-
Notifications
You must be signed in to change notification settings - Fork 92
/
tectonic.py
183 lines (148 loc) · 5.33 KB
/
tectonic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""
python client for tectonic server
"""
import ffi
import socket
import json
import struct
import time
import sys
import asyncio
from io import StringIO
import pandas as pd
import numpy as np
class TectonicDB():
"""
Example Usage:
from tectonic import TectonicDB
import json
import asyncio
async def subscribe(name):
db = TectonicDB()
print(await db.subscribe(name))
while 1:
_, item = await db.poll()
if item == b"NONE":
await asyncio.sleep(0.01)
else:
yield json.loads(item)
class TickBatcher(object):
def __init__(self, db_name):
self.one_batch = []
self.db_name = db_name
async def batch(self):
generator = subscribe(self.db_name)
async for item in generator:
self.one_batch.append(item)
async def timer(self):
while 1:
await asyncio.sleep(5)
print(len(self.one_batch))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
proc = TickBatcher("bnc_xrp_btc")
loop.create_task(proc.batch())
loop.create_task(proc.timer())
loop.run_forever()
loop.close()
"""
def __init__(self, host="localhost", port=9001):
self.subscribed = False
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (host, port)
self.sock.connect(server_address)
async def cmd(self, cmd):
loop = asyncio.get_event_loop()
if type(cmd) != str:
message = (cmd.decode() + '\n').encode()
else:
message = (cmd+'\n').encode()
loop.sock_sendall(self.sock, message)
if "GET" in cmd and "JSON" not in cmd and "CSV" not in cmd:
return await self._recv_dtf()
else:
return await self._recv_text()
async def _recv_dtf(self):
success, data = await self._recv_text()
ups = ffi.parse_stream(data)
return success, ups
async def _recv_text(self):
loop = asyncio.get_event_loop()
header = await loop.sock_recv(self.sock, 9)
current_len = len(header)
while current_len < 9:
header += await loop.sock_recv(self.sock, 9-current_len)
current_len = len(header)
success, bytes_to_read = struct.unpack('>?Q', header)
if bytes_to_read == 0:
return success, ""
body = await loop.sock_recv(self.sock, 1)
body_len = len(body)
while body_len < bytes_to_read:
len_to_read = bytes_to_read - body_len
if len_to_read > 32:
len_to_read = 32
body += await loop.sock_recv(self.sock, len_to_read)
body_len = len(body)
return success, body
def destroy(self):
self.sock.close()
async def info(self):
return await self.cmd("INFO")
async def countall(self):
return await self.cmd("COUNT ALL")
async def countall_in_mem(self):
return await self.cmd("COUNT ALL IN MEM")
async def ping(self):
return await self.cmd("PING")
async def help(self):
return await self.cmd("HELP")
async def insert(self, ts, seq, is_trade, is_bid, price, size, dbname):
return await self.cmd("INSERT {}, {}, {} ,{}, {}, {}; INTO {}"
.format( ts, seq,
't' if is_trade else 'f',
't' if is_bid else 'f', price, size,
dbname))
async def add(self, ts, seq, is_trade, is_bid, price, size):
return await self.cmd("ADD {}, {}, {} ,{}, {}, {};"
.format( ts, seq,
't' if is_trade else 'f',
't' if is_bid else 'f', price, size))
async def getall(self):
success, ret = await self.cmd("GET ALL")
return success, list(map(lambda x:x.to_dict(), ret))
async def get(self, n):
success, ret = await self.cmd("GET {}".format(n))
if success:
return success, list(map(lambda x:x.to_dict(), ret))
else:
return False, None
async def clear(self):
return await self.cmd("CLEAR")
async def clearall(self):
return await self.cmd("CLEAR ALL")
async def flush(self):
return await self.cmd("FLUSH")
async def flushall(self):
return await self.cmd("FLUSH ALL")
async def create(self, dbname):
return await self.cmd("CREATE {}".format(dbname))
async def use(self, dbname):
return await self.cmd("USE {}".format(dbname))
async def unsubscribe(self):
await self.cmd("UNSUBSCRIBE")
self.subscribed = False
async def subscribe(self, dbname):
res = await self.cmd("SUBSCRIBE {}".format(dbname))
if res[0]:
self.subscribed = True
return res
async def poll(self):
return await self.cmd("")
async def range(self, dbname, start, finish):
self.use(dbname)
data = await self.cmd("GET ALL FROM {} TO {} AS CSV".format(start, finish).encode())
data = data[1]
return data