# Concurrency in Python

In computer science, concurrency is about managing multiple tasks in a program, while parallelism is about actually executing them simultaneously.

Here's a breakdown of concurrency:

- Multiple tasks: A program is broken down into smaller tasks that can be run independently.
- Out-of-order execution: The order in which these tasks are completed isn't necessarily important. As long as the final outcome is correct, it doesn't matter if task B finishes before task A.
- Partial order: There might be some dependencies between tasks, requiring a certain level of order. Task B might need the results of task A before it can start.

There are several benefits to using concurrency:

- Improved responsiveness: If a program encounters a long-running task, concurrency allows it to continue working on other tasks while waiting. This makes the program feel more responsive to the user.
- Efficient resource utilization: By keeping the processor busy with multiple tasks, concurrency can improve overall performance.

However, concurrency also introduces challenges:

- Coordination: When multiple tasks are accessing shared resources, there's a risk of conflicts. Careful coordination is needed to ensure data consistency and prevent issues like deadlocks.
- Complexity: Reasoning about concurrent programs can be difficult. The out-of-order execution can lead to unexpected behavior if not managed properly.


![Deadlock 1](../data/images/deadlock.jpeg)

![Deadlock 2](../data/images/deadlock_2.png)

# Threading

Threads are very useful for maintaining multiple program flows running (quasi-)simultaneously. 
In Python, threads are real system threads and are managed by the operating system.

