From d8cb514d87e1a33a62c8483e61285c0fd8f44ad9 Mon Sep 17 00:00:00 2001 From: Danni Moiseyev Date: Sun, 18 Nov 2018 17:53:05 +0200 Subject: [PATCH] Add support for Range aggregation --- README.md | 2 +- src/compaction.c | 151 +++++++++++++++++++++++++++------------ src/consts.h | 3 +- src/tests.c | 1 + src/tests/test_module.py | 10 ++- 5 files changed, 119 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 5e8348e5d..c7a382aae 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ Optional args: TS.CREATERULE SOURCE_KEY AGG_TYPE BUCKET_SIZE_SEC DEST_KEY ``` * SOURCE_KEY - key name for source time series -* AGG_TYPE - aggregation type one of the following: avg, sum, min, max, count, first, last +* AGG_TYPE - aggregation type one of the following: avg, sum, min, max, range, count, first, last * BUCKET_SIZE_SEC - time bucket for aggregated compaction, * DEST_KEY - key name for destination time series diff --git a/src/compaction.c b/src/compaction.c index 6e6a86391..23034cedd 100644 --- a/src/compaction.c +++ b/src/compaction.c @@ -4,15 +4,49 @@ #include "rmutil/alloc.h" typedef struct MaxMinContext { - double value; + double minValue; + double maxValue; char isResetted; } MaxMinContext; +typedef struct SingleValueContext { + double value; + char isResetted; +} SingleValueContext; + typedef struct AvgContext { double val; double cnt; } AvgContext; +void *SingleValueCreateContext() { + SingleValueContext *context = (SingleValueContext*)malloc(sizeof(SingleValueContext)); + context->value = 0; + context->isResetted = TRUE; + return context; +} + +void SingleValueReset(void *contextPtr) { + SingleValueContext *context = (SingleValueContext*)contextPtr; + context->value = 0; + context->isResetted = TRUE; +} + +double SingleValueFinlize(void *contextPtr) { + SingleValueContext *context = (SingleValueContext *)contextPtr; + return context->value; +} + +void SingleValueWriteContext(void *contextPtr, RedisModuleIO * io) { + SingleValueContext *context = (SingleValueContext *)contextPtr; + RedisModule_SaveDouble(io, context->value); +} + +void SingleValueReadContext(void *contextPtr, RedisModuleIO * io){ + SingleValueContext *context = (SingleValueContext *)contextPtr; + context->value = RedisModule_LoadDouble(io); +} + void *AvgCreateContext() { AvgContext *context = (AvgContext*)malloc(sizeof(AvgContext)); context->cnt = 0; @@ -65,66 +99,77 @@ static AggregationClass aggAvg = { void *MaxMinCreateContext() { MaxMinContext *context = (MaxMinContext *)malloc(sizeof(MaxMinContext)); - context->value = 0; + context->minValue = 0; + context->maxValue = 0; context->isResetted = TRUE; return context; } -void MaxAppendValue(void *contextPtr, double value) { +void MaxMinAppendValue(void *contextPtr, double value) { MaxMinContext *context = (MaxMinContext *)contextPtr; if (context->isResetted) { context->isResetted = FALSE; - context->value = value; - } else if (value > context->value) { - context->value = value; + context->maxValue = value; + context->minValue = value; + } else { + if (value > context->maxValue) { + context->maxValue = value; + } + if (value < context->minValue) { + context->minValue = value; + } } } -double MaxMinFinalize(void *contextPtr) { +double MaxFinalize(void *contextPtr) { MaxMinContext *context = (MaxMinContext *)contextPtr; - return context->value; + return context->maxValue; } -void MaxMinReset(void *contextPtr) { +double MinFinalize(void *contextPtr) { MaxMinContext *context = (MaxMinContext *)contextPtr; - context->value = 0; - context->isResetted = TRUE; + return context->minValue; } -void MinAppendValue(void *contextPtr, double value) { +double RangeFinalize(void *contextPtr) { MaxMinContext *context = (MaxMinContext *)contextPtr; - if (context->isResetted) { - context->isResetted = FALSE; - context->value = value; - } else if (value < context->value) { - context->value = value; - } + return context->maxValue - context->minValue; +} + +void MaxMinReset(void *contextPtr) { + MaxMinContext *context = (MaxMinContext *)contextPtr; + context->maxValue = 0; + context->minValue = 0; + context->isResetted = TRUE; } void MaxMinWriteContext(void *contextPtr, RedisModuleIO * io) { MaxMinContext *context = (MaxMinContext *)contextPtr; - RedisModule_SaveDouble(io, context->value); + RedisModule_SaveDouble(io, context->maxValue); + RedisModule_SaveDouble(io, context->minValue); RedisModule_SaveStringBuffer(io, &context->isResetted, 1); } void MaxMinReadContext(void *contextPtr, RedisModuleIO * io) { MaxMinContext *context = (MaxMinContext *)contextPtr; size_t len = 1; - context->value = RedisModule_LoadDouble(io); + context->maxValue = RedisModule_LoadDouble(io); + context->minValue = RedisModule_LoadDouble(io); context->isResetted = RedisModule_LoadStringBuffer(io, &len)[0]; } + void SumAppendValue(void *contextPtr, double value) { - MaxMinContext *context = (MaxMinContext *)contextPtr; + SingleValueContext *context = (SingleValueContext *)contextPtr; context->value += value; } void CountAppendValue(void *contextPtr, double value) { - MaxMinContext *context = (MaxMinContext *)contextPtr; + SingleValueContext *context = (SingleValueContext *)contextPtr; context->value++; } void FirstAppendValue(void *contextPtr, double value) { - MaxMinContext *context = (MaxMinContext *)contextPtr; + SingleValueContext *context = (SingleValueContext *)contextPtr; if (context->isResetted) { context->isResetted = FALSE; context->value = value; @@ -132,15 +177,15 @@ void FirstAppendValue(void *contextPtr, double value) { } void LastAppendValue(void *contextPtr, double value) { - MaxMinContext *context = (MaxMinContext *)contextPtr; + SingleValueContext *context = (SingleValueContext *)contextPtr; context->value = value; } static AggregationClass aggMax = { .createContext = MaxMinCreateContext, - .appendValue = MaxAppendValue, + .appendValue = MaxMinAppendValue, .freeContext = rm_free, - .finalize = MaxMinFinalize, + .finalize = MaxFinalize, .writeContext = MaxMinWriteContext, .readContext = MaxMinReadContext, .resetContext = MaxMinReset @@ -148,49 +193,59 @@ static AggregationClass aggMax = { static AggregationClass aggMin = { .createContext = MaxMinCreateContext, - .appendValue = MinAppendValue, + .appendValue = MaxMinAppendValue, .freeContext = rm_free, - .finalize = MaxMinFinalize, + .finalize = MinFinalize, .writeContext = MaxMinWriteContext, .readContext = MaxMinReadContext, .resetContext = MaxMinReset }; static AggregationClass aggSum = { - .createContext = MaxMinCreateContext, + .createContext = SingleValueCreateContext, .appendValue = SumAppendValue, .freeContext = rm_free, - .finalize = MaxMinFinalize, - .writeContext = MaxMinWriteContext, - .readContext = MaxMinReadContext, - .resetContext = MaxMinReset + .finalize = SingleValueFinlize, + .writeContext = SingleValueWriteContext, + .readContext = SingleValueReadContext, + .resetContext = SingleValueReset }; static AggregationClass aggCount = { - .createContext = MaxMinCreateContext, + .createContext = SingleValueCreateContext, .appendValue = CountAppendValue, .freeContext = rm_free, - .finalize = MaxMinFinalize, - .writeContext = MaxMinWriteContext, - .readContext = MaxMinReadContext, - .resetContext = MaxMinReset + .finalize = SingleValueFinlize, + .writeContext = SingleValueWriteContext, + .readContext = SingleValueReadContext, + .resetContext = SingleValueReset }; static AggregationClass aggFirst = { - .createContext = MaxMinCreateContext, + .createContext = SingleValueCreateContext, .appendValue = FirstAppendValue, .freeContext = rm_free, - .finalize = MaxMinFinalize, - .writeContext = MaxMinWriteContext, - .readContext = MaxMinReadContext, - .resetContext = MaxMinReset + .finalize = SingleValueFinlize, + .writeContext = SingleValueWriteContext, + .readContext = SingleValueReadContext, + .resetContext = SingleValueReset }; static AggregationClass aggLast = { - .createContext = MaxMinCreateContext, + .createContext = SingleValueCreateContext, .appendValue = LastAppendValue, .freeContext = rm_free, - .finalize = MaxMinFinalize, + .finalize = SingleValueFinlize, + .writeContext = SingleValueWriteContext, + .readContext = SingleValueReadContext, + .resetContext = SingleValueReset +}; + +static AggregationClass aggRange = { + .createContext = MaxMinCreateContext, + .appendValue = MaxMinAppendValue, + .freeContext = rm_free, + .finalize = RangeFinalize, .writeContext = MaxMinWriteContext, .readContext = MaxMinReadContext, .resetContext = MaxMinReset @@ -227,6 +282,8 @@ int StringLenAggTypeToEnum(const char *agg_type, size_t len) { result = TS_AGG_FIRST; } else if (strncmp(agg_type_lower, "last", len) == 0) { result = TS_AGG_LAST; + } else if (strncmp(agg_type_lower, "range", len) == 0) { + result = TS_AGG_RANGE; } else { result = TS_AGG_INVALID; } @@ -250,6 +307,8 @@ const char * AggTypeEnumToString(int aggType) { return "FIRST"; case TS_AGG_LAST: return "LAST"; + case TS_AGG_RANGE: + return "RANGE"; default: return "Unknown"; } @@ -280,6 +339,8 @@ AggregationClass* GetAggClass(int aggType) { case AGG_LAST: return &aggLast; break; + case TS_AGG_RANGE: + return &aggRange; default: return NULL; } diff --git a/src/consts.h b/src/consts.h index 2446c1c6a..eb88a95b0 100644 --- a/src/consts.h +++ b/src/consts.h @@ -27,7 +27,8 @@ typedef enum { TS_AGG_COUNT, TS_AGG_FIRST, TS_AGG_LAST, - TS_AGG_TYPES_MAX // 8 + TS_AGG_RANGE, + TS_AGG_TYPES_MAX // 9 } TS_AGG_TYPES_T; #endif \ No newline at end of file diff --git a/src/tests.c b/src/tests.c index ecae03b91..122be9e65 100644 --- a/src/tests.c +++ b/src/tests.c @@ -46,6 +46,7 @@ MU_TEST(test_StringLenAggTypeToEnum) { mu_check(StringAggTypeToEnum("count") == TS_AGG_COUNT); mu_check(StringAggTypeToEnum("first") == TS_AGG_FIRST); mu_check(StringAggTypeToEnum("last") == TS_AGG_LAST); + mu_check(StringAggTypeToEnum("range") == TS_AGG_RANGE); } MU_TEST_SUITE(test_suite) { diff --git a/src/tests/test_module.py b/src/tests/test_module.py index e37866000..2d8ff2b05 100644 --- a/src/tests/test_module.py +++ b/src/tests/test_module.py @@ -6,7 +6,7 @@ import math from rmtest import ModuleTestCase -class MyTestCase(ModuleTestCase(os.path.dirname(os.path.abspath(__file__)) + '/../redis-tsdb-module.so')): +class RedisTimeseriesTests(ModuleTestCase(os.path.dirname(os.path.abspath(__file__)) + '/../redis-tsdb-module.so')): def _get_ts_info(self, redis, key): info = redis.execute_command('TS.INFO', key) return dict([(info[i], info[i+1]) for i in range(0, len(info), 2)]) @@ -325,6 +325,14 @@ def test_agg_last(self): actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50) assert expected_result == actual_result + def test_agg_range(self): + with self.redis() as r: + agg_key = self._insert_agg_data(r, 'tester', 'range') + + expected_result = [[10, '74'], [20, '74'], [30, '74'], [40, '74']] + actual_result = r.execute_command('TS.RANGE', agg_key, 10, 50) + assert expected_result == actual_result + def test_downsampling_rules(self): """ Test downsmapling rules - avg,min,max,count,sum with 4 keys each.