# 05 - Thread pools, Map/Reduce, Data races
## February 18th

We have seen the use of Fork/Join pools and map/reduce algorithms in the lecture. The tutorial gave a little additional depth on race conditions. In today's practical, we will revisit these concepts and look at how they are implemented in Python.

We will again be using Python's `threading` module. It provides all the tools that are needed to implement thread pools and Map/Reduce-style concurrent computation. Moreover, we will analyse how we can use the process synchronisation tools that we have learned about in the last two sessions to avoid race conditions when processing data in parallel. 

## Learning outcomes
1. Using `ThreadPools` in Python
2. Implement and run a simple Map/Reduce algorithm
2. Understanding and resolving race conditions in Python

## 3.1 Thread pools 
We'll start of using thread and process pools in Python. The relevant classes are contained in `multiprocessing.pool`, a sub-module of the `multiprocessing` module.

In [70]:
import multiprocessing.pool as mpp

Let's instatiate a `ThreadPool` with 4 threads:

In [71]:
tp_4 = mpp.ThreadPool(4)

Next we distribute a couple of tasks across the `ThreadPool` and have them executed concurrently. We start off simple, so let's implement a friendly function `say_hi` that is going to be executed in the threads. It simply prints the name of the thread that is executing it. As seen previously, we included a tiny wait time before the print statement to simulate what might happen on a busy system when, i.e. when threads get interleaved. 

Please put your name in the `args` parameter to be greeted properly.

In [72]:
import random
import time
import threading

def say_hi(name):
    time.sleep(random.random() * 0.3) # wait for a little while to force interleaving
    print("Hi {} from thread {}".format(name, threading.current_thread().name))

Next we'll submit the function to the `ThreadPool` for execution to be executed 10 times. We use the `apply_async` method for this purpose, calling it 10 times in a loop. 

In [73]:
num_statements = 10

for i in range(num_statements):
    tp_4.apply_async(func=say_hi, args=('PM',))

Hi PM from thread Thread-14
Hi PM from thread Thread-11
Hi PM from thread Thread-14Hi PM from thread Thread-13

Hi PM from thread Thread-14
Hi PM from thread Thread-12
Hi PM from thread Thread-13
Hi PM from thread Thread-11
Hi PM from thread Thread-12Hi PM from thread Thread-14



The result is that all four threads say "Hi" a couple of times. Please note that the `Thread` that executes the message is always one of the four threads in the `ThreadPool`.

<hr/>

## Exercise 1
The `apply_async` method works *asynchronously*, which means that it doesn't wait for the submitted function to return. The `ThreadPool` also has an `apply` function that blocks until the submitted function returns. What effect on concurrent execution do you expect?

Change the `apply_async` to `apply` and run the example again. Compare the time the cell takes to complete between the two versions. What do you observe?
<br/><hr/>

## 3.2 Map/Reduce
Many interesting problems in computing involve splitting work across many workers and combining their individual results afterwards. This is often called _Divide and Conquer_, or, more recently, _Map/Reduce_. The Map/Reduce pattern essentially involves two steps:

1. __Map:__ assign functions and chunks of data to individual worker threads.
2. __Reduce:__ Combine the results from the worker threads. 

### 3.2.1 Map
Let's have a loook at the `map` function in the `ThreadPool` class first. We'll use the `say_hi` function from above but this time we'll use it to greet many different people instead of just one person. 

In [11]:
names = ['Bradley', 'Sachin', 'Prijanka', 'Nicholas', 'Erica', 'Lyubomir', 'Christopher']

tp_4.map(say_hi, names)

Hi Nicholas from thread Thread-5
Hi Erica from thread Thread-5
Hi Bradley from thread Thread-6
Hi Sachin from thread Thread-4
Hi Christopher from thread Thread-6
Hi Prijanka from thread Thread-7
Hi Lyubomir from thread Thread-5


[None, None, None, None, None, None, None]

The `map` method takes a function and a list of arguments, and executes the function once for each argument. Since we use a `ThreadPool`, the function/argument pairs are evenly distributed across all threads in the pool.

The difference to the `apply_async` function is that we don't need a loop to distribute the function; the `map` function takes care of this for us. 

You may have noted the line that goes "``` [None, None, None, None, None, None, None] ```". This signifies that the `map` method returned a list of values. Each of the values is the return value of one of the instances of our `say_hi` function. Since this function doesn't return anything, the return value is `None`. 

And this leads us directly to... 

### 3.2.2 Reduce

The _reduce_ step is all about combining the results from the _mapped_ functions. Since our `say_hi` function was more of a toy that didn't compute or return anything, we're going to need another function that does a bit of actual work. 

Say we want to sum up all numbers in a large array. Using the map/reduce paradigm, we can simply split the array across multiple workers (Map) and sum up the result afterwards (Reduce).

First, let's create a long list of numbers and split this into equal parts.

