<a href="https://colab.research.google.com/github/cicyfan/spark-fundamentals/blob/master/spark_ml_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark ML

![image.png](attachment:image.png)

In [50]:
# Uncomment below to install and setup spark

# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
# !tar xf spark-2.4.6-bin-hadoop2.7.tgz
# !pip install -q findspark

# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

In [13]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Initialize Spark

In [20]:
# let's find where spark is installed
import findspark
findspark.init()

# intialize
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .getOrCreate()
        #.appName("ML")\
        #.getOrCreate()

spark

#### Import the packages we need

In [21]:
# spark types
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, LongType, NumericType

# spark ML packages
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline


#### Define the schema for our dataset

In [22]:
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", IntegerType(), True),
    StructField("education", StringType(), True),
    StructField("education-num", IntegerType(), True),
    StructField("marital-status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capital-gain", IntegerType(), True),
    StructField("capital-loss", IntegerType(), True),
    StructField("hours-per-week", IntegerType(), True),
    StructField("native-country", StringType(), True),
    StructField("salary", StringType(), True)
])



In [23]:
# load our data into a train set and a test set
train_df = spark.read.csv('/content/drive/My Drive/data/adult.data.txt', header=False, schema=schema)
test_df = spark.read.csv('/content/drive/My Drive/data/adult.text.txt', header=False, schema=schema)


In [24]:
# pretty print some rows
from pyspark.sql.functions import lit, col, asc, desc

train_df.orderBy(asc("age")).limit(5).toPandas()


Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,,,,,,,,,,,,,,,
1,,,,,,,,,,,,,,,
2,,,,,,,,,,,,,,,
3,,,,,,,,,,,,,,,
4,,,,,,,,,,,,,,,


#### We can see our data was not read correctly... let's investigate

In [26]:
train_df = spark.read \
  .option("delimiter",",") \
  .option("inferSchema","true") \
  .csv('/content/drive/My Drive/data/adult.data.txt', header=False)
train_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: double (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)



##### Instead of integer, spark 'infer' chose doubles. Let's find out why


In [27]:
train_df.take(1)

[Row(_c0=39, _c1=' State-gov', _c2=77516.0, _c3=' Bachelors', _c4=13.0, _c5=' Never-married', _c6=' Adm-clerical', _c7=' Not-in-family', _c8=' White', _c9=' Male', _c10=2174.0, _c11=0.0, _c12=40.0, _c13=' United-States', _c14=' <=50K')]

#### Turns out Sparks IntegerType cannot parse strings with leading blanks; at least in the version I am running.
We need to modify our ingestion method to correct for it. 
We could: 
    - ingest as double and convert to int
    - ingest as string, trim and convert to int
    
I will select the second approach


In [33]:
# seems IntegerType cannot handle leading spaces; let's try to trim first
from pyspark.sql.functions import ltrim, rtrim, col

# trim function, spark.sql compatible and registered
def trim(x):
  return ltrim(rtrim(x))
spark.udf.register("trim", trim, StringType())

