![Erudio logo](../img/erudio-logo-small.png)

# Programming Concurrency in Python

This module aims at an intermediate Python programmer.  We look at several models and paradigms for writing concurrent programs using tools in the Python standard library.  

The course addresses several pitfalls developers face with concurrency, including race conditions and deadlocks.  As well, it explains the pros and cons of working with threads versus processes as concurrency models.

# What will be covered in this course

* High-level abstraction `concurrent.futures`.
* Basics of threading, using `threading`.
* Sharing data among threads and race conditions.
* Locks, deadlocks, and circular dependencies.
* The Python Global Interpreter Lock.
* Process parallelism with `multiprocessing`.

# 1.1 Concurrent.futures

In this lesson, we jump immediately to the highest-level abstraction for concurrency that the Python standard library provides: the `concurrent.futures` module.  

Some of the terms presented here may not be entirely familiar yet, but I believe you can understand them in context.

**Most** of the time, when you write concurrent programs in Python, you should use `concurrent.futures`.  

The module provides a beautiful and Pythonic interface that makes concurrency easy, while hiding most of the messy details of threads, processes, locks, deadlocks, race conditions, data sharing.

...Of course, most of the time is not **all** of the time.  

You will sometimes need to reach down to lower-level interfaces provided by other modules that `concurrent.futures` is built on top of.  Those building blocks make up the remaining lessons of this course.  

It might be wise to return to this lesson at the end, after you have completed the other lessons.

## Parallel and Sequential

The problems that `concurrent.futures` best addresses are ones that are *embarrassingly parallel* (or nearly so).  If you can express your problems as a large number of "tasks" each of which is already bundled with the data it needs, concurrency is easiest.  

On the other hand, if every task depends on the result of its predecessor, a program is *strictly sequential*.  Many real computations are somewhere in the middle.

### Example: Embarrassingly Parallel

Problems that are embarrassingly parallel include Monte Carlo simulations, web scraping or distributed data acquisition, many types of graphic rendering (with pixels independent), and other domains.

A diagram of such tasks might look like this (an arrow indicates one task depends on the output of another task).

<img src="../img/embarrasingly-parallel.png"/>

### Example: Strictly Sequential

Other problems are sequential by nature, and cannot be made concurrent in any meaningful way.  

For example, most pseudo-random number generators keep internal state, and perform a complex mathematical modification of that state each time they move to the next state (typically not reversibly, but that is inessential here).

<img src="../img/strictly-sequential.png"/>

### Example: Mixed Data Flow

Many problems fall between these two pictures.  Some tasks have sequential dependencies, but others are independent.  

For example, perhaps you have to aggregate and process per-second data sequentially for a day, but then you need to reaggregate daily data into decades in a s similar way.  Some things can be concurrent, but others are dependencies.

<img src="../img/mixed-data-flow.png"/>

Let us load the various modules and names we use in this lesson.

In [None]:
!pip install ipywidgets

In [34]:
#----- Concurrency facilities
from concurrent.futures import (
    ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError, as_completed)
from multiprocessing import cpu_count
from threading import current_thread, Thread
from queue import Queue
#----- General utilities
from datetime import datetime, timedelta
import requests
from pprint import pprint
from time import sleep, perf_counter
from collections import namedtuple
from random import sample
#----- Some pretty display later
from ipywidgets import IntProgress, Layout, Label
from IPython.display import display
from random import random
from multiprocessing.pool import ThreadPool


## Executors

Executors are the main construct in `concurrent.futures`. They are similar to `multiprocessing.Pool`, which we look at in a later lesson. Once an executor has been instantiated, we can `submit` jobs, or even `map` tasks.

In the module are two executors: `ThreadPoolExecutor` and `ProcessPoolExecutor`. They have the same interface, but use different concurrency mechanisms.  The trade-offs of processes and threads are discussed in later lessons.

We need to define a task to perform, using a Python callable. In this example, we'll use multithreading to run two functions in parallel. Multithreading improves program performance by allowing multiple tasks to run simultaneously. It creates two separate threads, each executing a function, and shows how to coordinate the execution of these threads to work together effectively.

