<a href="https://colab.research.google.com/github/AnupJoseph/adv-python/blob/master/Concurrency.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install aiohttp

Collecting aiohttp
[?25l  Downloading https://files.pythonhosted.org/packages/7c/39/7eb5f98d24904e0f6d3edb505d4aa60e3ef83c0a58d6fe18244a51757247/aiohttp-3.6.2-cp36-cp36m-manylinux1_x86_64.whl (1.2MB)
[K     |████████████████████████████████| 1.2MB 2.8MB/s 
Collecting yarl<2.0,>=1.0
[?25l  Downloading https://files.pythonhosted.org/packages/a0/b4/2cbeaf2c3ea53865d9613b315fe24e78c66acedb1df7e4be4e064c87203b/yarl-1.5.1-cp36-cp36m-manylinux1_x86_64.whl (257kB)
[K     |████████████████████████████████| 266kB 16.2MB/s 
[?25hCollecting idna-ssl>=1.0; python_version < "3.7"
  Downloading https://files.pythonhosted.org/packages/46/03/07c4894aae38b0de52b52586b24bf189bb83e4ddabfe2e2c8f2419eec6f4/idna-ssl-1.1.0.tar.gz
Collecting async-timeout<4.0,>=3.0
  Downloading https://files.pythonhosted.org/packages/e1/1e/5a4441be21b0726c4464f3f23c8b19628372f606755a9d2e46c187e65ec4/async_timeout-3.0.1-py3-none-any.whl
Collecting multidict<5.0,>=4.5
[?25l  Downloading https://files.pythonhosted.org/pack

In [None]:
# I/O Bound operations speedup

In [2]:
import requests
import time
import concurrent.futures
import threading
import asyncio
import aiohttp
import multiprocessing

In [3]:
# Non-concurrent program to download 160 sites
def download_site(url,session):
  with session.get(url) as response:
    print(f"Read {len(response.content)} from {url}")

def download_all_sites(sites):
  with requests.Session() as session:
    for url in sites:
      download_site(url,session)

if __name__ == "__main__":
  sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice"
  ]*80
  start_time = time.time()
  download_all_sites(sites)
  duration = time.time() - start_time
  print(f"Downloaded {len(sites)} in {duration} seconds")

Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jyth

In [4]:
thread_local = threading.local()

def get_session():
  if not hasattr(thread_local,"session"):
    thread_local_session = requests.Session()
  return thread_local_session

def download_site(url):
  session = get_session()
  with session.get(url) as response:
    print(f"Read {len(response.content)} from {url}")

def download_all_sites(sites):
  with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(download_site,sites)

if __name__ == "__main__":
  sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice"
  ]*80
  start_time = time.time()
  download_all_sites(sites)
  duration = time.time() - start_time
  print(f"Downloaded {len(sites)} in {duration} seconds")

Read 275 from http://olympus.realpython.org/diceRead 275 from http://olympus.realpython.org/dice

Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 10267 from https://www.jython.org
Read 10267 from https://www.jython.org
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 275 from http://olympus.realpython.org/dice
Read 275 from http://olympus.realpython.org/dice
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 275 from http://olympus.realpython.org/diceRead 10267 from https://www.jython.org

Read 275 from http://olympus.realpython.org/dice
Read 275 from http://olympus.realpython.org/dice
Read 10267 from https://www.jython.org
Read 10267 from https:

In [None]:
# Asyncio version has some unfixed erros
async def download_site(session, url):
    async with session.get(url) as response:
        print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} sites in {duration} seconds")

In [6]:
# Concurrent version
session = None

def set_global_session():
  global session
  if not session:
    session = requests.Session()

def download_site(url):
  with session.get(url) as response:
    name = multiprocessing.current_process().name
    print(f"{name}:Read {len(response.content)} from {url}")

def download_all_sites(sites):
  with multiprocessing.Pool(initializer = set_global_session) as pool:
    pool.map(download_site, sites)

if __name__ == "__main__":
  sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
  ] * 80
  start_time = time.time()
  download_all_sites(sites)
  duration = time.time() - start_time
  print(f"Downloaded {len(sites)} in {duration} seconds")

ForkPoolWorker-1:Read 10267 from https://www.jython.org
ForkPoolWorker-2:Read 10267 from https://www.jython.org
ForkPoolWorker-1:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-2:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-1:Read 10267 from https://www.jython.org
ForkPoolWorker-2:Read 10267 from https://www.jython.org
ForkPoolWorker-1:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-2:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-1:Read 10267 from https://www.jython.org
ForkPoolWorker-2:Read 10267 from https://www.jython.org
ForkPoolWorker-1:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-2:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-1:Read 10267 from https://www.jython.org
ForkPoolWorker-2:Read 10267 from https://www.jython.org
ForkPoolWorker-1:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-2:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-1:Read 10

In [None]:
# CPU bound execution speedup

In [11]:
def cpu_bound(number):
  return sum(i*i for i in range(number))

def find_sums(numbers):
  for number in numbers:
    cpu_bound(number)

if __name__ == "__main__":
  numbers = [5_000_000+x for x in range(20)]

  start_time = time.time()
  find_sums(numbers)
  duration = time.time() - start_time
  print(f"Duration {duration} seconds")

Duration 9.543235063552856 seconds


In [12]:
def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 10.451621770858765 seconds
