# Chapter 6. Implementing Concurrency

Measure and improve the performance of programs by reducing the number of operations performed by the CPU through clever algorithms and more efficient machine code

Programs where most of the time is spent waiting for resources that are much slower than the CPU, such as persistent storage and network resources.

Asynchronous programming is a programming paradigm that helps to deal with slow and unpredictable resources (such as users) and is widely used to build responsive services and user interfaces

## 6.1 Asynchronous programming

Asynchronous programming is a way of dealing with slow and unpredictable resources. Rather than waiting idle for resources to become available, asynchronous programs are able to handle multiple resources concurrently and efficiently. Programming in an asynchronous way can be challenging because it is necessary to deal with external requests that can arrive in any order, may take a variable amount of time, or may fail unpredictably

### 6.1.1 Waiting for I/O

A modern computer employs different kinds of memory to store data and perform operations. In general, a computer possesses a combination of expensive memory that is capable of operating at fast speeds and cheaper, and more abundant memory that operates at lower speeds and is used to store a larger amount of data.

**Registers**: At the top of the memory hierarchy are the CPU registers. Those are integrated in the CPU and are used to store and execute machine instructions. Accessing data in a register generally takes one clock cycle. This means that if the CPU operates at 3 GHz, the time it takes to access one element in a CPU register is in the order of 0.3 nanoseconds.

**Cache**: At the layer just below the registers, you can find the CPU cache, which is comprised of multiple levels and is integrated in the processor. The cache operates at a slightly slower speed than the registers but within the same order of magnitude.

**RAM**: The next item in the hierarchy is the main memory (RAM), which holds much more data but is slower than the cache. Fetching an item from memory can take a few hundred clock cycles.

**Storage**: At the bottom layer, you can find persistent storage, such as a rotating disks (HDD) and Solid State Drives (SSD). These devices hold the most data and are orders of magnitude slower than the main memory. An HDD may take a few milliseconds to seek and retrieve an item, while an SSD is substantially faster and takes only a fraction of a millisecond

To put the relative speed of each memory type into perspective, if you were to have the CPU with a clock speed of about one second, a register access would be equivalent to picking up a pen from the table. A cache access will be equivalent to picking up a book from the shelf. Moving higher in the hierarchy, a RAM access will be equivalent to loading up the laundry (about twenty x slower than the cache). When we move to persistent storage, things are quite a bit different. Retrieving an element from an SSD will be equivalent to doing a four day trip, while retrieving an element from an HDD can take up to six months! The times can stretch even further if we move on to access resources over the network

From the preceding example, it should be clear that accessing data from storage and other I/O devices is much slower compared to the CPU; therefore, it is very important to handle those resources so that the CPU is never stuck waiting aimlessly. This can be accomplished by carefully designing software capable of managing multiple, ongoing requests at the same time

### 6.1.2 Concurrency

Concurrency is a way to implement a system that is able to deal with multiple requests at the same time. The idea is that we can move on and start handling other resources while we wait for a resource to become available. Concurrency works by splitting a task into smaller subtasks that can be executed out of order so that multiple tasks can be partially advanced without waiting for the previous tasks to finish

As a first example, we will describe how to implement concurrent access to a slow network resource. Let's say we have a web service that takes the square of a number, and the time between our request and the response will be approximately one second. We can implement the network_request function that takes a number and returns a dictionary that contains information about the success of the operation and the result. We can simulate such services using the time.sleep function, as follows

In [3]:
import time
def network_request(number):
    time.sleep(1.0)
    return {"success": True, "result": number ** 2}

def fetch_square(number):
    response = network_request(number)
    if response["success"]:
        print("Result is: {}".format(response["result"]))
        
fetch_square(2)
# Output:
# Result is: 4

Result is: 4


In [4]:
fetch_square(2)
fetch_square(3)
fetch_square(4)

Result is: 4
Result is: 9
Result is: 16


The previous code will take three seconds to run, but it's not the best we can do. Waiting for the previous result to finish is unnecessary as we can technically submit multiple requests at and wait for them parallely

### 6.1.3 Callbacks

Imagine that you are at a restaurant and you've had a few drinks. It's raining outside, and you'd rather not take the bus; therefore, you request a taxi and ask them to call when they're outside so that you can come out, and you don't have to wait in the rain. What you did in this case is request a taxi (that is, the slow resource) but instead of waiting outside until the taxi arrives, you provide your number and instructions (callback) so that you can come outside when they're ready and go home.

In [5]:
def wait_and_print(msg):
    time.sleep(1.0)
    print(msg)
    
import threading

