Skip to content

Commit

Permalink
Merge pull request #6 from emre/async-node-selection
Browse files Browse the repository at this point in the history
Make node selection async
  • Loading branch information
emre committed Dec 30, 2021
2 parents a0e38ab + 44ccd5d commit 4cc637c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 51 deletions.
27 changes: 14 additions & 13 deletions docs/gettingstarted.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ That way, lighthive requests a ``get_dynamic_global_properties`` call to the eac

.. code-block:: bash
2021-12-30 15:12:10,203 lighthive INFO Measurements:
https://api.openhive.network: 0.13 [s]
https://rpc.ausbit.dev: 0.14 [s]
https://hive-api.arcange.eu: 0.14 [s]
https://api.deathwing.me: 0.17 [s]
https://rpc.ecency.com: 0.19 [s]
https://api.hive.blue: 0.21 [s]
https://api.pharesim.me: 0.27 [s]
https://hived.emre.sh: 0.28 [s]
https://api.hive.blog: 0.49 [s]
https://techcoderx.com: 0.74 [s]
2021-12-30 15:12:10,203 lighthive INFO Automatic node selection took 2.85 seconds.
2021-12-30 15:12:10,203 lighthive INFO Node set as https://api.openhive.network
2021-12-30 17:20:28,515 lighthive INFO Measurements:
https://rpc.ausbit.dev: 0.12 [s]
https://api.openhive.network: 0.12 [s]
https://hive-api.arcange.eu: 0.12 [s]
https://hived.emre.sh: 0.14 [s]
https://api.deathwing.me: 0.15 [s]
https://rpc.ecency.com: 0.16 [s]
https://api.hive.blue: 0.19 [s]
https://api.pharesim.me: 0.28 [s]
https://api.hive.blog: 0.46 [s]
https://techcoderx.com: 0.77 [s]
2021-12-30 17:20:28,516 lighthive INFO Automatic node selection took 0.81 seconds.
2021-12-30 17:20:28,516 lighthive INFO Node set as https://rpc.ausbit.dev
Since it's a time-consuming operation, this is an optional flag, and it's default is False.
Expand Down
77 changes: 43 additions & 34 deletions lighthive/node_picker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,59 +7,68 @@


async def rpc_request(client, node, call, params):
response = await client.post(
node,
json={
"jsonrpc": "2.0",
"method": call,
"params": params,
"id": str(uuid.uuid4())
}
)
try:
response = await client.post(
node,
json={
"jsonrpc": "2.0",
"method": call,
"params": params,
"id": str(uuid.uuid4())
}
)

if response.status_code != 200:
raise Exception("Invalid status code!")
if response.status_code != 200:
raise Exception("Invalid status code!")

response_json = response.json()
response_json = response.json()

# sanity check
head_block_number = response_json.get("result", {}).get("head_block_number")
if not head_block_number:
raise Exception("Malformed response!")
# sanity check
head_block_number = response_json.get("result", {}).get("head_block_number")
if not head_block_number:
raise Exception("Malformed response!")

if 'error' in response_json:
raise Exception(response.get("error"))
if 'error' in response_json:
raise Exception(response.get("error"))

return response
return response
except Exception as e:
e.url = node
raise
finally:
await client.aclose()


async def compare_nodes(nodes, logger):
start_time = time.time()
node_performance_results = {}
method = "database_api.get_dynamic_global_properties"

args = [rpc_request(httpx.AsyncClient(), node, method, {}) for node in nodes]
responses = await asyncio.gather(*args, return_exceptions=True)

for node in nodes:
async with httpx.AsyncClient() as client:
try:
response = await rpc_request(
client, node, "database_api.get_dynamic_global_properties",
{})
measured_time_in_seconds = response.elapsed.total_seconds()
except Exception as e:
logger.error(e)
measured_time_in_seconds = -1
for response in responses:
if isinstance(response, Exception):
logger.error(response)
measured_time_in_seconds = -1
else:
measured_time_in_seconds = response.elapsed.total_seconds()

node_performance_results[node] = measured_time_in_seconds
node_performance_results[response.url] = measured_time_in_seconds

node_performance_results_sorted = OrderedDict(sorted(node_performance_results.items(), key=lambda x: x[1]))
measurements_in_str = ""
for node, time_elapsed in node_performance_results_sorted.items():
measurements_in_str += f"{node}: {time_elapsed:.2f} [s]\n"
logger.info("Measurements: \n%s", measurements_in_str)
total_time_elapsed = time.time() - start_time
logger.info(f"Automatic node selection took {total_time_elapsed:.2f} seconds.")

return list(node_performance_results_sorted.keys())


def sort_nodes_by_response_time(nodes, logger):
start_time = time.time()
loop = asyncio.get_event_loop()
return loop.run_until_complete(compare_nodes(nodes, logger))
nodes = loop.run_until_complete(compare_nodes(nodes, logger))
total_time_elapsed = time.time() - start_time
logger.info(f"Automatic node selection took {total_time_elapsed:.2f} seconds.")

return nodes
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='lighthive',
version='0.2.9',
version='0.3.0',
packages=find_packages('.'),
url='http://github.com/emre/lighthive',
license='MIT',
Expand Down
11 changes: 8 additions & 3 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,15 @@ def test_sort_nodes_by_response_time(self, rpc_request_mock):

elapsed_mock = unittest.mock.MagicMock()
elapsed_mock.total_seconds.side_effect = [5, 10, 2]
f = asyncio.Future()
f.elapsed = elapsed_mock

rpc_request_mock.return_value = f
def rpc_request(*args, **kwargs):
f = asyncio.Future()
f.elapsed = elapsed_mock
f.url = args[1]

return f

rpc_request_mock.side_effect = rpc_request
nodes = lighthive.node_picker.sort_nodes_by_response_time(test_nodes, unittest.mock.MagicMock())

self.assertListEqual(nodes, ["c", "a", "b"])
Expand Down

0 comments on commit 4cc637c

Please sign in to comment.