Skip to content

Commit

Permalink
basic functionallity: insert,query, max retention
Browse files Browse the repository at this point in the history
  • Loading branch information
danni-m committed Feb 4, 2017
1 parent a079f03 commit 7224436
Show file tree
Hide file tree
Showing 12 changed files with 1,502 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
@@ -0,0 +1,3 @@
[submodule "RedisModulesSDK"]
path = RedisModulesSDK
url = ./RedisModulesSDK
661 changes: 661 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions RedisModulesSDK
Submodule RedisModulesSDK added at 4c62df
53 changes: 53 additions & 0 deletions readme.md
@@ -0,0 +1,53 @@
# Redis TSDB Module

## Overview
With this module you can now store timeseries data effiecently in redis.
The data is stored in a compact way.

## License: AGPL

## Memory model
Each series has a linked list of chunks.
Each chunk has 1+ samples.
Sample is a timestamp+value.

## Features
* Quick inserts (50K samples per sec)
* Query by start time and end-time
* Configurable max retention period

## Build
1. `cd src`
2. `make`
3. `loadmodule redis-tsdb-module.so`

## Cmds
```sql
TS.INSERT key TIMESTAMP value
```
```sql
TS.QUERY key FROM_TIMESTAMP TO_TIMESTAMP
1) (integer) 1486289260
2) (integer) 49994
3) (integer) 1486289261
4) (integer) 49995
5) (integer) 1486289262
6) (integer) 49996
7) (integer) 1486289263
8) (integer) 49997
9) (integer) 1486289264
10) (integer) 49998
11) (integer) 1486289265
12) (integer) 49999
```
```sql
TS.META key
1) lastTimestamp
2) (integer) 1486289265
3) retentionSecs
4) (integer) 0
5) chunkCount
6) (integer) 139
7) maxSamplesPerChunk
8) (integer) 360
```
35 changes: 35 additions & 0 deletions src/Makefile
@@ -0,0 +1,35 @@
#set environment variable RM_INCLUDE_DIR to the location of redismodule.h
ifndef RM_INCLUDE_DIR
RM_INCLUDE_DIR=../RedisModulesSDK
endif

ifndef RMUTIL_LIBDIR
RMUTIL_LIBDIR=../RedisModulesSDK/rmutil
endif

# find the OS
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')

# Compile flags for linux / osx
ifeq ($(uname_S),Linux)
SHOBJ_CFLAGS ?= -fno-common -g -ggdb
SHOBJ_LDFLAGS ?= -shared -Bsymbolic
else
SHOBJ_CFLAGS ?= -dynamic -fno-common -g -ggdb
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup
endif
CFLAGS = -I$(RM_INCLUDE_DIR) -Wall -g -fPIC -lc -lm -std=gnu99 -DREDIS_MODULE_TARGET
CC=gcc

all: rmutil redis-tsdb-module.so

rmutil: FORCE
$(MAKE) -C $(RMUTIL_LIBDIR)

redis-tsdb-module.so: module.o tsdb.o
$(LD) -o $@ module.o tsdb.o $(SHOBJ_LDFLAGS) $(LIBS) -L$(RMUTIL_LIBDIR) -lrmutil -lc

clean:
rm -rf *.xo *.so *.o

FORCE:
5 changes: 5 additions & 0 deletions src/README.md
@@ -0,0 +1,5 @@
# An Example Redis Module

This project is a simple redis module demonstrating basic API usage and `librmutil`.

You can treat it as a basic module template. See the project's [README](../README.md) for more details.
167 changes: 167 additions & 0 deletions src/module.c
@@ -0,0 +1,167 @@
#include "redismodule.h"
#include "module.h"
#include "rmutil/util.h"
#include "rmutil/strings.h"
#include "rmutil/alloc.h"
// #include "rmutil/test_util.h"

#include "tsdb.h"

#define MYTYPE_ENCODING_VERSION 0

static RedisModuleType *SeriesType;

int TSDB_Meta(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);

if (argc != 2) return RedisModule_WrongArity(ctx);

RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ|REDISMODULE_WRITE);
Series *series;

if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY){
return RedisModule_ReplyWithError(ctx, "TSDB: key does not exist");
} else if (RedisModule_ModuleTypeGetType(key) != SeriesType){
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
} else {
series = RedisModule_ModuleTypeGetValue(key);
}

RedisModule_ReplyWithArray(ctx, 4*2);

RedisModule_ReplyWithSimpleString(ctx, "lastTimestamp");
RedisModule_ReplyWithLongLong(ctx, series->lastTimestamp);
RedisModule_ReplyWithSimpleString(ctx, "retentionSecs");
RedisModule_ReplyWithLongLong(ctx, series->retentionSecs);
RedisModule_ReplyWithSimpleString(ctx, "chunkCount");
RedisModule_ReplyWithLongLong(ctx, series->chunkCount);
RedisModule_ReplyWithSimpleString(ctx, "maxSamplesPerChunk");
RedisModule_ReplyWithLongLong(ctx, series->maxSamplesPerChunk);