def wait_and_print_async(msg):
    def callback():
        print(msg)
        
    timer = threading.Timer(1.0, callback)
    timer.start()

An important feature of the wait_and_print_async function is that none of the statements are blocking the execution flow of the program.

This technique of registering callbacks for execution in response to certain events is commonly called the Hollywood principle. This is because, after an audition for a role at Hollywood, you may be told "Don't call us, we'll call you", meaning that they won't tell you if they chose you for the role immediately, but they'll call you in case they do.

In [6]:
# Syncronous
wait_and_print("First call")
wait_and_print("Second call")
print("After call")

First call
Second call
After call


In [7]:
# Async
wait_and_print_async("First call async")
wait_and_print_async("Second call async")
print("After submission")

After submission


The synchronous version behaves in a very familiar way. The code waits for a second, prints First call, waits for another second, and then prints the Second call and After call messages. In the asynchronous version, wait_and_print_async submits (rather than execute) those calls and moves on immediately. You can see this mechanism in action by acknowledging that the "After submission" message is printed immediately.

In [8]:
def network_request_async(number, on_done):
    def timer_done():
        on_done({"success": True,
                 "result": number ** 2})
    timer = threading.Timer(1.0, timer_done)
    timer.start() 
    
def on_done(result):
    print(result)
    
network_request_async(2, on_done)

In [9]:
network_request_async(2, on_done)
network_request_async(3, on_done)
network_request_async(4, on_done)
print("After submission")

After submission


In [10]:
def fetch_square(number):
    def on_done(response):
        if response["success"]:
            print("Result is: {}".format(response["result"]))
            
    network_request_async(number, on_done)

### 6.1.4 Futures

A future is an abstraction that helps us keep track of the requested resources and that we are waiting to become available

In [35]:
pip install asyncio

Note: you may need to restart the kernel to use updated packages.


In [36]:
import asyncio
fut = asyncio.Future()
fut

<Future pending>

In [37]:
fut.set_result("Hello")

In [38]:
fut.result()

'Hello'

In [40]:
fut = asyncio.Future()
fut.add_done_callback(lambda future: print(future.result(), flush=True))
fut.set_result("Hello")

Hello


In [41]:
from concurrent.futures import Future

def network_request_async(number):
    future = Future()
    result = {"success": True, "result": number ** 2}
    timer = threading.Timer(1.0, lambda: future.set_result(result))
    timer.start()
    return future

fut = network_request_async(2)

In [42]:
def fetch_square(number):
    fut = network_request_async(number)
    
    def on_done_future(future):
        response = future.result()
        if response["success"]:
            print("Result is: {}".format(response["result"]))
    fut.add_done_callback(on_done_future)

Futures are a different and slightly more convenient way of working with callbacks. Futures are also advantageous, because they can keep track of the resource status, cancel (unschedule) scheduled tasks, and handle exceptions more naturally.

### 6.1.5 Event loops

The idea behind an event loop is to continuously monitor the status of the various resources (for example, network connections and database queries) and trigger the execution of callbacks when events take place (for example, when a resource is ready or when a timer expires).

Why not just stick to threading? Events loops are sometimes preferred as every unit of execution never runs at the same time as another and this can simplify dealing with shared variables, data structures, and resources.

In [43]:
class Timer:
    def __init__(self, timeout):
        self.timeout = timeout
        self.start = time.time()
    def done(self):
        return time.time() - self.start > self.timeout

In [44]:
timer = Timer(1.0)

while True:
    if timer.done():
        print("Timer is done!")
        break

Timer is done!


Waiting for events to happen by continuously polling using a loop is commonly termed as busy-waiting

In [46]:
class Timer:
# ... previous code

    def on_timer_done(self, callback):
        self.callback = callback

Note that on_timer_done merely stores a reference to the callback. The entity that monitors the event and executes the callback is the loop. This concept is demonstrated as follows

In [None]:
timer = Timer(1.0)
timer.on_timer_done(lambda: print("Timer is done!"))

while True:
    if timer.done():
        timer.callback()
        break

In [None]:
timers = []

timer1 = Timer(1.0)
timer1.on_timer_done(lambda: print("First timer is done!"))

timer2 = Timer(2.0)
timer2.on_timer_done(lambda: print("Second timer is done!"))

timers.append(timer1)
timers.append(timer2)

while True:
    for timer in timers:
        if timer.done():
            timer.callback()
            timers.remove(timer)
    # If no more timers are left, we exit the loop
    if len(timers) == 0:
        break

The main restriction of an event loop is, since the flow of execution is managed by a continuously running loop, that it never uses blocking calls. If we use any blocking statement (such as time.sleep) inside the loop, you can imagine how the event monitoring and callback dispatching will stop until the blocking call is done.

