# Chapter 13: Concurrency 🚀

- Threads
- Multiprocessing
- Futures 
- AsyncIO

**By Will Norris**

## Sequential Programming: 
- Sequential programming model is intuitive and natural
    - Do things **one step at a time** (The way humans think)
- In programming languages: 
    - Each of these real-world actions is an abstraction for a sequence of finer-grained actions
    - Flow: ```Open the cupboard, select a tea, check water level in kettle, if low: add more, boil water, pour water in cup, wait for tea...``` 
- **But**, what we do while the water is boiling is up to us
    - Do we simply wait? 
    - Or, do we do other tasks such as starting our toast or fetching the newspaper (asynchronous tasks)
        - The whole time aware that we are waiting for our water to boil!
- Tea kettle makers know people tend to operate asynchronously, so they add a warning for when your tea is done, to remind you to come back to the task at hand. 
    - Finding the right balance of sequentiality and asynchrony is a characteristic of efficient people, **the same is true of efficient programs**

### Why Concurrency? 
- At some point it is not cost efficient to buy a faster machine (**scaling vertically**)
- Instead of scaling computation up, we can go out (**scaling horizontally**)
    - Allows us to use cheap hardware, and accomplish pieces of computation across a set of threads/processors/nodes 
- In modern computing, we can divide the problem entirely across nodes (processors) 
    - In legacy computing, we could take advantage of "switching", which means rapidly swapping between threads on a single process to accompish multiple things "at once" (time sharing systems) 
    

## Multiple Processes: 
__Motivating Factors:__
- **Resource Utilizaton:** 
    - Programs are always waiting for external operations (File I/O), and can't do anything while they wait. Let's use that time!
- **Fairness:**
    - Multiple users and programs may have equal claim on the machine's resources. We want to let them share "slices" of time rather than give one before the other 
- **Convenience:**
    - It is easier to write several programs that each perform a single task and have them coordinate with each other when needed than to write one big program. 

## Threads: 
- Threads allow multiple streams of program control flow to coexist within a process. 
- They share process-wide resources (memory, file handles) 
    - But, each thread has its own program counter, stack, and local variables 
- Threads provide a natural decomposition for exploiting hardware parallelism when we have multiple processors
    -  multiple threads within the same program can be scheduled simultaneously on multi CPU's
- Most modern OS's treat threads as **lightweight processses** and use them (not processes) as the basic unit of scheduling