For this lesson, we use a server that reports hisorical cryptocurrency prices on different markets.  The server takes about a second to return a response to each query.

In [33]:
def task(id):
    print(f'Starting the task {id}...')
    sleep(1)
    return f'Done with task {id}'

In [8]:
%%time 
start = perf_counter()

print(task(1))
print(task(2))

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

Starting the task 1...
Done with task 1
Starting the task 2...
Done with task 2
It took 1.9959884000018064 second(s) to finish.
CPU times: total: 15.6 ms
Wall time: 2.01 s


We can call the function, with given arguments by wrapping it in an executor.  There is no advantage in doing so for a single function call, but it is a starting point.
We are going to use `ThreadPoolExecutor`. The ThreadPoolExecutor class extends the Executor class and returns a Future object.

### Executor
The Executor class has three methods to control the thread pool:

    submit() – dispatch a function to be executed and return a Future object. The submit() method takes a function and executes it asynchronously.
    map() – execute a function asynchronously for each element in an iterable.
    shutdown() – shut down the executor.

When you create a new instance of the ThreadPoolExecutor class, Python starts the Executor.

Once completing working with the executor, you must explicitly call the shutdown() method to release the resource held by the executor. To avoid calling the shutdown() method explicitly, you can use the context manager.

#### Using the submit() method example


In [9]:
start = perf_counter()

with ThreadPoolExecutor() as executor:
    f1 = executor.submit(task, 1)
    f2 = executor.submit(task, 2)

    print(f1.result())
    print(f2.result())    

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

Starting the task 1...
Starting the task 2...
Done with task 1
Done with task 2
It took 1.0266852999993716 second(s) to finish.


## Executor .map()

The submit() method returns a Future object. In this example, we have two Future objects f1 and f2. To get the result from the Future object, we called its result() method.

While you can perfectly well call `.submit()` manually to create many futures, very often it is easier and more clear to create an entire family of implicit futures for different data you wish to process concurrently.  

When using `.map()` to create families of workers, you may only pass a single argument.  This simply means you have to package each *data value* into a collection like a tuple or dictionary that can be destructured within the worker function.

In [11]:
start = perf_counter()

with ThreadPoolExecutor() as executor:
    results = executor.map(task, [1,2])
    for result in results:
        print(result)

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

Starting the task 1...
Starting the task 2...
Done with task 1
Done with task 2
It took 1.0076016999992135 second(s) to finish.


## Another Practical Example
We'll utilise the urllib package available to download 5 images parallely

In [12]:
from concurrent.futures import ThreadPoolExecutor
from urllib.request import urlopen
import time
import os

def download_image(url):
    image_data = None
    with urlopen(url) as f:
        image_data = f.read()

    if not image_data:
        raise Exception(f"Error: could not download the image from {url}")

    filename = os.path.basename(url)
    with open(filename, 'wb') as image_file:
        image_file.write(image_data)
        print(f'{filename} was downloaded...')

start = time.perf_counter()

urls = ['https://upload.wikimedia.org/wikipedia/commons/9/9d/Python_bivittatus_1701.jpg',
        'https://upload.wikimedia.org/wikipedia/commons/4/48/Python_Regius.jpg',
        'https://upload.wikimedia.org/wikipedia/commons/d/d3/Baby_carpet_python_caudal_luring.jpg',
        'https://upload.wikimedia.org/wikipedia/commons/f/f0/Rock_python_pratik.JPG',
        'https://upload.wikimedia.org/wikipedia/commons/0/07/Dulip_Wilpattu_Python1.jpg']

with ThreadPoolExecutor() as executor:
      executor.map(download_image, urls)

finish = time.perf_counter()    

print(f'It took {finish-start} second(s) to finish.')