In [75]:
num_items = 1005
long_list = list(range(1, num_items)) #array with numbers from 1 to num_items - 1

num_splits = 10 # the number of splits we want to use
split_length = 100

split_list = []
for i in range(0, len(long_list), split_length):
    split_list.append(long_list[i:i + split_length])


print("The split list has {} smaller lists, they have element counts {}.".format(len(split_list), [len(li) for li in split_list]))

The split list has 11 smaller lists, they have element counts [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 4].


Next, we write a function that sums all elements in a list.

In [76]:
def sum(the_list):
    the_sum = 0
    for i in range(len(the_list)):
        the_sum += the_list[i]
    return(the_sum)

Let's __map__ this function with the split list on the `ThreadPool`. This time we store the individual results in the `result` variable. 

In [78]:
result = tp_4.map(sum, split_list)

Let's look at the result:

In [79]:
print(result)

[5050, 15050, 25050, 35050, 45050, 55050, 65050, 75050, 85050, 95050, 4010]


Finally, we __reduce__ the list of individual results to produce the final result. To this end we use Python's `reduce` function. This takes two parameters: a function that specifies how to combine __two__ results, and the list that is to be reduced. 

In [80]:
from functools import reduce

def sum_two(x, y):
    return x + y

final_sum = reduce(sum_two, result)

print("The final sum from the multithreaded version is {}.".format(final_sum))


The final sum from the multithreaded version is 504510.


As a sanity check, let's see if we get the same result when we use our `sum` function to sum the whole list, without splitting it up:

In [81]:
final_sum_singlethread = sum(long_list)
print("The final sum from the singlethreaded version is {}.".format(final_sum))

if (final_sum == final_sum_singlethread):
    print("Both results are the same.")
else:
    print("Oh no! The results differ.")

The final sum from the singlethreaded version is 504510.
Both results are the same.


As a side note, `reduce` is usually used with Python's `lambda` function. Python `lambda` functions allow you to define 'anonymous' functions in-place, that is, functions without a name at the location in the code where they are actually used. Using `lambda`, our example looks like this:

In [22]:
from functools import reduce

#def sum_two(x, y):
#    return x + y

# note how the lambda definition replaces the previous sum_two function:
final_sum = reduce(lambda x,y: x+y, result)

print("The final sum from the multithreaded version is {}.".format(final_sum))


The final sum from the multithreaded version is 504510.


<hr/>

## Exercise 2 (a tricky one if you like a challenge)

Which three single characters need to be changed in the code above in order to compute the *product* of all number in the list? 
<br/>
<hr/>

## 3.3 Race conditions

Race conditions are always potential hazards when working with concurrent access to data. In the above Map/Reduce example, we avoided race conditions by splitting data into distinct chunks that are then separately processed by concurrent workers. Only in the final reduce step the data is combined in a non-concurrent fashion. 

But sometimes we need to access data concurrently in order to achieve the desired result. 

Consider the following function. It takes a `global` variable that is shared across threads and counts it down.

In [60]:
def count_em_down(count):
    global number
    for i in range(count):
        number -= 1

Each thread shall count down a fraction of the `global` number, so that in the end the `global` number will be at zero. 

We'll use the `apply_async` method to invoke this function concurrently on a number of threads. Since `apply_async` doesn't block, we need to make sure that we wait until all computation is finished. This is achieved by the last `for`-loop, where we call `wait()` on the individual results from the threads.

(if you want to know how exactly this works please look up the documentation of [Process Pools in the Python docs](https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#module-multiprocessing.pool)).


In [61]:
num_threads = 10
count = 100000
number = num_threads * count
results = []
for i in range(num_threads):
    result = tp_4.apply_async(count_em_down, args=(count,))
    results.append(result)

for r in results:
    r.wait()

In [62]:
print("Number should be 0. Actually, it is {}.".format(number))

Number should be 0. Actually, it is 262352.


<hr/>

## Exercise 3

The number should be 0, but it is something considerable higher than that. How can this happen? Think about a timing diagram that could explain this. 
<br/>
<hr/>

## Exercise 4 (a coding exercise, finally!!!)
Fix the race condition by using `threading.Lock`. Hint: You should define the `Lock` outside the `count_em_down` function, but you should `acquire` and `release` the lock in the function.
<br/>
<hr/>


In [66]:
import threading as th 
count_sema = th.Lock()

In [67]:
def count_em_down1(count, sema):
    global number
    
    for i in range(count):
        sema.acquire()
        number -= 1
        sema.release()

In [68]:
num_threads = 10
count = 100000
number = num_threads * count
print("Number initially is {}.".format(number))
results = []
for i in range(num_threads):
    result = tp_4.apply_async(count_em_down1, args=(count,count_sema))
    results.append(result)

for r in results:
    r.wait()

Number initially is 1000000.


In [69]:
print("Number should be 0. Actually, it is {}.".format(number))

Number should be 0. Actually, it is 0.
