# Concurrency and Parralism in Python

## pyconil 2019
### by Guy Doulberg

# Talk Metadata 

# Who am I?


* My name is Guy Doulberg
* 15 years of development experience
* I have been developing in python in the last 3 years



* Working for [Satellogic](https://satellogic.com/) 
* A startup that want to capture frequent high quality images of the earth
* We use satellites to produce images
* I am resposible for some of the data processing that is being done after the cpature was taken
* We used the following methods in our system.

An image that was captured by one of Satellogic's sattellites 
![satellogic](https://satellogic.com/wp-content/themes/satellogic-theme/dist/assets/images/what-we-do.jpg)

# Why am I doing this talk?



* Because it is important for every python developer
* Becasue I was happy to hear this talk about 2 years ago when I had the need to concurrency in python
* Because it is fun


# What am I going to talk about?

* What is Concucurency and Parralism
* How to implement softwares that use Concurrency and Parraslism in the python stadard library 
* How to implement Concurrency and Parraslism acrross hosts (cluster)


# The examples you are going to see are made up, synthesised to make points

# Which python?


* python 3.7.3
* I don't know enough about python 2.7

# Resources:

* Git repo: https://github.com/guydou/pycon2019_con_para
* Slides: https://guydou.github.io/pycon2019_con_para/index.html#/ ![QRCODE](https://guydou.github.io/pycon2019_con_para//images/qrcode.png)

# Theortical background

# Concurrency

Consider you have two or more tasks that you need to execute on the same time. The tasks can start and be executed on the same time. 


# Parralism 

Consider you have two or more tasks that run exactly on the same time (in parralel paths).

![image](http://alvinalexander.com/sites/default/files/photos/parallelism-vs-concurrency.png)

image taken from "Parallelism vs concurrency in programming" [link](https://alvinalexander.com/photos/parallelism-vs-concurrency-programming)

# IO bound vs CPU bound

When considering optimiziation of a code block by either running the code in parrallel or concurrently, you should indentify your **bottleneck** properly. 






* If your program can run faster when using more computing units (CPUs) - your program is considered to be **CPU bound**


* If your program can run faster when using more bandwidth or reading/writing to/from several sources. Your program is considered to be **IO bound**

It is important to identify the nature of your program becasue:



* Adding more computing units to IO bounded program will not help -  (might do worst)

* Running a CPU bounded program using a single CPU is a sub-optimal.

# Memory Bounded 


* A program can also be bounded by the available memory of the machine

* If you program can run faster when using more memory (RAM)

# Stay Tuned:

* In one of the last slides I will show how to use cluster of machines, that could be a solution for such programs. 



# Dealing with IO Bounded tasks


* Basically when dealing with IO bounded tasks I would like to break the code that is boudned by IO and run it *Concurrently*.



## Physics 
 * In a single machine you can't utilize more than your avialable disk I/O
 
 * In a single machine you can't utilize more than your avialable network I/O
 
 * In a client server architecture, a client can't work faster than the server

Let's do it in python

First I will create a usecase of I/O bounded task,

A flask server that runs the following code:

```python
from flask import Flask
app = Flask(__name__)
import time

@app.route("/")
def sleep_well():
    sleep_duration = 2
    time.sleep(sleep_duration)
    return "Hello World!"
```

I am running this flask server behind a gunicorn, in order to control the number of requests it can handle:

```shell
gunicorn --bind 0.0.0.0:5000  -w 10 wsgi
```

# Naive implemetation will be **sequential** 

In [1]:
import time
import requests

host = "http://localhost:5000"
start_time = time.time()

for i in range(10):
    body = requests.get(host)
    assert body.text == "Hello World!"
    
end_time = time.time()
total = end_time - start_time

print("Total time of execution: %s" % (total))

Total time of execution: 20.102389812469482


# Using concurrent.futures.ThreadPoolExecutor


In [1]:
from concurrent.futures import ThreadPoolExecutor, as_completed 

* **ThreadPoolExecutor** - An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously ....
* **as_completed** - Returns an iterator over the Future instances (possibly created by different Executor instances) given by fs that yields futures as they complete (finished or were cancelled). 


## The code I want to run concurrently is the code that is actaully doing the calls to the server:

In [3]:
def io_bounded_task(host):
    body = requests.get(host)
    return body

## Now I am going to submit this code 10 times


## The thread pool will exectute the code concurrently 

## And my code will wait for all the executions to finish

In [4]:

start_time = time.time()
with ThreadPoolExecutor() as e:
    futures = {e.submit(io_bounded_task, host) for i in range(10)}
    for future in as_completed(futures):
        assert future.result().text == "Hello World!"
    
end_time = time.time()
total = end_time - start_time
print("Total time of execution: %s" % (total))

Total time of execution: 2.0783989429473877


# What happens when we reach the limit?

* Depends on what is blocking us
* If we just exceedrd the available bandwidth (by the server/client ISP, network card) - we just wouldn't be able to run more concurrent tasks
* We could also be blocked by the server we are trying to reach 

It might make sense to **throttle** our requests


## In the following code we run 20 threads while our server can handle only 10

In [5]:
start_time = time.time()
with ThreadPoolExecutor() as e:
    futures = {e.submit(io_bounded_task, host) for i in range(20)}
    for future in as_completed(futures):
        assert future.result().text == "Hello World!"
    
end_time = time.time()
total = end_time - start_time

# we are being throttled by the server so it will not be broken
#It is not healthy to count on the server for these things, we should count on our own
print("Total time of execution: %s" % (total))

Total time of execution: 4.043481826782227


## It doesn't make sense to run with 20 threads lets run only with 10

In [6]:
start_time = time.time()
with ThreadPoolExecutor(max_workers=10) as e:
    futures = {e.submit(io_bounded_task, host) for i in range(20)}
    for future in as_completed(futures):
        future.result().text == "Hello World!"
    
end_time = time.time()
total = end_time - start_time

print("Total time of execution: %s" % (total))

Total time of execution: 4.078224420547485


# asyncio
## When dealing with IO bounded tasks we can also use the asyncio module



## asyncio theory 

1. The idea is that there is an event loop(main thread), 

1. The event loop deligates iobounded tasks to other threads and continues with its exectution sequence. 

1. When result is ready, the event loop goes back to the the code that called the task and continues its sequence.

1. The event loop decides when and which code it runs, the code it runs must not block! 

1. Conisder we have 10 non blocking methods that fetch data from a server, the main loop runs them concurrently. 


In [13]:
import asyncio
import aiohttp

## define a coroutine that accesses the host
A function that can be entered and exited multiple times, suspended and resumed each time, is called a coroutine


In [52]:
async def async_io_bounded_task(host):
    async with aiohttp.ClientSession() as session:
        async with session.get(host) as response:
            text = await response.read()
            assert text == b"Hello World!"

## await for all the corutines to return

* Await keyword works only if you are in the an active event loop

In [56]:

# creating a coroutine mehtod to run several corutines
async def main():
    tasks = []
    await asyncio.gather(*[async_io_bounded_task(host) for i in range(10)])
    


## Create an event loop

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.

Application developers should typically use the high-level asyncio functions, such as asyncio.run(), and should rarely need to reference the loop object or call its methods. This section is intended mostly for authors of lower-level code, libraries, and frameworks, who need finer control over the event loop behavior.

In this example I am using the event loop of jupyter

In [57]:
start_time = time.time()
# I am running in a jupyter notebook so I have already event loop running:
await main()

# if you don't have event loop then:
# asyncio.run(main())
end_time = time.time()
total = end_time - start_time

print("Total time of execution: %s" % (total))

Total time of execution: 2.018639087677002


# Which module to choose?

I think it is a matter of taste, 

Maybe I am old fashined, but I prefer the the Exector module becasue it is easier for me to folow the code path 



# CPU bounded jobs


## For the propuse of the talk I am going to produce a big array of random floats





In [3]:
import numpy
random_numbers = numpy.random.random(300000000)

* I am using numpy to speed things up, 

* The numpy object is also an object which is easier to share accross processes - I will use this feature

# Let's  try to get the maximal value out of the array

# First Attempt: sequential approach

In [4]:
import time
start_time = time.time()
print("Max values is %s " % max(random_numbers))

end_time = time.time()
total = end_time - start_time

print("Total time of execution: %s" % (total))


Max values is 0.9999999963405415 
Total time of execution: 15.650021076202393


## CPU utilization when running the sequential max:

![image](https://guydou.github.io/pycon2019_con_para//images/max_seq.png)

# Lets use threads


## From the pthread library of C/C++ description:

```
... It is most effective on multi-processor or multi-core systems where the process flow can be scheduled to run on another processor thus gaining speed through parallel or distributed processing...
```



## Proposed implementation:

1. Split the list to chunks

1. Find the max value in each chunk

1. Find the maximal value in all maximal values



### First lets check how much time it takes to run on a single chunk

In [5]:
num_chunks = 100
chunk_number = 10
def slice_chunk(chunk_num, array, num_chunks):
    offset = int(len(array)/num_chunks)
    return array[chunk_number*offset:(chunk_number+1)*offset]

start_time = time.time()
max(slice_chunk(chunk_number, random_numbers, num_chunks))
end_time = time.time()
total = end_time - start_time
print("Total time of execution: %s" % (total))

Total time of execution: 0.16901397705078125


**Can running on these kind of chunks in parllel can help us?**

1. We have 4 cpus
1. Finding a max value out of a chunk take 0.2 seconds
1. We have 100 chunks
1. We can run 25 groups of chunk in parallel 

The total time of execution should be:
```python
25*0.2 = 5 #seconds
```

### Run the on chunk in parllel wait for thre max values to return and find the maximal value

In [12]:

start_time = time.time()

with ThreadPoolExecutor() as e:
    futures = {e.submit(max, slice_chunk(chunk_number, random_numbers, num_chunks)) for chunk_number in range(num_chunks)}
    max_values = [future.result() for future in as_completed(futures)]

print("Max values is %s " % max(max_values))
        
end_time = time.time()
total = end_time - start_time
print("Total time of execution: %s" % (total))

Max values is 0.99999934045655 
Total time of execution: 16.2425696849823


![oops](https://thumbs.dreamstime.com/z/vector-oops-symbol-over-white-29840798.jpg)

# Say hello to the GIL:  or Global Interperter lock

* The GIL is a mutex (lock) that prevent access to python objects from different threads.

* In our case we need access from different threads, but the GIL in practice makes the threds code to run sequetialy using a single thread

## CPU utilization when running mutlithreading code:

![image](https://guydou.github.io/pycon2019_con_para/images/threads_max.png)

## Lets try the same thing using processes 

In [7]:
from concurrent.futures import ProcessPoolExecutor

In [8]:
start_time = time.time()    

with ProcessPoolExecutor() as e:
    futures = {e.submit(max, slice_chunk(chunk_number, random_numbers, num_chunks)) for chunk_number in range(num_chunks)}
    max_values = [future.result() for future in as_completed(futures)]

print("Max values is %s " % max(max_values))

end_time = time.time()
total = end_time - start_time
              
print("Total time of execution: %s" % (total))

Max values is 0.99999934045655 
Total time of execution: 8.223525524139404


## Why is it not faster

* Spawning proceses is an heavy task 
* Marshaling/Unmarshaling of chunk of data is heavy

## What to do?

* Initialize the processes in advance and reuse the proceses
* Use shared memory and pointer 

In [35]:
import SharedArray as sa
array_key = "shm://test"
sa.delete(array_key)


In [9]:
import SharedArray as sa
array_key = "shm://test"

array = sa.create(array_key, random_numbers.shape, random_numbers.dtype)
array[:] = random_numbers[:]

## Now we need to retrieve the chunk of array from the from the shared array

In [10]:

def max_shared_array(chunk_num, array_key, num_chunks):
    array = sa.attach(array_key)
    return max(slice_chunk(chunk_num, array, num_chunks))

In [11]:

start_time = time.time()

with ProcessPoolExecutor() as e:
    futures = {e.submit(max_shared_array, chunk_number, array_key, num_chunks)
               for chunk_number in range(num_chunks)}
    max_values = [future.result() for future in as_completed(futures)]

print("Max values is %s " % max(max_values))

end_time = time.time()
total = end_time - start_time


              
print("Total time of execution: %s" % (total))

Max values is 0.99999934045655 
Total time of execution: 6.6549787521362305


## CPU utilization when running multiprocess code:


![image](https://guydou.github.io/pycon2019_con_para/images/processes_max.png)

## Now lets check what happens when running with Numpy

In [83]:
start_time = time.time()
    

print("Max values is %s " % numpy.max(random_numbers))

end_time = time.time()
total = end_time - start_time
              
print("Total time of execution: %s" % (total))

Max values is 0.9999999998221734 
Total time of execution: 0.1670057773590088


# CPU Bound conclusion:

1. Don't forget the GIL
2. Multiprocessing is your friend
3. Use native C/C++ labraries suche as: numpy, pandas.

# Dask


* **Dynamic task scheduling optimized for computation**
* **“Big Data” collections**


![architecture](https://docs.dask.org/en/latest/_images/collections-schedulers.png)

* In other words, you can run a complex excution grpah on many data types using the same code on you laptop and on a multi host cluster

* A way to solve memory bounded problems




### I will demo only the library **dask.distirbuted** that can help you run you code utilizing CPUs and Hosts

# Cluster setup:


In [23]:
import dask

from dask.distributed import Client

client = Client()  # set up local cluster on your laptop
# client = Client(Cluter Host) 
client

0,1
Client  Scheduler: tcp://127.0.0.1:37657,Cluster  Workers: 4  Cores: 4  Memory: 16.68 GB


# Lets try to find the maximal value out a the randon_numbers. 


1. Find the maximal value in each chunk

1. Find the maximal value out of all maximal values



In [21]:
def dask_mask_array(chunk_num):
    array_key = "shm://test"
    return max_shared_array(chunk_num, array_key, num_chunks)

In [20]:
start_time = time.time()
max_values = client.map(dask_mask_array, range(num_chunks) )
max_value = client.submit(max, max_values)

print(max_value.result())

end_time = time.time()
total = end_time - start_time
print("Total time of execution: %s" % (total))

0.999999955677862
Total time of execution: 7.13366961479187


## CPU utilization when running dask code:


![image](https://guydou.github.io/pycon2019_con_para/images/dask.png)

## Thank You