forked from rotki/rotki
/
premium.py
176 lines (148 loc) · 5.77 KB
/
premium.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import time
import hashlib
import hmac
import base64
import requests
from http import HTTPStatus
from urllib.parse import urlencode
from binascii import Error as binascii_error
from rotkehlchen.utils import rlk_jsonloads
from rotkehlchen.constants import ROTKEHLCHEN_SERVER_TIMEOUT
import logging
logger = logging.getLogger(__name__)
HANDLABLE_STATUS_CODES = [
HTTPStatus.OK,
HTTPStatus.NOT_FOUND,
HTTPStatus.UNAUTHORIZED,
HTTPStatus.BAD_REQUEST,
]
def premium_create_and_verify(api_key, api_secret):
"""Create a Premium object with the key pairs and verify them.
Returns a tuple (premium_object, valid, empty_or_error) where:
- premium_object: Is the initialized premium_object
- valid: A boolean indicating if the api keys are actually valid. This
is found out by making an API call.
- empty_or_error: A string containing an error message if something went wrong
"""
try:
premium = Premium(api_key, api_secret)
valid, empty_or_error = premium.is_active()
except binascii_error:
return None, False, 'incorrect api key format'
return premium, valid, empty_or_error
class Premium(object):
def __init__(self, api_key, api_secret):
self.session = requests.session()
self.apiversion = '1'
self.uri = 'http://localhost:5001/api/{}/'.format(self.apiversion)
self.reset_credentials(api_key, api_secret)
def reset_credentials(self, api_key, api_secret):
self.api_key = api_key
self.secret = base64.b64decode(api_secret)
self.session.headers.update({
'API-KEY': self.api_key,
})
def set_credentials(self, api_key, api_secret):
old_api_key = self.api_key
old_secret = base64.b64encode(self.secret)
# Forget the cached active value since we are trying new credentials
if hasattr(self, 'active'):
del self.active
# If what's given is not even valid b64 encoding then stop here
try:
self.reset_credentials(api_key, api_secret)
except binascii_error as e:
return False, 'Secret Key formatting error: {}'.format(e)
active, empty_or_error = self.is_active()
if not active:
self.reset_credentials(old_api_key, old_secret)
return False, empty_or_error
return True, ''
def is_active(self):
if hasattr(self, 'active'):
return self.active, ''
else:
self.active, result_or_error = self.query_last_data_metadata()
emptystr_or_error = '' if self.active else result_or_error
return self.active, emptystr_or_error
def process_response(self, response):
result_or_error = ''
success = False
if response.status_code not in HANDLABLE_STATUS_CODES:
result_or_error = (
'Unexpected status response({}) from rotkehlchen server'.format(
response.status_code))
else:
result_or_error = rlk_jsonloads(response.text)
if 'error' in result_or_error:
result_or_error = result_or_error['error']
else:
success = True
return success, result_or_error
def sign(self, method, **kwargs):
urlpath = '/api/' + self.apiversion + '/' + method
req = kwargs
req['nonce'] = int(1000 * time.time())
# print('HASH OF BLOB: {}'.format(hashlib.sha256(req['data_blob']).digest()))
post_data = urlencode(req)
hashable = post_data.encode()
message = urlpath.encode() + hashlib.sha256(hashable).digest()
signature = hmac.new(
self.secret,
message,
hashlib.sha512
)
return signature, req
def upload_data(self, data_blob, our_hash, last_modify_ts, compression_type):
signature, data = self.sign(
'save_data',
data_blob=data_blob,
original_hash=our_hash,
last_modify_ts=last_modify_ts,
index=0,
length=len(data_blob),
compression=compression_type,
)
self.session.headers.update({
'API-SIGN': base64.b64encode(signature.digest()),
})
try:
response = self.session.put(
self.uri + 'save_data',
data=data,
timeout=ROTKEHLCHEN_SERVER_TIMEOUT,
)
except requests.ConnectionError:
return False, 'Could not connect to rotkehlchen server'
success, result_or_error = self.process_response(response)
return success, result_or_error
def pull_data(self):
signature, data = self.sign('get_saved_data')
self.session.headers.update({
'API-SIGN': base64.b64encode(signature.digest()),
})
try:
response = self.session.get(
self.uri + 'get_saved_data',
data=data,
timeout=ROTKEHLCHEN_SERVER_TIMEOUT,
)
except requests.ConnectionError:
return False, 'Could not connect to rotkehlchen server'
success, result_or_error = self.process_response(response)
return success, result_or_error
def query_last_data_metadata(self):
signature, data = self.sign('last_data_metadata')
self.session.headers.update({
'API-SIGN': base64.b64encode(signature.digest()),
})
try:
response = self.session.get(
self.uri + 'last_data_metadata',
data=data,
timeout=ROTKEHLCHEN_SERVER_TIMEOUT,
)
except requests.ConnectionError:
return False, 'Could not connect to rotkehlchen server'
success, result_or_error = self.process_response(response)
return success, result_or_error