Python_Regius.jpg was downloaded...
Python_bivittatus_1701.jpg was downloaded...
Dulip_Wilpattu_Python1.jpg was downloaded...
Rock_python_pratik.JPG was downloaded...
Baby_carpet_python_caudal_luring.jpg was downloaded...
It took 1.9715921999995771 second(s) to finish.


## Futures

In the examples above, the `submit` method immediately returns a `Future` object. These objects are an abstraction of a task that is being processed. 

They have multiple useful methods; the most important is `.result(timeout=None)`. 

The `timeout` argument lets us wait a finite number of seconds until a result is produced. If no result is generated in that time, a `TimeoutError` is raised.

In [16]:
try:
    ex = ThreadPoolExecutor()
    future = ex.submit(download_image, urls[0])
    data = future.result(timeout=0.5)
except TimeoutError as err:
    pprint(err)

TimeoutError()
Python_bivittatus_1701.jpg was downloaded...


Another important method of a `Future` is `.done()`.  Notice that we might submit multiple tasks, and each might become "done" afer different durations.

In [19]:
with ThreadPoolExecutor() as ex:
    future1 = ex.submit(download_image, urls[0])
    future2 = ex.submit(download_image, urls[1])
    print("Just submitted: done?", future1.done())
    sleep(3)
    print("Slept a while: done?", future1.done())
    print(f"Image downloaded from URL 1: ${future1.result()}")
    print(f"Image downloaded from URL 2: ${future2.result()}")
    print("Waited on result: done?", future1.done())

Just submitted: done? False
Python_Regius.jpg was downloaded...
Python_bivittatus_1701.jpg was downloaded...
Slept a while: done? True
Image downloaded from URL 1: $None
Image downloaded from URL 2: $None
Waited on result: done? True


## Executor .map()

While you can perfectly well call `.submit()` manually to create many futures, very often it is easier and more clear to create an entire family of implicit futures for different data you wish to process concurrently.  

When using `.map()` to create families of workers, you may only pass a single argument.  This simply means you have to package each *data value* into a collection like a tuple or dictionary that can be destructured within the worker function.

When you call `.map()` it is as if you called `.result()` on each future, although the futures are not named.

### Futures as_completed()

The `.map()` method is very concise for mapping one function to multiple data sets it should process.  It *did* require us to refactor the function to unpack a single *data* object.  

A bit more flexible is using the function `as_completed()` to iterate over results as they become available.  This will block on the next result becoming available, but the threads or processes generating those results will keep running while your code handles an available one.

We'll create a another function task that returns a value  

In [35]:
# custom task that will sleep for a variable amount of time
def task(name):
    value = random()
    sleep(value * 10)
    return f'Task={name}: {value:.2f}'

In [25]:
with ThreadPoolExecutor(10) as executor:
    # submit tasks and collect futures
    futures = [executor.submit(task, i) for i in range(10)]
    # process task results as they are available
    for future in as_completed(futures):
        # retrieve the result
        result = future.result()
        print(result)

Task=8: 0.14
Task=3: 0.22
Task=4: 0.26
Task=5: 0.39
Task=2: 0.43
Task=6: 0.72
Task=7: 0.93
Task=0: 0.97
Task=1: 0.97
Task=9: 0.99


## Producer/Consumer Pattern

What we did so far supposed that we knew the data associated with our overall processing in advance of launching workers.  That will not always be the case; in particular, some workers may generate the data for other workers to process.

The trick to allowing this setup is the use of **queues** (or a similar data structure) that allow safe concurrent access.  A queue allows one worker to push data into a collection, and another worker to pop data, without risking one overwriting the other or other data integrity problems.

We will extend the example of querying prices from exchanges, but with the addition of a **producer** that generates requests at the same time as other **consumers** are processing them.

The architecture of this example is a bit more detailed, to resemble real programs you will write.  We will launch three types of workers as part of this overall system.

* A **monitor** that will simply show the evolving queues
* A **producer** that will feed requests into the TODO queue
* Multiple **consumers** that will act on requests, and add to the RESULTS queue

Let us first create the queues that the tasks will work with.

