/
rest_client.py
132 lines (102 loc) · 3.14 KB
/
rest_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import hmac
import hashlib
import time
import urllib
import json
import datetime
import logging
import sys
from asyncio import *
import aiohttp
from cexio.exceptions import *
logger = logging.getLogger(__name__)
logger.level = logging.DEBUG
logger.addHandler(logging.StreamHandler(sys.stdout))
__all__ = [
'CEXRestClient',
]
# Assert that required algorithm implemented
assert 'sha256' in hashlib.algorithms_guaranteed
class CEXRestAuth:
"""
"""
#
# JSON template dict
_json = {
'key': None,
'signature': None,
'nonce': None,
}
def __init__(self, config):
try:
self._user_id = config['auth']['user_id']
self._key = config['auth']['key']
self._secret = config['auth']['secret']
except KeyError as ex:
raise ConfigError('Missing key in _config file', ex)
def get_curr_timestamp(self):
return int(datetime.datetime.now().timestamp() * 1000)
def get_timed_signature(self):
"""
Returns java timestamp in miliseconds, and 'timed' signature,
which is the digest of byte string, compound of timestamp, user ID ans public key
"""
timestamp = self.get_curr_timestamp()
message = "{}{}{}".format(timestamp, self._user_id, self._key)
return timestamp, hmac.new(self._secret.encode(), message.encode(), hashlib.sha256).hexdigest()
def get_params(self):
"""
Returns JSON from self._json dict
The request is valid within ~20 seconds
"""
timestamp, signature = self.get_timed_signature()
self._json['key'] = self._key
self._json['signature'] = signature
self._json['nonce'] = timestamp
return self._json
class CEXRestClient:
def __init__(self, config):
global headers
headers = {'content-type': 'application/json'}
try:
self._uri = config['rest']['uri']
self._need_auth = config['authorize']
if self._need_auth:
self._auth = CEXRestAuth(config)
except KeyError as ex:
raise ConfigError('Missing key in _config file', ex)
async def get(self, resource):
url = self._uri + resource
logger.debug("REST.Get> {}".format(url))
with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
self._validate(url, response)
response = await response.json()
logger.debug("REST.Resp> Response: {}".format(response))
return response
async def post(self, resource, params={}):
url = self._uri + resource
logger.debug("REST.Post> {}".format(url))
with aiohttp.ClientSession() as session:
if self._need_auth:
params.update(self._auth.get_params())
async with session.post(url, data=params) as response:
self._validate(url, response)
response = await response.json()
logger.debug("REST.Resp> {}".format(response))
return response
@staticmethod
def _validate(url, response):
if response.status != 200:
error = "Error response code {}: {} at: {}".format(response.status, response.reason, url)
logger.debug(error)
raise InvalidResponseError(error)
content_type = response.headers['CONTENT-TYPE']
if content_type != 'text/json':
error = "Invalid response content-type \'{}\' of: {}".format(content_type, url)
logger.debug(error)
raise InvalidResponseError(error)
if __name__ == "__main__":
pass
else:
pass