## Lecture 9: Python Concurrency
### March 29, 2023

Partly based on [https://nyu-cds.github.io/python-concurrency/](https://nyu-cds.github.io/python-concurrency/)


## Improving performance by using concurrency

Concurrency vs parallelism:

    Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.
    
[source](https://medium.com/@itIsMadhavan/concurrency-vs-parallelism-a-brief-review-b337c8dac350)
- the link does a good job 
- concurency is about dealing with difrent things at once but does not nessvarilly have to be involved with wokring on both tasks all at once 
- u have 2 tasks, you can be working on them togther, but at each time you do not have to be working on both, like you could be rotating between the two at difrenet time steps
- parallism means you are doing two tasks at the same time, so that requires mutliple cpu cores. 

We will illustrate some benefits of concurrency with a program downloading images from the `imgur.com` website.

For this you will need to:

- create an account in [imgur.com](https://imgur.com/)
- register your application [here](https://api.imgur.com/oauth2/addclient)
  - Authorization Type: __OAuth 2 authorization with a callback URL__
  - Authorization Callback URL: __https://www.getpostman.com/oauth2/callback__
  - email:
  - Description:
  

---
The functions below fetchs a list of images and download them __imgur__ repository: 
[https://imgur.com/](https://imgur.com/)

- We will start with a version that downloads images sequentially, or one at a time

- Then improve the performance by introducing multiprocessing and threading

---
We will split the functionality into three separate functions, see the file `download.py`
- get_links
- download_link
- setup_download_dir

In [2]:
from time import time

# 'replace with your client ID'
CLIENT_ID = '989ced793a1b0ef'
from download import setup_download_dir, get_links, download_link

ts = time()
download_dir = setup_download_dir()

links = [l for l in get_links(CLIENT_ID)]

for i, link in enumerate(links):
    print("%2d %s" % (i, link))
    download_link(download_dir, link)

print('Took {}s'.format(time() - ts))

 0 https://i.imgur.com/4lGmQfq.jpg
 1 https://i.imgur.com/4lyQK.png
 2 https://i.imgur.com/4laUQHt.jpg
 3 https://i.imgur.com/4l1xB.jpg
 4 https://i.imgur.com/4lapcy1.jpg
 5 https://i.imgur.com/4lzqjWi.png
 6 https://i.imgur.com/4lrBXJY.jpg
 7 http://i.imgur.com/4lkspcfh.gif
 8 https://i.imgur.com/4lGS2.jpg
 9 https://i.imgur.com/4ll0LsP.jpg
10 https://i.imgur.com/4lnOz6z.jpg
11 https://i.imgur.com/4lLayye.jpg
12 https://i.imgur.com/4l0vHEe.jpg
13 https://i.imgur.com/4lyXrC5.gif
14 http://i.imgur.com/4ldedsXh.gif
15 https://i.imgur.com/4lw7hzs.gif
Took 6.466162204742432s


In [3]:
ls images/

[0m[01;35m4l0vHEe.jpg[0m  [01;35m4lGmQfq.jpg[0m  [01;35m4lapcy1.jpg[0m   [01;35m4ll0LsP.jpg[0m  [01;35m4lw7hzs.gif[0m  [01;35m4lzqjWi.png[0m
[01;35m4l1xB.jpg[0m    [01;35m4lLayye.jpg[0m  [01;35m4ldedsXh.gif[0m  [01;35m4lnOz6z.jpg[0m  [01;35m4lyQK.png[0m    [01;35m59aAlYK.gif[0m
[01;35m4lGS2.jpg[0m    [01;35m4laUQHt.jpg[0m  [01;35m4lkspcfh.gif[0m  [01;35m4lrBXJY.jpg[0m  [01;35m4lyXrC5.gif[0m  [01;35mizybG.png[0m


---

- To improve the performance of the image downloader we can run **multiple copies** of the program at the same time. 


- However, we would need to know what images are available so that we could ensure that one process didn’t download an image that had already been downloaded by a different process.  


- Fortunately the multiprocessing module is available for this purpose.

---

- how can we decrease the amount of times it takes to download an image 

### Pool

- To use multiple processes we need a multiprocessing **Pool**. 


- The Pool class provides a map method that runs a function as a separate process, passing arguments from a supplied iterable. 


- The iterable is divided into a number of chunks, so that each process gets roughly the same number of elements. 


- We will pass the list of URLs to the pool, which starts 8 new processes and use each one to download the images in parallel.

- here we are effectivly jsut running many version of the same code at once. 
- want to make sure that you are not downloading immages, twice so want to use multiprocessing module, lets them run the same code multiple times with out repeated work 
- pool is a map method 
- to use it we have this function downlaod links and directories where they should go, and this is itterable. 
- we have this funciton downlaod link and then we suply our link thne id 
- the iterable is devided into the number of ellements

In [4]:
from multiprocessing import cpu_count
print("number of CPU cores:", cpu_count())

number of CPU cores: 16


In [5]:
from functools import partial
from multiprocessing.pool import Pool

def multi_processes_download():
    ts = time()
    download_dir = setup_download_dir() ## irweBLW
    links = [l for l in get_links(CLIENT_ID)]

    # functools.partial makes a new version of a function 
    # with one or more fixed arguments
    download = partial(download_link, download_dir) ##  makes a copt of this function
   
    with Pool(8) as p: ## this sets the number of cores you are running. 
        p.map(download, links) ## map download to link
  #      
    print('Took {}s'.format(time() - ts)) ## 

multi_processes_download() ## this reduce the  time the factor of 8. 

Took 0.8304293155670166s


---

Although easy to implement, the parallelism bears some drawbacks:
- each process contains **a copy of the entire memory**
- it does not handle processes that depend on each other

Those issues can be tackled by shared memory and message passing mechanisms, which we will learn from later lessons.

- this does not handle processes with dependicies, things can be really well devided w
- processes that have dependinces on one another can be dealt with in a later class.

## Using Threads

Threading is a well known approach to attaining concurrency: 
- typically threads are lighter weight than processes
- **lower memory requirements**, as **they share the same memory space**

A basic way to use threads is through `ThreadPoolExecutor` in `concurrent.futures`, which provides a similar interface to `multiprocessing.Pool`.

For more refined behavior will rely on the `Thread` class, which provides a `run` method that should be overridden with a method that does the actual work of the thread.

In [6]:
## Simple example with ThreadPoolExecutor

from functools import partial 
from concurrent.futures import ThreadPoolExecutor 

def multithreaded_download():
    ts = time()
    download_dir = setup_download_dir()
    links = [l for l in get_links(CLIENT_ID)]

    download = partial(download_link, download_dir) ## partial sets an arugmnet 
   
    with ThreadPoolExecutor(max_workers=8) as ex:
        ex.map(download, links)
        
    print('Took {}s'.format(time() - ts))

multithreaded_download()
## this may not lead to memory improvemts but will likely lead to 

Took 0.717604398727417s


### Thread Safety

- Variables in the program are shared by all the threads and should not be accessed the way you would normally access a variable. One thread may change the variable while another thread is reading it, or worse, two threads may try to update the variable at the same time. 


- This is known as a **race condition**, it is one of the leading sources of errors in threaded programs and needs to be addressed properly.



- A way to deal with thread safety is using the __Queue Class__

## in the image downloading example, it is not really a concern
- but if you are not carefulll about how you access shared varible there could be erorrs
- didiferent threads could be racing to change a certain varible 
- one way to deal with it is to use a que. 

In [7]:
# Understanding Queue 
from queue import Queue 

def do_work(q): ## uf the q if not empry you get a n item print the string veriosn of the item and then run it on teh taks 
    while not q.empty():
        item = q.get()
        print(str(item)) 
        q.task_done()  # this is important when combining Queue with Threads

q = Queue() # FIFO queue

for i in range(20):
    q.put(i)

do_work(q)

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


A simpler example before going back to the image downloader code

In [8]:
# in this example each thread prints an element of the queue

from time import sleep
from queue import Queue
from threading import Thread
import logging  

# set up a logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logging.basicConfig(format='(%(threadName)-9s) %(message)s', level=logging.DEBUG)

def do_work(q):
    while True:
        item = q.get()
        logger.debug("e" + str(item) + ' ')
        print(str(item) + ' ')
        q.task_done()
        sleep(2)
    
q = Queue()
num_threads = 10 ## 10 threads

for i in range(num_threads):
    worker = Thread(target=do_work, args=(q,), name='thread_' + str(i)) ## worker 
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

# now we have started 10 threads:

for i in range(50): #3 50 taks , in chunks of 10. , the order depends on when the task finishes. 
    q.put(i)

q.join() # wait until all threads have finished

  worker.setDaemon(True) # this stop the threads when the program quits
(thread_0 ) e0 
(thread_3 ) e1 
(thread_4 ) e2 
(thread_5 ) e3 
(thread_6 ) e4 
(thread_7 ) e5 
(thread_8 ) e6 
(thread_9 ) e7 
(thread_2 ) e8 
(thread_1 ) e9 


0 
1 
2 
3 
4 
5 
6 
7 
8 
9 


(thread_0 ) e10 
(thread_3 ) e11 
(thread_4 ) e12 
(thread_5 ) e13 
(thread_6 ) e14 
(thread_7 ) e15 
(thread_8 ) e16 
(thread_2 ) e17 
(thread_9 ) e18 
(thread_1 ) e19 


10 
11 
12 
13 
14 
15 
16 
17 
18 
19 


(thread_0 ) e20 
(thread_3 ) e21 
(thread_4 ) e22 
(thread_5 ) e23 
(thread_6 ) e24 
(thread_7 ) e25 
(thread_8 ) e26 
(thread_2 ) e27 
(thread_9 ) e28 
(thread_1 ) e29 


20 
21 
22 
23 
24 
25 
26 
27 
28 
29 


(thread_0 ) e30 
(thread_3 ) e31 
(thread_4 ) e32 
(thread_5 ) e33 
(thread_6 ) e34 
(thread_7 ) e35 
(thread_8 ) e36 
(thread_9 ) e37 
(thread_2 ) e38 
(thread_1 ) e39 


30 
31 
32 
33 
34 
35 
36 
37 
38 
39 


(thread_0 ) e40 
(thread_3 ) e41 
(thread_4 ) e42 
(thread_5 ) e43 
(thread_6 ) e44 
(thread_7 ) e45 
(thread_8 ) e46 
(thread_9 ) e47 
(thread_2 ) e48 
(thread_1 ) e49 


40 41 

42 
43 
44 
45 
46 
47 
48 
49 


In [9]:
from queue import Queue
from threading import Thread

class DownloadWorker(Thread):
    def __init__(self, queue):
        super(DownloadWorker, self).__init__()
        self.queue = queue
    
    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            (directory, link) = self.queue.get()
            # call the function donwload_link (from download.py)
            download_link(directory, link)
            self.queue.task_done()

            
def threaded_download():
    ts = time()
    download_dir = setup_download_dir()
    links = [l for l in get_links(CLIENT_ID)]
    
    # Create a queue to communicate with the worker threads
    queue = Queue()
    
    # Create 8 worker threads
    for _ in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit 
        # even if the workers are blocking
        worker.daemon = True
        worker.start()

    
    # Put the tasks into the queue as a tuple
    for link in links:
        print('Queueing: {}'.format(link))
        queue.put((download_dir, link))
    
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    
    print('Took {}s'.format(time() - ts))

threaded_download()

(MainThread) Starting new HTTPS connection (1): api.imgur.com:443
(MainThread) https://api.imgur.com:443 "GET /3/gallery/random/random/ HTTP/1.1" 200 22685
(Thread-13) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-14) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-10) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-12) Starting new HTTP connection (1): i.imgur.com:80
(Thread-8 ) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-7 ) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-11) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-9 ) Starting new HTTPS connection (1): i.imgur.com:443


Queueing: https://i.imgur.com/4lGmQfq.jpg
Queueing: https://i.imgur.com/4lyQK.png
Queueing: https://i.imgur.com/4laUQHt.jpg
Queueing: https://i.imgur.com/4l1xB.jpg
Queueing: https://i.imgur.com/4lapcy1.jpg
Queueing: https://i.imgur.com/4lzqjWi.png
Queueing: https://i.imgur.com/4lrBXJY.jpg
Queueing: http://i.imgur.com/4lkspcfh.gif
Queueing: https://i.imgur.com/4lGS2.jpg
Queueing: https://i.imgur.com/4ll0LsP.jpg
Queueing: https://i.imgur.com/4lnOz6z.jpg
Queueing: https://i.imgur.com/4lLayye.jpg
Queueing: https://i.imgur.com/4l0vHEe.jpg
Queueing: https://i.imgur.com/4lyXrC5.gif
Queueing: http://i.imgur.com/4ldedsXh.gif
Queueing: https://i.imgur.com/4lw7hzs.gif


(Thread-12) http://i.imgur.com:80 "GET /4lkspcfh.gif HTTP/1.1" 301 0
(Thread-12) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-13) https://i.imgur.com:443 "GET /4lapcy1.jpg HTTP/1.1" 200 121260
(Thread-14) https://i.imgur.com:443 "GET /4lzqjWi.png HTTP/1.1" 200 223369
(Thread-10) https://i.imgur.com:443 "GET /4laUQHt.jpg HTTP/1.1" 200 60516
(Thread-13) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-8 ) https://i.imgur.com:443 "GET /4lGmQfq.jpg HTTP/1.1" 200 131040
(Thread-11) https://i.imgur.com:443 "GET /4l1xB.jpg HTTP/1.1" 200 629559
(Thread-7 ) https://i.imgur.com:443 "GET /4lrBXJY.jpg HTTP/1.1" 200 152670
(Thread-10) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-14) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-9 ) https://i.imgur.com:443 "GET /4lyQK.png HTTP/1.1" 200 397450
(Thread-12) https://i.imgur.com:443 "GET /4lkspcfh.gif HTTP/1.1" 200 141001
(Thread-8 ) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-7 ) Starting

Took 0.7233140468597412s


## The Global Interpreter Lock
#### Not really parallel !

- Python has a **Global Interpreter Lock (GIL)**, which allows only **one thread to be executed at a time** throughout this process. Therefore, **this code is concurrent but not parallel**. 

- The reason it is still faster is because the image downloader is an input/output (I/O) bound task. 
The majority of the time is spent waiting for the network. This is why threading can provide a large speed increase. 

- **The processor can switch between the threads** whenever one of them is **ready** to do some work.



- If the program was performing a task that was CPU bound, using the threading module in Python or any other interpreted language with a GIL could actually result in reduced performance.

- For CPU bound tasks and truly parallel execution in Python, the multiprocessing module is a better option.

- Some parallelism is still possible with threads if the executed functions rely on low-level code that realeases the GIL (e.g. many Numpy/Scipy functions). This includes custom Cython programs (see the `nogil` keyword [here](https://cython.readthedocs.io/en/latest/src/userguide/parallelism.html) and [here](https://cython.readthedocs.io/en/latest/src/userguide/numpy_tutorial.html))

- Other packages for parallelization: task/job queues (e.g. [python-rq](https://python-rq.org/)), [joblib](https://joblib.readthedocs.io/en/latest/parallel.html), [dask](https://dask.org/)

- python has gil, it only allows one thread to be executed at a time. so it can be concurecnt but not parallel
- if you use c++ or c, that is not as much an issue 
- immage downloading is often more a function of waiting on the network than the core. 
- tha tis an input output bounds task 
- a cpu bound task is something that is pure computuation, so does not invovle acessing memoery or what ever. 
- multi processing is a better option for cpu bound tasks. 
- for cpu bound tasks you really have to do the work . 
- the gil is a big draw back of pyhton, evetything has to be concurrent


### Example: sum of array elements in parallel

In [10]:
n = int(1e8)

In [11]:
# Sequential version
from time import time

ts = time()
s = 0
for i in range(n):
    s = s + i
print(s, '-->', time()-ts,'s')   

## this is a function taht ads from zero to 10 tehe 8thh adn says how ong tha ttakes. 

KeyboardInterrupt: 

- this is a cpu bound task so multi process is a good task 

In [None]:
# multiprocessing version
from time import time
from multiprocessing.pool import Pool

from download import sum_multi_processes_1, sum_multi_processes_2

def sum_multi_processes_1_(chunk):
    y = 0
    for i in chunk:
        y = y + i
    return y
## both add things up given chunk??
# the second one uses the range function 
# the first one has a loop
## so it is teh difernce between an itteator the and the list
## the ssecond is going to be fast then. 

def sum_multi_processes_2_(start, end):
    y = 0
    for i in range(start, end):
        y = y + i
    return y

chunks1 = [list(range(i,i + 100)) for i in range(0, n, 100)]
chunks2 = [(i,i + 100) for i in range(0, n, 100)]

print(len(chunks1), 'chunks')

ts = time()
with Pool(8) as p:
     results = p.map(sum_multi_processes_1, chunks1)
#     results = p.starmap(sum_multi_processes_2, chunks2)

print(sum(results), '-->', time()-ts,'s')   

1000000 chunks
4999999950000000 --> 11.718096494674683 s


In [14]:
# Thread version
from queue import Queue
from threading import Thread
from threading import Lock

x = 0
lock = Lock()
def sum_chunk(q):
    while True:
        global x
        start, end = q.get()
        for i in range(start, end):
            with lock:  # force synchronization
                x = x + i
        q.task_done()

n = int(1e8)
chunks = [(i, i + 100) for i in range(0, n, 100)]

ts = time()
q = Queue()
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=sum_chunk, args=(q, ))
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

for chunk in chunks:
    q.put(chunk)

q.join()
print(x, '-->', time() - ts, 's')    
## here we are trying a trheaded version of the same code
# play around with th epool number 

KeyboardInterrupt: 

### Example: Pi Simulation

In [None]:
from download import monte_carlo_pi
import numpy as np

def monte_carlo_pi_(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(0, 1)
        y = np.random.uniform(0, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n

In [None]:
%%time
result = [monte_carlo_pi(int(3e5)) for _ in range(10)]

In [None]:
np.array(result)

In [None]:
from multiprocessing.pool import Pool

In [None]:
%%time
with Pool(8) as pool:
    result = pool.map(monte_carlo_pi, [int(3e5) for _ in range(10)])

In [None]:
np.array(result)