From 188c469fa914427901f29a085c99f210d3470e8f Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 18 Dec 2019 12:35:35 +0200 Subject: [PATCH 1/3] Added pipeline + tests --- redistimeseries/client.py | 32 +++++++++++++++++++++++++++++--- test_commands.py | 17 ++++++++++++++++- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/redistimeseries/client.py b/redistimeseries/client.py index 9b5cc7a..fe3626b 100644 --- a/redistimeseries/client.py +++ b/redistimeseries/client.py @@ -1,6 +1,7 @@ import six import redis -from redis import Redis, RedisError +from redis import Redis, RedisError +from redis.client import Pipeline from redis.client import bool_ok from redis.client import int_or_none from redis._compat import (long, nativestr) @@ -11,8 +12,11 @@ class TSInfo(object): labels = [] sourceKey = None chunk_count = None - last_time_stamp = None + memory_usage = None + total_samples = None retention_msecs = None + last_time_stamp = None + first_time_stamp = None max_samples_per_chunk = None def __init__(self, args): @@ -20,9 +24,12 @@ def __init__(self, args): self.rules = response['rules'] self.sourceKey = response['sourceKey'] self.chunkCount = response['chunkCount'] + self.memory_usage = response['memoryUsage'] + self.total_samples = response['totalSamples'] self.labels = list_to_dict(response['labels']) - self.lastTimeStamp = response['lastTimestamp'] self.retention_msecs = response['retentionTime'] + self.lastTimeStamp = response['lastTimestamp'] + self.first_time_stamp = response['firstTimestamp'] self.maxSamplesPerChunk = response['maxSamplesPerChunk'] def list_to_dict(aList): @@ -262,3 +269,22 @@ def info(self, key): def queryindex(self, filters): """Get all the keys matching the ``filter`` list.""" return self.execute_command(self.QUERYINDEX_CMD, *filters) + + def pipeline(self, transaction=True, shard_hint=None): + """ + Return a new pipeline object that can queue multiple commands for + later execution. ``transaction`` indicates whether all commands + should be executed atomically. Apart from making a group of operations + atomic, pipelines are useful for reducing the back-and-forth overhead + between the client and server. + Overridden in order to provide the right client through the pipeline. + """ + p = Pipeline( + connection_pool=self.connection_pool, + response_callbacks=self.response_callbacks, + transaction=transaction, + shard_hint=shard_hint) + return p + +class Pipeline(Pipeline, Client): + "Pipeline for ReJSONClient" \ No newline at end of file diff --git a/test_commands.py b/test_commands.py index 863b0f3..db74ec1 100644 --- a/test_commands.py +++ b/test_commands.py @@ -35,7 +35,8 @@ def testAlter(self): rts.alter(1, labels={'Time':'Series'}) self.assertEqual('Series', rts.info(1).labels['Time']) self.assertEqual(10, rts.info(1).retention_msecs) - + pipe = rts.pipeline() + self.assertTrue(pipe.create(2)) def testAdd(self): '''Test TS.ADD calls''' @@ -141,11 +142,25 @@ def testInfo(self): self.assertEqual(info.labels['currentLabel'], 'currentData') def testQueryIndex(self): + '''Test TS.QUERYINDEX calls''' rts.create(1, labels={'Test':'This'}) rts.create(2, labels={'Test':'This', 'Taste':'That'}) self.assertEqual(2, len(rts.queryindex(['Test=This']))) self.assertEqual(1, len(rts.queryindex(['Taste=That']))) self.assertEqual(['2'], rts.queryindex(['Taste=That'])) + def testPipeline(self): + '''Test pipeline''' + pipeline = rts.pipeline() + pipeline.create('with_pipeline') + for i in range(100): + pipeline.add('with_pipeline', i, 1.1 * i) + pipeline.execute() + + info = rts.info('with_pipeline') + self.assertEqual(info.lastTimeStamp, 99) + self.assertEqual(info.total_samples, 100) + self.assertEqual(rts.get('with_pipeline')[1], 99 * 1.1) + if __name__ == '__main__': unittest.main() \ No newline at end of file From 44cc3daecc943ee14c3de45492c7ff69198b3781 Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 18 Dec 2019 15:10:10 +0200 Subject: [PATCH 2/3] Correct string --- redistimeseries/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redistimeseries/client.py b/redistimeseries/client.py index fe3626b..51437e5 100644 --- a/redistimeseries/client.py +++ b/redistimeseries/client.py @@ -287,4 +287,4 @@ def pipeline(self, transaction=True, shard_hint=None): return p class Pipeline(Pipeline, Client): - "Pipeline for ReJSONClient" \ No newline at end of file + "Pipeline for Redis TimeSeries Client" \ No newline at end of file From 49018a116d66724f40e39b5e85d6fab6ff2c71ad Mon Sep 17 00:00:00 2001 From: Ariel Date: Wed, 18 Dec 2019 15:22:38 +0200 Subject: [PATCH 3/3] remove unused packages --- redistimeseries/client.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/redistimeseries/client.py b/redistimeseries/client.py index 51437e5..9530672 100644 --- a/redistimeseries/client.py +++ b/redistimeseries/client.py @@ -1,11 +1,8 @@ import six -import redis -from redis import Redis, RedisError +from redis import Redis from redis.client import Pipeline from redis.client import bool_ok -from redis.client import int_or_none -from redis._compat import (long, nativestr) -from redis.exceptions import DataError +from redis._compat import nativestr class TSInfo(object): rules = []