Skip to content

Commit

Permalink
switch from tornado http client to aiohttp (with statement support, m…
Browse files Browse the repository at this point in the history
…uch lower object usage)
  • Loading branch information
hclivess committed Feb 7, 2023
1 parent 7d75fd9 commit 77fc179
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 136 deletions.
81 changes: 33 additions & 48 deletions compounder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from urllib.parse import quote

import msgpack
from tornado.httpclient import AsyncHTTPClient

import aiohttp
from ops.data_ops import sort_list_dict
from ops.log_ops import get_logger
"""this module is optimized for low memory and bandwidth usage"""
Expand All @@ -21,24 +20,20 @@ async def get_list_of(key, peer, port, fail_storage, logger, semaphore, compress
url_construct = f"http://{peer}:{port}/{key}"

try:
http_client = AsyncHTTPClient()
async with semaphore:
response = await http_client.fetch(url_construct, request_timeout=5)

if compress == "msgpack":
fetched = msgpack.unpackb(response.body)
else:
fetched = json.loads(response.body.decode())[key]
return fetched


async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
if compress == "msgpack":
fetched = msgpack.unpackb(await (response.read()))
else:
fetched = json.loads(await response.text())[key]
return fetched

except Exception as e:
if peer not in fail_storage:
logger.info(f"Compounder: Failed to get {key} of {peer} from {url_construct}: {e}")
fail_storage.append(peer)
finally:
del http_client


async def compound_get_list_of(key, entries, port, logger, fail_storage, semaphore, compress=None):
"""returns a list of lists of raw peers from multiple peers at once"""
Expand Down Expand Up @@ -67,25 +62,19 @@ async def get_url(peer, port, url, logger, fail_storage, semaphore, compress=Non
"""method compounded by compound_get_url"""

url_construct = f"http://{peer}:{port}/{url}"

try:
http_client = AsyncHTTPClient()
async with semaphore:
response = await http_client.fetch(url_construct, request_timeout=5)

fetched = response.body.decode()

return peer, fetched

async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
fetched = await response.text()
return peer, fetched

except Exception as e:
if peer not in fail_storage:
logger.info(f"Compounder: Failed to get URL {url_construct}: {e}")
fail_storage.append(peer)

finally:
del http_client


async def compound_get_url(ips, port, url, logger, fail_storage, semaphore, compress=None):
"""returns result of urls with arbitrary data past slash"""
result = list(
Expand All @@ -108,18 +97,17 @@ async def send_transaction(peer, port, logger, fail_storage, transaction, semaph
url_construct = f"http://{peer}:{port}/submit_transaction?data={quote(json.dumps(transaction))}"

try:
http_client = AsyncHTTPClient()
async with semaphore:
response = await http_client.fetch(url_construct, request_timeout=5)
fetched = json.loads(response.body)["message"]
return peer, fetched

async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
fetched = json.loads(await response.text())["message"]
return peer, fetched

except Exception as e:
if peer not in fail_storage:
logger.info(f"Compounder: Failed to send transaction to {url_construct}: {e}")
fail_storage.append(peer)
finally:
del http_client

async def compound_send_transaction(ips, port, logger, fail_storage, transaction, semaphore, compress=None):
"""returns a list of dicts where ip addresses are keys"""
Expand All @@ -145,26 +133,24 @@ async def get_status(peer, port, logger, fail_storage, semaphore, compress=None)
url_construct = f"http://{peer}:{port}/status?compress={compress}"
else:
url_construct = f"http://{peer}:{port}/status"

try:
http_client = AsyncHTTPClient()
async with semaphore:
response = await http_client.fetch(url_construct, request_timeout=5)

if compress == "msgpack":
fetched = msgpack.unpackb(response.body)
else:
fetched = json.loads(response.body.decode())

async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
if compress == "msgpack":
fetched = msgpack.unpackb(await response.read())
else:
fetched = json.loads(await response.text())

return peer, fetched
return peer, fetched

except Exception as e:
if peer not in fail_storage:
logger.info(f"Compounder: Failed to get status from {url_construct}: {e}")
fail_storage.append(peer)

finally:
del http_client

async def compound_get_status_pool(ips, port, logger, fail_storage, semaphore, compress=None):
"""returns a list of dicts where ip addresses are keys"""
result = list(
Expand All @@ -189,18 +175,17 @@ async def announce_self(peer, port, my_ip, logger, fail_storage, semaphore):
)

try:
http_client = AsyncHTTPClient()
async with semaphore:
response = await http_client.fetch(url_construct, request_timeout=5)
fetched = response.body.decode()
return fetched

async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
fetched = await response.text()
return fetched

except Exception:
if peer not in fail_storage:
# logger.info(f"Failed to announce self to {url_construct}: {e}")
fail_storage.append(peer)
finally:
del http_client


async def compound_announce_self(ips, port, my_ip, logger, fail_storage, semaphore):
Expand Down
104 changes: 51 additions & 53 deletions ops/block_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import msgpack
import requests
from tornado.httpclient import AsyncHTTPClient

import aiohttp
from .account_ops import get_account_value
from config import get_timestamp_seconds, get_config
from .data_ops import set_and_sort, average, get_home
Expand Down Expand Up @@ -317,19 +317,19 @@ def construct_block(

async def knows_block(target_peer, port, hash, logger):
try:
http_client = AsyncHTTPClient()
url = f"http://{target_peer}:{port}/get_block?hash={hash}"
result = await http_client.fetch(url, request_timeout=5)
if result.code == 200:
return True
else:
return False
url_construct = f"http://{target_peer}:{port}/get_block?hash={hash}"


async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
if response.status == 200:
return True
else:
return False

except Exception as e:
logger.error(f"Failed to check block {hash} from {target_peer}: {e}")
return False
finally:
del http_client


def update_child_in_latest_block(child_hash, logger, parent):
Expand All @@ -344,73 +344,71 @@ def update_child_in_latest_block(child_hash, logger, parent):


async def get_blocks_after(target_peer, from_hash, logger, count=50, compress="msgpack"):

try:
http_client = AsyncHTTPClient()
url = f"http://{target_peer}:{get_config()['port']}/get_blocks_after?hash={from_hash}&count={count}&compress={compress}"
result = await http_client.fetch(url, request_timeout=5)
code = result.code

if code == 200 and compress == "msgpack":
read = result.body
return msgpack.unpackb(read)
elif code == 200:
text = result.body.decode()
return json.loads(text)["blocks_after"]
else:
return False
url_construct = f"http://{target_peer}:{get_config()['port']}/get_blocks_after?hash={from_hash}&count={count}&compress={compress}"

async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
code = response.status

if code == 200 and compress == "msgpack":
read = response.read()
return msgpack.unpackb(await read)
elif code == 200:
text = response.text()
return json.loads(await text)["blocks_after"]
else:
return False

except Exception as e:
logger.error(f"Failed to get blocks after {from_hash} from {target_peer}: {e}")
finally:
del http_client


async def get_blocks_before(target_peer, from_hash, count=50, compress="true"):
try:
http_client = AsyncHTTPClient()
url = f"http://{target_peer}:{get_config()['port']}/get_blocks_before?hash={from_hash}&count={count}&compress={compress}"
result = await http_client.fetch(url, request_timeout=5)

code = result.code

if code == 200 and compress == "msgpack":
read = result.body
return msgpack.unpackb(read)
elif code == 200:
text = result.body.decode()
return json.loads(text)["blocks_before"]
else:
return False
url_construct = f"http://{target_peer}:{get_config()['port']}/get_blocks_before?hash={from_hash}&count={count}&compress={compress}"


async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
code = response.status

if code == 200 and compress == "msgpack":
read = response.read()
return msgpack.unpackb(await read)
elif code == 200:
text = response.text()
return json.loads(await text)["blocks_before"]
else:
return False

except Exception as e:
logger.error(f"Failed to get blocks before {from_hash} from {target_peer}: {e}")
return False
finally:
del http_client


async def get_from_single_target(key, target_peer, logger) -> list: # todo add msgpack support
"""obtain from a single target, returns list"""

try:
http_client = AsyncHTTPClient()
url = f"http://{target_peer}:{get_config()['port']}/{key}"
result = await http_client.fetch(url, request_timeout=5)
text = result.body.decode()
code = result.code
url_construct = f"http://{target_peer}:{get_config()['port']}/{key}"

if code == 200:
return json.loads(text)[key]
else:
return []

async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
text = response.text()
code = response.status

if code == 200:
return json.loads(await text)[key]
else:
return []

except Exception as e:
logger.error(f"Failed to get {key} from {target_peer}: {e}")
return []

finally:
del http_client


def get_ip_penalty(producer, logger, blocks_backward=50):
"""calculates how many blocks an ip received over a given period"""
Expand Down
42 changes: 21 additions & 21 deletions ops/peer_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from hashing import base64encode, blake2b_hash
from .key_ops import load_keys


import aiohttp

def validate_dict_structure(dictionary: dict, requirements: list) -> bool:
if not all(key in requirements for key in dictionary):
Expand All @@ -41,25 +41,25 @@ def update_local_address(logger):


async def get_remote_status(target_peer, logger) -> [dict, bool]: # todo add msgpack support

try:
url = f"http://{target_peer}:{get_port()}/status"
http_client = AsyncHTTPClient()
result = await http_client.fetch(url, request_timeout=5)
text = result.body.decode()
code = result.code
url_construct = f"http://{target_peer}:{get_port()}/status"

if code == 200:
return json.loads(text)
else:
return False

async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
text = response.text()
code = response.status

if code == 200:
return json.loads(await text)
else:
return False

except Exception as e:
logger.error(f"Failed to get status from {target_peer}: {e}")
return False

finally:
del http_client


def delete_peer(ip, logger):
peer_path = f"{get_home()}/peers/{base64encode(ip)}.dat"
Expand Down Expand Up @@ -321,16 +321,16 @@ def check_ip(ip):

async def get_public_ip(logger):
urls = ["https://api.ipify.org", "https://ipinfo.io/ip"]
for url in urls:

for url_construct in urls:
try:
http_client = AsyncHTTPClient()
ip = await http_client.fetch(url, request_timeout=5)
return ip.body.decode()
except Exception as e:
logger.error(f"Unable to fetch IP from {url}: {e}")
finally:
del http_client
async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
async with session.get(url_construct) as response:
ip = await response.text()
return ip

except Exception as e:
logger.error(f"Unable to fetch IP from {url_construct}: {e}")

def update_local_ip(ip, logger):
old_ip = get_config()["ip"]
Expand Down
Loading

0 comments on commit 77fc179

Please sign in to comment.