# Parallel Computation

## Parallel computers
- Multiprocessor/multicore: several processors work on data stored in shared memory
- Cluster: several processor/memory units work together by exchanging data over a network
- Co-processor: a general-purpose processor delegates specific tasks to a special-purpose processor (GPU)


## Parallel Programming
- Decomposition of the complete task into independent subtasks and the data flow between them.
- Distribution of the subtasks over the processors minimizing the total execution time.
- For clusters: distribution of the data over the nodes minimizing the communication time.
- For multiprocessors: optimization of the memory access patterns minimizing waiting times.
- Synchronization of the individual processes.

## MapReduce

In [69]:
from time import sleep
def f(x):
    sleep(1)
    return x*x
L = list(range(8))
L

[0, 1, 2, 3, 4, 5, 6, 7]

In [70]:
%time sum([f(x) for x in L])

Wall time: 8.02 s


140

In [71]:
%time sum(map(f,L))

Wall time: 8.04 s


140

## Multiprocessing 

`multiprocessing` is a package that supports spawning processes.

We can use it to display how many concurrent processes you can launch on your computer.

In [72]:
from multiprocessing import cpu_count

cpu_count()

4

## Futures

The `concurrent.futures` module provides a high-level interface for asynchronously executing callables.

The asynchronous execution can be performed with:
- **threads**, using ThreadPoolExecutor, 
- separate **processes**, using ProcessPoolExecutor. 
Both implement the same interface, which is defined by the abstract Executor class.

