Parallel Map
====
We find that parsing JSON is slow and so we parallelize the process using the [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) module to do this work in multiple processes.

### Objectives

*  Find the code bottleneck
*  Use `concurrent.futures` to `map` a function across many inputs in parallel


## Sequential Execution

In [None]:
%load_ext snakeviz

In [None]:
from glob import glob
import json
import pandas as pd
import os
from time import sleep

In [None]:
filenames = sorted(glob(os.path.join('..', 'data', 'json', '*.json')))  # ../data/json/*.json
filenames[:5]

In [None]:
%%snakeviz

for fn in filenames:
    print(fn)
    with open(fn) as f:
        data = [json.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')

Parallel Execution
--------------------

We can process each file independently and in parallel.  To accomplish this we'll transform the body of our for loop into a function and then use the [concurrent.futures.ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#executor-objects) to apply that function across all of the filenames in parallel using multiple processes.

### Before

Whenever we have code like the following:

```python
results = [f(x) for x in L]
# or 
results = map(f, L)
```

### After

We can instead write it as the following:

```python
from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor()

results = list(e.map(f, L))
```

### Example

In [None]:
%%time

### Sequential code

import time

results = []

for i in range(8):
    time.sleep(1)
    results.append(i + 1)

In [None]:
#!pip install loky  # Windows users will need to install loky

In [None]:
%%time

### Parallel code

from concurrent.futures import ProcessPoolExecutor
# from loky import ProcessPoolExecutor  # for Windows users

e = ProcessPoolExecutor()

def slowinc(x):
    sleep(1)
    return x + 1

In [None]:
# Notice what kind of object e.map returns
e.map(slowinc, range(8))

In [None]:
# Computation happens when we consume the elements
for x in e.map(slowinc, range(8)):
    print(x)

# Most cases we can simply convert to list
# result = list(e.map(slowinc, range(8))

Exercise:  Convert JSON data to HDF5 in parallel 
--------------------------------
### using `concurrent.futures.Executor.map`

In [None]:
import json

In [None]:
%%time

### Sequential code

for fn in filenames:
    with open(fn) as f:
        data = [json.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')

In [None]:
%%time
# try replacing %%time with %%snakeviz when everything's working
# to get a profile

### Parallel code

# TODO

In [None]:
%load solutions/map-1.py

Try visualizing your parallel version with `%%snakeviz`. Where does it look like it's spending all its time?

## Conclusion

*  Used `snakeviz` to profile code
*  Used `concurrent.futures.ProcessPoolExecutor` for simple parallelism across many files
    *  Gained some speed boost (but not as much as expected)
    *  Lost ability to diagnose performance within parallel code
*  Describing each task as a function call helps use tools like map for parallelism

----

Further reading
======

## Parallelism isn't everything

We get a moderate increase in performance when using multiple processes.  However parallelism isn't the only way to accelerate this computation.  Recall that the bulk of the cost comes from the `json.loads` function.  A quick internet search on "fast json parsing in python" yields the [ujson](https://pypi.python.org/pypi/ujson) library as the top hit.

Knowing about and importing the optimized `ujson` library is just as effective as multi-core execution.

In [None]:
import ujson

In [None]:
%%time
filenames = sorted(glob(os.path.join('..', 'data', 'json', '*.json')))

for fn in filenames:
    with open(fn) as f:
        data = [ujson.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')

History: multiprocessing.Pool
--------------------------------

Previously people have done multi-processing computations with the `multiprocessing.Pool` object, which behaves more or less identically.

However, today most library designers are coordinating around the `concurrent.futures` interface, so it's wise to move over.

In [None]:
%%time 

from multiprocessing import Pool
p = Pool()

list(p.map(load_parse_store, filenames))