# Concurrency and Parallelism

## Resources

- Concurrency is not parallelism (Rob Pike, Heroku's Waza talk, 2012) - https://blog.golang.org/concurrency-is-not-parallelism

- Latency vs. Throughput
    - https://community.cadence.com/cadence_blogs_8/b/sd/archive/2010/09/13/understanding-latency-vs-throughput
    - http://www.futurechips.org/thoughts-for-researchers/clarifying-throughput-vs-latency.html

- IO- vs. CPU-bound
    - https://conferences.oreilly.com/oscon/oscon2014/public/schedule/detail/34040
    - http://stackoverflow.com/questions/868568/what-do-the-terms-cpu-bound-and-i-o-bound-mean
    
- Processes vs. Threads
    - http://stackoverflow.com/questions/200469/what-is-the-difference-between-a-process-and-a-thread
    - https://www.ploggingdev.com/2017/01/multiprocessing-and-multithreading-in-python-3/
    - https://www.blog.pythonlibrary.org/2016/08/02/python-201-a-multiprocessing-tutorial/
    
- Apache vs. Nginx
    - https://www.rootusers.com/linux-web-server-performance-benchmark-2016-results/
    
- GIL
    - http://www.dabeaz.com/python/UnderstandingGIL.pdf
    - http://python-notes.curiousefficiency.org/en/latest/python3/multicore_python.html
    - https://lwn.net/Articles/689548/

- AsyncIO
    - http://masnun.rocks/2016/10/06/async-python-the-different-forms-of-concurrency/
    - https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/
    

## asyncio
- https://understandingdata.com/posts/asynchronous-web-scraping-in-python/
- https://builtin.com/data-science/asyncio-python

## What to improve

**Latency** and/or **Throughput**


## Latency

the time required to perform some action or to produce some result

measured in units of time - hours, minutes, seconds, nanoseconds or clock periods

## Throughput

the number of such actions executed or results produced per unit of time

- measured in units of whatever is being produced (cars, motorcycles, I/O samples, memory words, iterations) per unit of time

The term **memory bandwidth** is sometimes used to specify the throughput of memory systems


# Water pipe example

When you go to buy a water pipe, there are two completely independent parameters that you look at:

- diameter
- length

# Water pipe example

When you go to buy a water pipe, there are two completely independent parameters that you look at:

- diameter - determines the throughput of the pipe
- length - determines the latency, i.e., the time it will take for a water droplet to travel across the pipe

## Pizza delivery example

**Two different delivery strategies for pizza company**

Do you want your pizza hot?

Or do you want your pizza to be inexpensive?

## Pizza delivery example

**Two different delivery strategies for pizza company**

Do you want your pizza hot?
- Low latency

Or do you want your pizza to be inexpensive?
- High throughput – lots of pizzas per hour 

## Latency and Throughput are independent

A system consists of 8 service stations that take 1 unit of time each

- If each job must go through each station, the throughput can be 1 job per unit time, while the
latency is 8

- If each job needs only 1 service station and any service station will do, the throughput can be 8 jobs per unit time, and the latency is 1

### The modern world is parallel
 Multicore. 

 Networks. 

 Clouds of CPUs. 

 Loads of users. 

 Our technology should help. 

 That's where concurrency comes in. 

### Concurrency
 Programming as the composition of independently executing processes. 

 (Processes in the general sense, not Linux processes. Famously hard to define.) 

### Parallelism
 Programming as the simultaneous execution of (possibly related) computations. 

### Concurrency vs. parallelism
 Concurrency is about **dealing with** lots of things at once. 

 Parallelism is about **doing** lots of things at once. 

 Not the same, but related. 

### Concurrency vs. parallelism

 Concurrency is about structure, parallelism is about execution. 

 Concurrency provides a way to structure a solution to solve a problem that may (but not necessarily) be parallelizable. 

### An analogy
 Concurrent: Mouse, keyboard, display, and disk drivers. 

 Parallel: Vector dot product. 

### Concurrency plus communication
 Concurrency is a way to structure a program by breaking it into pieces that can be executed independently. 

 Communication is the means to coordinate the independent executions. 

### Our problem
 Move a pile of obsolete language manuals to the incinerator. 

![](isj_concurrency_parallelism/gophersimple1.jpg) With only one gopher this will take too long. 

### More gophers!
![](isj_concurrency_parallelism/gophersimple3.jpg) More gophers are not enough; they need more carts. 

### More gophers and more carts
![](isj_concurrency_parallelism/gophersimple2.jpg) This will go faster, but there will be bottlenecks at the pile and incinerator. 

 Also need to synchronize the gophers. 

 A message (that is, a communication between the gophers) will do. 

### Double everything
 Remove the bottleneck; make them really independent. 

![](isj_concurrency_parallelism/gophersimple4.jpg) This will consume input twice as fast. 

### Concurrent composition
![](isj_concurrency_parallelism/gophersimple4.jpg) The concurrent composition of two gopher procedures. 

### Concurrent composition
 This design is not automatically parallel! 

 What if only one gopher is moving at a time? 

 Then it's still concurrent (that's in the design), just not parallel. 

 However, it's automatically parallelizable! 

 Moreover the concurrent composition suggests other models. 

