This repository has been archived by the owner on Mar 12, 2022. It is now read-only.
/
redis_db.py
105 lines (82 loc) · 3.72 KB
/
redis_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python3
import asyncio
import aioredis
SERVICE_PUBLIC = "Y"
class DpowRedis(object):
def __init__(self, server, loop):
self.pool = aioredis.create_pool(
server,
minsize=5, maxsize=15,
loop=loop
)
async def setup(self):
self.pool = await self.pool
async def close(self):
self.pool.close()
await self.pool.wait_closed()
async def all_statistics(self):
precache_total = await self.get("stats:precache") or 0
on_demand_total = await self.get("stats:ondemand") or 0
public_services = list()
private_services = {"count": 0, "precache": 0, "ondemand": 0}
services = await self.set_members("services")
for service in services:
info = await self.hash_getmany(f"service:{service}", "public", "display", "website", "precache", "ondemand")
info["precache"] = int(info["precache"])
info["ondemand"] = int(info["ondemand"])
is_public = info.pop("public") == SERVICE_PUBLIC
if is_public:
public_services.append(info)
else:
private_services["count"] += 1
private_services["precache"] += info["precache"]
private_services["ondemand"] += info["ondemand"]
return dict(
services={
"public": public_services,
"private": private_services
},
work={
"precache": int(precache_total),
"ondemand": int(on_demand_total)
}
)
async def insert(self, key: str, value: str):
return await self.pool.execute('set', key, value)
async def insert_expire(self, key: str, value: str, seconds: int):
return await self.pool.execute('setex', key, seconds, value)
async def insert_if_noexist(self, key: str, value: str):
"""Returns True if the key was inserted, False if it was already there"""
key_set = await self.pool.execute('setnx', key, value)
return key_set == 1
async def insert_if_noexist_expire(self, key: str, value: str, seconds: int):
"""Returns True if the key was inserted, False if it was already there"""
key_set = await self.pool.execute('setnx', key, value)
if key_set == 1:
await self.pool.execute('expire', key, seconds)
return key_set == 1
async def delete(self, key: str):
return await self.pool.execute('del', key)
async def get(self, key: str):
val = await self.pool.execute('get', key)
return val.decode("utf-8") if val else None
async def exists(self, key: str):
exists = await self.pool.execute('exists', key)
return exists == 1
async def increment(self, key: str):
return await self.pool.execute('incr', key)
async def hash_increment(self, key: str, field: str, by=1):
return await self.pool.execute('hincrby', key, field, by)
async def hash_getall(self, key: str):
arr = await self.pool.execute('hgetall', key)
return {arr[i].decode("utf-8"): arr[i+1].decode("utf-8") for i in range(0, len(arr)-1, 2)}
async def hash_getmany(self, key: str, *fields, decode=True):
arr = await self.pool.execute('hmget', key, *fields)
return {fields[i]: arr[i].decode("utf-8") for i in range(len(arr))}
async def hash_get(self, key: str, field: str):
return await self.pool.execute('hget', key, field)
async def set_add(self, key: str, value: str):
return await self.pool.execute('sadd', key, value)
async def set_members(self, key: str):
members = await self.pool.execute('smembers', key)
return {member.decode("utf-8") for member in members}