Pierre Navaro - [Institut de Recherche Mathématique de Rennes](https://irmar.univ-rennes1.fr) - [CNRS](http://www.cnrs.fr/)

[![nbviewer](https://img.shields.io/badge/render-nbviewer-orange.svg)](http://nbviewer.jupyter.org/github/pnavaro/big-data/blob/master/07.PySpark.ipynb)


# PySpark

Spark can manage "big data" collections with a small set of high-level primitives like `map`, `filter`, `groupby`, and `join`.  With these common patterns we can often handle computations that are more complex than map, but are still structured.

PySpark uses Py4J that enables Python programs to dynamically access Java objects.

![PySpark Internals](http://i.imgur.com/YlI8AqEl.png)

- Apache Spark is a fast and general-purpose cluster computing system. 
- It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. 
- It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

## Simple example

In [1]:
from pyspark import SparkContext
import os
os.environ["PYSPARK_PYTHON"]="python3"
sc = SparkContext('local[2]') # Create a local spark cluster with 2 workers

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*]) created by <module> at /usr/local/lib/python3.6/site-packages/IPython/utils/py3compat.py:186 

In [3]:
# If tou get an error run this cell with the command below commented out
# and fix the path to python in the cell above
#sc.stop()

We have a spark context sc to use with a tiny local spark cluster with 2 nodes (will work just fine on a multicore machine).

In [2]:
print(sc) # it is like a Pool Processor executor

<SparkContext master=local[*] appName=PySparkShell>


In [3]:
rdd = sc.parallelize(range(5))  # create collection
rdd

PythonRDD[1] at RDD at PythonRDD.scala:48

In [4]:
rdd.collect()  # Gather results back to local process

[0, 1, 2, 3, 4]

In [7]:
rdd.map(lambda x: x ** 2) # Square each element

PythonRDD[2] at RDD at PythonRDD.scala:48

In [8]:
rdd.map(lambda x: x ** 2).collect() # Square each element and collect results

[0, 1, 4, 9, 16]

In [9]:
# Map-Reduce operation 
from operator import add
rdd.map(lambda x: x ** 2).reduce(add)

30

In [10]:
# Select only the even elements
rdd.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4]

In [11]:
# Cartesian product of each pair of elements in two sequences 
# (or the same sequence in this case)
rdd.cartesian(rdd).collect()

[(0, 0),
 (0, 1),
 (1, 0),
 (1, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (1, 2),
 (1, 3),
 (1, 4),
 (2, 0),
 (2, 1),
 (3, 0),
 (4, 0),
 (3, 1),
 (4, 1),
 (2, 2),
 (2, 3),
 (2, 4),
 (3, 2),
 (4, 2),
 (3, 3),
 (3, 4),
 (4, 3),
 (4, 4)]

In [12]:
# Chain operations to construct more complex computations
(rdd.map(lambda x: x ** 2)
    .cartesian(rdd)
    .filter(lambda tup: tup[0] % 2 == 0)
    .collect())

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (4, 0),
 (4, 1),
 (16, 0),
 (16, 1),
 (4, 2),
 (4, 3),
 (4, 4),
 (16, 2),
 (16, 3),
 (16, 4)]

In [13]:
# Stop the local spark cluster
sc.stop()

## Pi computation example

In [14]:
import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PythonPi").getOrCreate()

partitions = 4
n = 1000000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = spark.sparkContext.parallelize(range(1, n+1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

spark.stop()

Pi is roughly 3.142872


### Exercise 7.1

Using the same method than the PI computation example, compute the integral
$$
I = \int_0^1 \exp(-x^2) dx
$$
You can check your result with numpy

In [15]:
# numpy evaluates solution using numeric computation. 
# It uses discrete values of the function
import numpy as np
x = np.linspace(0,1,1000)
np.trapz(np.exp(-x*x),x)

0.74682407137637408

In [16]:
# numpy and scipy evaluates solution using numeric computation. It uses discrete values
# of the function
import numpy as np
from scipy.integrate import quad
quad(lambda x: np.exp(-x*x), 0, 1)
# note: the solution returned is complex 

(0.7468241328124271, 8.291413475940725e-15)

### Exercice 7.2

We again start with the following sequential code

```python
series = {}
for fn in filenames:   # Simple map over filenames
    series[fn] = pd.read_hdf(fn)['x']

results = {}

for a in filenames:    # Doubly nested loop over the same collection
    for b in filenames:  
        if a != b:     # Filter out bad elements
            results[a, b] = series[a].corr(series[b])  # Apply function

((a, b), corr) = max(results.items(), key=lambda kv: kv[1])  # Reduction
```

Parallelize pairwise correlations with PySpark

To make this a bit easier we're just going to compute the maximum correlation and not try to keep track of the stocks that yielded this maximal result.

### Spark methods

We can construct most of the above computation with the following Spark methods:

*  `collection.map(function)`: apply function to each element in collection
*  `collection.cartesian(collection)`: Create new collection with every pair of inputs
*  `collection.filter(predicate)`: Keep only elements of colleciton that match the predicate function
*  `collection.max()`: Compute maximum element

We use these briefly in isolated exercises and then combine them to rewrite the previous computation from the `submit` section.

In [17]:
from glob import glob
import os
import pandas as pd
import ujson as json # or json

def convert_to_json(d):
    """ Convert all csv files of directory d into json format """
    filenames = sorted(glob(os.path.join(d, '*')))[-365:]
    outfn = d.replace('minute', 'json') + '.json'
    if os.path.exists(outfn):
        return
    with open(outfn, 'w') as f:
        for fn in filenames:
            df = pd.read_csv(fn)
            for rec in df.to_dict(orient='records'):
                json.dump(rec, f)
                f.write('\n')
    print("Finished JSON: %s" % d.split(os.path.sep)[-1])

here = os.getcwd()
js = os.path.join(here, 'data', 'json')
if not os.path.exists(js):
    os.mkdir(js)

directories = sorted(glob(os.path.join(here, 'data', 'minute', '*')))
for d in directories:
    convert_to_json(d)

filenames = sorted(glob(os.path.join('data', 'json', '*.json')))

for fn in filenames:
    with open(fn) as f:
        data = [json.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')
    print("Finished : %s" % out_filename.split(os.path.sep)[-1])

filenames = sorted(glob(os.path.join('data', 'json', '*.h5')))  # ../data/json/*.json
filenames

Finished : hal.h5
Finished : hp.h5
Finished : hpq.h5
Finished : ibm.h5
Finished : jbl.h5
Finished : jpm.h5
Finished : luv.h5
Finished : pcg.h5
Finished : usb.h5


['data/json/hal.h5',
 'data/json/hp.h5',
 'data/json/hpq.h5',
 'data/json/ibm.h5',
 'data/json/jbl.h5',
 'data/json/jpm.h5',
 'data/json/luv.h5',
 'data/json/pcg.h5',
 'data/json/usb.h5']

In [18]:
%%time

### Sequential Code

series = []
for fn in filenames:   # Simple map over filenames
    series.append(pd.read_hdf(fn)['close'])

results = []

for a in series:    # Doubly nested loop over the same collection
    for b in series:  
        if not (a == b).all():     # Filter out comparisons of the same series 
            results.append(a.corr(b))  # Apply function

result = max(results)

CPU times: user 843 ms, sys: 163 ms, total: 1.01 s
Wall time: 1.01 s


In [19]:
%%time
sc = SparkContext('local[2]')
rdd = sc.parallelize(filenames)

series = rdd.map(lambda fn: pd.read_hdf(fn)['close'])

corr = (series.cartesian(series)
              .filter(lambda ab: not (ab[0] == ab[1]).all())
              .map(lambda ab: ab[0].corr(ab[1]))
              .max())

result = corr
sc.stop()

CPU times: user 15.8 ms, sys: 11.1 ms, total: 26.9 ms
Wall time: 6.66 s


In [20]:
result

0.94600093394804452

Computation time is slower because there is a lot of setup, workers creation, there is a lot of communications the correlation function is too small