# Foundations of Data Science (GDW) 2023



# Exercise II: Parallel Computing

Parallel processing is an important paradigm for scaling computation intensive calculations. In this exercise you will practice two paradigms in python:
- MapReduce
- Job pooling

## Part 1: MapReduce

Let us start with an example:

Suppose you have a kitchen to run and access to several ingredients for making a cheese burger:

- Lettuce
- Onions
- Pickles
- Tomatos
- Cheese
- Patty
- Buns
- Bottles of sauce

### Task 1.1
Assuming you can apply as many workers as ingredients, design a MapReduce task to parallelize making a Burger and describe the steps below:

 --->
 
 --->
 
 --->

### Prerequisites

Now, we will take a look at another example, this time with code.

Before running this example, you might need to install the python package `mrjob`, e.g. by calling

`pip install mrjob`

*Note: You do not need a hadoop installation for this exercise, as we are using the simulated local mode.*

Running `mrjob` from jupyterlab is a bit tricky as you will ﬁnd out in the following: We structure the job description and job execution into diﬀerent cells. In the ﬁrst cell we use the ﬁrst line to store its contents in a python ﬁle (using the magic `%%file <filename>` notation), in the next cells we can import this module (line 1). 

Since jupyterlab caches the loaded packages, after execution of the second cell changes
to the ﬁrst would not be loaded again. Therefore, we utilize the package reload (lines 2 and 3, in the
second cell) to force a reload of the module exported by the ﬁrst cell.
    
In the following yu will see how to count characters and lines in a text ﬁle using map reduce (and you
will add word count afterwards by yourselves).

You can choose your own text file or download the provided ﬁle `big.txt` from moodle.

In [None]:
%%file mrcharcount.py

from mrjob.job import MRJob

class MRWordCount(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "lines", 1
                 
    def reducer(self, key, values):
        s = sum(values)
        yield key, s

if __name__ == '__main__':
    MRWordCount.run()

After executing the previous cell, you will notice that a .py file appeared in your notebook folder. You are now able to import it and use any functions that are provided by the newly created module:

In [None]:
import mrcharcount
from importlib import reload 
reload(mrcharcount)

file = 'big.txt'
mr_job = mrcharcount.MRWordCount(args=[file])
print(f"Counting occurences in {file}...")
with mr_job.make_runner() as runner:
    runner.run()
    for line in mr_job.parse_output(runner.cat_output()):
        print(f"{line[0]}: {line[1]}")

The warning about the lack of config specifications can be ignored here, as mrjob will just use auto-config.

### Task 1.2
Modify the listing above such that it also returns the number of words. 

Hint: You may separate a string `s` into its words by application of the function `s.split()`.

## Part 2: Job Pooling
Another often used method to perform jobs in parallel are worker pools, which are separate processes spawned from your main process. The results need to be collected by the coordinator, thus, there needs to be some synchronization to wait for jobs to finish.

In the following you see how to spawn a child process (on your machine) with the `multiprocessing` python package.

In [None]:
%%file joblist.py

from multiprocessing import Process
import os

def info(title):
    # provides information on current system process 
    print(title)
    print('module name:', __name__)
    print('parent process id:', os.getppid())
    print('own process id:', os.getpid())

def f(name):
    info('Information on function f')
    print('hello', name)
    print()

if __name__ == '__main__':
    info('Information on function main')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

You can run the .py file directly from your notebook with:

In [None]:
%run -i 'joblist.py'

We want to calculate a lot of squares now, with help of the `multiprocessing` library.

In [None]:
%%file workerpool_simple.py

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    # Compute square of input value
    return x*x

mem_size = 1400000000
a=range(mem_size)

num_cpu=os.cpu_count()

if __name__ == '__main__':
    print(f"Number of CPU cores: {num_cpu}")
    # start a worker process for each of your cpu cores
    with Pool(processes=num_cpu) as pool:

        # Distribute evaluation of a function to workers  
        result=pool.map(f, a)
        
        # Display results
        print(f"All squares: \n {result}") # outputs a looooong list of square numbers

*Note:* you might have to adjust the `mem_size` parameter if you get errors like MemoryErrors.

In [None]:
%run -i 'workerpool_simple.py'

## Task 2.1
Given the script above, that computes square numbers in parallel, apply some modifications, such that it counts the workload of each individal process. 

In [None]:
# write your code here

Test your file in the same way like the example above.

In [None]:
# write your code here