newSchema = StructType([
    StructField("age", StringType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", StringType(), True),
    StructField("education", StringType(), True),
    StructField("education-num", StringType(), True),
    StructField("marital-status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capital-gain", StringType(), True),
    StructField("capital-loss", StringType(), True),
    StructField("hours-per-week", StringType(), True),
    StructField("native-country", StringType(), True),
    StructField("salary", StringType(), True)
])


# load our data 
train_df = spark.read.csv('/content/drive/My Drive/data/adult.data.txt', header=False, schema=newSchema) 

# trim all whitespace from all columns
for c in train_df.columns:
    train_df = train_df.withColumn(c, trim(col(c)))

# convert to integers
train_df = train_df \
    .withColumn("age",trim(col("age")).cast(IntegerType())) \
    .withColumn("fnlwgt",trim(col("fnlwgt")).cast(IntegerType())) \
    .withColumn("education-num",trim(col("education-num")).cast(IntegerType())) \
    .withColumn("capital-gain",trim(col("capital-gain")).cast(IntegerType())) \
    .withColumn("capital-loss",trim(col("capital-loss")).cast(IntegerType())) \
    .withColumn("hours-per-week",trim(col("hours-per-week")).cast(IntegerType())) 

train_df.limit(5).toPandas()



Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


#### Let's inspect our test set

In [34]:
# load our test data again and inspect 
test_df = spark.read.csv('/content/drive/My Drive/data/adult.data.txt', header=False, schema=newSchema) 
test_df.limit(5).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


#### Our test dataset is `dirty`.
The first line has some strange line. So let's clean it. 
There is no easy way to do this in Spark, yet. 


In [31]:
# load and ignore the first line

# 1. convert to RDD
# 2. add an index column (this converts the row to a tuple (index, Row); see the API documentation)
# 3. filter the first row (index 0)
# 4. convert back to a Row value
# 5. convert to dataframe

test_df = spark.read.csv('/content/drive/My Drive/data/adult.text.txt', header=False, schema=newSchema) \
    .rdd \
    .zipWithIndex() \
    .filter(lambda tuple: tuple[1]>0) \
    .map(lambda tuple: tuple[0]) \
    .toDF()

test_df.limit(5).toPandas()



Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,25,Private,226802,11th,7,Never-married,Machine-op-inspct,Own-child,Black,Male,0,0,40,United-States,<=50K.
1,38,Private,89814,HS-grad,9,Married-civ-spouse,Farming-fishing,Husband,White,Male,0,0,50,United-States,<=50K.
2,28,Local-gov,336951,Assoc-acdm,12,Married-civ-spouse,Protective-serv,Husband,White,Male,0,0,40,United-States,>50K.
3,44,Private,160323,Some-college,10,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,7688,0,40,United-States,>50K.
4,18,?,103497,Some-college,10,Never-married,?,Own-child,White,Female,0,0,30,United-States,<=50K.


In [35]:
# fix the strings and integer columns 

# create a list of the columns we want to convert to Integer
numericCols = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']

for c in test_df.columns:
    test_df = test_df.withColumn(c, trim(col(c)))
    if c in numericCols:
        test_df = test_df.withColumn(c, col(c).cast(IntegerType()))
    


In [36]:
test_df.limit(5).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


### Finally, Actual ML ahead !!


#### Let's convert string columns into numbers using StringIndexer and OneHotEncoder transformer, as discussed in class 

![image.png](attachment:image.png)

### Create the transformer objects we will use

#### CAREFUL - ALERT

When creating a categorical feature from string values, we **must** use a consistent vocabulary.
That means using the ***same*** vocabulary at all times: e.g. we create just one vocabulary using *StringIndexer* to use for all datasets. 

By creating a single string encoder list, we ensure a single mapping between strings and indices.
In our code below, we pass the `handleInvalid="skip"` option. See http://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=stringindexer#pyspark.ml.feature.StringIndexer.


In [37]:

# create a list of the coluns we want to index & hot-encode
categorical_variables = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']

# use indexers to encode unique strings as numbers; this var holds a list: one indexer/column
indexers = [StringIndexer(inputCol=column, outputCol=column+"-index", handleInvalid="skip") for column in categorical_variables]

# create string encodings for trinaing data only
stringsIndexers = [indexer.fit(train_df) for indexer in indexers]

# one-hot encode the encoded strings to a vector representation
stringsEncoder = OneHotEncoderEstimator(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

# create a VectorAssemble transformer to encode the resulting string indices 
stringsAssembler = VectorAssembler(
    inputCols=stringsEncoder.getOutputCols(),
    outputCol="categorical-features"
)


### vectorize all columns 

Note: we spent some effort in converting numeric columns to Integers. It is possible the learning algorithms would have preferred doubles.

I personally like to control every possible setting in my environment. By doing the conversion earlier, I am guaranteeing the schema I want...


In [38]:
# now add the numeric variables
numeric_variables = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']

numericsAssembler = VectorAssembler(
    inputCols=['categorical-features', *numeric_variables],
    outputCol='features'
)

# add all the transform steps to a **pipeline**
#  NOTE: we use indexingModels as a step as we do not want re-create string indices based only on the 
#     strings we saw in training
pipeline = Pipeline(stages=stringsIndexers + [stringsEncoder, stringsAssembler, numericsAssembler])

# Apply the pipeline to our training dataset
train_df1 = pipeline.fit(train_df).transform(train_df)

train_df1.limit(5).toPandas()['features'][0]



SparseVector(100, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 94: 39.0, 95: 77516.0, 96: 13.0, 97: 2174.0, 99: 40.0})

### Select the `label` column
We want to learn `salary`


In [39]:
labelIndexer = StringIndexer(inputCol='salary', outputCol='label', handleInvalid="skip")

train_df2 = labelIndexer.fit(train_df1).transform(train_df1)
train_df2.limit(10).toPandas()['label']


0    0.0
1    0.0
2    0.0
3    0.0
4    0.0
5    0.0
6    0.0
7    1.0
8    1.0
9    1.0
Name: label, dtype: float64

### Build a model using Logistic Regression


In [40]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(train_df2)

### Summary of the Model
Spark has a poor summary function for data and model...


In [41]:
from pyspark.sql.functions import avg

trainingSummary = model.summary
print("Accuracy: ", trainingSummary.accuracy)
print("Area under ROC: ", trainingSummary.areaUnderROC)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(model.coefficientMatrix))
print("Multinomial intercepts: " + str(model.interceptVector))


# average precision/recall curve
trainingSummary.pr.select(avg("recall"),avg("precision")).limit(1).toPandas()


