In [1]:
import numpy as np

Spark on a local mahcine using 4 nodes
====

Started with
```bash
EXECUTOR_MEMORY=512m MASTER=local[4] pyspark
```

If you have a Spark cluster, just set
```bash
MASTER=spark://IP:PORT
```
Everything else works the same way.

#### Check that the SparkContext object is available.

In [2]:
sc

<pyspark.context.SparkContext at 0x10fa9a5d0>

### Example 1

Adapted from scala version in Chapter 2: Introduction to Data Analysis with Scala and Spark of Advanced Analytics with Spark (O'Reilly 2015)

In [3]:
import os

if not os.path.exists('documentation'):
    ! curl -o documentation https://archive.ics.uci.edu/ml/machine-learning-databases/00210/documentation
if not os.path.exists('donation.zip'):
    ! curl -o donation.zip https://archive.ics.uci.edu/ml/machine-learning-databases/00210/donation.zip
! unzip -n -q donation.zip
! unzip -n -q 'block_*.zip'
if not os.path.exists('linkage'):
    ! mkdir linkage
! mv block_*.csv linkage
! rm block_*.zip


10 archives were successfully processed.


In [4]:
ls

EMR.pem          Spark.ipynb      donation.zip     [31mword_count.py[m[m*
EMR2.pem         [34mbooks[m[m/           frequencies.csv
MapReduce.ipynb  [31mdocumentation[m[m*   [34mlinkage[m[m/


#### Info about the data set

In [5]:
! cat documentation

1. Title: Record Linkage Comparison Patterns 

2. Source Information
   -- Underlying records: Epidemiologisches Krebsregister NRW
      (http://www.krebsregister.nrw.de)
   -- Creation of comparison patterns and gold standard classification:
      Institute for Medical Biostatistics, Epidemiology and Informatics (IMBEI),
      University Medical Center of Johannes Gutenberg University, Mainz, Germany
      (http://www.imbei.uni-mainz.de) 
   -- Donor: Murat Sariyar, Andreas Borg (IMBEI)    
   -- Date: September 2008
 
3. Past Usage:
    1. Irene Schmidtmann, Gael Hammer, Murat Sariyar, Aslihan Gerhold-Ay:
       Evaluation des Krebsregisters NRW Schwerpunkt Record Linkage. Technical
       Report, IMBEI 2009. 
       http://www.krebsregister.nrw.de/fileadmin/user_upload/dokumente/Evaluation/EKR_NRW_Evaluation_Abschlussbericht_2009-06-11.pdf
       -- Describes the external evaluation of the registry's record linkage
          procedures.
       -- The comparison pa

### If we are running Spark on Hadoop, we need to transfer files to HDFS

```bash
! hadoop fs -rm -rf linkage
! hadoop fs -put block_*.csv linkage
```

In [6]:
rdd = sc.textFile('linkage')

#### Actions trigger execution and return a non-RDD result

In [7]:
rdd.first()

u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"'

In [8]:
rdd.take(10)

[u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"',
 u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE']

In [9]:
def is_header(line):
    return "id_1" in line

#### Transforms return an RDD and are lazy

In [10]:
vals = rdd.filter(lambda x: not is_header(x))
vals

PythonRDD[4] at RDD at PythonRDD.scala:42

#### Now it is evaluated

In [11]:
vals.take(10)

[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE',
 u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']

#### Each time we access vals, it is *reconstructed* from the original sources

Spark maintains a DAG of how each RDD was constructed so that data sets can be reconstructed - hence *resilient distributed datasets*. However, this is inefficient.

In [12]:
# vals is reconstructed again
vals.first()

u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE'

#### Spark allows us to persist RDDs that we will be re-using

In [13]:
import pyspark

vals.cache()

PythonRDD[4] at RDD at PythonRDD.scala:42

In [14]:
# now vals is no longer reconstructed but retrieved from memory
vals.take(10)

[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE',
 u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']

In [15]:
vals.take(10)

[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE',
 u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']

#### Parse lines and work on them 

In [16]:
def parse(line):
    pieces = line.strip().split(',')
    id1, id2 = map(int, pieces[:2])
    scores = [np.nan if p=='?' else float(p) for p in pieces[2:11]]
    matched = True if pieces[11] == 'TRUE' else False
    return [id1, id2, scores, matched]

In [17]:
mds = vals.map(lambda x: parse(x))

In [None]:
mds.cache()

PythonRDD[9] at RDD at PythonRDD.scala:42

In [None]:
match_counts = mds.map(lambda x: x[-1]).countByValue()

In [None]:
for cls in match_counts:
    print cls, match_counts[cls]

#### Summary statistics

In [None]:
mds.map(lambda x: x[2][0]).stats()

In [None]:
mds.filter(lambda x: np.isfinite(x[2][0])).map(lambda x: x[2][0]).stats()

#### Takes too long on laptop - skip

stats = [mds.filter(lambda x: np.isfinite(x[2][i])).map(lambda x: x[2][i]).stats()
         for i in range(3)]

for stat in stats:
    print stat

#### Regression

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint

def parsePoint(md):
    return LabeledPoint(md[-1], md[2])

data = mds.filter(lambda x: np.all(np.isfinite(x[2]))).map(lambda x: parsePoint(x))
model = LogisticRegressionWithSGD.train(data)

labelsAndPreds = data.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(data.count())
print("Training Error = " + str(trainErr))