Skip to content

Commit

Permalink
New nodes class for better node url handling
Browse files Browse the repository at this point in the history
Wallet
* getKeysForAccount added
* getOwnerKeysForAccount, getActiveKeysForAccount, getPostingKeysForAccount added
beemapi
* WorkingNodeMissing is raised when no working node could be found
GrapheneRPC
* cycle([urls]) is replaced by the nodes class
Nodes
* Node handling and management of url and error_counts is performed by the nodes class
* sleep_and_check_retries is moved to the nodes class
* Websocket, steemnodrpc were adpapted to the changes
Unit tests
* new tests for the nodes class
* tests adapted for websocket and rpcutils
  • Loading branch information
holgern committed May 24, 2018
1 parent 1b7ab8c commit a261c53
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 105 deletions.
2 changes: 1 addition & 1 deletion beem/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(

# Open the websocket
self.websocket = SteemWebsocket(
urls=self.steem.rpc.urls,
urls=self.steem.rpc.nodes,
user=self.steem.rpc.user,
password=self.steem.rpc.password,
only_block_id=only_block_id,
Expand Down
51 changes: 51 additions & 0 deletions beem/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,42 @@ def getKeyForAccount(self, name, key_type):
raise MissingKeyError("No private key for {} found".format(name))
return

def getKeysForAccount(self, name, key_type):
""" Obtain a List of `key_type` Private Keys for an account from the wallet database
"""
if key_type not in ["owner", "active", "posting", "memo"]:
raise AssertionError("Wrong key type")
if key_type in Wallet.keyMap:
return Wallet.keyMap.get(key_type)
else:
if self.rpc.get_use_appbase():
account = self.rpc.find_accounts({'accounts': [name]}, api="database")['accounts']
else:
account = self.rpc.get_account(name)
if not account:
return
if len(account) == 0:
return
if key_type == "memo":
key = self.getPrivateKeyForPublicKey(
account[0]["memo_key"])
if key:
return [key]
else:
keys = []
key = None
for authority in account[0][key_type]["key_auths"]:
try:
key = self.getPrivateKeyForPublicKey(authority[0])
if key:
keys.append(key)
except MissingKeyError:
key = None
if key is None:
raise MissingKeyError("No private key for {} found".format(name))
return keys
return

def getOwnerKeyForAccount(self, name):
""" Obtain owner Private Key for an account from the wallet database
"""
Expand All @@ -406,6 +442,21 @@ def getPostingKeyForAccount(self, name):
"""
return self.getKeyForAccount(name, "posting")

def getOwnerKeysForAccount(self, name):
""" Obtain list of all owner Private Keys for an account from the wallet database
"""
return self.getKeysForAccount(name, "owner")

def getActiveKeysForAccount(self, name):
""" Obtain list of all owner Active Keys for an account from the wallet database
"""
return self.getKeysForAccount(name, "active")

def getPostingKeysForAccount(self, name):
""" Obtain list of all owner Posting Keys for an account from the wallet database
"""
return self.getKeysForAccount(name, "posting")

def getAccountFromPrivateKey(self, wif):
""" Obtain account name from private key
"""
Expand Down
4 changes: 4 additions & 0 deletions beemapi/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@ class InvalidEndpointUrl(Exception):

class UnnecessarySignatureDetected(Exception):
pass


class WorkingNodeMissing(Exception):
pass
93 changes: 49 additions & 44 deletions beemapi/graphenerpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
import time
import warnings
from .exceptions import (
UnauthorizedError, RPCConnection, RPCError, RPCErrorDoRetry, NumRetriesReached, CallRetriesReached
UnauthorizedError, RPCConnection, RPCError, RPCErrorDoRetry, NumRetriesReached, CallRetriesReached, WorkingNodeMissing
)
from .rpcutils import (
is_network_appbase_ready, sleep_and_check_retries,
is_network_appbase_ready,
get_api_name, get_query
)
from .node import Nodes
from beemgraphenebase.version import version as beem_version
from beemgraphenebase.chains import known_chains

Expand Down Expand Up @@ -123,38 +124,39 @@ def __init__(self, urls, user=None, password=None, **kwargs):
self.rpc_methods = {'offline': -1, 'ws': 0, 'jsonrpc': 1, 'wsappbase': 2, 'appbase': 3}
self.current_rpc = self.rpc_methods["ws"]
self._request_id = 0
if isinstance(urls, str):
url_list = re.split(r",|;", urls)
self.n_urls = len(url_list)
self.urls = cycle(url_list)
if self.urls is None:
self.n_urls = 1
self.urls = cycle([urls])
elif isinstance(urls, (list, tuple, set)):
self.n_urls = len(urls)
self.urls = cycle(urls)
elif urls is not None:
self.n_urls = 1
self.urls = cycle([urls])
else:
self.n_urls = 0
self.urls = None
self.timeout = kwargs.get('timeout', 60)
num_retries = kwargs.get("num_retries", -1)
num_retries_call = kwargs.get("num_retries_call", 5)
self.use_condenser = kwargs.get("use_condenser", False)
self.nodes = Nodes(urls, num_retries, num_retries_call)
if self.nodes.working_nodes_count == 0:
self.current_rpc = self.rpc_methods["offline"]

self.user = user
self.password = password
self.ws = None
self.url = None
self.session = None
self.rpc_queue = []
self.timeout = kwargs.get('timeout', 60)
self.num_retries = kwargs.get("num_retries", -1)
self.use_condenser = kwargs.get("use_condenser", False)
self.error_cnt = {}
self.num_retries_call = kwargs.get("num_retries_call", 5)
self.error_cnt_call = 0
if kwargs.get("autoconnect", True):
self.rpcconnect()

@property
def num_retries(self):
return self.nodes.num_retries

@property
def num_retries_call(self):
return self.nodes.num_retries_call

@property
def error_cnt_call(self):
return self.nodes.error_cnt_call

@property
def error_cnt(self):
return self.nodes.error_cnt

def get_request_id(self):
"""Get request id."""
self._request_id += 1
Expand All @@ -179,14 +181,12 @@ def get_use_appbase(self):

def rpcconnect(self, next_url=True):
"""Connect to next url in a loop."""
if self.urls is None:
if self.nodes.working_nodes_count == 0:
return
while True:
if next_url:
self.url = next(self.urls)
self.error_cnt_call = 0
if self.url not in self.error_cnt:
self.error_cnt[self.url] = 0
self.url = next(self.nodes)
self.nodes.reset_error_cnt_call()
log.debug("Trying to connect to node %s" % self.url)
if self.url[:3] == "wss":
self.ws = create_ws_instance(use_ssl=True)
Expand Down Expand Up @@ -226,9 +226,9 @@ def rpcconnect(self, next_url=True):
except KeyboardInterrupt:
raise
except Exception as e:
self.error_cnt[self.url] += 1
do_sleep = not next_url or (next_url and self.n_urls == 1)
sleep_and_check_retries(self.num_retries, self.error_cnt[self.url], self.url, str(e), sleep=do_sleep)
self.nodes.increase_error_cnt()
do_sleep = not next_url or (next_url and self.nodes.working_nodes_count == 1)
self.nodes.sleep_and_check_retries(str(e), sleep=do_sleep)
next_url = True

def rpclogin(self, user, password):
Expand Down Expand Up @@ -322,22 +322,24 @@ def rpcexec(self, payload):
:raises RPCError: if the server returns an error
"""
log.debug(json.dumps(payload))
if self.nodes.working_nodes_count == 0:
raise WorkingNodeMissing
if self.url is None:
self.rpcconnect()
reply = {}
while True:
self.error_cnt_call += 1
self.nodes.increase_error_cnt_call()
try:
if self.current_rpc == 0 or self.current_rpc == 2:
reply = self.ws_send(json.dumps(payload, ensure_ascii=False).encode('utf8'))
else:
reply = self.request_send(json.dumps(payload, ensure_ascii=False).encode('utf8'))
if not bool(reply):
try:
sleep_and_check_retries(self.num_retries_call, self.error_cnt_call, self.url, "Empty Reply", call_retry=True)
self.nodes.sleep_and_check_retries("Empty Reply", call_retry=True)
except CallRetriesReached:
self.error_cnt[self.url] += 1
sleep_and_check_retries(self.num_retries, self.error_cnt[self.url], self.url, "Empty Reply", sleep=False, call_retry=False)
self.nodes.increase_error_cnt()
self.nodes.sleep_and_check_retries("Empty Reply", sleep=False, call_retry=False)
self.rpcconnect()
else:
break
Expand All @@ -347,12 +349,12 @@ def rpcexec(self, payload):
# self.error_cnt[self.url] += 1
self.rpcconnect(next_url=False)
except ConnectionError as e:
self.error_cnt[self.url] += 1
sleep_and_check_retries(self.num_retries, self.error_cnt[self.url], self.url, str(e), sleep=False, call_retry=False)
self.nodes.increase_error_cnt()
self.nodes.sleep_and_check_retries(str(e), sleep=False, call_retry=False)
self.rpcconnect()
except Exception as e:
self.error_cnt[self.url] += 1
sleep_and_check_retries(self.num_retries, self.error_cnt[self.url], self.url, str(e), sleep=False, call_retry=False)
self.nodes.increase_error_cnt()
self.nodes.sleep_and_check_retries(str(e), sleep=False, call_retry=False)
self.rpcconnect()

ret = {}
Expand Down Expand Up @@ -381,15 +383,15 @@ def rpcexec(self, payload):
ret_list.append(r["result"])
else:
ret_list.append(r)
self.error_cnt_call = 0
self.nodes.reset_error_cnt_call()
return ret_list
elif isinstance(ret, dict) and "result" in ret:
self.error_cnt_call = 0
self.nodes.reset_error_cnt_call()
return ret["result"]
elif isinstance(ret, int):
raise RPCError("Client returned invalid format. Expected JSON! Output: %s" % (str(ret)))
else:
self.error_cnt_call = 0
self.nodes.reset_error_cnt_call()
return ret
return ret

Expand All @@ -404,16 +406,19 @@ def method(*args, **kwargs):
api_name = "condenser_api"

# let's be able to define the num_retries per query
self.num_retries_call = kwargs.get("num_retries_call", self.num_retries_call)
stored_num_retries_call = self.nodes.num_retries_call
self.nodes.num_retries_call = kwargs.get("num_retries_call", stored_num_retries_call)
add_to_queue = kwargs.get("add_to_queue", False)
query = get_query(self.is_appbase_ready() and not self.use_condenser, self.get_request_id(), api_name, name, args)
if add_to_queue:
self.rpc_queue.append(query)
self.nodes.num_retries_call = stored_num_retries_call
return None
elif len(self.rpc_queue) > 0:
self.rpc_queue.append(query)
query = self.rpc_queue
self.rpc_queue = []
r = self.rpcexec(query)
self.nodes.num_retries_call = stored_num_retries_call
return r
return method

0 comments on commit a261c53

Please sign in to comment.