Skip to content

Commit

Permalink
Merge pull request #19 from RedisLabsModules/range_aggregation
Browse files Browse the repository at this point in the history
Add support for Range aggregation
  • Loading branch information
danni-m committed Nov 18, 2018
2 parents da554db + d8cb514 commit 097da57
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 48 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Optional args:
TS.CREATERULE SOURCE_KEY AGG_TYPE BUCKET_SIZE_SEC DEST_KEY
```
* SOURCE_KEY - key name for source time series
* AGG_TYPE - aggregation type one of the following: avg, sum, min, max, count, first, last
* AGG_TYPE - aggregation type one of the following: avg, sum, min, max, range, count, first, last
* BUCKET_SIZE_SEC - time bucket for aggregated compaction,
* DEST_KEY - key name for destination time series

Expand Down
151 changes: 106 additions & 45 deletions src/compaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,49 @@
#include "rmutil/alloc.h"

typedef struct MaxMinContext {
double value;
double minValue;
double maxValue;
char isResetted;
} MaxMinContext;

typedef struct SingleValueContext {
double value;
char isResetted;
} SingleValueContext;

typedef struct AvgContext {
double val;
double cnt;
} AvgContext;

void *SingleValueCreateContext() {
SingleValueContext *context = (SingleValueContext*)malloc(sizeof(SingleValueContext));
context->value = 0;
context->isResetted = TRUE;
return context;
}

void SingleValueReset(void *contextPtr) {
SingleValueContext *context = (SingleValueContext*)contextPtr;
context->value = 0;
context->isResetted = TRUE;
}

double SingleValueFinlize(void *contextPtr) {
SingleValueContext *context = (SingleValueContext *)contextPtr;
return context->value;
}

void SingleValueWriteContext(void *contextPtr, RedisModuleIO * io) {
SingleValueContext *context = (SingleValueContext *)contextPtr;
RedisModule_SaveDouble(io, context->value);
}

void SingleValueReadContext(void *contextPtr, RedisModuleIO * io){
SingleValueContext *context = (SingleValueContext *)contextPtr;
context->value = RedisModule_LoadDouble(io);
}

void *AvgCreateContext() {
AvgContext *context = (AvgContext*)malloc(sizeof(AvgContext));
context->cnt = 0;
Expand Down Expand Up @@ -65,132 +99,153 @@ static AggregationClass aggAvg = {

void *MaxMinCreateContext() {
MaxMinContext *context = (MaxMinContext *)malloc(sizeof(MaxMinContext));
context->value = 0;
context->minValue = 0;
context->maxValue = 0;
context->isResetted = TRUE;
return context;
}

void MaxAppendValue(void *contextPtr, double value) {
void MaxMinAppendValue(void *contextPtr, double value) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
if (context->isResetted) {
context->isResetted = FALSE;
context->value = value;
} else if (value > context->value) {
context->value = value;
context->maxValue = value;
context->minValue = value;
} else {
if (value > context->maxValue) {
context->maxValue = value;
}
if (value < context->minValue) {
context->minValue = value;
}
}
}

double MaxMinFinalize(void *contextPtr) {
double MaxFinalize(void *contextPtr) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
return context->value;
return context->maxValue;
}

void MaxMinReset(void *contextPtr) {
double MinFinalize(void *contextPtr) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
context->value = 0;
context->isResetted = TRUE;
return context->minValue;
}

void MinAppendValue(void *contextPtr, double value) {
double RangeFinalize(void *contextPtr) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
if (context->isResetted) {
context->isResetted = FALSE;
context->value = value;
} else if (value < context->value) {
context->value = value;
}
return context->maxValue - context->minValue;
}

void MaxMinReset(void *contextPtr) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
context->maxValue = 0;
context->minValue = 0;
context->isResetted = TRUE;
}

void MaxMinWriteContext(void *contextPtr, RedisModuleIO * io) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
RedisModule_SaveDouble(io, context->value);
RedisModule_SaveDouble(io, context->maxValue);
RedisModule_SaveDouble(io, context->minValue);
RedisModule_SaveStringBuffer(io, &context->isResetted, 1);
}

void MaxMinReadContext(void *contextPtr, RedisModuleIO * io) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
size_t len = 1;
context->value = RedisModule_LoadDouble(io);
context->maxValue = RedisModule_LoadDouble(io);
context->minValue = RedisModule_LoadDouble(io);
context->isResetted = RedisModule_LoadStringBuffer(io, &len)[0];
}

void SumAppendValue(void *contextPtr, double value) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
SingleValueContext *context = (SingleValueContext *)contextPtr;
context->value += value;
}

void CountAppendValue(void *contextPtr, double value) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
SingleValueContext *context = (SingleValueContext *)contextPtr;
context->value++;
}

void FirstAppendValue(void *contextPtr, double value) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
SingleValueContext *context = (SingleValueContext *)contextPtr;
if (context->isResetted) {
context->isResetted = FALSE;
context->value = value;
}
}

void LastAppendValue(void *contextPtr, double value) {
MaxMinContext *context = (MaxMinContext *)contextPtr;
SingleValueContext *context = (SingleValueContext *)contextPtr;
context->value = value;
}

static AggregationClass aggMax = {
.createContext = MaxMinCreateContext,
.appendValue = MaxAppendValue,
.appendValue = MaxMinAppendValue,
.freeContext = rm_free,
.finalize = MaxMinFinalize,
.finalize = MaxFinalize,
.writeContext = MaxMinWriteContext,
.readContext = MaxMinReadContext,
.resetContext = MaxMinReset
};

static AggregationClass aggMin = {
.createContext = MaxMinCreateContext,
.appendValue = MinAppendValue,
.appendValue = MaxMinAppendValue,
.freeContext = rm_free,
.finalize = MaxMinFinalize,
.finalize = MinFinalize,
.writeContext = MaxMinWriteContext,
.readContext = MaxMinReadContext,
.resetContext = MaxMinReset
};

static AggregationClass aggSum = {
.createContext = MaxMinCreateContext,
.createContext = SingleValueCreateContext,
.appendValue = SumAppendValue,
.freeContext = rm_free,
.finalize = MaxMinFinalize,
.writeContext = MaxMinWriteContext,
.readContext = MaxMinReadContext,
.resetContext = MaxMinReset
.finalize = SingleValueFinlize,
.writeContext = SingleValueWriteContext,
.readContext = SingleValueReadContext,
.resetContext = SingleValueReset
};

static AggregationClass aggCount = {
.createContext = MaxMinCreateContext,
.createContext = SingleValueCreateContext,
.appendValue = CountAppendValue,
.freeContext = rm_free,
.finalize = MaxMinFinalize,
.writeContext = MaxMinWriteContext,
.readContext = MaxMinReadContext,
.resetContext = MaxMinReset
.finalize = SingleValueFinlize,
.writeContext = SingleValueWriteContext,
.readContext = SingleValueReadContext,
.resetContext = SingleValueReset
};

static AggregationClass aggFirst = {
.createContext = MaxMinCreateContext,
.createContext = SingleValueCreateContext,
.appendValue = FirstAppendValue,
.freeContext = rm_free,
.finalize = MaxMinFinalize,
.writeContext = MaxMinWriteContext,
.readContext = MaxMinReadContext,
.resetContext = MaxMinReset
.finalize = SingleValueFinlize,
.writeContext = SingleValueWriteContext,
.readContext = SingleValueReadContext,
.resetContext = SingleValueReset
};

static AggregationClass aggLast = {
.createContext = MaxMinCreateContext,
.createContext = SingleValueCreateContext,
.appendValue = LastAppendValue,
.freeContext = rm_free,
.finalize = MaxMinFinalize,
.finalize = SingleValueFinlize,
.writeContext = SingleValueWriteContext,
.readContext = SingleValueReadContext,
.resetContext = SingleValueReset
};

static AggregationClass aggRange = {
.createContext = MaxMinCreateContext,
.appendValue = MaxMinAppendValue,
.freeContext = rm_free,
.finalize = RangeFinalize,
.writeContext = MaxMinWriteContext,
.readContext = MaxMinReadContext,
.resetContext = MaxMinReset
Expand Down Expand Up @@ -227,6 +282,8 @@ int StringLenAggTypeToEnum(const char *agg_type, size_t len) {
result = TS_AGG_FIRST;
} else if (strncmp(agg_type_lower, "last", len) == 0) {
result = TS_AGG_LAST;
} else if (strncmp(agg_type_lower, "range", len) == 0) {
result = TS_AGG_RANGE;
} else {
result = TS_AGG_INVALID;
}
Expand All @@ -250,6 +307,8 @@ const char * AggTypeEnumToString(int aggType) {
return "FIRST";
case TS_AGG_LAST:
return "LAST";
case TS_AGG_RANGE:
return "RANGE";
default:
return "Unknown";
}
Expand Down Expand Up @@ -280,6 +339,8 @@ AggregationClass* GetAggClass(int aggType) {
case AGG_LAST:
return &aggLast;
break;
case TS_AGG_RANGE:
return &aggRange;
default:
return NULL;
}
Expand Down
3 changes: 2 additions & 1 deletion src/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ typedef enum {
TS_AGG_COUNT,
TS_AGG_FIRST,
TS_AGG_LAST,
TS_AGG_TYPES_MAX // 8
TS_AGG_RANGE,
TS_AGG_TYPES_MAX // 9
} TS_AGG_TYPES_T;

#endif
1 change: 1 addition & 0 deletions src/tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ MU_TEST(test_StringLenAggTypeToEnum) {
mu_check(StringAggTypeToEnum("count") == TS_AGG_COUNT);
mu_check(StringAggTypeToEnum("first") == TS_AGG_FIRST);
mu_check(StringAggTypeToEnum("last") == TS_AGG_LAST);
mu_check(StringAggTypeToEnum("range") == TS_AGG_RANGE);
}

MU_TEST_SUITE(test_suite) {
Expand Down
10 changes: 9 additions & 1 deletion src/tests/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import math
from rmtest import ModuleTestCase

class MyTestCase(ModuleTestCase(os.path.dirname(os.path.abspath(__file__)) + '/../redis-tsdb-module.so')):
class RedisTimeseriesTests(ModuleTestCase(os.path.dirname(os.path.abspath(__file__)) + '/../redis-tsdb-module.so')):
def _get_ts_info(self, redis, key):
info = redis.execute_command('TS.INFO', key)
return dict([(info[i], info[i+1]) for i in range(0, len(info), 2)])
Expand Down Expand Up @@ -325,6 +325,14 @@ def test_agg_last(self):
actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50)
assert expected_result == actual_result

def test_agg_range(self):
with self.redis() as r:
agg_key = self._insert_agg_data(r, 'tester', 'range')

expected_result = [[10, '74'], [20, '74'], [30, '74'], [40, '74']]
actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50)
assert expected_result == actual_result

def test_downsampling_rules(self):
"""
Test downsmapling rules - avg,min,max,count,sum with 4 keys each.
Expand Down

0 comments on commit 097da57

Please sign in to comment.