return REDISMODULE_OK;
}

int TSDB_Query(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);

if (argc != 4) return RedisModule_WrongArity(ctx);

long long start_ts, end_ts;

if (RedisModule_StringToLongLong(argv[2], &start_ts) != REDISMODULE_OK)
RedisModule_ReplyWithError(ctx, "TSDB: start-timestamp is invalid");
if (RedisModule_StringToLongLong(argv[3], &end_ts) != REDISMODULE_OK)
RedisModule_ReplyWithError(ctx, "TSDB: end-timestamp is invalid");

RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ|REDISMODULE_WRITE);
Series *series;

if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY){
return RedisModule_ReplyWithError(ctx, "TSDB: key does not exist");
} else if (RedisModule_ModuleTypeGetType(key) != SeriesType){
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
} else {
series = RedisModule_ModuleTypeGetValue(key);
}

long long arraylen = 0;
RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
SeriesItertor iterator = SeriesQuery(series, start_ts, end_ts);
Sample *sample;
while ((sample = SeriesItertorGetNext(&iterator)) != NULL ) {
RedisModule_ReplyWithLongLong(ctx, sample->timestamp);
RedisModule_ReplyWithLongLong(ctx, sample->data);
arraylen++;
}

RedisModule_ReplySetArrayLength(ctx,arraylen*2);
return REDISMODULE_OK;
}

int TSDB_Insert(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);

if (argc != 4) {
return RedisModule_WrongArity(ctx);
}

RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ|REDISMODULE_WRITE);

double timestamp, value;
if ((RedisModule_StringToDouble(argv[3], &value) != REDISMODULE_OK))
return RedisModule_ReplyWithError(ctx,"TSDB: invalid value");

if ((RedisModule_StringToDouble(argv[2], &timestamp) != REDISMODULE_OK))
return RedisModule_ReplyWithError(ctx,"TSDB: invalid timestamp");

Series *series;

if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY){
series = TSDB_Create(ctx, argv[1]);
} else if (RedisModule_ModuleTypeGetType(key) != SeriesType){
return RedisModule_ReplyWithError(ctx, "TSDB: the key is not a TSDB key");
} else {
series = RedisModule_ModuleTypeGetValue(key);
}

int retval = SeriesAddSample(series, timestamp, value);
if (retval == TSDB_ERR_TIMESTAMP_TOO_OLD) {
RedisModule_ReplyWithError(ctx, "TSDB: timestamp is too old");
return REDISMODULE_ERR;
} else if (retval != TSDB_OK) {
RedisModule_ReplyWithError(ctx, "TSDB: Unknown Error");
return REDISMODULE_ERR;
} else {
RedisModule_ReplyWithSimpleString(ctx, "OK");
RedisModule_ReplicateVerbatim(ctx);
return REDISMODULE_OK;
}
}

Series* TSDB_Create(RedisModuleCtx *ctx, RedisModuleString *key) {
RedisModuleKey *series = RedisModule_OpenKey(ctx, key, REDISMODULE_READ|REDISMODULE_WRITE);

if (RedisModule_KeyType(series) != REDISMODULE_KEYTYPE_EMPTY) {
return NULL;
}

Series *newSeries = NewSeries();
RedisModule_ModuleTypeSetValue(series, SeriesType, newSeries);

RedisModule_Log(ctx, "info", "created new series");
return newSeries;
}
void series_aof_rewrite(RedisModuleIO *aof, RedisModuleString *key, void *value)
{}

void series_rdb_save(RedisModuleIO *rdb, void *value)
{}

void *series_rdb_load(RedisModuleIO *rdb, int encver)
{
return NULL;
}

int RedisModule_OnLoad(RedisModuleCtx *ctx) {
if (RedisModule_Init(ctx, "tsdb", 1, REDISMODULE_APIVER_1) ==
REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

RedisModuleTypeMethods tm = {
.version = REDISMODULE_TYPE_METHOD_VERSION,
.rdb_load = series_rdb_load,
.rdb_save = series_rdb_save,
.aof_rewrite = series_aof_rewrite,
.mem_usage = SeriesMemUsage,
.free = SeriesFree
};

SeriesType = RedisModule_CreateDataType(ctx, "TSDB-TYPE", 0, &tm);
if (SeriesType == NULL) return REDISMODULE_ERR;
RMUtil_RegisterWriteCmd(ctx, "ts.insert", TSDB_Insert);
RMUtil_RegisterWriteCmd(ctx, "ts.query", TSDB_Query);
RMUtil_RegisterWriteCmd(ctx, "ts.meta", TSDB_Meta);

return REDISMODULE_OK;
}
3 changes: 3 additions & 0 deletions src/module.h
@@ -0,0 +1,3 @@
#include "tsdb.h"

Series * TSDB_Create(RedisModuleCtx *ctx, RedisModuleString *key);

0 comments on commit 7224436

Please sign in to comment.