`concurrent.futures` can't launch **processes** on windows. Windows users must install 
[loky](https://github.com/tomMoral/loky).

In [73]:
%%time
#from concurrent.futures import ProcessPoolExecutor # for Unix
from loky import ProcessPoolExecutor  # for Windows users

with ProcessPoolExecutor() as epool: # Create several Python processes (cpu_count)

    results = sum(epool.map(f, L))
    
print(results)

140
Wall time: 2.27 s


- `ProcessPoolExecutor` launches one slave process per physical core on the computer. 
- `epool.map` divides the input list into chunks and puts the tasks (function + chunk) on a queue.
- Each slave process takes a task (function + a chunk of data), runs map(function, chunk), and puts the result on a result list.
- `epool.map` on the master process waits until all tasks are handled and returns the concatenation of the result lists.

In [74]:
%%time
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as epool:

    results = sum(epool.map(f, L))
    
print(results)

140
Wall time: 1.01 s


## Thread and Process: Differences

- A **process** is an instance of a running program. 
- **Process** may contain one or more **threads**, but a **thread** cannot contain a **process**.
- **Process** has a self-contained execution environment. It has its own memory space. 
- Application running on your computer may be a set of cooperating **processes**.
- **Process** don't share its memory, communication between **processes** implies data serialization.

- A **thread** is made of and exist within a **process**; every **process** has at least one **thread**. 
- Multiple **threads** in a **process** share resources, which helps in efficient communication between **threads**.
- **Threads** can be concurrent on a multi-core system, with every core executing the separate **threads** simultaneously.




## The Global Interpreter Lock (GIL)

- The Python interpreter is not thread safe.
- A few critical internal data structures may only be accessed by one thread at a time. Access to them is protected by the GIL.
- Attempts at removing the GIL from Python have failed until now. The main difficulty is maintaining the C API for extension modules.
- Multiprocessing avoids the GIL by having separate processes which each have an independent copy of the interpreter data structures.
- The price to pay: serialization of tasks, arguments, and results.

## Weighted mean and Variance

### Exercise 6.1

Use `ThreadPoolExecutor` to parallelized functions written in notebook 05

# Wordcount


## Parallel map


- Let's improve the `mapper` function by print out inside the function the current process name. 

*Example*

In [75]:
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
from loky import ProcessPoolExecutor 
def process_name(n):
    " prints out the current process name "
    print(mp.current_process().name)

with ProcessPoolExecutor(max_workers=4) as e:
    _ = e.map(process_name, range(mp.cpu_count()))
process_name(4)

MainProcess


### Exercise 6.2

- Modify the mapper function by adding this print.

In [76]:
from collections import defaultdict
import multiprocessing as mp

def mapper(filename):
    """Function with a single file name as input that returns a sorted sequence of tuples (word, 1) values."""
    with open(filename, 'r') as f:
        text = f.read()
    word_list = sorted(text.lower().replace('.', '').split())
    print(mp.current_process().name) # Rien ne s'affiche avec ProcessPoolExecutor
    return [(word, 1) for word in word_list]

def partitioner(mapper_res):
    """Function that stores the key/value pairs from mapper that group (word, 1) pairs into a list
    of format (word, [1, 1, 1, ...] with n times 1, n is the number of occurences of word)"""
    word_dict = defaultdict(list)
    for word, occ in mapper_res:
        word_dict[word].append(occ)
    return sorted(list(word_dict.items()))

def reducer(partition_elmt):
    """Function that read a tuple (word,[1,1,1,..,1]) and sum the occurrences of word to a final count,
    and then output the tuple (word,occurences)."""
    return (partition_elmt[0], len(partition_elmt[1]))

## Parallel reduce

- For parallel reduce operation, data must be aligned in a container. We already created a `partitioner` function that returns this container.

### Exercise 6.3

Write a parallel program that uses the three functions above using `ProcessPoolExecutor`. It reads all the "sample\*.txt" files. Map and reduce steps are parallel.


In [77]:
from lorem import text
for i in range(8):
    with open("sample{0:02d}.txt".format(i), "w") as f:
        f.write(text())
        
import glob
files = glob.glob('sample0*.txt')
files        

['sample00.txt',
 'sample01.txt',
 'sample02.txt',
 'sample03.txt',
 'sample04.txt',
 'sample05.txt',
 'sample06.txt',
 'sample07.txt']

In [78]:
from functools import reduce
from itertools import chain
from loky import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor

def wordcount(file_list):
    """Function taking for argument a list of files and returns the count (word, occurences)
    of all files together sorted by the number of occurences"""
    with ProcessPoolExecutor() as e:
        mapped = e.map(mapper, file_list)
        res = e.map(reducer, partitioner(chain(*mapped)))
    return sorted(res, key=lambda t:t[1], reverse=True)

wordcount(files)

[('labore', 68),
 ('tempora', 67),
 ('consectetur', 66),
 ('neque', 66),
 ('magnam', 65),
 ('est', 64),
 ('porro', 63),
 ('voluptatem', 63),
 ('dolor', 62),
 ('amet', 61),
 ('dolorem', 61),
 ('ipsum', 61),
 ('modi', 61),
 ('adipisci', 60),
 ('aliquam', 60),
 ('quaerat', 60),
 ('numquam', 59),
 ('sed', 59),
 ('dolore', 58),
 ('eius', 57),
 ('quisquam', 57),
 ('ut', 55),
 ('etincidunt', 53),
 ('quiquia', 52),
 ('non', 51),
 ('sit', 47),
 ('velit', 42)]

## Increase volume of data

*Due to the proxy, code above is not runnable on workstations*

### Getting the data

- [The Latin Library](http://www.thelatinlibrary.com/) contains a huge collection of freely accessible Latin texts. We get links on the Latin Library's homepage ignoring some links that are not associated with a particular author.

In [79]:
from bs4 import BeautifulSoup
from urllib.request import *

base_url = "http://www.thelatinlibrary.com/"
home_content = urlopen(base_url)

soup = BeautifulSoup(home_content, "lxml")
author_page_links = soup.find_all("a")
author_pages = [ap["href"] for i, ap in enumerate(author_page_links) if i < 49]

### Generate html links

- Create a list of all links pointing to Latin texts. The Latin Library uses a special format which makes it easy to find the corresponding links: All of these links contain the name of the text author.

In [80]:
ap_content = list()
for ap in author_pages:
    ap_content.append(urlopen(base_url + ap))

book_links = list()
for path, content in zip(author_pages, ap_content):
    author_name = path.split(".")[0]
    ap_soup = BeautifulSoup(content, "lxml")
    book_links += ([link for link in ap_soup.find_all("a", {"href": True}) if author_name in link["href"]])

### Download webpages content

In [81]:
from urllib.error import HTTPError

num_pages = 100

for i, bl in enumerate(book_links[:num_pages]):
    print("Getting content " + str(i + 1) + " of " + str(num_pages), end="\r", flush=True)
    try:
        content = urlopen(base_url + bl["href"]).read()
        with open("book" + str(i) + ".dat","wb") as f:
            f.write(content)
    except HTTPError as err:
        print("Unable to retrieve " + bl["href"] + ".")
        continue

Unable to retrieve cicero/cael.shtml.
Getting content 100 of 100

### Extract data files

- I already put the content of pages in files named book-*.txt
- You can extract data from the archive by running the cell below


In [82]:
import os  # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives

def extract_data():
    datadir = os.path.join('..','data','latinbooks')
    if not os.path.exists(datadir):
       print("Extracting data...")
       tar_path = os.path.join('..','data', 'latinbooks.tgz')
       with tarfile.open(tar_path, mode='r:gz') as books:
          books.extractall('../data')
            
extract_data() # this function call will extract text files in ../data/latinbooks

### Read data files

In [83]:
from glob import glob
files = glob('../data/latinbooks/*')
texts = list()
for file in files:
    with open(file,'rb') as f:
        text = f.read()
    texts.append(text)

### Extract the text from html and split the text at periods to convert it into sentences.

In [84]:
%%time 
from bs4 import BeautifulSoup

sentences = list()

for i, text in enumerate(texts):
    print("Document " + str(i + 1) + " of " + str(len(texts)), end="\r", flush=True)
    textSoup = BeautifulSoup(text, "lxml")
    paragraphs = textSoup.find_all("p", attrs={"class":None})
    prepared = ("".join([p.text.strip().lower() for p in paragraphs[1:-1]]))
    for t in prepared.split("."):
        part = "".join([c for c in t if c.isalpha() or c.isspace()])
        sentences.append(part.strip())

print(sentences[1262])

adfirmant etiam aliqui terrarum halitu densiore crassatum aera emittendis corporis spiraminibus resistentem necare non nullos qua causa animalia praeter homines cetera iugiter prona homero auctore et experimentis deinceps multis cum talis incesserit labes ante novimus interire
Wall time: 1.97 s


In [85]:
len(sentences)

31767

### Exercise 6.4

Parallelize this last process using `concurrent.futures`.

In [86]:
%%time
from loky import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from itertools import chain

def mapper(text):
    textSoup = BeautifulSoup(text, "lxml")
    paragraphs = textSoup.find_all("p", attrs={"class":None})
    prepared = ("".join([p.text.strip().lower() for p in paragraphs[1:-1]]))
    sentences = []
    for t in prepared.split("."):
        part = "".join([c for c in t if c.isalpha() or c.isspace()])
        sentences.append(part.strip())
    return(sentences)

with ThreadPoolExecutor(max_workers=4) as executor:
    mapped = executor.map(mapper, texts)
    
sentences2 = [values for values in mapped]
sentences2 = list(chain(*sentences2))

Wall time: 2.14 s


In [87]:
print(sentences2[1262])
len(sentences2)

adfirmant etiam aliqui terrarum halitu densiore crassatum aera emittendis corporis spiraminibus resistentem necare non nullos qua causa animalia praeter homines cetera iugiter prona homero auctore et experimentis deinceps multis cum talis incesserit labes ante novimus interire


31767

## References

- [Using Conditional Random Fields and Python for Latin word segmentation](https://medium.com/@felixmohr/using-python-and-conditional-random-fields-for-latin-word-segmentation-416ca7a9e513)

In [95]:
# A test to better understand how it works

from time import sleep
import random
import threading

values = list(range(10))

def f(x):
    freeze_time = random.randint(2,4)
    print("(" + threading.current_thread().name + ") Waiting " + str(freeze_time) + " seconds for compute the square of " + str(x))
    sleep(freeze_time)
    print("(" + threading.current_thread().name + ") Done")
    return(x*x)


In [89]:
%%time
res = [f(x) for x in values]
print(res)

(MainThread) Waiting 3 seconds for compute the square of 0
(MainThread) Done
(MainThread) Waiting 4 seconds for compute the square of 1
(MainThread) Done
(MainThread) Waiting 4 seconds for compute the square of 2
(MainThread) Done
(MainThread) Waiting 2 seconds for compute the square of 3
(MainThread) Done
(MainThread) Waiting 2 seconds for compute the square of 4
(MainThread) Done
(MainThread) Waiting 4 seconds for compute the square of 5
(MainThread) Done
(MainThread) Waiting 3 seconds for compute the square of 6
(MainThread) Done
(MainThread) Waiting 3 seconds for compute the square of 7
(MainThread) Done
(MainThread) Waiting 3 seconds for compute the square of 8
(MainThread) Done
(MainThread) Waiting 3 seconds for compute the square of 9
(MainThread) Done
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Wall time: 31 s


In [90]:
%%time
res = [i for i in map(f, values)]
print(res)

(MainThread) Waiting 4 seconds for compute the square of 0
(MainThread) Done
(MainThread) Waiting 2 seconds for compute the square of 1
(MainThread) Done
(MainThread) Waiting 4 seconds for compute the square of 2
(MainThread) Done
(MainThread) Waiting 4 seconds for compute the square of 3
(MainThread) Done
(MainThread) Waiting 2 seconds for compute the square of 4
(MainThread) Done
(MainThread) Waiting 4 seconds for compute the square of 5
(MainThread) Done
(MainThread) Waiting 4 seconds for compute the square of 6
(MainThread) Done
(MainThread) Waiting 4 seconds for compute the square of 7
(MainThread) Done
(MainThread) Waiting 3 seconds for compute the square of 8
(MainThread) Done
(MainThread) Waiting 4 seconds for compute the square of 9
(MainThread) Done
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Wall time: 35.1 s


In [91]:
%%time

# Avant de relancer ce code, il faut reancer la définition de la fonction f !!

import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=4) as executor: #Plus max_workers est grand, plus ca va vite
    values = executor.map(f, values)
res = [value for value in values]
print(res)

(Thread-186) Waiting 3 seconds for compute the square of 0
(Thread-187) Waiting 3 seconds for compute the square of 1
(Thread-188) Waiting 4 seconds for compute the square of 2
(Thread-189) Waiting 2 seconds for compute the square of 3
(Thread-189) Done
(Thread-189) Waiting 4 seconds for compute the square of 4
(Thread-186) Done(Thread-187) Done
(Thread-186) Waiting 2 seconds for compute the square of 5

(Thread-187) Waiting 4 seconds for compute the square of 6
(Thread-188) Done
(Thread-188) Waiting 4 seconds for compute the square of 7
(Thread-186) Done
(Thread-186) Waiting 3 seconds for compute the square of 8
(Thread-189) Done
(Thread-189) Waiting 4 seconds for compute the square of 9
(Thread-187) Done
(Thread-188) Done
(Thread-186) Done
(Thread-189) Done
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Wall time: 10 s


In [97]:
%%time

# Avant de relancer ce code, il faut reancer la définition de la fonction f !!

import multiprocessing as mp
from loky import ProcessPoolExecutor

with ProcessPoolExecutor(max_workers=15) as executor: # Bizarrement on peut utiliser plus de process que cpu_count()
    values = executor.map(f, values)
res = [value for value in values]
print(res)

[0, 1, 16, 81, 256, 625, 1296, 2401, 4096, 6561]
Wall time: 4.99 s
