In [13]:
import asyncio
import aiohttp
from aiohttp import ClientSession

import time
import json

from itertools import cycle
import numpy as np
import pandas as pd
import random 

from IPython.display import display, clear_output

In [None]:
# ANSI colors
c = (
    "\033[0m",   # End of color
    "\033[36m",  # Cyan
    "\033[91m",  # Red
    "\033[35m",  # Magenta
)

async def makerandom(idx: int, threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}).")
    i = random.randint(0, 10)
    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
        await asyncio.sleep(idx + 1)
        i = random.randint(0, 10)
    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    return i

async def main():
    res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
    return res

In [None]:
random.seed(444)
#r1, r2, r3 = asyncio.run(main())
r1, r2, r3 = await main()
print('')
print(f"r1: {r1}, r2: {r2}, r3: {r3}")

---

In [None]:
async with ClientSession() as session:
    res = await session.request(method="GET", url='https://api.binance.com/api/v3/depth?symbol=BTCEUR&limit=5')

In [None]:
res.status

In [None]:
async def gather_symbol(symbol):
    url = f'https://api.binance.com/api/v3/depth?symbol={symbol}&limit=5'
    async with ClientSession() as session:
        res = await session.request(method="GET", url=url)
        if res.status != 200:
            print(res.status)
            res.raise_for_status()
            return {}
        res_json = await res.text()
        return (symbol, json.loads(res_json))

In [None]:
time_i = time.perf_counter()

symbol_list = ['BTCEUR', 'ETHBTC', 'ADABTC']*100

responses = await asyncio.gather(*(gather_symbol(symbol) for symbol in symbol_list))
time_e = time.perf_counter()
print(f'{time_e-time_i:.4} seconds')
######
responses

---

In [None]:
def endless_loop(iterable):
    yield from cycle(iterable)

In [None]:
async def get_symbol_prices(symbol):
    binance_uri = f'https://api.binance.com/api/v3/depth?symbol={symbol}&limit=5'

    async with ClientSession() as session:
        res = await session.request(method="GET", url=binance_uri)
        timestamp = time.perf_counter()
        res_json = await res.text()
    JSON_object = json.loads(res_json)

    prices = [float(price) for price, quantity in JSON_object['bids']]
    weights = [float(quantity) for price, quantity in JSON_object['bids']]
    buy = np.average(prices, weights=weights)

    prices = [float(price) for price, quantity in JSON_object['asks']]
    weights = [float(quantity) for price, quantity in JSON_object['asks']]
    sell = np.average(prices, weights=weights)

    return buy, sell, timestamp

In [None]:
async def gather_symbols_endless_loop(symbol_dict):
    symbol_list = symbol_dict.keys()
    for sym in endless_loop(symbol_list):
        if not symbol_dict[sym][3]:
            symbol_dict[sym][3] = True
            symbol_dict[sym][:3] = await get_symbol_prices(sym)
            symbol_dict[sym][3] = False

In [None]:
symbol_list = ['BTCEUR', 'ETHBTC', 'ADABTC']
symbol_dict = {s:[0,0,0,False] for s in symbol_list}

future = asyncio.gather(gather_symbols_endless_loop(symbol_dict))

In [None]:
symbol_dict

In [None]:
df_prices = pd.DataFrame(symbol_dict, index=['buy', 'sell', 'timestamp', 'updating']).T.reset_index().rename(columns={'index':'symbol'})
df_prices['timestamp_delta'] = df_prices['timestamp'] - time.perf_counter()
df_prices

In [None]:
future.cancel()

----

# Test asyncio data structure memory manage

In [None]:
async def add_one(obj, q):
    obj += 1
    await asyncio.sleep(random.random())
    await q.put(obj)
    
io_queue = asyncio.Queue()
asyncio.gather(*(add_one(i, io_queue) for i in range(10)))
for i in range(10):    
    queue_elements = []
    while True:
        try:
            queue_elements.append(io_queue.get_nowait())
        except asyncio.QueueEmpty:
            break
    print(queue_elements)
    await asyncio.sleep(0.5)

In [4]:
async def add_random(queue,key):
    while True:
        rand = random.random()
        await asyncio.sleep(rand)
        queue.put_nowait((key,rand))
    
n = 3
io_queue = asyncio.Queue()
future = asyncio.gather(*(add_random(io_queue, i) for i in range(n)))
mydict = {i:0 for i in range(n)}
while True:
    # get all current elements on queue and print them forever
    try:
        queue_elements = []
        while True:
            try:
                queue_elements.append(io_queue.get_nowait())
            except asyncio.QueueEmpty:
                break
        for k,v in queue_elements:
            mydict[k] += v
        print(mydict, len(queue_elements))
        await asyncio.sleep(0.5)
    except Exception as e:
        future.cancel()
        break

