# 11.2.2 Pyspark example

This is a simple pyspark script that you can run locally or otherwise, provided that you have have a working Spark, pyspark and Jupyter configuration. See [Install notes](install_notes.txt) for details.

## Generic setup of a Spark Context

This is an essential component for every spark program. We are initialising it to allow for parallel computation.

In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "first app")

In [None]:
print(SparkContext.version)

In [None]:
%matplotlib inline 

import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [10, 10]

## Spark and output

Spark assumes that you've done your homework and made sure that the output location is clear.

This is often not the case when you are experimenting. You will therefore want to include provision for removing content that gets in the way.

Note that you cannot do this easily from within Spark. It is best done from the OS or from python before you access the SparkContext.

### A useful function to clear the output path

In [None]:
from shutil import rmtree
import os
def clearaway(dir):
    if os.path.exists(dir):
        rmtree(dir)

## 11.2.2.1 parallelize input and output


This is a "first example" to show how input and output work in pyspark. Equivelent to `2.1-SparkInputOutput.py`.

In [None]:
## Distribute data over a spark context
samples = sc.parallelize([
    ("abonsanto@fakemail.com", "Alberto", "Bonsanto"),
    ("mbonsanto@fakemail.com", "Miguel", "Bonsanto"),
    ("stranger@fakemail.com", "Stranger", "Weirdo"),
    ("dbonsanto@fakemail.com", "Dakota", "Bonsanto")
])

## Collect the data (a sequential operation)
print(samples.collect())

clearaway("output/folder")
## Save the data (a parallel operation)
samples.saveAsTextFile("output/folder/here.txt")
## Load the data (a parallel operation)
read_rdd = sc.textFile("output/folder/here.txt")
## Collect the data again
print(read_rdd.collect())

## 11.2.2.2 Exploiting RDD builtin functions

RDDs have many useful functions. Count is one that is implemented efficiently in parallel.

This is implemented in `2.2-SparkCount.py`.

In [None]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
## Count is parallelized
counts = words.count()
print("Number of elements in RDD -> %i" % (counts))

## 11.2.2.3 Accessing information about the Spark environment

Pyspark has very many ways to access the details; `StorageLevel` is one, that describes the redundancy of the replication. You should investigate the other information available, e.g. [pyspark for beginners](https://www.tutorialspoint.com/pyspark/pyspark_storagelevel.htm).

This is implemented in `2.3-SparkStorageLevel.py`.

In [None]:
from pyspark import StorageLevel

# Parallelize an RDD
rdd1 = sc.parallelize([1,2])
## Set its storage level to "2x replicated on memory and disk"
rdd1.persist(StorageLevel.MEMORY_AND_DISK_2 )
## Get this report
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())

## 11.2.2.4 A simple filter

Filtering is the best thing to do with distributed data as it can return a manageable amount of data. This is trivially achieved via a `filter`, as in the following.

This is implemented in `2.4-ReadAndFilter.py`.

In [None]:
logFile = "data/books/5720.txt"  
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print ("Lines with a: %i, lines with b: %i" % (numAs, numBs))

## 11.2.2.5 Machine Learning from within PySpark

For data science, the real joy of spark is distributed machine learning.