### Another design
![](isj_concurrency_parallelism/gophercomplex0.jpg) Three gophers in action, but with likely delays. 

 Each gopher is an independently executing procedure, 

 plus coordination (communication). 

### Finer-grained concurrency
 Add another gopher procedure to return the empty carts. 

![](isj_concurrency_parallelism/gophercomplex1.jpg) Four gophers in action for better flow, each doing one simple task. 

 If we arrange everything right (implausible but not impossible), that's four times faster than our original one-gopher design. 

### Observation
 We improved performance by adding a concurrent procedure to the existing design. 

 More gophers doing more work; it runs better. 

 This is a deeper insight than mere parallelism. 

### Concurrent procedures
 Four distinct gopher procedures: 


* load books onto cart
* move cart to incinerator
* unload cart into incinerator
* return empty cart

 Different concurrent designs enable different ways to parallelize. 

### More parallelization!
 We can now parallelize on the other axis; the concurrent design makes it easy. Eight gophers, all busy. 

![](isj_concurrency_parallelism/gophercomplex2.jpg)

### Or maybe no parallelization at all
 Keep in mind, even if only one gopher is active at a time (zero parallelism), it's still a correct and concurrent solution. 

![](isj_concurrency_parallelism/gophercomplex2.jpg)

### Another design
 Here's another way to structure the problem as the concurrent composition of gopher procedures. 

 Two gopher procedures, plus a staging pile. 

![](isj_concurrency_parallelism/gophercomplex3.jpg)

### Parallelize the usual way
 Run more concurrent procedures to get more throughput. 

![](isj_concurrency_parallelism/gophercomplex4.jpg)

### Or a different way
 Bring the staging pile to the multi-gopher concurrent model: 

![](isj_concurrency_parallelism/gophercomplex5.jpg)

### Full on optimization
 Use all our techniques. Sixteen gophers hard at work! 

![](isj_concurrency_parallelism/gophercomplex6.jpg)

### Lesson
 There are many ways to break the processing down. 

 That's concurrent design. 

 Once we have the breakdown, parallelization can fall out and correctness is easy. 

### Back to Computing
 In our book transport problem, substitute: 


* book pile => web content
* gopher => CPU
* cart => marshaling, rendering, or networking
* incinerator => proxy, browser, or other consumer

 It becomes a concurrent design for a scalable web service. 

 Gophers serving web content. 

![Subs](isj_concurrency_parallelism/what-is-async-how-does-it-work-and-when-should-i-use-it-13-638.jpg)

![Subs](isj_concurrency_parallelism/what-is-async-how-does-it-work-and-when-should-i-use-it-14-638.jpg)

## Process vs. Thread

Threads share the address space of the process that created it; processes have their own address space.

## Process vs. Thread
Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.

## Process vs. Thread
Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.

## Process vs. Thread
Threads have a relatively small overhead; processes have considerable overhead.

## Process vs. Thread
New threads are easily created; new processes require duplication of the parent process.

## Process vs. Thread
Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.

## Process vs. Thread
Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process do not affect child processes.

## Context Switching

= the process of storing and restoring the state (more specifically, the execution context) of a process or thread so that execution can be resumed from the same point at a later time

– saving and loading registers and memory maps, updating various tables and lists

## OS vs. Green/Light-weight

**Green threads** / **Light-weight processes** are scheduled by a VM instead of natively by the underlying OS



![Web Server Use 2017](isj_concurrency_parallelism/web_server_use_2017.PNG)

## Apache vs. Nginx

Usually, Apache creates a new process for each new request

NGINX uses asynchronous event-driven architecture – which essentially means it doesn’t need to spawn a new process for each request

Instead, each NGINX process can serve numerous requests, translating to a reduced impact on hardware and performance

NGINX can cope with around four times more requests per second

![web-server-performance-benchmark-4-cpu-cores-2.jpg](isj_concurrency_parallelism/web-server-performance-benchmark-4-cpu-cores-2.jpg)

![web-server-benchmark-memory-usage.jpg](isj_concurrency_parallelism/web-server-benchmark-memory-usage.jpg)

## Multitasking

- **Cooperative** - tasks voluntarily yield control periodically or when idle in order to enable multiple applications to be run simultaneously

- **Preemptive** - involves the use of an interrupt mechanism which suspends the currently executing process and invokes a scheduler to determine which process should execute next