To avoid this, rather than using a blocking call, such as time.sleep, we let the event loop detect and execute the callback when the resource is ready. By not blocking the execution flow, the event loop is free to monitor multiple resources in a concurrent way.

## 6.2 The asyncio framework

In [None]:
import asyncio 

loop = asyncio.get_event_loop()

def callback():
    print('Hello, asyncio')
    loop.stop()

loop.call_later(1.0, callback)
loop.run_forever()    

### 6.2.1 Coroutines

One of the main problems with callbacks is that they require you to break the program execution into small functions that will be invoked when a certain event takes place. As we saw in the earlier sections, callbacks can quickly become cumbersome

Coroutines are another, perhaps a more natural, way to break up the program execution into chunks. They allow the programmer to write code that resembles synchronous code but will execute asynchronously. You may think of a coroutine as a function that can be stopped and resumed. A basic example of coroutines is generators

Generators can be defined in Python using the yield statement inside a function

In [3]:
def range_generator(n):
    i = 0
    while i < n:
        print('Generating value {}'.format(i))
        yield 1
        i += 1
        
generator = range_generator(3)
generator

<generator object range_generator at 0x1120f9b50>

In [4]:
next(generator)

Generating value 0


1

You can think of a yield statement as a breakpoint where we can stop and resume execution (while also maintaining the internal state of the generator). This ability of stopping and resuming execution can be leveraged by the event loop to allow for concurrency.

In the Python world, a generator that can also receive values is called a generator-based coroutine

In [5]:
def parrot():
    while True:
        message = yield
        print("Parrot says: {}".format(message))
        
generator = parrot()
generator.send(None)
generator.send("Hello")
generator.send("World")

Parrot says: Hello
Parrot says: World


In [6]:
async def hello():
    print('Hello, async!')

coro = hello()
coro

<coroutine object hello at 0x112149200>

In [None]:
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)

In [None]:
async def wait_and_print(msg):
    await asyncio.sleep(1)
    print("Message: ", msg)
    
loop.run_until_complete(wait_and_print("Hello"))

In [12]:
async def network_request(number):
    await asyncio.sleep(1.0)
    return {"success": True, "result": number ** 2}

In [13]:
async def fetch_square(number):
    response = await network_request(number)
    if response["success"]:
        print("Result is: {}".format(response["result"]))

In [None]:
loop.run_until_complete(fetch_square(2))
loop.run_until_complete(fetch_square(3))
loop.run_until_complete(fetch_square(4))

In [16]:
asyncio.ensure_future(fetch_square(2))
asyncio.ensure_future(fetch_square(3))
asyncio.ensure_future(fetch_square(4))


<Task pending coro=<fetch_square() running at <ipython-input-13-044634bf1e4c>:1>>

In [None]:
loop.run_forever()

### 6.2.2 Converting blocking code into non-blocking code

An effective strategy for dealing with blocking code is to run it in a separate thread. Threads are implemented at the Operating System (OS) level and allow parallel execution of blocking code. For this purpose, Python provides the Executor interface designed to run tasks in a separate thread and to monitor their progress using futures.

In [20]:
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=3)

def wait_and_return(msg):
    time.sleep(1)
    return msg

executor.submit(wait_and_return, "Hello. executor")

<Future at 0x112200d50 state=finished raised NameError>

In [21]:
fut = loop.run_in_executor(executor, wait_and_return, "Hello, asyncio executor")

In [None]:
loop.run_until_complete(fut)

In [None]:
import requests

async def fetch_urls(urls):
    responses = []
    for url in urls:
        responses.append(await loop.run_in_executor
                        (executor, requests.get, url))
    return responses

loop.run_until_complete(fetch_urls(['http://www.google.com',
'http://www.example.com',
'http://www.facebook.com']))

In [None]:
def fetch_urls(urls):
    return asyncio.gather(*[loop.run_in_executor
                            (executor, requests.get, url)
                            for url in urls])

## 6.3 Reactive programming

Reactive manifesto:
1. **Responsive**: The system responds immediately to the user
2. **Elastic**: The system is capable of handling different levels of load and is able to adapt to accommodate increasing demands
3. **Resilient**: The system deals with failure gracefully. This is achieved by modularity and avoiding having a single point of failure
4. **Message driven**: The system should not block and take advantage of events and messages. A message-driven application helps achieve all the previous requirements

### 6.3.1 Observables

In [None]:
!pip3 install rx

In [None]:
from rx import Observable
obs = Observable.from_iterable(range(4))

In [None]:
obs.subscribe(print)

