<a href="https://colab.research.google.com/github/anilaksu/Algorithmic-Trading-Codes/blob/Web-scraping-with-Asyncio-in-Python/Asynchronous_Python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Asyhcronous Python with Asyncio**


Anil Aksu

Personal e-mail: aaa293@cornell.edu

**Asyncio allows us to concurrent programming in Python**

Some tasks require waiting:

*   HTTP requests
*   Database connections
*   Writing to log files




**Outline:**


1.   Fundamentals
  * Synchronous vs Asynchronous
  * Blocking & Timeouts
  * Scraping with Selenium
  * Asynchronous Functions
  * Asynchronous Iterators
2. Coroutines and Awaitables
  * Coroutine Objects and Async Functions
  * Ways of running coroutines
  * Cancelling coroutines
  * Awaitable Objects
3. Tasks, Futures and the Event Loop
  * Using Task Objects
  * Interacting with the Event Loop
  * The concept of the Future




In [1]:
from google.colab import drive
drive.mount('/content/gdrive')
%cd /content/gdrive/MyDrive/ColabNotebooks/FinanceAlgorithms
!ls # special shell command to view the files in the home directory of the notebook environment

Mounted at /content/gdrive
/content/gdrive/MyDrive/ColabNotebooks/FinanceAlgorithms
 2013-03-08options.csv	      EURUSD_Options_Data.csv	   OptionsTrading.ipynb
 2013-03-08stocks.csv	      EURUSD_Options_Data.gsheet   PriceJump.gdraw
'Asynchronous Python.ipynb'  'ForEx&IndexData.xls'	  'Stock Markets Codes.ipynb'
 async_scrape.py	      local.csv


#**1.Fundamentals**

In [52]:
# Here we install required libraries for asynchronous programming
!python3 -V
!which pip3
!pip3 install requests-html --upgrade --no-cache-dir
!pip3 install selenium --upgrade --no-cache-dir
!pip3 install arsenic --upgrade --no-cache-dir
!pip3 install chromium-chromedriver --upgrade --no-cache-dir
!pip3 install pyuac --upgrade --no-cache-dir
!pip3 install pypiwin32 --upgrade --no-cache-dir