## Task communication
- Shared memory
- Message passing

## Shared memory
- Exclusive access
- Locking
- Deadlock

## Python - threading

https://docs.python.org/library/threading.html

- interface for OS threads
- Thread
- Lock - exclusive access
- RLock - a synchronization primitive that may be acquired multiple times by the same thread
- Condition - synchronization

In [1]:
import threading
from queue import Queue
import time
import shutil

print_lock = threading.Lock()

def copy_op(file_data):
    with print_lock:
        print("Starting thread : {}".format(threading.current_thread().name))

    mydata = threading.local()
    mydata.ip, mydata.op = next(iter(file_data.items()))

    shutil.copy(mydata.ip, mydata.op)

    with print_lock:
        print("Finished thread : {}".format(threading.current_thread().name))

def process_queue():
    while True:
        file_data = compress_queue.get()
        copy_op(file_data)
        compress_queue.task_done()

compress_queue = Queue()

output_names = [{'v1.avi' : 'v11.avi'},{'v2.avi' : 'v22.avi'}]

for i in range(2):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for file_data in output_names:
    compress_queue.put(file_data)

compress_queue.join()

print("Execution time = {0:.5f}".format(time.time() - start))

Starting thread : Thread-5 (process_queue)
Starting thread : Thread-6 (process_queue)
Finished thread : Thread-6 (process_queue)
Finished thread : Thread-5 (process_queue)
Execution time = 0.04897


In [2]:

import threading
from queue import Queue
import requests
import bs4
import time

print_lock = threading.Lock()

def get_url(current_url):

    with print_lock:
        print("\nStarting thread {}".format(threading.current_thread().name))
    res = requests.get(current_url)
    res.raise_for_status()

    current_page = bs4.BeautifulSoup(res.text,"html.parser")
    current_title = current_page.select('title')[0].getText()

    with print_lock:
        print("{}\n".format(threading.current_thread().name))
        print("{}\n".format(current_title))
        print("\nFinished fetching : {}".format(current_url))

def process_queue():
    while True:
        current_url = url_queue.get()
        get_url(current_url)
        url_queue.task_done()

url_queue = Queue()

url_list = ["https://www.google.com"]*5

for i in range(2):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for current_url in url_list:
    url_queue.put(current_url)

url_queue.join()

print(threading.enumerate())

print("Execution time = {0:.5f}".format(time.time() - start))

ModuleNotFoundError: No module named 'requests'

In [3]:
import threading
from queue import Queue
import time

list_lock = threading.Lock()

def find_rand(num):
    sum_of_primes = 0

    ix = 2

    while ix <= num:
        if is_prime(ix):
            sum_of_primes += ix
        ix += 1

    sum_primes_list.append(sum_of_primes)

def is_prime(num):
    if num <= 1:
        return False
    elif num <= 3:
        return True
    elif num%2 == 0 or num%3 == 0:
        return False
    i = 5
    while i*i <= num:
        if num%i == 0 or num%(i+2) == 0:
            return False
        i += 6
    return True

def process_queue():
    while True:
        rand_num = min_nums.get()
        find_rand(rand_num)
        min_nums.task_done()

min_nums = Queue()

rand_list = [400000, 500000, 600000]
sum_primes_list = list()

for i in range(2):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for rand_num in rand_list:
    min_nums.put(rand_num)

min_nums.join()

end_time = time.time()

sum_primes_list.sort()
print(sum_primes_list)

print("Execution time = {0:.5f}".format(end_time - start))

[6458901531, 9914236195, 14071826345]
Execution time = 1.35991


## Global Interpreter Lock (GIL)

- The GIL ensures that only one thread runs in the interpreter at once

- Simplifies many low-level details (memory management, callouts to C extensions, etc.)

- Every attempt to remove the GIL from CPython has cost single-threaded programs too much performance to be worth the gains for multithreading

## GIL

- https://github.com/python/cpython/blob/master/Python/ceval_gil.h
- https://github.com/python/cpython/blob/master/Python/ceval.c

Whenever a thread begins sleeping or awaiting network I/O, there is a chance for another thread to take the GIL and execute some Python code (cooperative multitasking)

CPython also has preemptive multitasking: If a thread runs uninterrupted gil_interval microseconds in Python 3, then it gives up the GIL and another thread may run


In [None]:
# Dot products, using pure python, slowed by GIL

import threading
from queue import Queue
import time
import numpy as np

list_lock = threading.Lock()

def matrix_product(A, B):
    C = [[0 for i in range(len(B[0]))] for j in range(len(A))]
    for i in range(len(A)):
        for j in range(len(B[0])):
            for k in range(len(B)):
                C[i][j] += A[i][k] * B[k][j]
    return C