Observables are ordered collections of items just like lists or, more generally, iterators. The term observable comes from the combination of observer and iterable. An observer is an object that reacts to changes of the variable it observes, while an iterable is an object that is capable of producing and keeping track of an iterator

In [33]:
collection = list([1, 2, 3, 4, 5])
iterator = iter(collection)
print("Next")
print(next(iterator))
print(next(iterator))
print("For loop")
for i in iterator:
    print(i)

Next
1
2
For loop
3
4
5


You can see how, every time we call next or we iterate, the iterator produces a value and advances. In a sense, we are pulling results from the iterator.

Iterators sound a lot like generators; however, they are more general. In Python, generators are returned by functions that use yield expressions. As we saw, generators support next, therefore, they are a special class of iterators.

In [None]:
obs = Observable.from_iter(range(4))
obs.subscribe(on_next=lambda x: print(on_next="Next item: {}"),
              on_completed=lambda: print("No more data"))

In [None]:
obs = Observable.from_iterable(range(100000))
obs2 = obs.take(4)
obs2.subscribe(print)

### 6.3.2 Useful operators

In [None]:
(Observable.from_iterable(range(4))
.map(lambda x: x**2)
.subscribe(print))

In [None]:
obs = (Observable.from_range(range(4))
.group_by(lambda x: x % 2))

In [None]:
obs.subscribe(print)

In [None]:
obs.subscribe(lambda x: print("group key: ", x.key))

In [None]:
obs.take(1).subscribe(lambda x: x.subscribe(print))

In [None]:
obs.merge_all().subscribe(print)

In [None]:
def make_replay(a):
    result = a.replay(None)
    result.connect()
    return result
obs.map(make_replay).concat_all().subscribe(print)

### 6.3.3 Hot and cold observables

In [None]:
obs = Observable.interval(1000)
obs.take(4).subscribe(print)

In [None]:
import time
start = time.time()
obs = Observable.interval(1000).map(lambda a: (a, time.time() - start))

In [None]:
time.sleep(2)
obs.take(4).subscribe(print)

In [None]:
start = time.time()
obs = Observable.interval(1000).map(lambda a: (a, time.time() - start))

In [None]:
time.sleep(2)
obs.take(4).subscribe(lambda x: print("First subscriber: {}".format(x)))
time.sleep(0.5)
obs.take(4).subscribe(lambda x: print("Second subscriber: {}".format(x)))

In [None]:
start = time.time()
obs = Observable.interval(1000).map(lambda a: (a, time.time() - start)).publish()
obs.take(4).subscribe(lambda x: print("First subscriber: {}".format(x)))
obs.connect() # Data production starts here
time.sleep(2)
obs.take(4).subscribe(lambda x: print("Second subscriber: {}".format(x)))

In [None]:
import time
start = time.time()
obs = Observable.interval(1000).map(lambda a: (a, time.time() - bstart)).replay(None)
obs.take(4).subscribe(lambda x: print("First subscriber: {}".format(x)))
obs.connect()
time.sleep(2)
obs.take(4).subscribe(lambda x: print("Second subscriber: {}".format(x)))

In [None]:
s = Subject()
s.subscribe(lambda a: print("Subject emitted value: {}".format(x))
s.on_next(1)
# Subject emitted value: 1
s.on_next(2)
# Subject emitted value: 2

### 6.3.4 Building a CPU monitor

In [40]:
import psutil
psutil.cpu_percent()

0.0

In [None]:
from rx import Observable
cpu_data = (Observable
.interval(100) # Each 100 milliseconds
.map(lambda x: psutil.cpu_percent())
.publish())
cpu_data.connect() # Start producing data

In [None]:
cpu_data.take(4).subscribe(print)

In [None]:
import numpy as np
from matplotlib import pyplot as plt
def monitor_cpu(npoints):
    lines, = plt.plot([], [])
    plt.xlim(0, npoints)
    plt.ylim(0, 100) # 0 to 100 percent
    cpu_data_window = cpu_data.buffer_with_count(npoints, 1)
    def update_plot(cpu_readings):
        lines.set_xdata(np.arange(npoints))
        lines.set_ydata(np.array(cpu_readings))
        plt.draw()
cpu_data_window.subscribe(update_plot)
plt.show()

In [None]:
alertpoints = 4
high_cpu = (cpu_data
.buffer_with_count(alertpoints, 1)
.map(lambda readings: all(r > 20 for r in readings)))

In [None]:
label = plt.text(1, 1, "normal")
def update_warning(is_high):
    if is_high:
        label.set_text("high")
    else:
        label.set_text("normal")
high_cpu.subscribe(update_warning)

## 6.4 Summary