Parallel Map on Files
------------------------

For each of a set of filenames, we parse JSON data contents, load that data into a Pandas DataFrame, and then output the result to another file with a nicer format, HDF5.

We find that parsing JSON is slow and so we parallelize the process using the [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) module to do this work in multiple processes.

### Objectives

*  Profile code to find bottleneck
*  Use `concurrent.futures` to `map` a function across many inputs in parallel


### Requirements

*  Pandas
*  concurrent.futures (standard in Python 3, `pip install futures` in Python 2)
*  snakeviz (for profile visualization, `pip install snakeviz`)


    pip install snakeviz
    pip install futures
    

## Before we start

We need to get some data to work with.
We are going to generate some [fake stock data](https://github.com/mrocklin/fakestockdata) by adding a bunch of points between real stock data points. This will take a few minutes the first time we run it.

In [1]:
%run prep.py

Finished aa
Finished bwa
Finished aig
Finished ebay
Finished cost
Finished d
Finished amgn
Finished csco
Finished abc
Finished emr
Finished aapl
Finished amzn
Finished esrx
Finished goog
Finished nyx
Finished gas
Finished met
Finished hpq
Finished jbl
Finished ge
Finished hp
Finished jpm
Finished hal
Finished ibm
Finished zmh
Finished pcg
Finished usb
Finished vrsn
Finished yhoo
Finished JSON: /Users/j35/git/OverviewSciPy2016/parallel_python/data/minute/aa
Finished JSON: /Users/j35/git/OverviewSciPy2016/parallel_python/data/minute/ebay
Finished JSON: /Users/j35/git/OverviewSciPy2016/parallel_python/data/minute/csco
Finished JSON: /Users/j35/git/OverviewSciPy2016/parallel_python/data/minute/abc
Finished JSON: /Users/j35/git/OverviewSciPy2016/parallel_python/data/minute/aapl
Finished JSON: /Users/j35/git/OverviewSciPy2016/parallel_python/data/minute/d
Finished JSON: /Users/j35/git/OverviewSciPy2016/parallel_python/data/minute/aig
Finished JSON: /Users/j35/git/OverviewSciPy2016/parallel_p

## Sequential Execution

In [3]:
%load_ext snakeviz

In [4]:
from glob import glob
import json
import pandas as pd
import os

In [14]:
filenames = sorted(glob(os.path.join('data', 'json', '*.json')))  # data/json/*.json
print("Will load {:d} json files".format(len(filenames)))      # https://pyformat.info/
filenames[:5]

Will load 29 json files


['data/json/aa.json',
 'data/json/aapl.json',
 'data/json/abc.json',
 'data/json/aig.json',
 'data/json/amgn.json']

#### Loading json data into pandas data frame then saving them back into HDF5 file format 

In [8]:
%%snakeviz

for fn in filenames:
    print(fn)
    with open(fn) as f:
        data = [json.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')

data/json/aa.json
data/json/aapl.json
data/json/abc.json
data/json/aig.json
data/json/amgn.json
data/json/amzn.json
data/json/bwa.json
data/json/cost.json
data/json/csco.json
data/json/d.json
data/json/ebay.json
data/json/emr.json
data/json/esrx.json
data/json/gas.json
data/json/ge.json
data/json/goog.json
data/json/hal.json
data/json/hp.json
data/json/hpq.json
data/json/ibm.json
data/json/jbl.json
data/json/jpm.json
data/json/met.json
data/json/nyx.json
data/json/pcg.json
data/json/usb.json
data/json/vrsn.json
data/json/yhoo.json
data/json/zmh.json
 
*** Profile stats marshalled to file '/var/folders/t8/307ghqxx5djb28nl56mkmx7j90c0xy/T/tmppi4001tf'. 


Parallel Execution
--------------------

We can process each file independently and in parallel.  To accomplish this we'll transform the body of our for loop into a function and then use the [concurrent.futures.ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#executor-objects) to apply that function across all of the filenames in parallel using multiple processes.

### Before

Whenever we have code like the following:

```python
results = []
for x in L:
    results.append(f(x))
```

or the following:

```python
results = [f(x) for x in L]
```

or the following:

```python
results = list(map(f, x))
```

### After

We can instead write it as the following:

```python
from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor()

results = list(e.map(f, L))
```

### Example

In [15]:
%%time

### Sequential code

from time import sleep

results = []

for i in range(8):
    sleep(1)
    results.append(i + 1)

CPU times: user 698 µs, sys: 960 µs, total: 1.66 ms
Wall time: 8.02 s


In [16]:
%%time

### Parallel code

from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor()

def slowinc(x):
    sleep(1)
    return x + 1

results = list(e.map(slowinc, range(8)))

CPU times: user 14.7 ms, sys: 33.1 ms, total: 47.8 ms
Wall time: 1.04 s


### Exercise:  Convert JSON data to HDF5 in parallel using `concurrent.futures.Executor.map`

In [17]:
import json

In [18]:
%%time

### Sequential code

for fn in filenames:
    with open(fn) as f:
        data = [json.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')

CPU times: user 22.6 s, sys: 1.04 s, total: 23.7 s
Wall time: 23.8 s


In [20]:
# Parallel code

from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor()

def load_parse_store(fn):
    with open(fn) as f:
        data = [json.loads(line) for line in f]

    df = pd.DataFrame(data)

    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')

list(e.map(load_parse_store, filenames))

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

Try visualizing your parallel version with `%%snakeviz`. Where does it look like it's spending all its time?

History: multiprocessing.Pool
--------------------------------

Perviously people have done multi-processing computations with the `multiprocessing.Pool` object, which behaves more or less identically.

However, today most library designers are coordinating around the `concurrent.futures` interface, so it's wise to move over.

In [21]:
%%time 

from multiprocessing import Pool
p = Pool()

list(p.map(load_parse_store, filenames))

CPU times: user 18 ms, sys: 33.5 ms, total: 51.5 ms
Wall time: 4.2 s


Conclusion
-----------

*  Used `snakeviz` to profile code
*  Used `concurrent.futures.ProcessPoolExecutor` for simple parallelism across many files
    *  Gained some speed boost (but not as much as expected)
    *  Lost ability to diagnose performance within parallel code
*  Describing each task as a function call helps use tools like map for parallelism
*  Making your tasks fast is often at least as important as parallelizing your tasks.