def compute_cubic(m):
    return matrix_product(matrix_product(m,m), m)
def process_queue():
    while True:
        matrix = matrix_q.get()
        compute_cubic(matrix)
        matrix_q.task_done()

matrix_q = Queue()

dim = 10_000
matrix_list = [np.random.rand(dim,dim).tolist(),
               np.random.rand(dim,dim).tolist(),
               np.random.rand(dim,dim).tolist(),
               np.random.rand(dim,dim).tolist()]
sum_primes_list = list()

for i in range(4):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for m in matrix_list:
    matrix_q.put(m)

matrix_q.join()

end_time = time.time()


print("Execution time = {0:.5f}".format(end_time - start))

In [1]:
# Dot products, using numpy, not locked by GIL
# !python -m pip install numpy

import threading
from queue import Queue
import time
import numpy as np

list_lock = threading.Lock()

def compute_cubic(m):
    return np.dot(np.dot(m,m.T), m.T)
def process_queue():
    while True:
        matrix = matrix_q.get()
        compute_cubic(matrix)
        matrix_q.task_done()

matrix_q = Queue()

dim = 10_000
matrix_list = [np.random.rand(dim,dim),
               np.random.rand(dim,dim),
               np.random.rand(dim,dim),
               np.random.rand(dim,dim)]
sum_primes_list = list()

for i in range(4):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for m in matrix_list:
    matrix_q.put(m)

matrix_q.join()

end_time = time.time()


print("Execution time = {0:.5f}".format(end_time - start))

KeyboardInterrupt: 

## Thread-safe

If a thread can lose the GIL at any moment, you must make your code thread-safe

Python programmers think differently about thread safety than C or Java programmers do, however, because many Python operations are “atomic”

## Atomic operations
An example of an atomic operation is calling sort() on a list of primitive objects like numbers or strings

A thread cannot be interrupted in the middle of sorting, and other threads never see a partly-sorted list, nor see stale data from before the list was sorted


## Atomic operations
Atomic operations simplify our lives, but there are surprises

For example, += seems simpler than sort(), but += is not atomic!

How can you know which operations are atomic and which are not?

In [2]:
n = 0

def foo():
    global n
    n += 1
     
import dis
dis.dis(foo)

  3           0 RESUME                   0

  5           2 LOAD_GLOBAL              0 (n)
             14 LOAD_CONST               1 (1)
             16 BINARY_OP               13 (+=)
             20 STORE_GLOBAL             0 (n)
             22 LOAD_CONST               0 (None)
             24 RETURN_VALUE


In [3]:
lst = [4, 1, 3, 2]

def foo():
    lst.sort()
    
dis.dis(foo)

  3           0 RESUME                   0

  4           2 LOAD_GLOBAL              0 (lst)
             14 LOAD_METHOD              1 (sort)
             36 PRECALL                  0
             40 CALL                     0
             50 POP_TOP
             52 LOAD_CONST               0 (None)
             54 RETURN_VALUE


In [4]:
from multiprocessing import Process, Pipe
# Pipe is much faster than Queue

def ff(conn):
    conn.send([42, None, 'hello'])
    conn.close()

parent_conn, child_conn = Pipe()
p = Process(target=ff, args=(child_conn,))
p.start()
print(parent_conn.recv())   # prints "[42, None, 'hello']"
p.join() 

[42, None, 'hello']


In [5]:
from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f,args=(d, l))
    p.start()
    p.join()
    print(d), print(l)