The main library is called `mllib` and it contains many useful functions; see e.g. [Mllib for beginners](https://www.tutorialspoint.com/pyspark/pyspark_mllib.htm).

This is implemented in `2.5-MLlibRecommender.py`.

In [None]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

clearaway("output/tmp/myCollaborativeFilter") 

## Here we have to be careful to protect workers so that they receive instruction from the main thread.
if __name__ == "__main__":
   data = sc.textFile("data/test_data.csv")
    ## Map data from a string into a Rating of person i of data j
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)
   
   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))
   
   # Save and load model
   model.save(sc, "output/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "output/tmp/myCollaborativeFilter")


## 11.2.2.6 MapReduce Example to count word frequencies

The wonderful thing about Spark is that it will parallelise serial content, and retain any parallelisation available in sequential content. So if we provide it with data stored on HDFS, it stays that way, with no additional effort.

We can create a parallelised interface from a regular file structure just by reading many files at once.

Here we use Map Reduce from PySpark taking advantage of this structure.

This is implemented in `2.6-MapReduceWordcount`.

In [None]:
import re
clearaway("pyspark_wc") 

#text_file = sc.textFile("hdfs/books/*.txt")
text_file = sc.textFile("data/books/*.txt")
def linesplit(line):
    line = re.sub(r'[^\w\s]','',line)
    return(line.split(" "))

counts = text_file.flatMap(linesplit) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

counts.saveAsTextFile("output/pyspark_wc")


In [None]:
counts.collect()

## 11.2.2.7 A working machine learning pipeline


In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt

## See https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa

## Interactive session

spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv('data/bank.csv', header = True, inferSchema = True)
df.printSchema()

## Converting to pandas
pd.DataFrame(df.take(5), columns=df.columns).transpose()


In [None]:
## Using the spark RDS describe
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

In [None]:
## Two ways to sample 1000 random points
## The recommended way, though you have to specify two limits (the fraction, here 0.5, and the number, here 1000, if needed)
## This way creates a spark RDD
numeric_data1=df.sample(False, 0.5, seed=0).limit(1000)
numeric_data = numeric_data1.toPandas()
numeric_data

In [None]:
## This way does a "collect", which can be slow in larger datasets, but is more natural
numeric_data2=df.rdd.takeSample(False, 1000, seed=0)
numeric_data2

Scatter matrices are a good go-to plot to understand the structure 

In [None]:
ax=pd.plotting.scatter_matrix(numeric_data, diagonal='kde', marker='+')

In [None]:
####################
## Preparing the data
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], 
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]

numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


In [None]:
## Getting rid of useless columns
df = df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit')
cols = df.columns
df.printSchema()

In [None]:

#######################
## Making a "data processing pipeline"

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()


In [None]:

#######################
## Look at a couple of rows
pd.DataFrame(df.take(5), columns=df.columns).transpose()


In [None]:

#######################
## Do a standard training/ testing split
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))


In [None]:
#######################
## Fit a logistic regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)

In [None]:
#######################
## Plot the results
import numpy as np

beta = np.sort(lrModel.coefficients)

plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.savefig('output/bank_betas.png')
#plt.close()
plt.show()
plt.close()

In [None]:

######################
## ROC curves

trainingSummary = lrModel.summary

predpandas=predictions.select(['label','probability']).toPandas()
predpandas['probability']=[x[1] for x in predpandas['probability']]
from sklearn.metrics import roc_curve
testrocarray = roc_curve(predpandas['label'], predpandas['probability'])
testroc = pd.DataFrame.from_records(testrocarray).transpose()
testroc.columns=['FPR','TPR','Thresh']

trainroc = trainingSummary.roc.toPandas()

plt.plot(trainroc['FPR'],trainroc['TPR'],label='Training')
plt.plot(testroc['FPR'],testroc['TPR'],label='Test')
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.savefig('output/banks_roc.png')
plt.show()
plt.close()

print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))


## 11.2.2.8 Broadcast

You might want to update all worker nodes with new information, e.g. hyperparameter values, etc. To do this you can use a **Broadcast** which simply assigns the variable to all worker nodes.

This is implemented in `Supplement/pys_broadcast.py` as it is so trivially explored.

In [None]:
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print ("Stored data -> %s" % (data) )
elem = words_new.value[2]
print ("Printing a particular element in RDD -> %s" % (elem))

## Supplementary materials

For completeness, additional material are placed in `Supplement` and include:
    
* pys_filter.py: A simple example of filtering
* pys_map.py: A simple example of Map
* pys_collect.py: A simple example of a Collect
