# Multiprocessing

**Source:** *Python and HDF5* by Andrew Collette, O'Reilly 2013.

<img src="./img/MP.png" width=600 />

On Unix/Linux, the `multiprocessing` module uses the `fork()` system call to spwan new processes. Child processes inherit state information such as open file handles and *the state of the HDF5 library* from the parent process. If you play your cards carefully, this is not an issue. If you are careless, this will cause a lot of grief and heartburn.

## Two Variations on Divide-and-Conquer

1. The parent process handles all HDF5 I/O and child process perform only computational tasks.
2. Child processes write separate intermediate file and the parent merges them.
3. **[HDF5 1.10 only]** The parent process maintains a Virtual Dataset (VDS) and the child processes maintain their own "shards."

In [1]:
import numpy as np, h5py

Create a file and a dataset, `/coord`, with 1,000 rows and 2 columns.

In [2]:
with h5py.File('coords.hdf5', 'w') as f:
    dset = f.create_dataset('coords', (1000, 2), dtype='f4')
    dset[...] = np.random.random((1000,2))

**Task:** For each 2D point (row), calculate its distance from the origin.

In [3]:
from multiprocessing import Pool

In [4]:
def distance(arr):
    return np.sqrt(np.sum(arr**2))  # arr is a shape-(2,) array

Load data and close the input file:

In [5]:
with h5py.File('coords.hdf5', 'r') as f:
    data = f['coords'][...]

Create a 4-process pool:

In [6]:
p = Pool(4)

Carry out parallel computation, `MAP`-style:

In [7]:
result = np.array(p.map(distance, data))

Write the result into a new dataset in the file:

In [12]:
with h5py.File('coords.hdf5') as f:
    f['distances'] = result

RuntimeError: Unable to create link (Name already exists)

### Sharding = Poor Man's MapReduce

**"MAP Phase"** We define a function that reads a 100-element coordinates block, compute distances, and writes the results to a process-specific file.

In [21]:
def distance_block(idx):  # idx - the slice offset
    with h5py.File('coords.hdf5','r') as f:
        data = f['coords'][idx:idx+200]

    result = np.sqrt(np.sum(data**2, axis=1))

    with h5py.File('result_index_%d.hdf5'%idx, 'w') as f:
        f['result'] = result

Create a 4-process pool and carry out the computations.

In [22]:
p = Pool(4)

In [23]:
p.map(distance_block, range(0, 1000, 200))

[None, None, None, None, None]

**"REDUCE Phase"** The parent "picks up the pieces" and merges them into a single dataset.

In [27]:
import os

In [29]:
with h5py.File('coords.hdf5','w') as f:
    dset = f.create_dataset('distances', (1000,), dtype='f4')

    # Loop over our 100-element "chunks" and merge the data into coords.hdf5
    for idx in range(0, 1000, 200):

        filename = 'result_index_%d.hdf5'%idx
        with h5py.File(filename, 'r') as f2:
            data = f2['result'][...]

        dset[idx:idx+200] = data
        os.unlink(filename)  # delete the shards, which are no longer needed

## Advanced Topics for Discussion

- What's new in HDF5 1.10
  + Virtual Datasets (VDS)
  + Single Writer Multiple Reader (SWMR)