{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


In [None]:
import os
 
from multiprocessing import Process
 
def doubler(number):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc = os.getpid()
    print('{0} doubled to {1} by process id: {2}'.format(
        number, result, proc))
 
numbers = [5, 10, 15, 20, 25]
procs = []
 
for index, number in enumerate(numbers):
    proc = Process(target=doubler, args=(number,))
    procs.append(proc)
    proc.start()
 
for proc in procs:
    proc.join()

In [None]:
import os
 
from multiprocessing import Process, current_process
 
 
def doubler(number):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc_name = current_process().name
    print('{0} doubled to {1} by: {2}'.format(
        number, result, proc_name))
 
 
numbers = [5, 10, 15, 20, 25]
procs = []
proc = Process(target=doubler, args=(5,))

for index, number in enumerate(numbers):
    proc = Process(target=doubler, args=(number,))
    procs.append(proc)
    proc.start()

proc = Process(target=doubler, name='Test', args=(2,))
proc.start()
procs.append(proc)

for proc in procs:
    proc.join()

In [None]:
from multiprocessing import Process, Lock
 
 
def printer(item, lock):
    """
    Prints out the item that was passed in
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
if __name__ == '__main__':
    lock = Lock()
    items = ['tango', 'foxtrot', 10]
    for item in items:
        p = Process(target=printer, args=(item, lock))
        p.start()

In [6]:
import logging
import multiprocessing
 
from multiprocessing import Process, Lock
 
def printer(item, lock):
    """
    Prints out the item that was passed in
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
if __name__ == '__main__':
    lock = Lock()
    items = ['tango', 'foxtrot', 10]
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    for item in items:
        p = Process(target=printer, args=(item, lock))
        p.start()
        
    logger.setLevel(logging.CRITICAL)

[INFO/Process-4] child process calling self.run()


tango


[INFO/Process-5] child process calling self.run()
[INFO/Process-4] process shutting down


foxtrot


[INFO/Process-5] process shutting down
[INFO/Process-4] process exiting with exitcode 0
[INFO/Process-5] process exiting with exitcode 0
[INFO/Process-6] child process calling self.run()


10


[INFO/Process-6] process shutting down
[INFO/Process-6] process exiting with exitcode 0


In [None]:
# Creator-Consumer: multiprocessing example with Queue

from multiprocessing import Process, Queue
 
sentinel = -1
 
def creator(data, q):
    """
    Creates data to be consumed and waits for the consumer
    to finish processing
    """
    print('Creating data and putting it on the queue')
    for item in data:
        print('putting in {}'.format(item))
        q.put(item)
 
 
def my_consumer(q):
    """
    Consumes some data and works on it
 
    In this case, all it does is double the input
    """
    while True:
        data = q.get()
        print('data found to be processed: {}'.format(data))
        processed = data * 2
        print(processed)
 
        if data is sentinel:
            break
 
 
if __name__ == '__main__':
    q = Queue()
    data = [5, 10, 13, -1]
    process_one = Process(target=creator, args=(data, q))
    process_two = Process(target=my_consumer, args=(q,))
    process_one.start()
    process_two.start()
 
    q.close()
    q.join_thread()
 
    process_one.join()
    process_two.join()

## The Pool Class

The Pool class is used to represent a pool of worker processes

It has methods which can allow you to offload tasks to the worker processes.

In [None]:
from multiprocessing import Pool
 
def doubler(number):
    return number * 2
 
numbers = [5, 10, 20]
pool = Pool(processes=3)
print(pool.map(doubler, numbers))

## concurrent.futures

https://docs.python.org/3/library/concurrent.futures.html

a high-level interface for asynchronously executing callables

The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor

In both case, we get a pool of threads or processes and we can submit tasks to this pool

The pool would assign tasks to the available resources (threads or pools) and schedule them to run

http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html

In [2]:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
def return_after_5_secs(message):
    sleep(5)
    return message
 
pool = ThreadPoolExecutor(max_workers=3)
 
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(6)
print(future.done())
print("Result: " + future.result())

False
True
Result: hello


When we submit() a task, we get back a Future

The Future object has a method – done() which tells us if the future has resolved, that is a value has been set for that particular future object


When a task finishes (returns a value or is interrupted by an exception), the thread pool executor sets the value to the future object

In [4]:
from concurrent.futures import ProcessPoolExecutor
from time import sleep
 
def return_after_5_secs(message):
    sleep(5)
    return message
 
pool = ProcessPoolExecutor(3)
 
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())

False
False


In [11]:
# Nothing is happening, why?
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    print(f.result())
    print("DONE!")


executor = ThreadPoolExecutor(max_workers=1)
t = executor.submit(wait_on_future)

In [5]:
# What is the problem over here?
from concurrent.futures import ThreadPoolExecutor
import time
def wait_on_b():
    time.sleep(5)
    print(b.result())
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())
    return 6

executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

In [6]:
executor = ThreadPoolExecutor() 
print(executor._max_workers)
executor = ProcessPoolExecutor() 
print(executor._max_workers)

8
4


__Changed in version 3.5__:   
If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor

__Changed in version 3.8__:  
Default value of max_workers is changed to `min(32, os.cpu_count() + 4)`. This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.

In [15]:
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://www.dw.com/']

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

The __as_completed() method__ takes an iterable of Future objects and starts yielding values as soon as the futures start resolving.

In contrast, the __map() method__ returns the results in the order in which we pass the iterables, once all are done.

In [11]:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
 
def return_after_5_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
 
pool = ThreadPoolExecutor(5)

#1. 
#print([f for f in pool.map(return_after_5_secs, range(5))])

#2. 
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))

# print([f.result() for f in futures])    
    
# classic wait is like map
# wait(futures) 
# print([f.result() for f in futures])

#3.
# as_completed
print([f.result() for f in as_completed(futures)])




['Return of 0', 'Return of 2', 'Return of 1', 'Return of 3', 'Return of 4']


In [16]:
import concurrent
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

