# Concurrency Patterns in Python

## Operations

1. CPU Bound Operations
    - eg: calculations, data processing
2. I/O Bound Operations
    - eg: http requests, disk read operations

## Threads

Threads are a seperate flow of execution.

In [1]:
import threading
import time

def thread_function(name):
    time.sleep(2)
    print('Thread completed execution',threading.get_ident())

In [39]:
import dis

dis.dis(thread_function)

  5           0 LOAD_GLOBAL              0 (time)
              2 LOAD_ATTR                1 (sleep)
              4 LOAD_CONST               1 (2)
              6 CALL_FUNCTION            1
              8 POP_TOP

  6          10 LOAD_GLOBAL              2 (print)
             12 LOAD_CONST               2 ('Thread completed execution')
             14 LOAD_GLOBAL              3 (threading)
             16 LOAD_ATTR                4 (get_ident)
             18 CALL_FUNCTION            0
             20 CALL_FUNCTION            2
             22 POP_TOP
             24 LOAD_CONST               0 (None)
             26 RETURN_VALUE


### starting a single thread

In [36]:
x = threading.Thread(target=thread_function, args=(1,),daemon=False)
x.start()
# x.join()

print(x.is_alive())
print('hi')

True
hi
Thread completed execution 140010548352768


### starting multiple threads

In [37]:
threads = []
for i in range(3):
    x = threading.Thread(target=thread_function, args=(1,))
    x.start()
    threads.append(x)

for x in threads:
    x.join()

Thread completed execution 140010548352768
Thread completed execution 140010565138176
Thread completed execution 140010556745472


### using thread pool executor

In [38]:
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

Thread completed execution 140010556745472
Thread completed execution 140010565138176
Thread completed execution 140010548352768


## Race Conditions

In [41]:
class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy

In [44]:
database = FakeDatabase()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    for index in range(10):
        executor.submit(database.update, index)

database.value

10

## Locks

a lock or mutex (from mutual exclusion) is a synchronization mechanism for enforcing limits on access to a resource in an environment where there are many threads of execution

In [43]:
class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def update(self, name):
        with self._lock:
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy

In [45]:
l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")

before first acquire
before second acquire


KeyboardInterrupt: 

### RLock

can be aquired and released multiple times

#### Example 1: scrape websites

In [9]:
##syncronous version

import requests
import time


def download_site(url, session):
    with session.get(url) as response:
        pass #do something


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)

sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")

Downloaded 160 in 38.64874482154846 seconds


In [54]:
##threading version

import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()

variable_list = []

def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        variable_list.append(url)
#         pass #do something


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=160) as executor:
        executor.map(download_site, sites)

sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
print(len(variable_list))

Downloaded 160 in 2.54917049407959 seconds
160


## Forms of multitasking

1. Pre-emptive multitasking
    - switches threads at interval of 1000 bytecodes. In Python 3 the GIL’s implementation is more complex, and the check interval is not a fixed number of bytecodes but 15 milliseconds.
    
2. Cooperative multitasking
    - tasks define when they are finished

### Asyncio

**levels of abstraction**

1. futures
2. coroutines
3. Tasks

**Special Keywords**

- async
- await

In [18]:
import sys
!{sys.executable} -m pip install asyncio
!{sys.executable} -m pip install aiohttp
!{sys.executable} -m pip install nest_asyncio

Defaulting to user installation because normal site-packages is not writeable


In [19]:
import nest_asyncio
nest_asyncio.apply()

In [51]:
import asyncio
import time
import aiohttp

async def download_site(session, url):
    async with session.get(url) as response:
        pass #do something


async def download_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == '__main__': 
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
#     asyncio.run(download_all_sites(sites))
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} sites in {duration} seconds")

Downloaded 160 sites in 2.0065319538116455 seconds


In [58]:
import requests
import multiprocessing
import time

session = None

def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        pass #do something


def download_all_sites(sites):
    with multiprocessing.Pool(20, initializer=set_global_session) as pool:
        pool.map(download_site, sites)

sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")

Downloaded 160 in 2.94077467918396 seconds


### Message Passing between threads

In [24]:
import concurrent.futures
import queue
import random
import threading
import time

def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        print("Producer got message: ", message)
        queue.put(message)

def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        print("Consumer storing message: {} (size={})".format(message, queue.qsize()))

pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, pipeline, event)
    executor.submit(consumer, pipeline, event)

    time.sleep(0.1)
    event.set()

Producer got message:  41
Producer got message:  87
Producer got message:  50
Producer got message:  81
Producer got message:  25
Producer got message:  20
Producer got message:  51
Producer got message:  16
Producer got message:  8
Producer got message:  17
Producer got message:  101
Consumer storing message: 41 (size=9)
Producer got message: Consumer storing message: 87 (size=9) 47
Producer got message:  57

Consumer storing message: 50 (size=9)
Consumer storing message: 81 (size=8)
Consumer storing message: 25 (size=7)
Consumer storing message: 20 (size=6)
Consumer storing message: 51 (size=5)
Consumer storing message: 16 (size=4)
Consumer storing message: 8 (size=3)
Consumer storing message: 17 (size=2)
Consumer storing message: 101 (size=1)
Consumer storing message: 47 (size=0)
Producer got message:  Consumer storing message: 57 (size=0)96
Producer got message:  20
Producer got message:  8
Producer got message:  41
Producer got message:  80
Producer got message:  26
Producer got m

## Actor Model

In [26]:
import sys
!{sys.executable} -m pip install thespian

Defaulting to user installation because normal site-packages is not writeable
Collecting thespian
  Downloading thespian-3.10.1.zip (493 kB)
[K     |████████████████████████████████| 493 kB 32 kB/s  eta 0:00:01
[?25hBuilding wheels for collected packages: thespian
  Building wheel for thespian (setup.py) ... [?25ldone
[?25h  Created wheel for thespian: filename=thespian-3.10.1-py3-none-any.whl size=266580 sha256=9d1467953480826472a4a4bfd044a085852b527f9c6435b371a7c21d10e1cb73
  Stored in directory: /home/anmol/.cache/pip/wheels/d1/0b/d6/3c7a8ef29276b6bd1c8cb51b74315b58a078fe6b9be92a9d22
Successfully built thespian
Installing collected packages: thespian
Successfully installed thespian-3.10.1


In [27]:
from thespian.actors import Actor
from thespian.actors import ActorSystem

class Hello(Actor):
    def receiveMessage(self, message, sender):
        if message.startswith("Hello! My name is"):
            your_name = self.createActor(YourName)
            your_name_msg = (sender, "Hello, ", message)
            self.send(your_name, your_name_msg)

class YourName(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_hello, orig_message = message
            orig_name = orig_message.lstrip("Hello! My name is ")
            self.send(orig_sender, pre_hello + orig_name)

ActorSystem()
hello = ActorSystem().createActor(Hello)
ActorSystem().ask(hello, "Hello! My name is Anmol")

'Hello, Anmol'

In [59]:
##satyam's code
#https://paste.ofcode.org/PwX4c3mSAGjBhZQxs6Tq3x

import asyncio
import time
import os
from concurrent.futures import ThreadPoolExecutor


def blocking_code():
    time.sleep(3)
    print("task completed")
    return time.time_ns()


async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(max_workers=os.cpu_count()) as exe:
        results = await asyncio.gather(*[loop.run_in_executor(exe, blocking_code) for i in range(10)])
        print(results)


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(main()))

AttributeError: module 'asyncio' has no attribute 'get_running_loop'