# Parallelism, Asynchrony, Concurrency

## Parallelism

* doing things in parallel instead of sequential
* hope we reach the solution in less wallclock time

**Examples**

* parse many files simultaneously
* distribute a computational task over many processors or nodes

## Asynchrony
* reacting to things that will happen in future
* we do not know when these things will happen

**Examples**

* File change notifications
* Incoming requests to a server
* Incoming packets of data to a socket 
* GUI events

## Concurrency
* several computations are executed concurrently – during overlapping time periods – instead of sequentially
* concurrency control: coordinate access to a shared resource

**Examples**
* booking system for a flight
* banking account
* database updates

## Concurrent code in Python

* **`threading`**: run several «trains of thought» on a single processor, **pre-emptive multitasking**
* **`asyncio`**: run several «trains of thought» on a single processor, **cooperative multitasking**
* **`multiprocessing`**: run several «trains of thought» at the same time, using **multiple processors**



**pre-emptive multitasking**: The operating system knows about each thread and can *interrupt it at any time* to start running a different thread. The OS decides when to switch tasks.

**cooperative multitasking**: The tasks must cooperate by announcing when they are ready to be switched out. The tasks decide when to give up control.

**multiprocessing**:  Python creates new processes – a collection of resources where the resources include memory, file handles etc. Each process runs in its own Python interpreter

## Links

* https://realpython.com/python-concurrency/#what-is-concurrency
* https://realpython.com/async-io-python/#async-io-is-not-easy

In [None]:
import requests
import time


def download_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

In [None]:
import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

In [11]:
import concurrent.futures


counter = 0


def increment_counter(fake_value):
    global counter
    for _ in range(100):
        counter += 1


if __name__ == "__main__":
    fake_data = [x for x in range(5000)]
    counter = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=500) as executor:
        executor.map(increment_counter, fake_data)

In [1]:
import asyncio
import time

async def count():
    print("One")
    #await asyncio.sleep(1)
    await time.sleep()
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())


import time
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")

RuntimeError: asyncio.run() cannot be called from a running event loop

In [4]:
import asyncio

In [40]:
import asyncio
import time

class Test():
    
    async def login(self, username, password):
        print("here in login")
        await self.keep_conn_alive()
        return
        
    async def keep_conn_alive(self):
        while True:
            await asyncio.sleep(3)
            print("Still awake...")

In [41]:
test = Test()

In [42]:
await test.login('username', 'password')

here in login
Still awake...
Still awake...


CancelledError: 

In [37]:
import asyncio

# convert to coroutine
async def reading_book():
    print("reading page 1")
    await asyncio.sleep(4)
    print("reading page 2")
    await asyncio.sleep(4)
    print("reading page 3")
    await asyncio.sleep(4)
    print("reading page 4")
    
async def seconds():
    i = 0
    while True:
        print(f"\t{i} seconds")
        i += 1
        await asyncio.sleep(1)

# convert to coroutine
async def checking_whatsapp():
    print("reading new message 1")
    await asyncio.sleep(2)
    print("reading new message 2")
    await asyncio.sleep(4)
    print("reading new message 3")
    await asyncio.sleep(1)
    print("reading new message 4")

In [38]:
async def main(tasks):
    await asyncio.gather(*[task for task in tasks])

In [39]:
await main([reading_book(), checking_whatsapp(), seconds()])

reading page 1
reading new message 1
	0 seconds
	1 seconds
reading new message 2
	2 seconds
	3 seconds
reading page 2
	4 seconds
	5 seconds
reading new message 3
	6 seconds
reading new message 4
	7 seconds
reading page 3
	8 seconds
	9 seconds
	10 seconds
	11 seconds
reading page 4
	12 seconds
	13 seconds
	14 seconds
	15 seconds
	16 seconds


CancelledError: 

In [23]:
output = await asyncio.gather(*[reading_book(), checking_whatsapp()])

reading page 1
reading new message 1
reading page 2
reading new message 2
reading page 3
reading new message 3
reading page 4
reading new message 4


In [50]:
import asyncio
import random
import time

async def part1(n: int) -> str:
    i = random.randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")
    return result

async def part2(n: int, arg: str) -> str:
    i = random.randint(0, 10)
    print(f"part2{n, arg} sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-2 derived from {arg}"
    print(f"Returning part2{n, arg} == {result}.")
    return result

async def chain(n: int) -> None:
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")

async def main(*args):
    await asyncio.gather(*(chain(n) for n in args))

In [57]:
import sys
random.seed(444)
args = [1,2,3]

In [60]:
await main(*args)

part1(1) sleeping for 4 seconds.
part1(2) sleeping for 8 seconds.
part1(3) sleeping for 10 seconds.
Returning part1(1) == result1-1.
part2(1, 'result1-1') sleeping for 7 seconds.
Returning part1(2) == result2-1.
part2(2, 'result2-1') sleeping for 8 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part2(1, 'result1-1') == result1-2 derived from result1-1.
-->Chained result1 => result1-2 derived from result1-1 (took 11.01 seconds).
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 14.01 seconds).
Returning part2(2, 'result2-1') == result2-2 derived from result2-1.
-->Chained result2 => result2-2 derived from result2-1 (took 16.01 seconds).


In [62]:
#!/usr/bin/env python3
# asyncq.py

import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()

async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()


import argparse
random.seed(444)
#parser = argparse.ArgumentParser()
#parser.add_argument("-p", "--nprod", type=int, default=5)
#parser.add_argument("-c", "--ncon", type=int, default=10)
#ns = parser.parse_args()
start = time.perf_counter()
#asyncio.run(main(**ns.__dict__))
await main(nprod=2, ncon=5)
elapsed = time.perf_counter() - start
print(f"Program completed in {elapsed:0.5f} seconds.")

Producer 0 sleeping for 4 seconds.
Consumer 0 sleeping for 4 seconds.
Consumer 1 sleeping for 7 seconds.
Consumer 2 sleeping for 4 seconds.
Consumer 3 sleeping for 4 seconds.
Consumer 4 sleeping for 8 seconds.
Producer 0 added <d0bedca904> to queue.
Producer 0 sleeping for 10 seconds.
Consumer 0 got element <d0bedca904> in 0.00029 seconds.
Consumer 0 sleeping for 7 seconds.
Producer 0 added <fc50500687> to queue.
Producer 0 sleeping for 8 seconds.
Consumer 2 got element <fc50500687> in 0.00033 seconds.
Consumer 2 sleeping for 4 seconds.
Producer 0 added <98d5adf472> to queue.
Producer 0 sleeping for 7 seconds.
Consumer 3 got element <98d5adf472> in 0.00032 seconds.
Consumer 3 sleeping for 1 seconds.
Producer 0 added <0a3549c320> to queue.
Consumer 1 got element <0a3549c320> in 0.00024 seconds.
Consumer 1 sleeping for 6 seconds.
Program completed in 29.01676 seconds.
