# "Asynchronous I/O"
> "Experimenting with the asyncio library to improve API and database storage performance"

- toc: false
- branch: master
- comments: true
- categories: [project, ETL]
- hide: false


[Asyncio](https://docs.python.org/3/library/asyncio.html) is a python library which allows the execution of code concurrently through the use of coroutines. Coroutines are functions that can suspend their execution and pass control of the program to another coroutine while they wait for something to happen. This is powerful because it allows the program to continue executing lines of code elsewhere in the program while it waits for some event such as an API response to occur. 

In this example I am making 214 requests to the Alpaca API and downloading stock price data in 15 minute intervals for roughly two months for each request. I then use the data to insert over 210,000 records into a PostgreSQL database. Using synchronous code, this took 203.6 seconds to complete compared to 32.8 seconds using coroutines with the asyncio library. This marks a 520% improvement in performance.

## Syntax and examples
To declare a coroutine, the ```async``` keyword is used.

To pass control to another coroutine, the ```await``` keyword is used.

The ```asyncio.run()``` call creates an event loop and runs the coroutine inside it. The event loop is like a ```while True``` statement which is constantly running and checking in to see if coroutines have finished or received a response in the case of a network request.

In the execution below, because the ```await``` keyword is used before the sleep argument, the program continues to run other lines while it waits for the sleep to finish.

In [19]:
import asyncio
import time

async def main():
  print(f"Started at {time.strftime('%X')}")
  print('Hello')
  await asyncio.sleep(1)
  print('World')

  print(f"Finished at {time.strftime('%X')}")

asyncio.run(main())

Started at 05:58:09
Hello
World
Finished at 05:58:10


Here, the gather function is used to call the ```say_after``` function and runs it concurrently with two different inputs. Even though the print statement "...World" is sent first, it ends up being printed last because it has a larger delay parameter and the function is running concurrently with both inputs.

In [16]:
async def say_after(delay, text):
  await asyncio.sleep(delay)
  print(text)

async def main():
  await asyncio.gather(
    say_after(2, '...World'),
    say_after(1, 'Hello...')
  )

asyncio.run(main())

Hello...
...World


## Async stock price dump
Below is the script I used to asynchronously get my data and insert it into my database with a few explanatory comments. Along with asyncio, I needed aiohttp which is a http client/server for asyncio and asyncpg which is the psycopg2 equivalent when using asyncio.

In [None]:
import json
import requests
import datetime, time
import aiohttp, asyncpg

headers = {
    "APCA-API-KEY-ID": config.API_KEY,
    "APCA-API-SECRET-KEY": config.API_SECRET
  }

async def write_to_db(connection, params):
    await connection.copy_records_to_table('stock_price', records=params)


async def get_price(pool, stock_id, url):
    try:
        async with pool.acquire() as connection:
            async with aiohttp.ClientSession(headers=headers) as session:
                async with session.get(url=url) as response:
                    resp = await response.read()
                    response = json.loads(resp)
                    for key in response.keys():
                        # final list of data is created and dispatched to the database
                        params = [(stock_id, datetime.datetime.fromtimestamp(bar['t'] / 1000.0), round(bar['o'], 2), round(bar['h'], 2), round(bar['l'], 2), round(bar['c'], 2), bar['v']) for bar in response[key]]
                    await write_to_db(connection, params)

    except Exception as e:
        print("Unable to get url {} due to {}.".format(url, e.__class__))


async def get_prices(pool, symbol_urls):
    try:
        # schedule aiohttp requests to run concurrently for all stocks
        # like above, gather is used to send many inputs to a coroutine and run then all concurrently
        ret = await asyncio.gather(*[get_price(pool, stock_id, symbol_urls[stock_id]) for stock_id in symbol_urls])
        print("Finalized all. Returned  list of {} outputs.".format(len(ret)))
    except Exception as e:
        print(e)



async def get_stocks():
    # create database connection pool (this is a shared pool of database connections so that many inserts can happen concurrently)
    pool = await asyncpg.create_pool(user=config.DB_USER, password=config.DB_PASS, database=config.DB_NAME,
                                     host=config.DB_HOST, command_timeout=60)

    # get a connection
    async with pool.acquire() as connection:
        stocks = await connection.fetch("SELECT * FROM stock WHERE id IN (SELECT holding_id FROM etf_holding)")

        symbol_urls = {}
        for stock in stocks:
          # create a dictionary of urls for the API calls
            symbol_urls[stock[
                'id']] = f'https://data.alpaca.markets/v1/bars/15Min?symbols={stock["symbol"]}&limit=1000'


    await get_prices(pool, symbol_urls)

start = time.time()
asyncio.run(get_stocks())
end = time.time()

print(f'Took {end-start} seconds to run.')

##Synchronous run time


![picture](https://drive.google.com/uc?id=13v6MLeFAPlMQDoN8eNkFULu7pdsGffWm)


##Asynchronous run time

![picture](https://drive.google.com/uc?id=1k3WnrHn2BbDqWnGeUXq0ceRSzcdBcsIb)