# Concurrent.futures

In this lesson, we jump immediately to the highest-level abstraction for concurrency that the Python standard library provides: the `concurrent.futures` module.  

Some of the terms presented here may not be entirely familiar yet, but I believe you can understand them in context.

**Most** of the time, when you write concurrent programs in Python, you should use `concurrent.futures`.  

The module provides a beautiful and Pythonic interface that makes concurrency easy, while hiding most of the messy details of threads, processes, locks, deadlocks, race conditions, data sharing.

...Of course, most of the time is not **all** of the time.  

You will sometimes need to reach down to lower-level interfaces provided by other modules that `concurrent.futures` is built on top of.  Those building blocks make up the remaining lessons of this course.  

It might be wise to return to this lesson at the end, after you have completed the other lessons.

## Parallel and Sequential

The problems that `concurrent.futures` best addresses are ones that are *embarrassingly parallel* (or nearly so).  If you can express your problems as a large number of "tasks" each of which is already bundled with the data it needs, concurrency is easiest.  

On the other hand, if every task depends on the result of its predecessor, a program is *strictly sequential*.  Many real computations are somewhere in the middle.

### Example: Embarrassingly Parallel

Problems that are embarrassingly parallel include Monte Carlo simulations, web scraping or distributed data acquisition, many types of graphic rendering (with pixels independent), and other domains.

A diagram of such tasks might look like this (an arrow indicates one task depends on the output of another task).

<img src="embarrasingly-parallel.png"/>

### Example: Strictly Sequential

Other problems are sequential by nature, and cannot be made concurrent in any meaningful way.  

For example, most pseudo-random number generators keep internal state, and perform a complex mathematical modification of that state each time they move to the next state (typically not reversibly, but that is inessential here).

<img src="strictly-sequential.png"/>

### Example: Mixed Data Flow

Many problems fall between these two pictures.  Some tasks have sequential dependencies, but others are independent.  

For example, perhaps you have to aggregate and process per-second data sequentially for a day, but then you need to reaggregate daily data into decades in a s similar way.  Some things can be concurrent, but others are dependencies.

<img src="mixed-data-flow.png"/>

Let us load the various modules and names we use in this lesson.

In [1]:
#----- Concurrency facilities
from concurrent.futures import (
    ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError, as_completed)
from multiprocessing import cpu_count
from threading import current_thread
from queue import Queue, SimpleQueue, Empty
#----- General utilities
from datetime import datetime, timedelta
import requests
from pprint import pprint
from time import sleep
from collections import namedtuple
from random import sample
#----- Some pretty display later
from ipywidgets import IntProgress, Layout, Label
from IPython.display import display
#----- Used in various examples
base_url = "http://localhost:5000"

## Executors

Executors are the main construct in `concurrent.futures`. They are similar to `multiprocessing.Pool`, which we look at in a later lesson. Once an executor has been instantiated, we can `submit` jobs, or even `map` tasks.

In the module are two executors: `ThreadPoolExecutor` and `ProcessPoolExecutor`. They have the same interface, but use different concurrency mechanisms.  The trade-offs of processes and threads are discussed in later lessons.

We need to define a task to perform, using a Python callable.  For this lesson, we use a server that reports hisorical cryptocurrency prices on different markets.  The server takes about a second to return a response to each query.

In [2]:
def check_price(exchange, symbol, date):
    url = f"{base_url}/price/{exchange}/{symbol}/{date}"
    resp = requests.get(url)
    return resp.json()

In [3]:
%%time 
data = check_price('bitstamp', 'btc', '2020-04-01')
data['close']

CPU times: user 6.63 ms, sys: 0 ns, total: 6.63 ms
Wall time: 723 ms


6421.14

We can call the function, with given arguments by wrapping it in an executor.  There is no advantage in doing so for a single function call, but it is a starting point.

In [4]:
%%time
# Default max_workers is 5 x #cores
with ThreadPoolExecutor(max_workers=10) as ex:
    future = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01')
    data = future.result()
    
print(f"Price: ${data['close']}")

Price: $6421.14
CPU times: user 10.3 ms, sys: 0 ns, total: 10.3 ms
Wall time: 719 ms


Or with identical interface, use a process:

In [5]:
%%time
# Default max_workers is just number of cores for processes
with ProcessPoolExecutor(max_workers=cpu_count()) as ex:
    future = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01')
    pprint(future.result())
    print()

{'close': 6421.14,
 'day': '2020-04-01',
 'exchange': 'bitstamp',
 'high': 6527.24,
 'low': 6337.42,
 'open': 6408.95,
 'symbol': 'btc',
 'volume': 6342.29832651}

CPU times: user 44.3 ms, sys: 33.2 ms, total: 77.5 ms
Wall time: 838 ms


## Futures

In the examples above, the `submit` method immediately returns a `Future` object. These objects are an abstraction of a task that is being processed. 