In [4]:
Q_todo, Q_results, Q_info = Queue(), SimpleQueue(), SimpleQueue()

While it might be more robust to put the initial introspection of the server into a concurrent task, here we simplify slightly and perform this small task in a non-concurrent way first. In this example, we will define a simple producer task that generates a random number between 1 and 10, blocks for that few seconds, then places the generated value on the shared queue.

In [27]:
# producer task
def producer_task(queue):
    # generate a random number between 0 and 10
    value = random() * 10
    # block for a moment to simulate work
    sleep(value)
    # push data into queue
    queue.put(value)

Next let us define the monitor that reports progress. This uses some magic with IPython widgets that are not the subject of this lesson; do not worry about those.

In [15]:
def monitor(Q_todo, Q_results, Q_info):
    # Create the visual monitor widgets
    todo = IntProgress(value=0, min=0, max=nreqs, step=1, description='TODO', 
                orientation='horizontal', bar_style='info', layout=Layout(width='50%'))
    done = IntProgress(value=0, min=0, max=nreqs, step=1, description='DONE', 
                orientation='horizontal', bar_style='success', layout=Layout(width='50%'))
    info = Label(value='STARTING...')
    display(todo); display(done); display(info)
    
    while True:
        todo.value = Q_todo.qsize()
        done.value = Q_results.qsize()
        try:
            info.value = f"{Q_info.get(timeout=3)}"
        except Empty:
            break

We will mow define a thread pool with 10 workers and issue 10 producers into the pool to complete as fast as they are able. We need to create a producer function that will feed queries into the TODO queue.  In this case, the data involved is a small tuple of query elements; in other cases, the data itself might be substantial (such as a numeric array or a large text).  This producer artificially limits the rate at which it adds to the queue just to simulate real-world applications.

In [37]:
# producer manager task
def producer_manager(queue):
    with ThreadPool(10) as pool:
        # use threads to generate items and put into the queue
        _ = [pool.apply_async(producer_task, args=(queue,)) for _ in range(20)]
        # wait for all tasks to complete
        pool.close()
        pool.join()
    # put a signal to expect no further tasks
    queue.put(None)
    print('>producer_manager done.')

The consumer task will run in a loop until explicitly stopped.

In each iteration, the consumer task will retrieve an item from the shared queue. If the item is a message that indicates no further work, it will re-add the message to the queue for other consumers to process and exit. Otherwise, it will retrieve the value, block for that number of seconds to simulate work, and repeat the process.

In [30]:
# consumer task
def consumer_task(queue):
    # run until there is no more work
    while True:
        # retrieve one item from the queue
        value = queue.get()
        # check for signal of no more work
        if not value:
            # put back on the queue for other consumers
            queue.put(value)
            # shutdown
            return
        # block for a moment to simulate work
        sleep(value)
        print(f'Consumer got: {value}')

The consumer manager will create and manage a pool of 5 consumer tasks which will run for as long as there is work on the shared queue to complete. 

In [29]:
# consumer manager
def consumer_manager(queue):
    # create thread pool
    with ThreadPool(5) as pool:
        # start consumer tasks
        _ = [pool.apply_async(consumer_task, args=(queue,)) for _ in range(5)]
        pool.close()
        pool.join()
    print('>consumer_manager done.')

Finally, the main process will start a separate thread for each of the producer and consumer managers and wait for all work to be completed.

In [36]:
%%time
queue = Queue()
# run the consumer
consumer = Thread(target=consumer_manager, args=(queue,))
consumer.start()
# run the producer
producer = Thread(target=producer_manager, args=(queue,))
producer.start()
# wait for the producer to finish
producer.join()
# wait for the consumer to finish
consumer.join()
# report a final message
print('>main done.')