Accuracy:  0.8532600350112097
Area under ROC:  0.9086742514762464
Multinomial coefficients: DenseMatrix([[-1.45563658e+00, -1.94342930e+00, -1.63628895e+00,
              -7.69412473e-01, -1.77233682e+00, -1.27788135e+00,
              -9.62737643e-01, -9.41003654e+00,  3.93642261e-01,
               4.38932678e-01,  3.17003597e-01,  3.74099211e-01,
               3.35474269e-01,  2.68738135e-01,  1.60594042e-02,
               4.81655233e-01,  6.08510531e-01,  5.77915339e-01,
               5.88780642e-01,  3.58128419e-01,  4.83256829e-01,
               1.13939557e+00,  1.14715374e+00, -5.93342409e-01,
              -3.29914313e+00, -2.81401798e+00, -2.94750778e+00,
              -2.67141784e+00, -2.82479023e+00, -7.83977102e-02,
              -5.22659083e-01,  1.95176332e-01, -5.86216396e-01,
              -3.11939855e-01, -1.41451916e+00, -8.74921363e-01,
              -1.98333370e+00, -7.05161008e-01, -1.27921985e+00,
              -1.59331152e+00,  6.28178740e-02, -1.98778255e-02

Unnamed: 0,avg(recall),avg(precision)
0,0.810092,0.512386


# Evaluate/Predict on our test set

In [42]:
# prepare the test data using the same steps as the training data (see above)
test_df1 = pipeline.fit(test_df).transform(test_df)
test_df1.limit(5).toPandas()['features'][0]

SparseVector(100, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 94: 39.0, 95: 77516.0, 96: 13.0, 97: 2174.0, 99: 40.0})

In [43]:
test_df1.limit(2).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary,workclass-index,education-index,marital-status-index,occupation-index,relationship-index,race-index,sex-index,native-country-index,workclass-index-encoded,occupation-index-encoded,marital-status-index-encoded,native-country-index-encoded,education-index-encoded,race-index-encoded,relationship-index-encoded,sex-index-encoded,categorical-features,features
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K,4.0,2.0,1.0,3.0,1.0,0.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)",(1.0),"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K,1.0,2.0,0.0,2.0,0.0,0.0,0.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)",(1.0),"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [44]:
# set the label column
test_df1 = labelIndexer.fit(test_df1).transform(test_df1)
test_df1.limit(5).toPandas()['label']


0    0.0
1    0.0
2    0.0
3    0.0
4    0.0
Name: label, dtype: float64

### Predict the labels

In [45]:

test_df1 = model.transform(test_df1)

In [46]:
test_df1.limit(5).toPandas()['prediction']

0    0.0
1    0.0
2    0.0
3    0.0
4    1.0
Name: prediction, dtype: float64

In [47]:
print("Training Summary")
print("")
print("Accuracy: ", model.summary.accuracy)
print("Area under ROC: ", model.summary.areaUnderROC)

# average precision/recall curve
model.summary.pr.select(avg("recall"),avg("precision")).limit(1).toPandas()


Training Summary

Accuracy:  0.8532600350112097
Area under ROC:  0.9086742514762464


Unnamed: 0,avg(recall),avg(precision)
0,0.810092,0.512386


In [48]:
# Evaluate on Training and Test Set
# adapted from from https://chih-ling-hsu.github.io/2018/09/17/spark-mllib

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

def evaluate(predictionAndLabels):

    log = {}

    # Show Validation Score (AUROC)
    evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
    log['AUROC'] = "%f" % evaluator.evaluate(predictionAndLabels)    
    print("Area under ROC = {}".format(log['AUROC']))

    # Show Validation Score (AUPR)
    evaluator = BinaryClassificationEvaluator(metricName='areaUnderPR')
    log['AUPR'] = "%f" % evaluator.evaluate(predictionAndLabels)
    print("Area under PR = {}".format(log['AUPR']))

    # Metrics
    evaluator = MulticlassClassificationEvaluator()
    
    # Overall statistics
    log['Accuracy'] = "%s" % evaluator.evaluate(test_df1, {evaluator.metricName: "accuracy"})
    log['Precision'] = "%s" % evaluator.evaluate(test_df1, {evaluator.metricName: "weightedPrecision"})
    log['Recall'] = "%s" % evaluator.evaluate(test_df1, {evaluator.metricName: "weightedRecall"})
    log['F1 Measure'] = "%s" % evaluator.evaluate(test_df1, {evaluator.metricName: "f1"})
    
    print("[Overall]\taccuracy = %s\tprecision = %s | recall = %s | F1 Measure = %s" % \
            (log['Accuracy'], log['Precision'], log['Recall'], log['F1 Measure']))

    return log

### Quality measures, save to external file

In [49]:
import json

log = evaluate(test_df1)
with open('test.json', 'w') as f:
    json.dump(log, f)
    

Area under ROC = 0.908770
Area under PR = 0.770974
[Overall]	accuracy = 0.8532600350112097	precision = 0.8470438966761418 | recall = 0.8532600350112097 | F1 Measure = 0.8480374135849396
