-
Notifications
You must be signed in to change notification settings - Fork 0
/
redis.py
61 lines (54 loc) · 2.04 KB
/
redis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from redis_collections import Dict
from redis import StrictRedis
class Database:
def __init__(self, redis: StrictRedis,
consumer: str,
key: str = 'water-healer', **kwargs):
"""
Parameters
----------
redis: redis.StrictRedis
redis.StrictRedis object.
consumer: str
consumer name. Make sure `consumer` same as `consumer` use in kafka consumer.
key: str, optional (default='water-healer')
Redis key, to distinguish redis database.
"""
self.redis = redis
self.consumer = consumer
self.dict = Dict(redis=redis, key=key)
self.list_partitions = self.dict.get(consumer, '').split(',')
self.partitions = {p: Dict(redis=redis, key=f'{consumer}-{p}') for p in self.list_partitions}
def __contains__(self, key):
"""
Return True if the dict has a key, else return False.
key in d
"""
return key in self.partitions
def __getitem__(self, key):
"""
Return the item of the dict.
Raises a KeyError if key is not in the map.
d[key]
"""
if key not in self.partitions:
self.list_partitions = self.list_partitions + [key]
self.dict[self.consumer] = ','.join(self.list_partitions)
self.partitions[key] = Dict(redis=self.redis, key=f'{self.consumer}-{key}')
return self.partitions[key]
def pop(self, key, default=None):
if self.__contains__(key):
value = self.partitions.pop(key)
self.list_partitions = list(self.partitions.keys())
self.dict[self.consumer] = ','.join(self.list_partitions)
self.redis.delete(f'{self.consumer}-{key}')
return value
else:
return default
def get(self, key, default=None):
if self.__contains__(key):
return self.__getitem__(key)
else:
return default
def keys(self):
return [p for p in self.list_partitions if len(p)]