Python 3.10.12
/usr/local/bin/pip3
[31mERROR: Could not find a version that satisfies the requirement chromium-chromedriver (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for chromium-chromedriver[0m[31m
[0mCollecting pyuac
  Downloading pyuac-0.0.3-py2.py3-none-any.whl (11 kB)
Collecting tee (from pyuac)
  Downloading tee-0.0.3.tar.gz (3.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: tee
  Building wheel for tee (setup.py) ... [?25l[?25hdone
  Created wheel for tee: filename=tee-0.0.3-py3-none-any.whl size=3053 sha256=4a896cb1d8c6a094d69a02d84fa7ce27355c2095109fc6c80d305148369ca226
  Stored in directory: /tmp/pip-ephem-wheel-cache-ganwa861/wheels/04/85/14/def7ad66af0388d8f597af90753caaf2c4fd289d4c084162db
Successfully built tee
Installing collected packages: tee, pyuac
Successfully installed pyuac-0.0.3 tee-0.0.3
Collecting pypiwin32
  Downloading pypiwin32-223-py3-none-any.whl (1.7 kB)
INFO: pip 

##1.2 Sync vs Async

It is basically consecutive vs concurrent

**Synchronous Code**

In [2]:
import time # it gives us the run time

iteration_times = [1, 3, 2, 4]

# It emulates the processes that take some time
def sleeper(seconds, i = -1):
  if i != -1:
    print(f"{i}\t{seconds}s")
  time.sleep(seconds)

def run():
  for i, second in enumerate(iteration_times):
    sleeper(second, i=i)

run()

0	1s
1	3s
2	2s
3	4s


**Asynchronous Code**

In [3]:
import asyncio

iteration_times = [1, 3, 2, 4]

async def a_sleeper(seconds, i = -1):
  if i != -1:
    print(f"{i}\t{seconds}s")
  await asyncio.sleep(seconds)

async def a_run():
  results = []
  for i, second in enumerate(iteration_times):
    results.append(
        asyncio.create_task(a_sleeper(second, i=i))
        )
  return results

results = await a_run()

print(results)

[<Task pending name='Task-2' coro=<a_sleeper() running at <ipython-input-3-2040a03a9e27>:5>>, <Task pending name='Task-3' coro=<a_sleeper() running at <ipython-input-3-2040a03a9e27>:5>>, <Task pending name='Task-4' coro=<a_sleeper() running at <ipython-input-3-2040a03a9e27>:5>>, <Task pending name='Task-5' coro=<a_sleeper() running at <ipython-input-3-2040a03a9e27>:5>>]
0	1s
1	3s
2	2s
3	4s


##1.2 Blocking & Timeouts

In [4]:
def sleeper(seconds, i = -1):
  if i != -1:
    print(f"{i}\t{seconds}s")
  time.sleep(seconds)

In [5]:
async def a_sleeper(seconds, i = -1, timeout = 4):
  if i != -1:
    print(f"{i}\t{seconds}s")
  await asyncio.wait_for(asyncio.sleep(seconds), timeout=timeout)

await a_sleeper(3) #Allows us to block tasks similar to cells in this notebook

In [6]:
# Running asynchronously here allows us to run this code cell and still use rest of the notebook except for the result here
loop = asyncio.get_event_loop()
# loop = asyncio.new_event_loop()
# asyncio.run()
loop.create_task(a_sleeper(123))

<Task pending name='Task-8' coro=<a_sleeper() running at <ipython-input-5-9ff22a539f79>:1>>

In [7]:
# Here we can assign status of tasks based on the execution time using timeout
done, pending = await asyncio.wait([a_sleeper(1), a_sleeper(4)], timeout = 2)

  done, pending = await asyncio.wait([a_sleeper(1), a_sleeper(4)], timeout = 2)


In [8]:
done

{<Task finished name='Task-12' coro=<a_sleeper() done, defined at <ipython-input-5-9ff22a539f79>:1> result=None>}

In [9]:
pending

{<Task finished name='Task-11' coro=<a_sleeper() done, defined at <ipython-input-5-9ff22a539f79>:1> exception=TimeoutError()>}

In [86]:
# It will finish the pending task
await asyncio.wait(pending)

({<Task finished name='Task-11' coro=<a_sleeper() done, defined at <ipython-input-5-9ff22a539f79>:1> exception=TimeoutError()>},
 set())

In [85]:
# This returns if a task passes timeout limit
try:
  await asyncio.wait_for(a_sleeper(5), timeout = 3)
except asyncio.TimeoutError:
  print("Task failed")

Task failed


##1.3 Scraping with Selenium

In [100]:
url = 'https://www.spoonflower.com/en/shop?on=fabric%27'

In [101]:
import re
import requests
from requests_html import HTML
import pandas as pd

from selenium import webdriver
from selenium.webdriver.chrome.options import Options

In [102]:
def scraper(url):
  options = Options()
  options.add_argument("--headless")
  options.add_argument("--no-sandbox");
  options.add_argument("--disable-dev-shm-usage");
  driver = webdriver.Chrome(options = options)
  driver.get(url)
  return driver.page_source

def extract_id_slug(url_path):
  regex = r"^[^\s]+/(?P<id>\d+)-(?P<slug>[\w_-]+)$"
  group = re.match(regex, url_path)
  if not group:
    return None, None
  return group['id'], group['slug']

In [103]:
# Here we pull the data from the page
content = scraper(url)
html_r = HTML(html = content)

In [104]:
fabric_links = [x for x in list(html_r.links) if x.startswith("/en/fabric")]   # Links for fabric images

datas = []

for path in fabric_links:
  id_, slug_ = extract_id_slug(path) #product id and product
  data = {
      "id" : id_,
      "slug" : slug_,
      "path" : path,
      "scraped" : 0 # True / False -> 1 / 0
  }
  datas.append(data)
  #print(id_, slug_)

In [105]:
# Here we save it to dataframe
df = pd.DataFrame(datas)
df.head()

Unnamed: 0,id,slug,path,scraped
0,16194112,block-print-tortoise-hare-black-spice-large-sc...,/en/fabric/16194112-block-print-tortoise-hare-...,0
1,6851270,floral-wallroll-dark-by-crumpetsandcrabsticks,/en/fabric/6851270-floral-wallroll-dark-by-cru...,0
2,9575438,rustic-forest-trees-white-woodland-winter-chri...,/en/fabric/9575438-rustic-forest-trees-white-w...,0
3,13488971,retro-whimsy-daisy-flower-power-on-blue-eggshe...,/en/fabric/13488971-retro-whimsy-daisy-flower-...,0
4,9709002,tiger-peacock-large-scale-by-sveta_aho,/en/fabric/9709002-tiger-peacock-large-scale-b...,0


In [93]:
# Here we save it to the local
df.to_csv("local.csv", index = False)
df = pd.read_csv("local.csv")
df.head(5)

Unnamed: 0,id,slug,path,scraped
0,16194112,block-print-tortoise-hare-black-spice-large-sc...,/en/fabric/16194112-block-print-tortoise-hare-...,0
1,6851270,floral-wallroll-dark-by-crumpetsandcrabsticks,/en/fabric/6851270-floral-wallroll-dark-by-cru...,0
2,9575438,rustic-forest-trees-white-woodland-winter-chri...,/en/fabric/9575438-rustic-forest-trees-white-w...,0
3,13488971,retro-whimsy-daisy-flower-power-on-blue-eggshe...,/en/fabric/13488971-retro-whimsy-daisy-flower-...,0
4,9709002,tiger-peacock-large-scale-by-sveta_aho,/en/fabric/9709002-tiger-peacock-large-scale-b...,0


##1.4 Asynchronous Functions

In [116]:
import asyncio

# This is how we define async function
async def main():
  print('hello...')
  await asyncio.sleep(2) # Here we give a pause for 2 seconds
  print('...world')
  return 'bye'


# To run an asynchronous function, we need to run as follows:
await main()

# Here we create a coroutine object
# result =  main()
# try:
#   result.send(None)
# except StopIteration as e:
#   print('result was:', e.value)

hello...
...world


'bye'

##1.5 Asynchronous Iterators

In [132]:
# Asynchronous For loop
import asyncio
import random

class EggBoiler:
  def __init__(self, amount):
    self.eggs = iter(range(1,amount+1)) # Here we set # of eggs

  # Iteration function
  def __aiter__(self):
    return self

  # Exit function
  async def __anext__(self):
    try:
      egg = next(self.eggs)
    except StopIteration:
      raise StopAsyncIteration
    return self.boil(egg)

  # Sleep function
  async def boil(self, egg):
    await asyncio.sleep(random.randint(2, 5))
    print(f'Egg #{egg} is boiling')

# This is how we define async function
async def main_1():
  async for egg in EggBoiler(4):
      await egg

# This is function does it all at once
async def main_2():
  eggs = []
  async for egg in EggBoiler(4):
      eggs.append(egg)
  print('We wait for the eggs to boil...')
  await asyncio.gather(*eggs)

# To run an asynchronous function, we need to run as follows:
await main_2()

We wait for the eggs to boil...
Egg #1 is boiling
Egg #4 is boiling
Egg #2 is boiling
Egg #3 is boiling


In [134]:
# The @asynccontextmanager Decorator
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def connection():
  print('Setting up connection')
  await asyncio.sleep(1)
  yield{ 'drive': 'sqlite' }
  await asyncio.sleep(1)
  print('Shutting down')

async def main():
  async with connection() as db:
    print(db, 'is ready')

await main()

Setting up connection
{'drive': 'sqlite'} is ready
Shutting down


In [135]:
# The Asynchronous Context Manager Classes
import asyncio

class Connection:
  async def __aenter__(self):
    print('Setting up a connection')
    await asyncio.sleep(1)
    return {'driver' : 'sqlite'}

  async def __aenter__(self, exc_type, exc, tb):
    await asyncio.sleep(1)
    print('Connection is closed')

async def main():
  async with connection() as db:
    print(db, 'is ready')

await main()

Setting up connection
{'drive': 'sqlite'} is ready
Shutting down


In [141]:
# The Asynchronous Generators
import asyncio

async def download(urls):
  for url in urls:
    await asyncio.sleep(1)
    response = {'status': 200, 'data': f'content of {url}'}
    if url == 'bing.com':
      response['status'] = 500
    yield response

async def main():
  urls = [
      'google.com',
      'bing.com',
      'duckduckgo.com'
  ]

  responses = [value async for value in download(urls) if value['status'] != 200]
  print(responses)

await main()

[{'status': 500, 'data': 'content of bing.com'}]


#**2. Coroutines & Awaitables**

##2.1 Coroutines Objects and Async Functions

In [143]:
import asyncio
import inspect

async def main():
  pass

print(type(main))
print(inspect.iscoroutinefunction(main))
print(dir(main()))

<class 'function'>
True
['__await__', '__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'cr_await', 'cr_code', 'cr_frame', 'cr_origin', 'cr_running', 'send', 'throw']


  print(dir(main()))


##2.2 Ways of Running Coroutines

In [145]:
import asyncio
import inspect

async def main():
  print('The main function')

# 1. using await
await main()
# 2. creating a task
asyncio.create_task(main())

The main function


<Task pending name='Task-58' coro=<main() running at <ipython-input-145-bc733164109c>:4>>

The main function


##2.3 Cancelling Coroutines

In [146]:
import asyncio

async def stopwatch():
  count = 0
  while True:
      await asyncio.sleep(1)
      count += 1
      print(count)

async def main():
  task = asyncio.create_task(stopwatch())
  await asyncio.sleep(3)
  task.cancel()   # Here we say "Cancel the task after 3rd second"

await main()

1
2


##2.4 Awaitable Objects

In [147]:
import asyncio

class Stopwatch:
  def __await__(self):
    yield

async def main():
  await Stopwatch()

await main()

#**3. Tasks, Futures and the Event Loop**

##3.1 Using Task Objects

In [150]:
import asyncio

async def stopwatch():
  count = 0
  while count < 4:
      await asyncio.sleep(1)
      count += 1
      print(count)

def callb(task):
  print('task is done', task)

async def main():
  task = asyncio.create_task(stopwatch())
  task.set_name('My Task')
  task.add_done_callback(callb)
  print(task.get_name())
  print(task.get_coro())
  print(task.get_name())
  await task

await main()

My Task
<coroutine object stopwatch at 0x7ea2787a4660>
My Task
1
2
3
4
task is done <Task finished name='My Task' coro=<stopwatch() done, defined at <ipython-input-150-b87e2fe8d0a6>:3> result=None>


##3.2 Interacting with the Event Loop:

In [151]:
import asyncio

async def stopwatch():
  count = 0
  while count < 4:
      await asyncio.sleep(1)
      count += 1
      print(count)

def callb(task):
  print('task is done', task)

async def main():
  print(asyncio.get_running_loop())
  task = asyncio.create_task(stopwatch())
  task.add_done_callback(callb)

  await task

await main()

<_UnixSelectorEventLoop running=True closed=False debug=False>
1
2
3
4
task is done <Task finished name='Task-69' coro=<stopwatch() done, defined at <ipython-input-151-f16058d61056>:3> result=None>


##3.3 The concept of the future:

In [152]:
import asyncio

async def stopwatch():
  count = 0
  while count < 4:
      await asyncio.sleep(1)
      count += 1
      print(count)

def callb(task):
  print('task is done', task)

async def main():
  task = asyncio.create_task(stopwatch())
  task.add_done_callback(callb)
  print(asyncio.isfuture(task))

  await task

await main()

True
1
2
3
4
task is done <Task finished name='Task-71' coro=<stopwatch() done, defined at <ipython-input-152-fd9b0fb67711>:3> result=None>
