In [1]:
import numpy as np, pandas as pd, os, time

### 1. Introduction

**Parallelism** means performing multiple tasks or calculations at the same time e.g. on different CPUs or cores simultaneously.

**Concurrency** means ability to execute tasks out-of-order, possibly at the same time. In a concurrent application, two tasks can start, run, and complete in overlapping time periods i.e Task-2 can start even before Task-1 gets completed.

How concurrency is achieved various across architectures. In a single core environment, concurrency is achieved via a process called context-switching. If its a multi-core environment, concurrency can be achieved through parallelism.

**Synchronous** programming model allows tasks to be created and executed in order: next task is executed only after current task has finished.

**Asynchronous** programming model allows task switching: new tasks can be started without waiting for current tasks to finish.
- Asynchronous programming model helps to achieve concurrency. In a multi-threaded environment, it allows parallelism.
- Can be achieved via context switching in OS threads, or cooperative multitasking in user space.

---

<img src="Sync.png" width="80%"> 

<img src="Async.png" width="80%"> 

To summarize:
- Single Threaded and Multi-Threaded -> The environment of task execution. CPUs, cores, etc.
- Concurrency and Parallelism -> The way tasks are executed in the environment. 
- Synchronous and Asynchronous -> Programming model.

The above introduction is derived from this nice article: https://medium.com/swift-india/concurrency-parallelism-threads-processes-async-and-sync-related-39fd951bc61d

#### 1.1 What python offers?


- Parallelism across different CPUs using multiprocessing.Process and concurrent.futures.ProcessPoolExecutor
- Concurrency, but unfortunately not parallelism, using *threading* module and concurrent.futures.ThreadPoolExecutor. 
    - Threads in general allow parallelism but not in python due to Global Interpreter Lock (GIL).
- Asynchronous programming (cooperative multitasking) using asyncio library.
- CPU-intensive vs IO-intensive tasks
    - "asyncio is often a perfect fit for IO-bound and high-level structured network code."

### 2. Multiprocessing

There are two main libraries for multiprocessing: 
- concurrent.futures, which provides high-level API and is easier to use
- multiprocessing, which gives more flexilibity at the expense of more boilerplate code

Below I give two small examples using each of the libraries.

#### 2.1 Using concurrent.futures
- Provides ProcessPoolExecutor and ThreadPoolExecutor classes
    - Allows creating a pool of processes or threads. Distributing tasks to the pool and managing processes in the pool is take care of by library.
    
- There are two ways to assign tasks to and gather results from the processes in the pool.
    - Executor.map function
    - Executor.submit function

**Executor.map(func, *iterables, timeout=None, chunksize=1)**
- Similar to map(func, *iterables)
- For each value in the iterables, the callable func is executed on different processes in the pool.

In [None]:
result = map(lambda x: x**2, [1,2,3,4,5])
list(result)

In [None]:
from concurrent.futures import ProcessPoolExecutor

# Since processes are forked, any global data will be available to each process but it is not shared
DIM = 10000

def worker(vector):
    time.sleep(0.01) ## Comment this line and run
    return np.linalg.norm(vector)

def parallel():
    vectors = [np.random.rand(DIM) for i in range(1000)]
    with ProcessPoolExecutor(max_workers=4) as executor:  # usually more workers than CPUs is not good idea
        result_iter = executor.map(worker, vectors) 
        result = sum(result_iter)
        print(f'Sum of Norms: {result}')

Let's look at how fast it is compared to sequential approach.

In [None]:
%%timeit -n 1 -r 5
parallel()

In [None]:
def sequential():
    vectors = [np.random.rand(DIM) for i in range(1000)]
    result = sum(worker(v) for v in vectors)
    print(f'Sum of Norms: {result}')

In [None]:
%%timeit -n 1 -r 5
sequential()

Remarks:
- Parallelism is best achieved when execution time per process >> communication time between processes

---

