Skip to content
This repository was archived by the owner on Jan 24, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions redistimeseries/client.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
import six
import redis
from redis import Redis, RedisError
from redis import Redis
from redis.client import Pipeline
from redis.client import bool_ok
from redis.client import int_or_none
from redis._compat import (long, nativestr)
from redis.exceptions import DataError
from redis._compat import nativestr

class TSInfo(object):
rules = []
labels = []
sourceKey = None
chunk_count = None
last_time_stamp = None
memory_usage = None
total_samples = None
retention_msecs = None
last_time_stamp = None
first_time_stamp = None
max_samples_per_chunk = None

def __init__(self, args):
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
self.rules = response['rules']
self.sourceKey = response['sourceKey']
self.chunkCount = response['chunkCount']
self.memory_usage = response['memoryUsage']
self.total_samples = response['totalSamples']
self.labels = list_to_dict(response['labels'])
self.lastTimeStamp = response['lastTimestamp']
self.retention_msecs = response['retentionTime']
self.lastTimeStamp = response['lastTimestamp']
self.first_time_stamp = response['firstTimestamp']
self.maxSamplesPerChunk = response['maxSamplesPerChunk']

def list_to_dict(aList):
Expand Down Expand Up @@ -262,3 +266,22 @@ def info(self, key):
def queryindex(self, filters):
"""Get all the keys matching the ``filter`` list."""
return self.execute_command(self.QUERYINDEX_CMD, *filters)

def pipeline(self, transaction=True, shard_hint=None):
"""
Return a new pipeline object that can queue multiple commands for
later execution. ``transaction`` indicates whether all commands
should be executed atomically. Apart from making a group of operations
atomic, pipelines are useful for reducing the back-and-forth overhead
between the client and server.
Overridden in order to provide the right client through the pipeline.
"""
p = Pipeline(
connection_pool=self.connection_pool,
response_callbacks=self.response_callbacks,
transaction=transaction,
shard_hint=shard_hint)
return p

class Pipeline(Pipeline, Client):
"Pipeline for Redis TimeSeries Client"
17 changes: 16 additions & 1 deletion test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def testAlter(self):
rts.alter(1, labels={'Time':'Series'})
self.assertEqual('Series', rts.info(1).labels['Time'])
self.assertEqual(10, rts.info(1).retention_msecs)

pipe = rts.pipeline()
self.assertTrue(pipe.create(2))
def testAdd(self):
'''Test TS.ADD calls'''

Expand Down Expand Up @@ -141,11 +142,25 @@ def testInfo(self):
self.assertEqual(info.labels['currentLabel'], 'currentData')

def testQueryIndex(self):
'''Test TS.QUERYINDEX calls'''
rts.create(1, labels={'Test':'This'})
rts.create(2, labels={'Test':'This', 'Taste':'That'})
self.assertEqual(2, len(rts.queryindex(['Test=This'])))
self.assertEqual(1, len(rts.queryindex(['Taste=That'])))
self.assertEqual(['2'], rts.queryindex(['Taste=That']))

def testPipeline(self):
'''Test pipeline'''
pipeline = rts.pipeline()
pipeline.create('with_pipeline')
for i in range(100):
pipeline.add('with_pipeline', i, 1.1 * i)
pipeline.execute()

info = rts.info('with_pipeline')
self.assertEqual(info.lastTimeStamp, 99)
self.assertEqual(info.total_samples, 100)
self.assertEqual(rts.get('with_pipeline')[1], 99 * 1.1)

if __name__ == '__main__':
unittest.main()