# Multiprocessing tutorial 5

 - Author: Elwin van 't Wout
 - Affiliation: Pontificia Universidad Católica de Chile
 - Course: IMT3870
 - Date: 12-8-2024

The library `joblib` provides functionality for parallel computing. In this notebook, let us look into shared variables.

In [None]:
from joblib import Parallel, delayed

In the *multiprocessing* model, each process has its own data space with private variables. Hence, no variables can be shared between different tasks. The `joblib` library does allow for the use of global variables in each process, but they should not be changed by the different processes.

## Reading a global variable

Let us create a global variable with a specific value and a function that reads it.

In [None]:
MY_GLOBAL_VAR = 1.4

In [None]:
def return_global_var():
    return MY_GLOBAL_VAR

Let us perform this task with multiple processes. That is, each process returns the global variable.

In [None]:
n_workers = 2

In [None]:
tasks = [delayed(return_global_var)() for i in range(n_workers)]

In [None]:
with Parallel(n_jobs=n_workers, batch_size=1, verbose=10, backend='loky') as parallel_pool:
    parallel_results = parallel_pool(tasks)

In [None]:
print(parallel_results)

Even though each process has an independent dataspace, the variables created earlier in the notebook can also be used. However, this does not mean that the variable is actually shared in the sense that both processes can access the same memory where the variable is stored. The `joblib` library made a copy of the global variable in each process. Hence, it cannot be changed by the individual processes.

## Writing into a global variable

Let us try to overwrite a global variable with different values in each process.

In [None]:
def change_global_var(n):
    MY_GLOBAL_VAR = n
    return MY_GLOBAL_VAR

In [None]:
tasks = [delayed(change_global_var)(i) for i in range(n_workers)]

In [None]:
with Parallel(n_jobs=n_workers, batch_size=1, verbose=10, backend='loky') as parallel_pool:
    parallel_results = parallel_pool(tasks)

In [None]:
print(parallel_results)
print(MY_GLOBAL_VAR)

The above results shows that in each process, a local variable named `MY_GLOBAL_VAR` was created and returned to the main process. The global variable with the same name `MY_GLOBAL_VAR` was left unchanged. Notice that this is the expected behaviour of any Python function, not just for `joblib`.

## Changing a global variable

Let us try to add a value to the global variable.

In [None]:
def add_to_global_var(n):
    MY_GLOBAL_VAR += n
    return MY_GLOBAL_VAR

In [None]:
tasks = [delayed(add_to_global_var)(i) for i in range(n_workers)]

In [None]:
with Parallel(n_jobs=n_workers, batch_size=1, verbose=10, backend='loky') as parallel_pool:
    parallel_results = parallel_pool(tasks)

The `joblib` library throws an error. Here, the function tries to read and then write into the same variable `my_global_var`. The previous examples showed that either reading or writing is possible, but adding to a global variable fails. The reason is that Python cannot detect if the variable is a global or local variable since we try to both read and write the variable.

## Reading databases

In data science, it is common to have a dataset that needs to be used by all processes. However, each process has its own data space. There are different ways to handle this situation. The easiest approach is to handle the data set as a global variable. This is sufficient if the processes only need to read the dataset but not adapt it.

In [None]:
import pandas as pd
import numpy as np

In [None]:
my_global_df = pd.DataFrame(data=np.arange(100), columns=["my_data"])

In [None]:
my_global_df

In [None]:
def sum_data(start, end):
    my_local_data = my_global_df["my_data"][start:end]
    return np.sum(my_local_data)

In [None]:
chunk_size = my_global_df.shape[0] // n_workers
tasks = [delayed(sum_data)(i*chunk_size, (i+1)*chunk_size) for i in range(n_workers)]

In [None]:
with Parallel(n_jobs=n_workers, batch_size=1, verbose=10, backend='loky') as parallel_pool:
    parallel_results = parallel_pool(tasks)

In [None]:
print(parallel_results)

The first process indeed summed all elements in the first half of the database, and the second process summed the second half. Although this works, both processes have a copy of the entire database, which is inefficient.

One way of distributing a database over different processes is by reading the necessary parts of the database in each process. For example, one worker reads the first half and the other worker the second half of a database from disk.

In [None]:
my_global_df.to_excel("my_database.xlsx", index=False)

In [None]:
def read_and_sum_data(start, end):
    my_local_df = pd.read_excel("my_database.xlsx", header=0, skiprows=range(1,start+1), nrows=end-start)
    return my_local_df.shape, np.sum(my_local_df["my_data"])

In [None]:
chunk_size = my_global_df.shape[0] // n_workers
tasks = [delayed(read_and_sum_data)(i*chunk_size, (i+1)*chunk_size) for i in range(n_workers)]

In [None]:
with Parallel(n_jobs=n_workers, batch_size=1, verbose=10, backend='loky') as parallel_pool:
    parallel_results = parallel_pool(tasks)

In [None]:
print(parallel_results)

The result shows that the local data frames are half the size of the Excel file. Furthermore, the summations are correct.