'http://europe.wsj.com/' generated an exception: HTTP Error 403: Forbidden
'http://www.cnn.com/' page is 1851184 bytes
'http://www.foxnews.com/' page is 535370 bytes
'http://www.dw.com/' page is 13050 bytes
'http://www.bbc.co.uk/' page is 523511 bytes


ProcessPoolExecutor side-steps the Global Interpreter Lock but also means that __only picklable (serializable with pickle library) objects__ can be executed and returned

In [None]:
# %load pr.py
import math
import concurrent.futures

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

In [None]:
def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

In [None]:
# with context manager for futures!
def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

## Coroutines
- Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations

- Functions whose execution you can pause

- Connection to generators

In [17]:
def eager_range(up_to):  
    """Create a list of integers, from 0 to up_to, exclusive."""
    sequence = []
    index = 0
    while index < up_to:
        sequence.append(index)
        index += 1
    return sequence

In [18]:
def lazy_range(up_to):  
    """Generator to return the sequence of integers from 0 to up_to, exclusive."""
    index = 0
    while index < up_to:
        yield index
        index += 1

In [20]:
if __name__ == '__main__':  
    iterator = lazy_range(5)
    print(next(iterator))  # 0
    print(next(iterator))  # 1
    for x in iterator:
        print(x)  # 2,3,4
    print(next(iterator))  # StopIteration


0
1
2
3
4


In [21]:
# simple coroutine
def jumping_range(up_to):  
    """Generator for the sequence of integers from 0 to up_to, exclusive.

    Sending a value into the generator will shift the sequence by that amount.
    """
    index = 0
    while index < up_to:
        jump = yield index
        if jump is None:
            jump = 1
        index += jump


if __name__ == '__main__':  
    iterator = jumping_range(5)
    print(next(iterator))  # 0
    print(iterator.send(2))  # 2
    print(next(iterator))  # 3
    print(iterator.send(-1))  # 2
    for x in iterator:
        print(x)  # 3, 4

0
2
3
2
3
4


In [23]:
def lazy_range(up_to):  
    """Generator to return the sequence of integers from 0 to up_to, exclusive."""
    index = 0
    def gratuitous_refactor():
        nonlocal index 
        while index < up_to:
            yield index
            index += 1
            #if index>3:
            #    raise ValueError("Hello!")
    #yield from gratuitous_refactor() # syntax for delegating to a subgenerator
    for n in gratuitous_refactor():
        yield n
if __name__ == '__main__':  
    iterator = lazy_range(5)
    print(next(iterator))  # 0
    print(next(iterator))  # 1
    for x in iterator:
        print(x)  # 2,3,4
    print(next(iterator))  # StopIteration

0
1
2
3
4


StopIteration: 

In [None]:
def lazy_range(up_to):  
    """Generator to return the sequence of integers from 0 to up_to, exclusive."""
    index = 0
    def gratuitous_refactor():
        nonlocal index 
        while index < up_to:
            yield index
            index += 1
    for x in gratuitous_refactor():
        yield x
    # Is it just syntax sugar for this form?

if __name__ == '__main__':  
    iterator = lazy_range(5)
    print(next(iterator))  # 0
    print(next(iterator))  # 1
    for x in iterator:
        print(x)  # 2,3,4
    print(next(iterator))  # StopIteration

In [None]:
def lazy_range(up_to):  
    """Generator to return the sequence of integers from 0 to up_to, exclusive."""
    index = 0
    def gratuitous_refactor():
        nonlocal index 
        while index < up_to:
            offset = yield index
            if offset is None:
                offset = 0 
            index += 1 - offset
    itr = gratuitous_refactor()
    for x in itr:
        y = yield x
        itr.send(y)
        
        
    # Is it just syntax sugar for "for"?
    # 1. How to send data back to generator?
    # Is it just syntax sugar for "for" with send?


if __name__ == '__main__':  
    iterator = lazy_range(5)
    #iterator.send(None)
    print(next(iterator))  # 0
    print(next(iterator))  # 1
    for x in iterator:
        print(x)  # 2,3,4
    print(next(iterator))  # StopIteration

https://stackoverflow.com/questions/9708902/in-practice-what-are-the-main-uses-for-the-yield-from-syntax-in-python-3-3

In [None]:
def bottom():  
    # Returning the yield lets the value that goes up the call stack to come right back
    # down.
    return (yield 42)

def middle():  
    return (yield from bottom())

def top():  
    return (yield from middle())

# Get the generator.
gen = top()  
value = next(gen)  
print(value)  # Prints '42'.  
try:  
    value = gen.send(value * 2)
except StopIteration as exc:  
    value = exc.value
print(value)  # Prints '84'.  

## Asyncio - motivation

- Processes are costly to spawn
- So for I/O, Threads are chosen largely
- We know that I/O depends on external stuff - slow disks or nasty network lags make I/O often unpredictable