But CPython, the standard Python implementation, is not thread-safe, so the [Global Interperter Lock (**GIL**)](http://www.dabeaz.com/GIL) allows only one thread to execute at any given time. Therefore, the main benefit from threading is that one waiting job (I/O, sleep, waiting for user event) doesn't block other jobs from running. Or as the saying goes, threads are good for doing nothing: waiting mostly.

## Simple example

A worker thread that counts from 1 to 10, waiting one second between numbers, but doesn't block the main thread that counts from 11 to 20 (also waiting). 

We use the [threading](https://docs.python.org/3.5/library/threading.html) module from the standard library.

In [1]:
import threading
import time

def task(start, end):
    for i in range(start, end):
        print(" {} ".format(i), end="", flush=True)
        time.sleep(1)

worker = threading.Thread(target=task, args=(1, 10)) # not the main thread
worker.start()
task(11, 20) # the main thread
worker.join()
# the threads interrupt each other, one stops while the other runs
# this is *not* an example for multi-threading

 1  11  12  2  13  3  14  4  15  5  6  16  7  17  18  8  9  19 

### I/O bounded programs

When we do any kind of I/O, the GIL is released as soon as control is given to the OS or to lower-level C code. So threads are great for concurrency in I/O bounded programs, because as one thread waits for I/O, other threads can go on doing their jobs, as the GIL is released. This is true as long as I/O is not very quick and there are not too many concurrent jobs; if there are many concurrent short jobs, they will start a [GIL war](http://www.dabeaz.com/GIL/), which is bad for performence.

Let's start with a synchrounous program that reads books from the Gutenberg project and finds the most common word. Finding the most common word takes a while, but a lot less than reading the data from the web, so this is definately an I/O-bounded program.

In [2]:
from collections import Counter # a dict for counting
import urllib.request # for opening URLs
import time

We download and parse a [set of stop words](https://github.com/Alir3z4/stop-words/raw/25c6a0aea665871e887f155b883e950c3743ce50/english.txt) not to be included in the analysis:

For this we use `urlopen` which opens a remote URL as if it was a file, allowing us to read line-by-line. We then `decode` each line, as `urlopen` reads data as `bytes` rather than `str`, and `decode` wil decode those bytes to a string. 
We then use `strip` to remove whitespace. `words` is therefore a generator expression on the single lines in the URL, each a word; we thus consume the generator with the constructor of `frozenset`, an immutable set.

In [4]:
stop_words_url = 'https://github.com/Alir3z4/stop-words/raw/25c6a0aea665871e887f155b883e950c3743ce50/english.txt'

with urllib.request.urlopen(stop_words_url) as f:
    words = (line.decode().strip() for line in f)
    stop_words = frozenset(words) # frozenset is an immutable set of unique elements

print(list(stop_words)[:5])

['theirs', 'are', 'its', "let's", 'his']


We will read a bunch of books (see `names`) and parse them for the most common word (`most_common_word`).

First, create a tuple of the book names, and a dictionary that maps book names to book URLs.

In [5]:
names = (
    'Gulliver',
    'Alice in Wonderland',
    'Pride and prejudice',
    'Yellow wallpaper',
    'Metamorphosis',
    'A Tale of Two Cities',
    'The Importance of Being Earnest',
    'Frankenstein'
)
url_template = 'https://raw.githubusercontent.com/yoavram/Py4Eng/master/data/{}.txt'
urls = {
    name: urllib.parse.quote(url_template.format(name), safe=":/") 
    for name in names
} # for each name of the above (the keys) its value is the URL of the book
print('Gulliver:', urls['Gulliver'])

Gulliver: https://raw.githubusercontent.com/yoavram/Py4Eng/master/data/Gulliver.txt


The `most_cmmon_word` function accepts a `book_item` which is a tuple of `(name, url)`, opens the URL for reading, reads the "file" line-by-line, and for each line it decodes, strips, lowers, and splits. Therefore, each lines is converted to a list of lowercase words.
These words are then added to a `Counter` object, which is similar to a `set`, only it remembers **how many times** each element was added, and it allows to query about number of occurences and most common elements.

The function returns the name of the book, the most common word, and the count for the most common word, after zeroing the count for stop-words.

In [10]:
def most_common_word(book_item):
    name, url = book_item
    counter = Counter()
    with urllib.request.urlopen(url) as f:        
        for line in f:
            if not line:
                break
            line = line.decode().strip().lower().split()
            counter.update(line) # update the counter with the words in the line, counter is a function that counts  the number of hte same arguments
    for word in stop_words:
        counter[word] = 0
    word, count = counter.most_common(1)[0] # [0] gives only the first most common word
    return name, word, count

In [11]:
most_common_word(list(urls.items())[0])

KeyboardInterrupt: 

Here is a simple [context manager](https://docs.python.org/3.5/library/contextlib.html) for measuring time (`%timeit` is less useful for concurrency):

In [12]:
from contextlib import contextmanager

@contextmanager
def tictoc():
    tic = time.time()
    yield
    toc = time.time()
    print("Elapsed time: {:.2f} seconds".format(toc - tic))

In [13]:
with tictoc():
    print("hey")

hey
Elapsed time: 0.00 seconds


### Sequential run

We start by running the analysis in sequence using a single thread to get a baseline.

Note that a `map` applies a function to elements of an iterable using lazy evaluation

```python
results = map(most_common_word, urls.items())
```

very similar to the generator expression

```python
results = (most_common_word(item) for item in urls.items())
```

In [14]:
cube_map = map(lambda x: x**3, [1,2,3])  # map is a generator - a function that does things in steps and not at once. range is an example of a generator. after its action, the results are not stored in memory.
# lambda is an anonymous function that takes x and returns x**3.
print(list(cube_map))

[1, 8, 27]


In [15]:
with tictoc():
    results = map(most_common_word, urls.items())
    for name, word, count in results:
        print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

Most common word in Gulliver is "upon" (201 appearances)
Most common word in Alice in Wonderland is "said" (130 appearances)
Most common word in Pride and prejudice is "mr." (766 appearances)
Most common word in Yellow wallpaper is "project" (82 appearances)
Most common word in Metamorphosis is "gregor" (168 appearances)
Most common word in A Tale of Two Cities is "mr." (602 appearances)
Most common word in The Importance of Being Earnest is "jack." (224 appearances)
Most common word in Frankenstein is "will" (194 appearances)
Elapsed time: 14.11 seconds


### Multi-threading

To run a multi-threaded version of the above, we could use `threading` and create our threads etc., but there is a lot of boilerplate. This boilerplate can be handeled by a thread pool from the [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) module.
The thread pool executor is created using a context manager, so that all the threads in the pool will be closed when we are done.
Using the executor is really easy if we already used the `map` pattern.

In [17]:
import concurrent.futures
n_workers = len(urls)

with tictoc():
    with concurrent.futures.ThreadPoolExecutor(n_workers) as executor:
        results = executor.map(most_common_word, urls.items())
        for name, word, count in results:
            print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

KeyboardInterrupt: 

Much better, almost 5-fold improvement in running time.

### What are the futures?

The module is called `concurrent.futures` and in the documentation you can read that you are actually creating `Future` objects. These are like promises - they represent computational tasks that will be completed; therefore, they allow for an asynchronous style of programming, as we can start a task, go on to do something else, and then either check if it finished, wait for it to finish, or a assign a callback to be called when it is finished.

In the above, the `Future`s were handled by the executor `map` function, which creates `Future`s and waits for them to finish working. Now we will use them directly:

In [18]:
with tictoc():
    with concurrent.futures.ThreadPoolExecutor(n_workers) as executor:
        futures = [
            executor.submit(most_common_word, item) 
            for item in urls.items()
        ]
        for future in concurrent.futures.as_completed(futures):
            name, word, count = future.result()
            print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

Most common word in Yellow wallpaper is "project" (82 appearances)
Most common word in Alice in Wonderland is "said" (130 appearances)
Most common word in Metamorphosis is "gregor" (168 appearances)
Most common word in The Importance of Being Earnest is "jack." (224 appearances)
Most common word in Gulliver is "upon" (201 appearances)
Most common word in Frankenstein is "will" (194 appearances)
Most common word in Pride and prejudice is "mr." (766 appearances)
Most common word in A Tale of Two Cities is "mr." (602 appearances)
Elapsed time: 2.92 seconds


Here, `as_completed` allows us to iterate over the futures as they are completed, i.e. in roughly the order they finished their tasks, rather the order they were created (which is the case in the previous example).

### CPU bounded program

In some cases running multiple threads actually helps even if we are in a CPU bounded scenario, because the OS may run these threads on separate cores, and **if the code that we use releases the GIL** in some way, then we can achieve "true multi-threading". Note: if the code doesn't release the GIL, we will get into a [GIL war](http://www.dabeaz.com/GIL/) and performance will suffer compared to a single-core single-thread program!

In the following example we calculate a hash of our books using the *very slow* function, `pbkdf2_hmac`. The [`haslib` docs](https://docs.python.org/3/library/hashlib.html) specify that if the data is larger than 2047 bit, **the GIL is released** (the computation is done in C, so the GIL can be explicitly released) and therefore if we use threads we will see an improvement on multi-core machines.

In [19]:
import hashlib
import binascii
import concurrent.futures
import time
import multiprocessing
multiprocessing.cpu_count()

8

Of course, if the number of CPUs is 1 (see last command in previous cell), we won't get any benefit from a multi-threading approach - on the contrary.

We start by reading books to memory so that I/O won't be an issue. 

In [20]:
names = (
    'Gulliver',
    'Alice in Wonderland',
    'Pride and prejudice',
    'Yellow wallpaper',
    'Metamorphosis',
    'The Importance of Being Earnest'
)
filenames = {
    name: '../data/books/{}.txt'.format(name) 
    for name in names
}
print('Gulliver:', filenames['Gulliver'])

def read_book(item):
    name, filename = item    
    with open(filename) as f:
        data = f.read()        
    return name, data

books = dict(read_book(item) for item in filenames.items())

Gulliver: ../data/books/Gulliver.txt


`hash_book` is a **slow** function that takes an entire book and performs a specific [hash function](https://docs.python.org/3/library/hashlib.html#hashlib.pbkdf2_hmac) on it with multiple iterations.

In [21]:
def hash_book(item):
    name, data = item    
    # very slow function
    fingerprint = hashlib.pbkdf2_hmac('sha512', data.encode('utf8'), b'salt', 1000000)
    return name, binascii.hexlify(fingerprint).decode()

%timeit -n 3 hash_book(('Gulliver', books['Gulliver']))

910 ms ± 39.9 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)


### Sequential

Running in a single-thread mode - open your process monitor to see that only one core is used:

In [22]:
with tictoc():
    results = map(hash_book, books.items())
    for name, fingerprint in results:
        print('Fingerprint for {} is "{}"'.format(name, fingerprint))

Fingerprint for Gulliver is "279397822ce37e80b9235eb01370875546d5d5e92bb6b6d1c4d74d1fcc21a397ef42edbff782b18d27a1aa9941d73c65941f96717da62f680c11defa4c285ba1"
Fingerprint for Alice in Wonderland is "71a49a0fa0d36338e3072c4cff1649b2db1ad788b829613015d5e557bcf8c03ff498ac1ec1d11365ca9d70be73b791b260976caf5322d624fc21aebdf99bab9d"
Fingerprint for Pride and prejudice is "216f57cef336e96218f07d50ae1f7ab34fc42d5d4144e803595139dbd59010b8861a0ce14c6991ac5ba394ea1e414eefee88a2e1d54d1fe6887276c55707fccc"
Fingerprint for Yellow wallpaper is "6830e20c22d552077a4c64b2bfff289c8776e2f83d0e9793fa634121ec639f11133fb860416a386669b791647d7c53acdea8d9c086b90ba1d8ef20b92b2ff161"
Fingerprint for Metamorphosis is "834f837094aaa5dac36fbcf090c572a40c7ac5274267e311ce26db937e3153c4ae2c1fef1453a83f56d4d46f718f337889c170c7e7a5174278f0512d3451c599"
Fingerprint for The Importance of Being Earnest is "2205918c9d6ef846723b02d306320bc2560e06d53dffe74dcc64c7d0796da20438cd9b679537e56bfa2ae69c5a5a9cdc840ecb62a3ede42404676f

### Multi-threaded

In multi-thread mode, you'll see that all the cores are used, at least on some OS (it is OS-dependent, and requires multiple cores):

In [23]:
with tictoc():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = executor.map(
            hash_book, books.items()
        )
        for name, fingerprint in results:
            print('Fingerprint for {} is "{}"'.format(name, fingerprint))

Fingerprint for Gulliver is "279397822ce37e80b9235eb01370875546d5d5e92bb6b6d1c4d74d1fcc21a397ef42edbff782b18d27a1aa9941d73c65941f96717da62f680c11defa4c285ba1"
Fingerprint for Alice in Wonderland is "71a49a0fa0d36338e3072c4cff1649b2db1ad788b829613015d5e557bcf8c03ff498ac1ec1d11365ca9d70be73b791b260976caf5322d624fc21aebdf99bab9d"
Fingerprint for Pride and prejudice is "216f57cef336e96218f07d50ae1f7ab34fc42d5d4144e803595139dbd59010b8861a0ce14c6991ac5ba394ea1e414eefee88a2e1d54d1fe6887276c55707fccc"
Fingerprint for Yellow wallpaper is "6830e20c22d552077a4c64b2bfff289c8776e2f83d0e9793fa634121ec639f11133fb860416a386669b791647d7c53acdea8d9c086b90ba1d8ef20b92b2ff161"
Fingerprint for Metamorphosis is "834f837094aaa5dac36fbcf090c572a40c7ac5274267e311ce26db937e3153c4ae2c1fef1453a83f56d4d46f718f337889c170c7e7a5174278f0512d3451c599"
Fingerprint for The Importance of Being Earnest is "2205918c9d6ef846723b02d306320bc2560e06d53dffe74dcc64c7d0796da20438cd9b679537e56bfa2ae69c5a5a9cdc840ecb62a3ede42404676f

The `concurrent.futures` module has another pool executor - a `ProcessPoolExecutor` that uses processes for the jobs. It's as easy to use as the `ThreadPoolExecutor`, but in this case no further improvement can be had by replacing `ThreadPoolExecutor` with `ProcessPoolExecutor`, at least on my machine.

## Multi-processing

From the [threading](https://docs.python.org/3/library/threading.html) module:
> CPython implementation detail: In CPython, due to the Global Interpreter Lock, **only one thread can execute Python code at once**... If you want your application to make better use of the **computational resources of multi-core machines**, you are advised to use `multiprocessing` or `concurrent.futures.ProcessPoolExecutor`.

The standard library module, [multiprocessing](https://docs.python.org/3/library/multiprocessing.html), provides low-level interfaces for the use of multiple processes. 

We will use [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) which provides a high-level API - a process pool. If you get a persistent error about broken processes, try to restart the kernel and possibly the notebook server, then debug without the executor (non-parallel) and when it works, re-insert the executor.

In [24]:
import concurrent.futures
import math

primes = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419,
    3399726899288419,
    1125828054422712,
    237397848077029,
]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

In [25]:
with tictoc():
    results = map(
        is_prime, primes
    )
    for n, p in zip(primes, results):
        print('{} is prime: {}'.format(n, p))

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
3399726899288419 is prime: False
1125828054422712 is prime: False
237397848077029 is prime: False
Elapsed time: 2.36 seconds


In [26]:
with tictoc():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(
            is_prime, primes
        )
        for n, p in zip(primes, results):
            print('{} is prime: {}'.format(n, p))

Process SpawnProcess-1:
Process SpawnProcess-2:
Process SpawnProcess-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.11/concurrent/futures/process.py", line 249, in _process_worker
    call_item = call_queue.get(block=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.11/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'is_prime' on <module '__main__' (built-in)>
  File "/opt/anaconda3/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.11/multiprocessing/pr

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

In [36]:
from multiprocessing import Pool
with tictoc():
    with Pool() as p:
        results = p.map(is_prime, primes)
    print(results)

[True, True, True, True, True, False, False, False, False]
Elapsed time: 0.44 seconds


![thread vs process](https://i.ytimg.com/vi/4rLW7zg21gI/maxresdefault.jpg)