# MLlib - machine learning in spark
We are going to use the [pyspark api](https://spark.apache.org/docs/2.3.1/quick-start.html), and machine learning through MLlib

* pyspark - https://spark.apache.org/docs/latest/api/python/index.html
* MLlib - https://spark.apache.org/docs/latest/ml-guide.html
* S3 (w/boto) - https://boto3.amazonaws.com/v1/documentation/api/latest/index.html

## In this Noteboook
1. create a context
2. work with s3 and parquet to make our data frame
3. work with MLlib on the data frame

In [1]:
import pyspark
from os import listdir
from os.path import isfile, join
import boto3
import pandas as pd
from sagemaker import get_execution_role
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType

# Initialize the spark environment (takes ~ 1min)

In [2]:
conf = pyspark.SparkConf().setAppName('odl').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
sqlc = pyspark.sql.SQLContext(sc)
sc

In [3]:
sqlc

<pyspark.sql.context.SQLContext at 0x7fd73087feb8>

## Connect to S3
There are a few ways to connect to S3, we are going to use boto
* boto3 - https://boto3.amazonaws.com/v1/documentation/api/latest/index.html

### Read into spark dataframe from csv in s3

In [4]:
role = get_execution_role()
bucket='odl-spark19spds6003-001'
data_key = 'sample_data/data.csv'
data_location = 's3://{}/{}'.format(bucket, data_key)
pd.read_csv(data_location)

Unnamed: 0,alpha,beta,gamma
0,1,2,3
1,1,4,9
2,1,8,27


In [5]:
df = sqlc.createDataFrame(pd.read_csv(data_location))

In [6]:
df

DataFrame[alpha: bigint, beta: bigint, gamma: bigint]

### Write parquet to s3

In [None]:
parquetPath = '/home/ec2-user/SageMaker/tmp-pqt'
df.write.parquet(parquetPath)

In [None]:
# prep list of files to transfer
files = [f for f in listdir(parquetPath) if isfile(join(parquetPath, f))]

s3 = boto3.resource('s3')
for f in files:
    #print('copying {} to {}'.format(parquetPath+'/'+f,"sample_data/"+f))
    s3.Bucket(bucket).upload_file(parquetPath+'/'+f, "sample_data/pqt/"+f)


### Write to spark dataframe from parquet

In [8]:
df = sqlc.read.parquet(parquetPath)

In [9]:
df

DataFrame[alpha: bigint, beta: bigint, gamma: bigint]

# Now to start our ML pipeline
In class we will use a sample dataset that is small so things go quickly. As an exercise for the student you can connect to a different dataset. Please load your dataset into the S3 bucket 'odl-spark19spds6003-001' and put it in a 'folder' named by your UVA NetID.

## Student Exercise - Loading Data from S3 directly to dataframe
As part of the homework you need to set up a spark dataframe for your large dataset. I did not want to deprive you of the joy of figuring out how to do that. I recommend google. Some good terms to include in the search are: pyspark, dataframe, s3, doc. NB: It is ok to struggle with this part. That's where the learning is. It is also ok to ask a friend for help, must make sure you understand the code.
1. Make dataframe from csv file (s3://odl-spark19spds6003-001/checkouts-by-title-head.csv)
  * nb: this one is 166MB, we will start small and then compare performance with larger version
2. Make parquet file
3. Make dataframe from parquet file
4. Repeat with (s3://odl-spark19spds6003-001/checkouts-by-title.csv), 6.6GB file and note performance difference.
5. Make a data frame with the data you want to use for MLLib



* https://docs.aws.amazon.com/sagemaker/latest/dg/apache-spark-example1.html
* https://github.com/aws-samples/aws-sagemaker-ml-blog-predictive-campaigns/blob/master/notebooks/Retail%2BData%2BDiscovery%2Band%2BProcessing%2Bv1.0.ipynb

## Using MLlib

### Convention
* df = spark dataframe
* pddf = pandas dataframe

### Outline
1. make demo data frame
2. exploratory tools
3. feature engineering
4. train/test
5. vectorize (spark special sauce)
6. train
7. predict
8. eval

### Make a demo data frame

In [10]:
# setup a sample dataframe
bucket='odl-spark19spds6003-001'
data_key = 'Batting.csv'
data_location = 's3://{}/{}'.format(bucket, data_key)
pddf = pd.read_csv(data_location)

In [11]:
df = sqlc.createDataFrame(pddf) # NB: this will case an error

TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>

In [12]:
pddf = pddf.dropna() # spark thinks NaN are strings and the rest are doubles, so dropnas

In [13]:
df = sqlc.createDataFrame(pddf)

## MLlib Basics
* exploratory tools


In [14]:
display(df)

DataFrame[playerID: string, yearID: bigint, stint: bigint, teamID: string, lgID: string, G: bigint, AB: bigint, R: bigint, H: bigint, 2B: bigint, 3B: bigint, HR: bigint, RBI: double, SB: double, CS: double, BB: bigint, SO: double, IBB: double, HBP: double, SH: double, SF: double, GIDP: double]

In [15]:
df.printSchema()

root
 |-- playerID: string (nullable = true)
 |-- yearID: long (nullable = true)
 |-- stint: long (nullable = true)
 |-- teamID: string (nullable = true)
 |-- lgID: string (nullable = true)
 |-- G: long (nullable = true)
 |-- AB: long (nullable = true)
 |-- R: long (nullable = true)
 |-- H: long (nullable = true)
 |-- 2B: long (nullable = true)
 |-- 3B: long (nullable = true)
 |-- HR: long (nullable = true)
 |-- RBI: double (nullable = true)
 |-- SB: double (nullable = true)
 |-- CS: double (nullable = true)
 |-- BB: long (nullable = true)
 |-- SO: double (nullable = true)
 |-- IBB: double (nullable = true)
 |-- HBP: double (nullable = true)
 |-- SH: double (nullable = true)
 |-- SF: double (nullable = true)
 |-- GIDP: double (nullable = true)



In [16]:
df.take(5)

[Row(playerID='colemjo02', yearID=1890, stint=1, teamID='PHI', lgID='NL', G=1, AB=0, R=0, H=0, 2B=0, 3B=0, HR=0, RBI=0.0, SB=0.0, CS=0.0, BB=0, SO=0.0, IBB=0.0, HBP=0.0, SH=0.0, SF=0.0, GIDP=0.0),
 Row(playerID='brynato01', yearID=1891, stint=1, teamID='BSN', lgID='NL', G=1, AB=0, R=0, H=0, 2B=0, 3B=0, HR=0, RBI=0.0, SB=0.0, CS=0.0, BB=0, SO=0.0, IBB=0.0, HBP=0.0, SH=0.0, SF=0.0, GIDP=0.0),
 Row(playerID='dunnian01', yearID=1891, stint=1, teamID='NY1', lgID='NL', G=1, AB=0, R=0, H=0, 2B=0, 3B=0, HR=0, RBI=0.0, SB=0.0, CS=0.0, BB=0, SO=0.0, IBB=0.0, HBP=0.0, SH=0.0, SF=0.0, GIDP=0.0),
 Row(playerID='sulliji01', yearID=1891, stint=1, teamID='BSN', lgID='NL', G=1, AB=0, R=0, H=0, 2B=0, 3B=0, HR=0, RBI=0.0, SB=0.0, CS=0.0, BB=0, SO=0.0, IBB=0.0, HBP=0.0, SH=0.0, SF=0.0, GIDP=0.0),
 Row(playerID='viaule01', yearID=1892, stint=1, teamID='CL4', lgID='NL', G=1, AB=0, R=0, H=0, 2B=0, 3B=0, HR=0, RBI=0.0, SB=0.0, CS=0.0, BB=0, SO=0.0, IBB=0.0, HBP=0.0, SH=0.0, SF=0.0, GIDP=0.0)]

In [17]:
print("Pearson's r(H,R) = {}".format(df.corr("H", "R")))
print("Pearson's r(HR,SO) = {}".format(df.corr("HR", "SO")))
print("Pearson's r(yearID,HR) = {}".format(df.corr("yearID", "HR")))

Pearson's r(H,R) = 0.9739356029782406
Pearson's r(HR,SO) = 0.8235057354838308
Pearson's r(yearID,HR) = 0.02899663350555121


But those first two correlations are probably due to how many games the player played.

In [18]:
print("Pearson's r(G,H) = {}".format(df.corr("G", "H")))
print("Pearson's r(G,R) = {}".format(df.corr("G", "R")))

Pearson's r(G,H) = 0.9179755244596672
Pearson's r(G,R) = 0.8928026209617358


We've got several ways to go from here. In this case we are going to compute some new features. H, R, HR, SO, per G.

In [19]:
df = df.withColumn('HpG', df.H / df.G)
df.printSchema()

root
 |-- playerID: string (nullable = true)
 |-- yearID: long (nullable = true)
 |-- stint: long (nullable = true)
 |-- teamID: string (nullable = true)
 |-- lgID: string (nullable = true)
 |-- G: long (nullable = true)
 |-- AB: long (nullable = true)
 |-- R: long (nullable = true)
 |-- H: long (nullable = true)
 |-- 2B: long (nullable = true)
 |-- 3B: long (nullable = true)
 |-- HR: long (nullable = true)
 |-- RBI: double (nullable = true)
 |-- SB: double (nullable = true)
 |-- CS: double (nullable = true)
 |-- BB: long (nullable = true)
 |-- SO: double (nullable = true)
 |-- IBB: double (nullable = true)
 |-- HBP: double (nullable = true)
 |-- SH: double (nullable = true)
 |-- SF: double (nullable = true)
 |-- GIDP: double (nullable = true)
 |-- HpG: double (nullable = true)



In [20]:
df = df.withColumn('RpG', df.R / df.G)
df = df.withColumn('HRpG', df.HR / df.G)
df = df.withColumn('SOpG', df.SO / df.G)
df.printSchema()

root
 |-- playerID: string (nullable = true)
 |-- yearID: long (nullable = true)
 |-- stint: long (nullable = true)
 |-- teamID: string (nullable = true)
 |-- lgID: string (nullable = true)
 |-- G: long (nullable = true)
 |-- AB: long (nullable = true)
 |-- R: long (nullable = true)
 |-- H: long (nullable = true)
 |-- 2B: long (nullable = true)
 |-- 3B: long (nullable = true)
 |-- HR: long (nullable = true)
 |-- RBI: double (nullable = true)
 |-- SB: double (nullable = true)
 |-- CS: double (nullable = true)
 |-- BB: long (nullable = true)
 |-- SO: double (nullable = true)
 |-- IBB: double (nullable = true)
 |-- HBP: double (nullable = true)
 |-- SH: double (nullable = true)
 |-- SF: double (nullable = true)
 |-- GIDP: double (nullable = true)
 |-- HpG: double (nullable = true)
 |-- RpG: double (nullable = true)
 |-- HRpG: double (nullable = true)
 |-- SOpG: double (nullable = true)



In [21]:
print("Pearson's r(G,H) = {}".format(df.corr("G", "H")))
print("Pearson's r(G,HpG) = {}".format(df.corr("G", "HpG")))
print()
print("Pearson's r(G,R) = {}".format(df.corr("G", "R")))
print("Pearson's r(G,RpG) = {}".format(df.corr("G", "RpG")))
print()
print("Pearson's r(H,R) = {}".format(df.corr("H", "R")))
print("Pearson's r(HpG,RpG) = {}".format(df.corr("HpG", "RpG")))
print()
print("Pearson's r(HR,SO) = {}".format(df.corr("HR", "SO")))
print("Pearson's r(HRpG,SOpG) = {}".format(df.corr("HRpG", "SOpG")))

Pearson's r(G,H) = 0.9179755244596672
Pearson's r(G,HpG) = 0.7333576164062421

Pearson's r(G,R) = 0.8928026209617358
Pearson's r(G,RpG) = 0.702607341934959

Pearson's r(H,R) = 0.9739356029782406
Pearson's r(HpG,RpG) = 0.9041645921942068

Pearson's r(HR,SO) = 0.8235057354838308
Pearson's r(HRpG,SOpG) = 0.5032203403305449


### Select two features for analysis
* RBI = feature
* R = label

In [22]:
df = df.select("RBI","R")

## Finally, ML time
1. split data into train/test
2. **SPARK SPECIAL SAUCE** -  pysparki.ml.linalg - vectorization
3. Train --> Predict --> Evaluate

### Make Training and Test sets

In [23]:
# create train/test sets
seed = 42
(testDF, trainingDF) = df.randomSplit((0.20, 0.80), seed=seed)
print ('training set N = {}, test set N = {}'.format(trainingDF.count(),testDF.count()))

training set N = 52830, test set N = 13345


# VECTORIZATION - spark special sauce

In [30]:
from pyspark.ml.linalg import Vectors, VectorUDT # nb: bad form, done for pedagogy

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html
* User-Defined Functions (aka UDF) is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets.

In [34]:
# make a user defined function (udf)
sqlc.registerFunction("oneElementVec", lambda d: Vectors.dense([d]), returnType=VectorUDT())

# vectorize the data frames
trainingDF = trainingDF.selectExpr("R", "oneElementVec(RBI) as RBI")
testDF = testDF.selectExpr("R", "oneElementVec(RBI) as RBI")

print(testDF.orderBy(testDF.R.desc()).limit(5))

DataFrame[R: bigint, RBI: vector]


In [38]:
# rename to make ML engine happy
trainingDF = trainingDF.withColumnRenamed("R", "label").withColumnRenamed("RBI", "features")
testDF = testDF.withColumnRenamed("R", "label").withColumnRenamed("RBI", "features")

## ML time for real
1. Train
2. Predict
3. Evaluate

In [39]:
from pyspark.ml.regression import LinearRegression, LinearRegressionModel

lr = LinearRegression()
lrModel = lr.fit(trainingDF)

In [40]:
type(lrModel)

pyspark.ml.regression.LinearRegressionModel

We are now going to transform our test set to get predictions. It will append a prediction column to testDF in the new dataframe predictionsAndLabelsDF.

In [45]:
predictionsAndLabelsDF = lrModel.transform(testDF)

print(predictionsAndLabelsDF.orderBy(predictionsAndLabelsDF.label.desc()).take(5))

[Row(label=146, features=DenseVector([72.0]), prediction=71.00487259904881), Row(label=146, features=DenseVector([81.0]), prediction=79.68985762483204), Row(label=143, features=DenseVector([130.0]), prediction=126.97477609854079), Row(label=142, features=DenseVector([128.0]), prediction=125.04477942614452), Row(label=140, features=DenseVector([72.0]), prediction=71.00487259904881)]


## Model Evaluation

In [50]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator()
print(eval.explainParams())

labelCol: label column name. (default: label)
metricName: metric name in evaluation - one of:
                       rmse - root mean squared error (default)
                       mse - mean squared error
                       r2 - r^2 metric
                       mae - mean absolute error. (default: rmse)
predictionCol: prediction column name. (default: prediction)


In [51]:
type(eval)

pyspark.ml.evaluation.RegressionEvaluator

In [52]:
eval.setMetricName("rmse").evaluate(predictionsAndLabelsDF)

9.429655325124696

In [53]:
eval.setMetricName("r2").evaluate(predictionsAndLabelsDF)

0.8773328491508267