They have multiple useful methods; the most important is `.result(timeout=None)`. 

The `timeout` argument lets us wait a finite number of seconds until a result is produced. If no result is generated in that time, a `TimeoutError` is raised.

In [6]:
try:
    ex = ThreadPoolExecutor()
    future = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01')
    data = future.result(timeout=0.5)
except TimeoutError as err:
    pprint(err)

TimeoutError()


Another important method of a `Future` is `.done()`.  Notice that we might submit multiple tasks, and each might become "done" afer different durations.

In [7]:
with ThreadPoolExecutor() as ex:
    future1 = ex.submit(check_price, 'bitstamp', 'btc', '2020-04-01')
    future2 = ex.submit(check_price, 'kraken', 'btc', '2020-04-01')
    print("Just submitted: done?", future1.done())
    sleep(3)
    print("Slept a while: done?", future1.done())
    print(f"Bitstamp price: ${future1.result()['close']}")
    print(f"Kraken price: ${future2.result()['close']}")
    print("Waited on result: done?", future1.done())

Just submitted: done? False
Slept a while: done? True
Bitstamp price: $6421.14
Kraken price: $6401.9
Waited on result: done? True


## Executor .map()

While you can perfectly well call `.submit()` manually to create many futures, very often it is easier and more clear to create an entire family of implicit futures for different data you wish to process concurrently.  

When using `.map()` to create families of workers, you may only pass a single argument.  This simply means you have to package each *data value* into a collection like a tuple or dictionary that can be destructured within the worker function.

In [8]:
def check_price(data):
    exchange, symbol, date = data
    url = f"{base_url}/price/{exchange}/{symbol}/{date}"
    resp = requests.get(url)
    return resp.json()

When you call `.map()` it is as if you called `.result()` on each future, although the futures are not named.

In [9]:
%%time
datasets = []
for exchange in ['bitfinex', 'bitstamp', 'kraken']:
    for date in ['2020-04-01', '2020-04-02', '2020-04-03', '2020-04-04']:
        for coin in ['btc', 'eth', 'ltc']:
            datasets.append((exchange, coin, date))

with ThreadPoolExecutor(max_workers=100) as ex:
    results = ex.map(check_price, datasets)
    prices = [price['close'] for price in results]
    print(f"{len(prices)} records retrieved")
    print(prices[:7] + ["..."])

36 records retrieved
[6409.8, 133, 39.076, 6630.8, 135.49, 39.356, 6803.9, '...']
CPU times: user 139 ms, sys: 18.3 ms, total: 157 ms
Wall time: 817 ms


### Futures as_completed()

The `.map()` method is very concise for mapping one function to multiple data sets it should process.  It *did* require us to refactor the function to unpack a single *data* object.  

A bit more flexible is using the function `as_completed()` to iterate over results as they become available.  This will block on the next result becoming available, but the threads or processes generating those results will keep running while your code handles an available one.

In [20]:
%%time
with ThreadPoolExecutor(max_workers=100) as ex:
    # Could add futures with different function/arguments
    futures = {ex.submit(check_price, data): None for data in datasets}
    records = []
    for future in as_completed(futures):
        records.append(future.result())
        
print(f"{len(records)} records retrieved")

36 records retrieved
CPU times: user 125 ms, sys: 61.5 ms, total: 186 ms
Wall time: 874 ms


## Producer/Consumer Pattern

What we did so far supposed that we knew the data associated with our overall processing in advance of launching workers.  That will not always be the case; in particular, some workers may generate the data for other workers to process.

The trick to allowing this setup is the use of **queues** (or a similar data structure) that allow safe concurrent access.  A queue allows one worker to push data into a collection, and another worker to pop data, without risking one overwriting the other or other data integrity problems.

We will extend the example of querying prices from exchanges, but with the addition of a **producer** that generates requests at the same time as other **consumers** are processing them.

The architecture of this example is a bit more detailed, to resemble real programs you will write.  We will launch three types of workers as part of this overall system.

* A **monitor** that will simply show the evolving queues
* A **producer** that will feed requests into the TODO queue
* Multiple **consumers** that will act on requests, and add to the RESULTS queue

Let us first create the queues that the tasks will work with.

In [11]:
Q_todo, Q_results, Q_info = Queue(), SimpleQueue(), SimpleQueue()

While it might be more robust to put the initial introspection of the server into a concurrent task, here we simplify slightly and perform this small task in a non-concurrent way first.  The server can report what exchanges and symbols are available for querying.

In [12]:
# Find exchanges and symbols from the server
exchanges = requests.get(f"{base_url}/exchanges").json()
symbols = requests.get(f"{base_url}/symbols").json()

# Choose the dates of interest ourselves
dates, start = [], datetime(2020, 3, 1) 
for i in range(31):
    date = start + timedelta(days=i)
    dates.append(date.strftime('%Y-%m-%d'))