![](https://imgur.com/5mte34P.png)

In [None]:
from threading import Thread

class InputReader(Thread):
    def run(self):
        self.line_of_text = input()

In [None]:
print("Enter any text and press enter: ")
thread = InputReader()
thread.start()

count = result = 1
while thread.is_alive():
    result = count * count 
    count += 1

print("calculated squares up to {0} * {0} = {1}".format(
    count, result))
print("while you typed '{}'".format(thread.line_of_text))

In [None]:
import json 
from urllib.request import urlopen 
import time 
import requests
import pyowm

CITIES = ['Edmonton', 'Victoria', 'Winnipeg', 'Fredericton',
          'Halifax', 'Toronto', 'Charlottetown',
          'Quebec', 'Regina']

class TempGetter(Thread):
    def __init__(self, city):
        super().__init__()
        self.city = city
        self.owm = pyowm.OWM('be06b12aa45e1ca05a8f972f81376c6d')
    def run(self):
        city = self.owm.weather_at_place(self.city)
        weather = city.get_weather()
        self.temperature = weather.get_temperature('fahrenheit')['temp']
        
threads = [TempGetter(c) for c in CITIES]
start = time.time()
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
for thread in threads:
    print("it is {0.temperature:.0f}°C in {0.city}".format(thread))
print(
   "Got {} temps in {} seconds".format(
   len(threads), time.time() - start))


### What's actually happening here? 
- 10 threads are started
    - Remember to call ```super``` to ensure we instantiate an actual ```Thread``` object. 
    - We construct 10 thread objects from within the main thread, then run them later. 
        - Data constructed in one thread is accessible from other running threads
- Each thread is joined with eachother 
    - Joining threads tells each one to "wait for the thread to complete before doing anything" 
    - This means the second for loop won't end until all 10 threads have finished 
- **In threads, all state is shared by default**
    
    

### Running on one thread instead, much slower:

In [None]:
threads = [TempGetter(c) for c in CITIES]
start = time.time()
for thread in threads:
    thread.run()
for thread in threads:
    print("it is {0.temperature:.0f}°C in {0.city}".format(thread))
print(
   "Got {} temps in {} seconds".format(
   len(threads), time.time() - start))

## Threads sound great! What's the catch?? 
- Python programmers avoid threading for several reasons: 
    - Better alternative methods to concurrent programming in Python
    - Shared Memory
    - Global Interpreter Lock
    - Thread Overhead

**Shared Memory:**
- Shared memory is both a major advantage and disadvantage of threading
    - It is convenient to have access to all variables in memory from any thread 
    - However, this can cause horrible inconsistencies in the program state 
        - It is easy to allow one thread to change a value that another thread expected, which can cause unknown errors. 
- We can "synchronize" thread's access to variables, however this can get complex and improper synchronization can be hard to find.

**The Global Interpreter Lock:**
- To efficiently manage memory , garbage collection, and calls to machine code in libraries, Python uses the Global Interpreter Lock (GIL)
- It is impossible to turn off and it makes it impossible to properly use threads for parralell processing in python
    - The GIL will prevent any two thread's from doing work at the exact same time, even if they have work to do. ("doing work" == using CPU) 
    - The GIL is released as soon as the thread starts to wait for anything 
    
**Why do we still have the GIL?**
- It makes the reference implementation much easier to maintain (language structure)
- It makes single core python faster

**Thread Overhead:**
- Each thread takes up some memory (both in python and the OS kernel) to keep track of the thread state 
- Switching (jumping between threads) uses some CPU time
    - This can be improved with proper thread management and the use of ```ThreadPool``` to help reuse threads

## The better tool: Multiprocessing
- Multiprocessing is designed for when CPU-intensive jobs can be run in parallel and mulitple cores are available. 
    - There are almost always multiple cores available now! (4-core Rasberry pi = $30)


In [None]:
from multiprocessing import Process, cpu_count
import time 
import os 

class MuchCPU(Process):
    def run(self):
        print(os.getpid())
        for i in range(200000000):
            pass
        
procs = [MuchCPU() for f in range(cpu_count())]
t = time.time()
for p in procs:
    p.start()
for p in procs: 
    p.join()
print("work took {} seconds".format(time.time()-t))

- I have a 2-core (4 virtual cores) machine, so python will spin up 4 different processes each with a unique pid (process ID) 
- This took me 8.28 seconds versus 12.96 seconds on a 2014 similar setup. 
    - This is a 36% increase in performance without increasing core count
    
- When I run this process on my laptop, we can see that all 4 cores hit 100% usage for a brief moment.
    - This is because each core is simultaneously crunching 2,000,000 numbers
    
![](https://imgur.com/zzR5qxm.png)

In [None]:
class MuchCPU(Thread):
    def run(self):
        print(os.getpid())
        for i in range(200000000):
            pass
        
procs = [MuchCPU() for f in range(cpu_count())]
t = time.time()
for p in procs:
    p.start()
for p in procs: 
    p.join()
print("work took {} seconds".format(time.time()-t))

- Here we subclass on ```Thread``` instead of ```Process```
    - This means that each thread is sharing the same process 
- **Why is it not 4x slower?**
    - When we use all 4 processes on my laptop, they have to share computation with the other stuff happening on my laptop. 
    - When we use a single process and multithread, the remaining three processes can do laptop stuff 

## Multiprocessing Pools
- It can get really hard to allocate processes for each task. 
- We need something to manage where/when code is running and when each process interacts with eachother.
- Pools are designed to distribute tasks to available processors! 
    - Use FIFO scheduling 
    - Map reduce architecture: 
        - maps input to different processors and collects the output from all the processors
        - After execution, returns output in form of a list or array
        - Nothing is returned until all processes finish

### When Should I use Pool or Process???
**Pool**
- Pool allows you to do multiple jobs per process, which can make your program easier to parallelize
- Much better for large jobs
- Example: You have a million tasks to execute in parallel
    - You can create a ```Pool``` with ```num_processes = CPU_count``` and pass them to ```pool.map```
    - The pool will distribute those tasks to the worker processes and collects the return values in a list for the parent process
    
**Process**
- If you have a small subset of tasks then a Pool may be overkill for the task 
    - All the pickling and moving and scheduling slows things down!
- File IO
    - In pooling if there is a long IO operation, due to the FIFO scheduling, the core will wait to schedule another process
    - Process would suspend the IO process and schedule another one 

In [None]:
import random 
import time
import multiprocessing

data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker(input1):
    inputs = input1[0]
    the_time = input1[1]
    print(" Processs {}\tWaiting {} seconds".format(inputs, the_time))
    time.sleep(int(the_time))
    print(" Process {}\tDONE".format(inputs))

def mp_handler():
    p = multiprocessing.Pool(2)
    p.map(mp_worker, data)

mp_handler()

#### So, what is happening here? 
- We have two pools to corrall our processes into. This means that each pool will automatically take the next task in our list of tasks as they finish. 

## Queues: 
- Queues are good for sending messages from one process into one or more other processes
    - Any picklable object can be sent into a Queue
        - Pickling is expensive, especially on large objects!!!

**Queues Example:**
- We have a very simple reader and writer of messages off of a queue 
- The writer sends a lot of integers to the reader 
    - when the writer runs out of numbers, it sends a ```"DONE"``` message
    - when the reader receives a ```"DONE"``` message, it breaks the read loop

In [None]:
from multiprocessing import Process, Queue
import time
import sys 

def reader_proc(queue):
    # Read from the queue. spawned as seperate process 
    while True:
        msg = queue.get() # just read from queue 
        if (msg == "DONE"):
            break
            
def writer(count, queue):
    # Write to our queue
    for ii in range(0, count):
        queue.put(ii)
    queue.put('DONE')
        
pqueue = Queue()
for count in [10**4, 10**5, 10**6]:
    # reade_proc() reads pqueue as sperate process
    reader_p = Process(target=reader_proc, args=((pqueue),))
    reader_p.daemon = True
    reader_p.start()
    
    _start = time.time()
    writer(count, pqueue)
    reader_p.join()
    print("Sending {0} numbers to Queue() took {1} seconds".format(count,(time.time()-_start)))

## So... What's the Catch with Multiprocessing? 

- **"There is no best way to do concurrency; this is especially true in Python"**
    - Every problem is approached differently in parallel, part of the challenge is choosing the best way to parallelize. 

- Primary drawback with multiprocessing is that sharing dta between processes is super expensive. 
    - We have to employ some data structure that pickles data (queue, pipe, etc)

- Thus, due to the overhead of communication, **multicprocessing is best applied when:**
    - Information passed between processes is small 
    - The work done in each individual process is very large

Additional Drawbacks:
- It can also be very difficult to tell which process is accessing each variable in memory and when
    - It is easily possible for one process to overwrite a variable in memory when another process expects it to remain unchanged 

## Asynchronous Concurrency: Futures
- Futures "Provide a high-level interface for asynchronously executing callables" - Python 3.x docs
- They allow us to structure our code such that it is easier to track down when we alter the shared state
- A future is just an object that wraps a function call. 
    - The function is run in the background in a thread or process 
    - Checks if things have completed and get results when they do 
    
- Fun Fact: In Computer Science, future refers to a construct that can be used for synchronization when uszing concurrent programming techniques
    - The future is a way to describe the result of a process or thread before it has finished processing (pending result)

In [None]:
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import os 
import urllib.request

In [None]:
def downloader(url):
    req = urllib.request.urlopen(url)
    filename = os.path.basename(url)
    ext = os.path.splitext(url)[1]
    if not ext: 
        raise RuntimeError("URL does not contain an extension")
    
    with open(filename, 'wb') as file_handle:
        while True:
            chunk = req.read(1024)
            if not chunk:
                break
            file_handle.write(chunk)
    msg = 'Finished downloading {filename}'.format(filename=filename)
    return msg

def main(urls):
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(downloader, url) for url in urls]
        for future in as_completed(futures):
            print(future.result())

In [None]:
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
main(urls)

### So, what's actually going on here? 
- In main we instantiate our thread pool: 
    - Our pool has 5 workers
    - We use list comprehension to create a group of futures (jobs) and then we call the ```as_complete()```.
        - This function is an iterator that yields the futures as they complete. 
        - When they complete, we print the result, which is the string returned from ```downloader()```

**Futures' Structure:** 
- Once the ```executor``` has been constructed, we can ```submit``` jobs to it
    - The ```submit()``` method immediately returns a ```Future``` object, which is a promise to give a result eventually
- Futures have built in ```queue``` functionality: 
    - We can construct a ```queue``` of futures (essentially all running) and loop through the queue over and over
        - If the future at the end of the queue is still running, put it back at the front of the queue
        - If the future is still running, then get the ```result()```

## AsyncIO: 
**Current state of the art in Python concurrent programming**

- Combines futures, coroutines, and an Event Loop
- Can be used for a few different concurrent tasks, but designed for network I/O
    - Most network apps spend a lot of time waiting for data to come in
    - We handle each client in a seperate thread, but threads use memory and other resources, so AsyncIO uses coroutines

**Example:** You have a single core machine. You receive a request and need to make two database queries. Each query takes 50ms. 
- Synchronously: You must completely resolve the first query before starting the second. Total time = 100ms
- Asynchronous: You can send off query one, send off query two, then wait for each one individually. Total time = 50ms
    
**From Hackernoon.com:**

    "Asyncio is a beautiful symphony between an Event loop, Tasks and Coroutines all coming together so perfectly — its going to make you cry."

### Event Loop
- In its most simple form, and Event Loop is just a loop that runs tasks one at a time.
- The special part is that when the running task makes a blocking call (network request)
    - The event loop can let another task take a turn, and remembers roughly how long the blocking task will be waiting 
    
    *"The event loop time is precious. If you are not making progress, you should step off the loop, so that someone else can. Event loop is the measure of progress" -- Miklos Philips*

### The Coroutine & Task
- Coroutines are essentially *stateful* functions
    - When acoroutine is waiting for something it can give up control of the event loop, but save it's state for when it is ready again
- To pause a couroutine we use ```await other_coroutine```
    - This tells our current coroutine to pause, and immediatley schedules our ```other_coroutine``` to run

In [1]:
import asyncio
import random

async def coroutine_1():
    print('coroutine_1 is active on the event loop')
    print('coroutine_1 yielding control. Going to be blocked for 4 seconds')
    await asyncio.sleep(4)
    print('coroutine_1 resumed. coroutine_1 exiting')
    
async def coroutine_2():
    print('coroutine_2 is active on the event loop')
    print('coroutine_2 yielding control. Going to be blocked for 5 seconds')
    await asyncio.sleep(5)
    print('coroutine_2 resumed. coroutine_2 exiting')

In [2]:
# create the event loop 
loop = asyncio.get_event_loop()

#schedule both coroutines to run on the event loop 
loop.run_until_complete(asyncio.gather(coroutine_1(), coroutine_2()))

coroutine_2 is active on the event loop
coroutine_2 yielding control. Going to be blocked for 5 seconds
coroutine_1 is active on the event loop
coroutine_1 yielding control. Going to be blocked for 4 seconds
coroutine_1 resumed. coroutine_1 exiting
coroutine_2 resumed. coroutine_2 exiting


[None, None]

In [9]:
import asyncio
import random

@asyncio.coroutine
def random_sleep(counter):
    delay = random.random() * 5
    print("{} sleeps for {:.2f} seconds".format(counter, delay))
    yield from asyncio.sleep(delay)
    print("{} awakens".format(counter))
    
@asyncio.coroutine
def five_sleepers(): 
    print("Creating five tasks") 
    tasks = [asyncio.ensure_future(random_sleep(i)) for i in range(5)]
    print("Sleeping after starting five tasks")
    yield from asyncio.sleep(2)
    print("Waking and waiting for five tasks")
    yield from asyncio.wait(tasks)
    
asyncio.get_event_loop().run_until_complete(five_sleepers())
print("Done five tasks")

Creating five tasks
Sleeping after starting five tasks
0 sleeps for 4.32 seconds
1 sleeps for 3.57 seconds
2 sleeps for 4.77 seconds
3 sleeps for 3.04 seconds
4 sleeps for 3.62 seconds
Waking and waiting for five tasks
3 awakens
1 awakens
4 awakens
0 awakens
2 awakens
Done five tasks


### Let's piece this out: 
- ```loop.run_until_complete()``` gets the event loop and instructs it to run a future until it's done
    - The future is ```five_sleepers()```
- Inide of the ```five_sleepers``` future, we construct five ```random_sleep``` futures
    - adds them to the loop's task queue so they can execute concurrently when control is returned to the event loop
- Control is returned when we call ```yield from asyncio.sleep``` to pause execution
    - During this break the event loop executes the tasks that are next in the queue (the five futures we just made)
- If any sleep calls inside ```random_sleep``` are shorter than 2 seconds, the event loop passes control back into the relevant future
    - this prints its awakening message before returning
- When ```five_sleepers``` wakes up, it will wake up each ```random_sleep``` task 