Consumer got: 1.494394968076238
Consumer got: 2.428691892073779
Consumer got: 2.4623583161935345
Consumer got: 2.7045256117771235
Consumer got: 3.1714995645144684
Consumer got: 4.146393961968611
>producer_manager done.
Consumer got: 4.695807546849725
Consumer got: 5.008894425015034
Consumer got: 5.250008310811001
Consumer got: 5.9365590283533525
Consumer got: 6.009300076329285
Consumer got: 6.156578136946722
Consumer got: 6.244832190579518
Consumer got: 7.214463435925107
Consumer got: 7.506665065591452
Consumer got: 7.906552597376182
Consumer got: 8.507827081841954
Consumer got: 8.765992234212824
Consumer got: 9.134922486960907
Consumer got: 9.125252527802132
>consumer_manager done.
>main done.
CPU times: total: 15.6 ms
Wall time: 29.1 s


### Recap of the example
Running the example first creates the shared queue. Next, the consumer manager thread is started. This in turn starts the consumer thread pool and all five consumer workers. The workers run and await items to appear on the shared queue.

Next, a new thread is created and started for the producer manager. This starts a thread pool with 10 workers and issues 10 tasks to the pool, each producing one item onto the shared queue as fast as they are able.The main thread then waits for the threads to complete.

Each producer task generates a random number, sleeps, and places the value on the shared queue before exiting. Consumers read a value from the shared queue, sleep, then report the message before repeating the process.

All consumers finish and the producer manager places a signal to expect no further tasks on the shared queue, then shuts down. The consumer tasks continue to retrieve items from the queue until the shutdown message is read. Each consumer then shuts down in turn and finally, the consumer manager shuts down.

## Summary

The `concurrent.futures` module is the most abstract, highest-level concurrency module in the Python standard library. It **should be** your default option when writing concurrent code.  

Only when you need more advanced capabilities, will you need to use the `threading` or `multiprocessing` modules directly.

# Exercise

## Description

For this exercise you will parallelize a toy problem to show the pattern of `concurrent.futures` use.  For this task, a simple serial approach will be faster than thread creation overhead.  However, it is easy to imagine reading much larger files where disk I/O time was significant enough to change this balance.

The code in the Setup generates 1000 files, each of which contains 20 integer, one per line.  You with read each file, and multiply together the numbers on lines 3 and 17 (one-based indexing of line-number).  In turn, you want the sum of all these multiplications as the return value of your function.

For the task, use however many workers you think is most appropriate (pretending the files were much larger and the disk much slower).  The function `sum_of_products()` should return the computed answer, calculated in a multi-threaded manner.

A hint when writing this.  Later lessons talk about race conditions, but just as advice, it is unsafe to put multiple partial results in a list from different threads.  However, doing so with a `collections.deque` is safe, and uses the same `.append()` method to add things.

# Setup

In [22]:
from concurrent.futures import ThreadPoolExecutor
from collections import deque

from random import sample, seed, randint
from string import ascii_letters
from time import time

def create_files(random_state=0):
    seed(random_state)
    for _files in range(1000):
        name = "".join(sample(ascii_letters, 5))
        with open(f"tmp-{name}.numbers", 'w') as fh:
            for _lines in range(20):
                print(randint(1, 99), file=fh)
    return time()

created = create_files()
    
def sum_of_products():
    ThreadPoolExecutor  # Use this for something
    return 2481234

# Solution

In [23]:
def sum_of_products():
    from glob import glob
    def getsum(fname):
        nums = [int(n) for n in open(fname)]
        return nums[2] * nums[16]

    with ThreadPoolExecutor(max_workers=64) as ex:
        final = sum(ex.map(getsum, glob('tmp-*.numbers')))

    return final

# Test Cases

In [24]:
def test_final():
    assert sum_of_products() == 2483973, f"Wrong total computed"
    
test_final()

In [25]:
def test_files_touched():
    import os
    for id_ in "uTHni AgwYn yiQnJ nlrgE wzXTs".split():
        assert os.stat(f"tmp-{id_}.numbers").st_atime > created, \
                "Files not read after creation"

test_files_touched()

-------------
Materials licensed under [CC BY-NC-ND 4.0](https://creativecommons.org/licenses/by-nc-nd/4.0/) by the authors