# 15) Data in Time - Processes and Concurrency <a class="tocSkip">

### Programs and processes

When you run an individual program, your operating system creates a single process. It uses system resources and data structure in the operating system's kernel. A process is isolated from other processes, it can not see what other processes are doing or interfere with them.

We can access process data from your own programs. The standard library's os module provides a common way of accessing some system information. For instance, we can obtain the process ID with os.getpid(), the current working directory from os.getcwd(), and the user ID and group ID with os.getuid() and os.getgid().

We can run a Python function as a separate process, or even create multiple independent processes with the multiprocessing module:

In [15]:
import multiprocessing
import os
import psutil
import asyncio

In [9]:
def function(name):
    print('Process number {}: {}'.format(os.getpid(), name))

In [10]:
for n in range(5):
    p = multiprocessing.Process(target = function, args = ("I'm function number {}".format(n)))

If you created one or more processes and want to terminate one for some reason, use terminate(). The standard os package procides a lot of details on your system, and lets you control some of it if you run your Python script as a privileged user. Besides file and directory functions, it has informational functions such as os.getuname() and os.cpu_count(). The third party package psutil also provides system and process information:

In [11]:
# How much time each CPU (of which we have 16) is using

psutil.cpu_times(True)

[scputimes(user=1507.265625, system=3347.546875, idle=263258.984375, interrupt=1052.578125, dpc=738.625),
 scputimes(user=1933.125, system=973.015625, idle=265207.4375, interrupt=49.84375, dpc=16.421875),
 scputimes(user=8369.078125, system=6302.890625, idle=253441.609375, interrupt=82.046875, dpc=6.140625),
 scputimes(user=1436.1875, system=1399.90625, idle=265277.484375, interrupt=27.0, dpc=2.96875),
 scputimes(user=1772.5, system=982.765625, idle=265358.3125, interrupt=36.8125, dpc=3.890625),
 scputimes(user=1785.375, system=959.15625, idle=265369.046875, interrupt=40.921875, dpc=3.34375),
 scputimes(user=1553.3125, system=2623.21875, idle=263937.046875, interrupt=30.84375, dpc=4.78125),
 scputimes(user=1551.625, system=875.5625, idle=265686.390625, interrupt=32.984375, dpc=3.265625),
 scputimes(user=1568.609375, system=802.625, idle=265742.34375, interrupt=30.375, dpc=3.28125),
 scputimes(user=1563.3125, system=902.765625, idle=265647.5, interrupt=30.1875, dpc=3.453125),
 scputimes

In [12]:
# How busy the CPUs are

psutil.cpu_percent(True)

2.0

In [13]:
# How busy the CPUs are per CPU

psutil.cpu_percent(percpu = True)

[2.5,
 0.5,
 4.6,
 0.5,
 0.5,
 0.6,
 1.2,
 0.5,
 0.4,
 0.6,
 0.7,
 4.8,
 0.6,
 0.6,
 4.3,
 1.4]

We can also run command from the shell using invoke. One use of invoke is to make functions available as command-line arguments. We simply decorate a function with @task from invoke.task. The function requires a first argument that is used internally by invoke, but it can be named an argument. These tasks can have arguments and you can invoke multiple tasks at one time from the command line (similar to && use in shell scripts).

### Concurrency

In computers, if you are waiting for something, it is usually for one of two reasons:

    I/O bound
    
    This is by far the most common. Computer CPUs are incredibly fast - hundreds of times faster than computer memory and many thousands of times faster than disks or networks.
    
    CPU bound
    
    The CPU keeps busy. This happens with number crunching tasks such as scientific or graphic calculations.

Two more terms are related to concurrency:

    Synchronous - One thing follows the other.
    Asynchronous - Tasks are independent.

On a single machine, if you want to perform multiple tasks as fast as possible, you want to make them independent. The trick is getting them all to work with one another. Any shared control or state means that there will be bottlenecks. An even bigger trick is dealing with failures, because concurrent computing is harder than regular computing. The first way to manage tasks is queues:

#### Queues

A queue is like a list: things are added at one end and taken away from the other. Queues transport messages, which can be any kind of information. In this case, we are interested in queues for distributed task management, also known as work queues, job queues or task queues. Consider washing the dishes: washing, drying and putting away dishes may take a long time for an individual person if you individually wash, dry and store each dish in turn. This can be sped up by batch washing, batch drying and batch storing all the dishes together. Even quicker is to have multiple people: give each dish in the sink to an available washer, who washes it and hands it off to the first available dryer, who dries and hands it to a storer. This can be synchronous (workers wait for a dish to handle) or asynchronous (dishes are stacked between workers with different paces).

For a single machine, the standard library's multiprocessing modules contains a Queue function. Let us simulate a single washer and multiple dryer processes and an intermediate dish_queue:

In [14]:
# Simulating a washer and dryer chain process 

dishes = ['1', '2', '3', '4', '5']
dish_queue = multiprocessing.JoinableQueue()

for dish in dishes:
    print('Washing', dish, 'dish')
    dish_queue.put(dish)
    
while not dish_queue.empty():
    dish = dish_queue.get()
    print('Drying', dish, 'dish')
    

Washing 1 dish
Washing 2 dish
Washing 3 dish
Washing 4 dish
Washing 5 dish
Drying 1 dish
Drying 2 dish
Drying 3 dish
Drying 4 dish
Drying 5 dish


#### Threads

A thread runs within a process with access to everything in the process. The multiprocessing module has a similar module called threading that uses threads instead of processes. One difference between multiprocessing and threading is that threading does not have a terminate() function, there is no easy way to terminate a running thread. Like manual memory management in languages such as C and C++, threading can cause bugs that are hard to find, let alone fix. To use threads, all code in the program must be thread safe, this include not letting threads share any global variables, so they can run independently without breaking anything.

Threads can be useful and safe when global data is not involved. In particular, threads are useful for saving time while waiting for some I/O operation to complete. In these cases, they do not have to fight over data, because each has completely separate variables. In Python, threads do not speed up CPU-bound tasks. For Python, the recommendations are as follows:

    - Use threads for I/O bound problems.
    - Use processes, networking or event for CPU bound problems.

#### Green threads and gevent

Developers traditionally avoid slow spots in programs by running them in separate threads or processes. One alternative is event-based programming. An event-based program runs a central event loop, doles out any tasks and repeats the loop. The gevent library is event-based and lets you write normal imperative code and converts pieces to coroutines. These are like generators that communicate with one another and keep track of where they are.

This module makes use of greenlets (also known sometimes as a green thread or a microthread). The difference from a normal thread is that it does not block. If something occurred that would have blocked a normal thread, gevent switches control to one of the other greenlets. There are potential dangers when using gevent. As with any event-based system, each chunk of code that you execute should be relatively quick. Although it is non-blocking, code that does a lot of work is still slow.

#### Asyncio

In Python 3.4, Python added a standard asynchornous module called asyncio. Python 3.5 then added the keywords async and await. These implement some new concepts:

    - Coroutines are functions that pause at various points.
    - An event loop that schedules and runs coroutines.
    
These let us write asynchronous code that looks something like the normal synchronous code that we are used to. The event loop provides cooperative multitasking, in which coroutines indicate when they are able to start and stop. They run in a single thread.

You define a coroutine by putting async before its initial def. You call a coroutine by:

    - Putting await before it, which adds the coroutine to an existing event loop. You can do this only within another coroutine.
    - Or by using asyncio.run(), which explicitly starts an event loop.
    - Or by using asyncio.create_task() or asyncio.sensure_feature().

Below is an example where we create a task and await it. We use the asyncio.sleep(0 function to simulate a process taking time:

In [21]:
# Creating to asynchronous tasks (one that takes 5 seconds, one that takes 2 seconds)

async def task(start, seconds, end):
    print(start)
    await asyncio.sleep(seconds)
    print(end)
    
async def workers():
    task_1 = asyncio.create_task(task('Task 1 - Beginning', 5, 'Task 1 - Done'))
    task_2 = asyncio.create_task(task('Task 2 - Beginning', 2, 'Task 2 - Done'))
    await task_1
    await task_2

In [23]:
# Run the two tasks (Outside Jupyter we would use asyncio.run(workers()))

await workers()

Task 1 - Beginning
Task 2 - Beginning
Task 2 - Done
Task 1 - Done


We see that there is no delay in the tasks starting because they were separate tasks. But task 2 finishes 2 seconds later and task 1 finishes just 3 seconds after that. We have essentially saved 2 seconds from a synchronous approach.

With more moving parts, there are more possibilities for our assembly lines to be disrupted. Common techniques for potential problems include:

    Fire and forget
    Just pass things on and do not worry about the consequences, even if no one is there.
    
    Request-reply
    The washer receives an acknowledgment from the dryer, and the dryer from the storer, for each dish in the pipeline.
    
    Back pressure
    This technique directs a fast worker to take it easy if someone downstream can not keep up.

In real systems, you need to be careful that workers are keeping up with demand. You might add new tasks to a pending list, while some worker process removes the latest message and add it to a working list. When the message is done, it is removed from the working list and added to a completed list. This lets you know what tasks have failed or are taking too long. Some Python-based queue packages that add this extra level of management include:

    celery - can execute distributed tasks synchronously or asynchronously, using the the methods above: multiprocessing, gevent, and others.
    
    rq - a Python library for job queues.