{0: 0, 1: 0, 2: 0} 0
{0: 0, 1: 0, 2: 0.2849734772016188} 1
{0: 0.7836548310890649, 1: 0.779075155507138, 2: 0.8771648356430297} 5
{0: 1.0775043874587005, 1: 1.470960908904119, 2: 0.8771648356430297} 2
{0: 1.6971902274935036, 1: 1.7275470501851162, 2: 1.5932142586408564} 6
{0: 2.3736612217605657, 1: 2.2101989143456153, 2: 2.4259267889410463} 7
{0: 2.3736612217605657, 1: 2.651203548995049, 2: 2.7355008621095016} 2
{0: 3.252124979585039, 1: 2.651203548995049, 2: 3.427983138801674} 2
{0: 3.796885765311286, 1: 3.8409973455929896, 2: 3.8282353089648358} 6
{0: 4.442736838027203, 1: 4.200118615731745, 2: 3.8282353089648358} 3
{0: 4.851711568403019, 1: 4.980993041721247, 2: 4.650689051213988} 3
{0: 5.184438746406212, 1: 4.980993041721247, 2: 5.3258844912727} 2
{0: 5.184438746406212, 1: 5.799734817352582, 2: 5.808308841688194} 5
{0: 6.144078633796583, 1: 6.138236989664692, 2: 6.385092635874815} 4


_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError


In [None]:
async def add_dict(d,k):
    while True:
        rand = random.random()
        await asyncio.sleep(rand)
        d[k] += rand
    
n = 3
mydict = {i:0 for i in range(n)}
future = asyncio.gather(*(add_dict(mydict, i) for i in range(n)))
while True:
    try:
        print(mydict)
        await asyncio.sleep(0.5)
    except Exception as e:
        future.cancel()
        break

---

In [18]:
binance_base = "https://<>.binance.com"
binance_subdomains = ["api", "api1", "api2", "api3"]

binance_url = binance_base.replace('<>', binance_subdomains[0])

binance_endpoints = {
    'ping': ('GET', '/api/v3/ping'),
    'server_time': ('GET', '/api/v3/time'),
    'exchange_info': ('GET', '/api/v3/exchangeInfo'),
    'order_book': ('GET', '/api/v3/depth', {'symbol': True, 'limit': False}),
    'recent_trades': ('GET', '/api/v3/trades', {'symbol': True, 'limit': False}),
    'average_price': ('GET', '/api/v3/avgPrice', {'symbol': True}),
    'price': ('GET', '/api/v3/ticker/price', {'symbol': False}),
    'best_book_price': ('GET', '/api/v3/ticker/bookTicker', {'symbol': False})
}

async def get_symbol_prices(q, symbol):
    binance_uri = binance_url + binance_endpoints['order_book'][1] + f'?symbol={symbol}&limit=5'

    while True:
        async with ClientSession() as session:
            res = await session.request(method="GET", url=binance_uri)
            if res.status != 200:
                print(res.status, dict(res.headers))
                res.raise_for_status()
            timestamp = time.perf_counter()
            res_json = await res.text()
        JSON_object = json.loads(res_json)

        prices = [float(price) for price, quantity in JSON_object['bids']]
        weights = [float(quantity) for price, quantity in JSON_object['bids']]
        buy = np.average(prices, weights=weights)

        prices = [float(price) for price, quantity in JSON_object['asks']]
        weights = [float(quantity) for price, quantity in JSON_object['asks']]
        sell = np.average(prices, weights=weights)

        await q.put((symbol, buy, sell, timestamp))
    
symbols = ['ADABTC', 'BTCEUR', 'ETHBTC']
io_queue = asyncio.Queue()
future = asyncio.gather(*(get_symbol_prices(io_queue, s) for s in symbols))
mydict = {s:0 for s in symbols}
while True:
    # get all current elements on queue and print them forever
    try:
        queue_elements = []
        while True:
            try:
                queue_elements.append(io_queue.get_nowait())
            except asyncio.QueueEmpty:
                break
        for t in queue_elements:
            k = t[0]
            v = t[1:]
            mydict[k] = v
        print(mydict, len(queue_elements))
        await asyncio.sleep(0.5)
    except Exception as e:
        future.cancel()
        break

{'ADABTC': 0, 'BTCEUR': 0, 'ETHBTC': 0} 0
{'ADABTC': (2.0449916249616997e-05, 2.051752625970467e-05, 190118.850191518), 'BTCEUR': 0, 'ETHBTC': (0.035414311021090536, 0.03542945771981489, 190118.838418096)} 2
{'ADABTC': (2.0449916249616997e-05, 2.051752625970467e-05, 190118.850191518), 'BTCEUR': (50280.904172974544, 50299.097249502185, 190119.269616284), 'ETHBTC': (0.035414311021090536, 0.035429447928155035, 190119.496353979)} 3
{'ADABTC': (2.045155226852299e-05, 2.051752625970467e-05, 190119.918447046), 'BTCEUR': (50280.904172974544, 50299.097249502185, 190119.269616284), 'ETHBTC': (0.035414311021090536, 0.035429440113320394, 190119.802934451)} 3
{'ADABTC': (2.0453019524671832e-05, 2.051752625970467e-05, 190120.234278015), 'BTCEUR': (50281.953475947936, 50299.588822968784, 190120.347450313), 'ETHBTC': (0.03541478955147154, 0.03542943075713912, 190120.439120732)} 5
{'ADABTC': (2.0464342418077988e-05, 2.0521711108302792e-05, 190120.857223163), 'BTCEUR': (50282.04497411795, 50297.97748009

_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