nreqs = len(exchanges) * len(dates) * len(symbols)
print(f"{nreqs} total queries will be performed")

1023 total queries will be performed


Next let us define the monitor that reports progress. This uses some magic with IPython widgets that are not the subject of this lesson; do not worry about those.

In [21]:
def monitor(Q_todo, Q_results, Q_info):
    # Create the visual monitor widgets
    todo = IntProgress(value=0, min=0, max=nreqs, step=1, description='TODO', 
                orientation='horizontal', bar_style='info', layout=Layout(width='50%'))
    done = IntProgress(value=0, min=0, max=nreqs, step=1, description='DONE', 
                orientation='horizontal', bar_style='success', layout=Layout(width='50%'))
    info = Label(value='STARTING...')
    display(todo); display(done); display(info)
    
    while True:
        todo.value = Q_todo.qsize()
        done.value = Q_results.qsize()
        try:
            info.value = f"{Q_info.get(timeout=3)}"
        except Empty:
            break

We need to create a producer function that will feed queries into the TODO queue.  In this case, the data involved is a small tuple of query elements; in other cases, the data itself might be substantial (such as a numeric array or a large text).  This producer artificially limits the rate at which it adds to the queue just to simulate real-world applications.

In [14]:
def producer(Q_todo, Q_info, exchanges, dates, symbols):
    Query = namedtuple('Query', 'exchange date symbol')
    for exchange in exchanges:
        for date in dates:
            for symbol in symbols:
                query = Query(exchange, symbol, date)
                Q_todo.put(query)
                Q_info.put(f"ADDING {query}")
                sleep(0.015)  # Artificial small delay

Finally, in our task scaffolding is the worker function.  This worker consumes unprocessed queries from the TODO queue, operates on them, and puts the result in the RESULTS queue.  The RESULTS queue is a regular `Queue` rather than a `SimpleQueue` because a worker will mark the query as processed at the end of its work, using the `.task_done()` method (not available in `SimpleQueue`).

In [15]:
def worker(Q_todo, Q_results, Q_info):
    thread = current_thread().name
    sleep(0.5)  # Let TODO queue get populated first
    while not Q_todo.empty():
        query = Q_todo.get()
        Q_info.put(f"PROCESSING {query}")
        result = check_price(query)
        Q_results.put(result)
        Q_todo.task_done()
        
    Q_info.put(f"EXITING {thread} (TODO queue empty)")

Putting it together, let us launch our concurrent processing using many threads.

In [26]:
%%time
Q_todo, Q_results, Q_info = Queue(), SimpleQueue(), SimpleQueue()

with ThreadPoolExecutor(max_workers=100) as ex:
    futures = []
    futures.append(ex.submit(producer, Q_todo, Q_info, exchanges, dates, symbols))
    futures.append(ex.submit(monitor, Q_todo, Q_results, Q_info))
    for _ in range(32):
        futures.append(ex.submit(worker, Q_todo, Q_results, Q_info))

IntProgress(value=0, bar_style='info', description='TODO', layout=Layout(width='50%'), max=1023)

IntProgress(value=0, bar_style='success', description='DONE', layout=Layout(width='50%'), max=1023)

Label(value='STARTING...')

CPU times: user 11.9 s, sys: 1.58 s, total: 13.5 s
Wall time: 28.8 s


Having run our processing across concurrent workers, for this purpose, we have accumulated all the results in the RESULTS queue.

In [17]:
print("All futures done?", all([f.done() for f in futures]))
print("Size of TODO:", Q_todo.qsize())
print("Size of RESULTS:", Q_results.qsize())
print("Size of INFO:", Q_info.qsize())

All futures done? True
Size of TODO: 0
Size of RESULTS: 1023
Size of INFO: 0


From the queue, we might want to pull the results into random access data structure.  For a few hundred elements, certainly this is convenient.

In [18]:
results = []
for _ in range(Q_results.qsize()):
    result = Q_results.get()
    results.append(result)
    Q_results.put(result)   # Put the result back at the end

In [19]:
sample(results, k=2)

[{'exchange': 'okex',
  'symbol': 'eth',
  'open': 135.91,
  'high': 145,
  'low': 132.49,
  'close': 138.42,
  'volume': 796791.072558,
  'day': '2020-03-25'},
 {'exchange': 'okex',
  'symbol': 'btc',
  'open': 5028,
  'high': 5527.9,
  'low': 4923.7,
  'close': 5314.1,
  'volume': 95054.82617103,
  'day': '2020-03-18'}]

## Summary

The `concurrent.futures` module is the most abstract, highest-level concurrency module in the Python standard library. It **should be** your default option when writing concurrent code.  

Only when you need more advanced capabilities, will you need to use the `threading` or `multiprocessing` modules directly.