Parallel Collections
---------------------

Systems like Spark and Dask include "big data" collections with a small set of high-level primitives like `map`, `filter`, `groupby`, and `join`.  With these common patterns we can often handle computations that are more complex than map, but are still structured.

In this section we repeat the submit example using the PySpark and the Dask.Bag APIs, which both provide parallel operations on linear collections of arbitrary objects.


### Objectives

*  Use high-level `pyspark` and `dask.bag` to parallelize common non-map patterns

### Requirements

*  PySpark

*Note: PySpark requires a little additional setup. Usually, the following environment variables need to be set (using `/usr/libexec/java_home` on OS X or similar on Linux)*:

```
JAVA_HOME
JAVA_JRE
```

*Also, VPNs can interfere with PySpark, so you may need to disable yours if you are running PySpark locally.*

## Application

We again start with the following sequential code

```python
series = {}
for fn in filenames:   # Simple map over filenames
    series[fn] = pd.read_hdf(fn)['x']

results = {}

for a in filenames:    # Doubly nested loop over the same collection
    for b in filenames:  
        if a != b:     # Filter out bad elements
            results[a, b] = series[a].corr(series[b])  # Apply function

((a, b), corr) = max(results.items(), key=lambda kv: kv[1])  # Reduction
```

### Spark/Dask.bag methods

We can construct most of the above computation with the following Spark/Dask.bag methods:

*  `collection.map(function)`: apply function to each element in collection
*  `collection.cartesian(collection)`: Create new collection with every pair of inputs
*  `collection.filter(predicate)`: Keep only elements of colleciton that match the predicate function
*  `collection.max()`: Compute maximum element

We use these briefly in isolated exercises and then combine them to rewrite the previous computation from the `submit` section.

### PySpark: Example API

In [1]:
from pyspark import SparkContext
sc = SparkContext('local[4]')

In [2]:
rdd = sc.parallelize(range(5))  # create collection
rdd

PythonRDD[1] at RDD at PythonRDD.scala:48

In [3]:
rdd.collect()  # Gather results back to local process

[0, 1, 2, 3, 4]

### `map`

In [4]:
# Square each element

rdd.map(lambda x: x ** 2)

PythonRDD[2] at RDD at PythonRDD.scala:48

In [5]:
# Square each element and collect results

rdd.map(lambda x: x ** 2).collect()

[0, 1, 4, 9, 16]

In [6]:
# Select only the even elements

rdd.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4]

In [7]:
# Cartesian product of each pair of elements in two sequences (or the same sequence in this case)

rdd.cartesian(rdd).collect()

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (1, 0),
 (1, 1),
 (1, 2),
 (1, 3),
 (1, 4),
 (2, 0),
 (2, 1),
 (2, 2),
 (2, 3),
 (2, 4),
 (3, 0),
 (4, 0),
 (3, 1),
 (4, 1),
 (3, 2),
 (4, 2),
 (3, 3),
 (3, 4),
 (4, 3),
 (4, 4)]

In [8]:
# Chain operations to construct more complex computations

(rdd.map(lambda x: x ** 2)
    .cartesian(rdd)
    .filter(lambda tup: tup[0] % 2 == 0)
    .collect())

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (4, 0),
 (4, 1),
 (4, 2),
 (4, 3),
 (4, 4),
 (16, 0),
 (16, 1),
 (16, 2),
 (16, 3),
 (16, 4)]

### Exercise: Parallelize pairwise correlations with PySpark

To make this a bit easier we're just going to compute the maximum correlation and not try to keep track of the stocks that yielded this maximal result.

In [10]:
from glob import glob
import os
import pandas as pd

filenames = sorted(glob(os.path.join('..', 'data', 'json', '*.h5')))  # ../data/json/*.json
filenames[:5]

['../data/json/afl.h5',
 '../data/json/aig.h5',
 '../data/json/al.h5',
 '../data/json/avy.h5',
 '../data/json/bwa.h5']

In [11]:
%%time

### Sequential Code

series = []
for fn in filenames:   # Simple map over filenames
    series.append(pd.read_hdf(fn)['close'])

results = []

for a in series:    # Doubly nested loop over the same collection
    for b in series:  
        if not (a == b).all():     # Filter out comparisons of the same series 
            results.append(a.corr(b))  # Apply function

result = max(results)

CPU times: user 1.64 s, sys: 144 ms, total: 1.79 s
Wall time: 1.79 s


In [27]:
%%time

### Parallel code

rdd = sc.parallelize(filenames).map(lambda fn: pd.read_hdf(fn)["close"])
corr = (rdd
     .cartesian(rdd)
     .filter(lambda x: (x[0] != x[1]).any())
     .map(lambda x: x[0].corr(x[1]))
     .max()
)

result = corr

CPU times: user 16 ms, sys: 4 ms, total: 20 ms
Wall time: 7.07 s


In [28]:
result

0.94646920143454671

In [None]:
# %load solutions/collections-1.py
### Parallel code

rdd = sc.parallelize(filenames)
series = rdd.map(lambda fn: pd.read_hdf(fn)['close'])

corr = (series.cartesian(series)
              .filter(lambda ab: not (ab[0] == ab[1]).all())
              .map(lambda ab: ab[0].corr(ab[1]))
              .max())

result = corr


In [None]:
result

### Dask.bag

In [None]:
%load solutions/collections-2.py

In [None]:
%%time

import dask

result = corr.compute(get=dask.local.get_sync)

### Conclusion

*  Higher level collections include functions for common patterns
*  Move data to collection, construct lazy computation, trigger at the end
*  Used PySpark (`cartesian + map`) and Dask.bag (`product + map`) to handle nested for loop