**Executor.submit(func, *args, **kwargs)**
- Schedules the callable, fn, to be executed as fn(*args **kwargs) and returns a Future object representing the execution of the callable.
- A Future object encapsulates the asynchronous execution of a callable: results as well as exceptions are wrapped into the object.


In [None]:
from concurrent.futures import as_completed

# an example to show how exception are be captured and sent to another process
def worker(inp):
    if np.random.random() < 0.3:
        raise Exception('Bad luck!')
    return {'pid': os.getpid(), 'result': inp*inp}

def master():
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = {executor.submit(worker, i) for i in range(10)}
        
        while len(futures) > 0:
            completed_futures = [f for f in futures if f.done()]  # we check if future is done or we will be blocked below
            for future in completed_futures:
                exception = future.exception()   # this method is blocking
                if exception is None:
                    print('Result: %s' % future.result())   # this method is blocking too
                else:
                    print('Exception: %s' % exception)
                futures.remove(future)

In [None]:
master()

Following is an easier way to do the same thing:

In [None]:
from concurrent.futures import as_completed

def worker(inp):
    if np.random.random() < 0.3:
        raise Exception('Bad luck!')
    return {'pid': os.getpid(), 'result': inp*inp}

def master():
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(worker, i) for i in range(10)]
        for future in as_completed(futures):
            exception = future.exception()
            if exception is None:
                print('Result: %s' % future.result())
            else:
                print('Exception: %s' % exception)

In [None]:
master()

Remarks:
- There are some more useful methods on Future object: add_done_callback
- *timeout* argument to result and exception methods is also very useful

#### 2.3 Using multiprocessing

Very simple example is shown below. However, in real applications, much more needs to be done to prevent deadlocks.

In [None]:
from multiprocessing import Process, Queue

def worker(qin, qout):
    while True:
        inp = qin.get()
        if inp is None:
            break
        # too messy to deal with exceptions!
        #if np.random.random() < 0.3:
        #    raise Exception('Bad luck!')
        qout.put({'pid': os.getpid(), 'result': inp*inp})

def master():
    qin, qout = Queue(), Queue()
    procs = [Process(target=worker, args=(qin,qout)) for i in range(4)]
    for p in procs:
        p.start()
    for i in range(10):
        qin.put(i)
    for i in range(10):
        print(qout.get())
    for i in range(len(procs)):
        qin.put(None)
    for p in procs:
        p.join()

In [None]:
master()

Remarks:
- Notice a lot of boilerplate when using multiprocessing as compared to concurrent.futures

#### 2.4 Pure python vs zeromq
- Advantages of python standard libraries:
    - No external dependencies
    - Quick and easy to implement
- Cons
    - Not robust. If a parent or one of the children processes dies, everything has to be launched again. Compare this to zeromq: when using zeromq, master or worker processes can be stopped and started at any time.
    - IO between processes is not as fast as zeromq

An excellent guide to zeromq, which is a lightweight networking library and concurrency framework.
- http://zguide.zeromq.org/page:all

### 3. Asynchronous Programming using asyncio

#### 3.0 An example of undesirable context-switching when using Threading
Here we run a function in two threads and both are sharing the common resource, stdout, where they print a line. Ideally, we want each function to print the line fully and not be interrupted in the middle.

In [None]:
from concurrent.futures import ThreadPoolExecutor

def print_func(s):
    for i in range(10):
        for c in s:
            print(c, end='')
        print()
        time.sleep(0.1)

    
with ThreadPoolExecutor(max_workers=2) as executor:  # change num of workers to 2
    executor.submit(print_func, '0'*50)
    executor.submit(print_func, '1'*50)

#### 3.1 Generators
Generators are functions that generates values. A function usually returns a value and then the underlying scope is destroyed. When we call again, the function is started from scratch. It’s one time execution. But a generator function can yield a value and pause the execution of the function. The control is returned to the calling scope.