## Asyncio - motivation
- Let’s assume that we are using threads for I/O bound operation
- 3 threads are doing different I/O tasks
- The interpreter would need to switch between the concurrent threads and give each of them some time in turns
- Let’s call the threads - T1, T2 and T3

## Asyncio - motivation
- The three threads have started their I/O operation
- T3 completes it first. T2 and T1 are still waiting for I/O
- The Python interpreter switches to T1 but it’s still waiting
- Fine, so it moves to T2, it’s still waiting and then it moves to T3 which is ready and executes the code

## Asyncio - motivation
- T3 was ready but the interpreter switched between T2 and T1 first
- That incurred switching costs which we could have avoided if the interpreter first moved to T3

## What is asyncio
- Asyncio provides an event loop (along with other good stuff)
- The event loop tracks different I/O events and switches to tasks which are ready and pauses the ones which are waiting on I/O
- No waste time on tasks which are not ready to run right now

## What is asyncio
- We have functions that run async, I/O operations
- We give our functions to the event loop and ask it to run those for us
- The event loop gives us back a Future object, it’s like a promise that we will get something back in the future
- We hold on to the promise, time to time check if it has a value (when we feel impatient) and finally when the future has a value, we use it in some other operations

In [2]:
import asyncio

# only for python notebook, which already runs event 
import nest_asyncio
nest_asyncio.apply()
#


In [26]:

import datetime
import random


async def my_sleep_func():
    await asyncio.sleep(random.randint(0, 5))


async def display_date(num, loop):
    end_time = loop.time() + 50.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        await my_sleep_func()
    if num==2:
        loop.stop()

loop = asyncio.get_event_loop()

asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

loop.run_forever()
print(loop.is_running())

Loop: 1 Time: 2023-04-27 09:35:42.362080
Loop: 2 Time: 2023-04-27 09:35:42.362147
Loop: 1 Time: 2023-04-27 09:35:43.363566
Loop: 2 Time: 2023-04-27 09:35:47.367109
Loop: 1 Time: 2023-04-27 09:35:48.364740
Loop: 1 Time: 2023-04-27 09:35:51.368303
Loop: 2 Time: 2023-04-27 09:35:52.368864
Loop: 2 Time: 2023-04-27 09:35:54.371437
Loop: 1 Time: 2023-04-27 09:35:56.370967
Loop: 1 Time: 2023-04-27 09:35:56.371230
Loop: 2 Time: 2023-04-27 09:35:57.374612
Loop: 1 Time: 2023-04-27 09:36:00.375125
Loop: 2 Time: 2023-04-27 09:36:00.375391
Loop: 2 Time: 2023-04-27 09:36:00.375494
Loop: 1 Time: 2023-04-27 09:36:02.377692
Loop: 2 Time: 2023-04-27 09:36:05.379011
Loop: 2 Time: 2023-04-27 09:36:05.379716
Loop: 1 Time: 2023-04-27 09:36:06.379116
Loop: 2 Time: 2023-04-27 09:36:06.380638
Loop: 2 Time: 2023-04-27 09:36:10.385014
Loop: 1 Time: 2023-04-27 09:36:11.381505
Loop: 1 Time: 2023-04-27 09:36:12.382846
Loop: 2 Time: 2023-04-27 09:36:12.386158
Loop: 2 Time: 2023-04-27 09:36:12.386200
Loop: 2 Time: 20

## Walk through the code
- an async function `display_date` takes a number (as an identifier) and the event loop as parameters
- the function has an infinite loop that breaks after 50 secs
- during this 50 sec period, it repeatedly prints out the time and takes a nap
- the `await` function can wait on other async functions (coroutines) to complete

```python
async def my_sleep_func():
    await asyncio.sleep(random.randint(0, 5))


async def display_date(num, loop):
    end_time = loop.time() + 50.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        await my_sleep_func()
    if num==2:
        loop.stop()
```

## Walk through the code
- pass the function to event loop (using the `ensure_future` method).
- start running the event loop

```python
loop = asyncio.get_event_loop()

asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

loop.run_forever()
```

## Walk through the code
- Whenever the await call is made, asyncio understands that the function is probably going to need some time
- It pauses the execution, starts monitoring any I/O event related to it and allows tasks to run
- When asyncio notices that paused function’s I/O is ready, it resumes the function

In [3]:
# minitask 7
import asyncio
import time

async def perform_task(duration, task_name):
    await asyncio.sleep(duration)
    print(task_name, 'is done')

async def main():
    await perform_task(3, 'boiling kettle')
    await perform_task(2, 'cleaning cups')
    
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"Executed in {elapsed:0.2f} seconds.")

boiling kettle is done
cleaning cups is done
Executed in 5.00 seconds.