In [None]:
def gen_squares(max_value=None):
    n = 0
    while True:
        yield n * n
        n += 1
        if n*n > max_value:
            break

A generator function doesn’t directly return any values but when we call it, we get a generator object which is like an iterable. So we can call next() on a generator object to iterate over the values. Or run a for loop.

In [None]:
squares = gen_squares()

In [None]:
next(squares)

In [None]:
squares_upto_100 = gen_squares(max_value=100)

In [None]:
next(squares_upto_100)

In [None]:
for s in gen_squares(max_value=200):
    print(s)

#### 3.2 Naive task switching using generators

In [None]:
from itertools import cycle

def print_func(s):
    for i in range(10):
        for c in s:
            print(c, end='')
        print()
        time.sleep(np.random.random())  # Ideally we want to yield here
        yield None # we don't care about values, only computation above!
        
def print_func2():
    s='+'*50
    for i in range(10):
        for c in s:
            print(c, end='')
        print()
        time.sleep(np.random.random())
        yield None

def loop(generators):
    for gen in cycle(generators): # [A, B ,C] -> A, B, C, A, B, C, ...
        try:
            value = next(gen)  # we don't care about value that is yielded
        except StopIteration:
            break

loop([print_func('0'*50), 
      print_func('1'*50), 
      print_func2()])

---

One of the many problems with the above naive approach is that tasks are not scheduled optimally. Ideally, we would like to "yield" from the generator above when it is waiting (wasting time) during *sleep* function. This is what the coroutines allows us to achieve: we can yield when we need to wait for something, for example, a response for an HTTP request.

#### 3.3 Async programming using Coroutines and asyncio

- Event Loop: The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations
- Coroutines are declared with async/await syntax. They are like function except that they can only be executed by scheduling them into the event loop. They yield control to the loop using "await" statements.

Revisiting the context-switching example

In [None]:
import asyncio

async def print_func(s):
    for i in range(10):
        for c in s:
            print(c, end='')
        print()
        await asyncio.sleep(np.random.random())
    
asyncio.ensure_future(print_func('0'*50))
asyncio.ensure_future(print_func('1'*50))
asyncio.ensure_future(print_func('+'*50))

#### 3.5 Small example: Fetching webpages

In [None]:
import requests

URLS = ['https://facebook.com',
                'https://github.com',
                'https://google.com',
                'https://microsoft.com',
                'https://yahoo.com']

def sequential(urls):
    start = time.time()
    for url in urls:
        r = requests.get(url)
    print('Elapse time: %s' % (time.time()-start))

In [None]:
sequential(URLS)

In [2]:
import asyncio
from aiohttp import ClientSession

async def fetch(url, session):
    async with session.get(url) as response:
        resp = await response.read()
        return resp
    
async def fetch_all(urls):
    tasks = []
    start = time.time()
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) 
        await asyncio.gather(*tasks) 
    print('Elapse time: %s' % (time.time()-start))

In [None]:
future = asyncio.ensure_future(fetch_all(URLS)) # not blocking
loop = asyncio.get_event_loop()
loop.run_until_complete(future) # blocking

In [None]:
asyncio.ensure_future(fetch_all(URLS))

### 4. Cool JupyterLab Trick

In [3]:
def some_cool_function(data):
    data['x'] = 0
    while True:
        data['x'] += 1
        time.sleep(0.5)
        print('=+', end='')

In [4]:
data={}
some_cool_function(data)

=+=+=+=+=+

KeyboardInterrupt: 

In [5]:
data['x']

6

In [46]:
async def some_cool_function(data):
    data['x'] = 0
    while True:
        data['x'] += 1
        await asyncio.sleep(0.5)
        #print('=+', end='')

In [40]:
task = asyncio.ensure_future(some_cool_function(data))

=+=+=+=+=+=+=+=+=+=+

In [44]:
data['x']

38

=+=+=+=+=+=+=+=+=+

In [45]:
task.cancel()

=+

True