## minitask 7
změňte kód ve funkci main tak, aby úkoly probíhaly souběžně (postaví se voda, mezitím se umyjí hrnky, dovaří se voda), aby tedy na výstupu bylo:
```
boiling kettle starting , cleaning cups starting, cleaning cups is done, boiling kettle is done  
Executed in 3.02 seconds.  
```
```python

async def perform_task(duration, task_name):
    await asyncio.sleep(duration)
    print(task_name, 'is done')

async def main():
    await perform_task(3, 'boiling kettle')
    await perform_task(2, 'cleaning cups')
    
s = time.perf_counter() # followed by asyncio.run(main()), etc like on prev slide.
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"Executed in {elapsed:0.2f} seconds.")
```

In [4]:
# minitask 7
import asyncio
import time

async def perform_task(duration, task_name):
    await asyncio.sleep(duration)
    print(task_name, 'is done')

async def main():
    task1 = asyncio.create_task(
        perform_task(3, 'boiling kettle'))

    task2 = asyncio.create_task(
        perform_task(2, 'cleaning cups'))
    
    await task1
    await task2
    
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"Executed in {elapsed:0.2f} seconds.")

cleaning cups is done
boiling kettle is done
Executed in 3.00 seconds.


In [None]:
import asyncio

# only for python notebook, which already runs event 
import nest_asyncio
nest_asyncio.apply()
#


In [5]:
# alternative async with syntax for python 3.11
!python --version
# minitask 7
import asyncio
import time

async def perform_task(duration, task_name):
    await asyncio.sleep(duration)
    print(task_name, 'is done')

async def main():

    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(
            perform_task(3, 'boiling kettle'))

        task2 = tg.create_task(
            perform_task(2, 'cleaning cups'))

    # The await is implicit when the context manager exits.
    
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"Executed in {elapsed:0.2f} seconds.")

Python 3.11.3
cleaning cups is done
boiling kettle is done
Executed in 3.00 seconds.


In [6]:
import aiohttp
import asyncio

async def main():

    async with aiohttp.ClientSession() as session:
        async with session.get('http://python.org') as response:

            print("Status:", response.status)
            print("Content-type:", response.headers['content-type'])

            html = await response.text()
            print("Body:", html[:15], "...")

# for MS Windows
#asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

asyncio.run(main())

Status: 200
Content-type: text/html; charset=utf-8
Body: <!doctype html> ...


In [None]:
movies_list = ['tt2935510','tt7131622','tt5463162','tt4758646','tt3640424','tt6024606','tt1596363','tt3707106','tt2713180','tt2193215','tt2024544','tt0816711','tt1764234','tt1402488','tt1210166','tt0478304','tt1001526','tt0361748','tt0421715','tt0887883','tt0443680','tt0496806','tt0449467','tt0356910','tt0349903','tt0332452','tt0165982','tt0270288','tt0240772','tt0266987','tt0236493','tt0208092','tt0137523','tt0120601','tt0119643','tt0120102','tt0118972','tt0117665','tt0114746','tt0114369','tt0110322','tt0110148','tt0109783','tt0108399','tt0107302','tt0105265','tt0104009','tt0104567','tt0103074','tt0101268','tt0097478','tt0097136','tt0118930','tt0093407','tt0093638','tt0093640','tt0093231']
import time
import aiohttp
from aiohttp import ClientSession

async def get_html_by_movie_id_new(movie_id, session):
    url = f"https://www.imdb.com/title/{movie_id}/fullcredits"
    response = await session.request(method="GET", url=url)
    html = await response.text()
    return html

async def scrape_all_titles(movies_list):
    async with ClientSession() as session:
        tasks = []
        for movie_id in movies_list:
            tasks.append(get_html_by_movie_id_new(movie_id,session))
        result = await asyncio.gather(*tasks)
    return result

#if __name__ == "__main__":    
s = time.perf_counter()
result = await scrape_all_titles(movies_list)
elapsed = time.perf_counter() - s
print(f"Executed in {elapsed:0.2f} seconds.")

## Example of asyncio use - HTTP server japronto
- https://github.com/squeaky-pl/japronto
- https://medium.freecodecamp.com/million-requests-per-second-with-python-95c137af319
- based on uvloop - an ultra fast implementation of asyncio event loop on top of libuv -  https://github.com/MagicStack/uvloop
![japronto](isj_concurrency_parallelism/japronto.png)


## Rule of Thumb from SO
https://stackoverflow.com/questions/27435284/multiprocessing-vs-multithreading-vs-asyncio
```python
if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
        print("Use Threads")
else:
    print("Multi Processing")
```

* CPU Bound => Multi Processing
* I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading
* I/O Bound, Slow I/O, Many connections => Asyncio