In [71]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import isnan, when, count, col

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Imputer
from pyspark.ml.stat import Summarizer
from pyspark.ml import Pipeline
from pyspark.mllib.linalg import Vectors

import pandas as pd

### Exploring SPark MLlib Pipelines on an example of the 

In [2]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Spark Linear Regression") \
    .getOrCreate()

In [3]:
spark

We are going to work with the *Life Expectancy* Dataset. Let's load it and see:

Calling multiple actions like *show* when building a real ML Pipeline wouldn't be optimal, because we will need to go through the whole dataset again and again. But when exploring data - it's ok. 

In [4]:
df_raw = spark.read.load("Life Expectancy Data.csv", format="csv", sep=",", inferSchema="true", header="true")

In [56]:
df_raw.limit(5).toPandas().head()

Unnamed: 0,Country,Year,Status,Life expectancy,Adult Mortality,infant deaths,Alcohol,percentage expenditure,Hepatitis B,Measles,...,Polio,Total expenditure,Diphtheria,HIV/AIDS,GDP,Population,thinness 1-19 years,thinness 5-9 years,Income composition of resources,Schooling
0,Afghanistan,2015,Developing,65.0,263,62,0.01,71.279624,65,1154,...,6,8.16,65,0.1,584.25921,33736494.0,17.2,17.3,0.479,10.1
1,Afghanistan,2014,Developing,59.9,271,64,0.01,73.523582,62,492,...,58,8.18,62,0.1,612.696514,327582.0,17.5,17.5,0.476,10.0
2,Afghanistan,2013,Developing,59.9,268,66,0.01,73.219243,64,430,...,62,8.13,64,0.1,631.744976,31731688.0,17.7,17.7,0.47,9.9
3,Afghanistan,2012,Developing,59.5,272,69,0.01,78.184215,67,2787,...,67,8.52,67,0.1,669.959,3696958.0,17.9,18.0,0.463,9.8
4,Afghanistan,2011,Developing,59.2,275,71,0.01,7.097109,68,3013,...,68,7.87,68,0.1,63.537231,2978599.0,18.2,18.2,0.454,9.5


In [6]:
df_column_names_cleaned = df_raw
for sf in df_column_names_cleaned.schema.fields:
    df_column_names_cleaned = df_column_names_cleaned.withColumnRenamed(sf.name, sf.name.strip().lower().replace(' ', '_').replace('/', '_').replace('-', '_'))

In [9]:
# features without label
label_column = "life_expectancy"
list_columns = [sf.name for sf in df_column_names_cleaned.schema.fields if sf.name != label_column]
list_columns

['country',
 'year',
 'status',
 'adult_mortality',
 'infant_deaths',
 'alcohol',
 'percentage_expenditure',
 'hepatitis_b',
 'measles',
 'bmi',
 'under_five_deaths',
 'polio',
 'total_expenditure',
 'diphtheria',
 'hiv_aids',
 'gdp',
 'population',
 'thinness__1_19_years',
 'thinness_5_9_years',
 'income_composition_of_resources',
 'schooling']

### Feature Engineering

We will first perform Feature Engineering steps that can be applied on the whole dataset without the risk of data leakage. For example, we can securely drop columns with most null values. However, all the preprocessing steps that involve certain aggregation will be better to perform on data split into train and test sets.

In [10]:
# Delete rows where label is missing
df_nulls_removed = df_column_names_cleaned.where("{} is not null".format(label_column))

In [11]:
# numNonzeros

In [12]:
# df_nulls_removed.select([count(when(isnan(c), c)).alias(c) for c in df_nulls_removed.columns]).toPandas().head()

Unnamed: 0,country,year,status,life_expectancy,adult_mortality,infant_deaths,alcohol,percentage_expenditure,hepatitis_b,measles,...,polio,total_expenditure,diphtheria,hiv_aids,gdp,population,thinness__1_19_years,thinness_5_9_years,income_composition_of_resources,schooling
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


Start preparing modules of the ML pipeline.

In [14]:
splits = df_nulls_removed.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

In [None]:
# Prepare 3 indexers to one hot encode categorical features

In [None]:
# Start write code df agnostic

In [16]:
cat_features = ['country', 'year', 'status']
cat_features_ind = ["{}_ind".format(cf) for cf in cat_features]
cat_features_vec = ["{}_vec".format(cf) for cf in cat_features]
# Note: following for comprehansion has time complexity of the product of lengths of lists
list_columns_no_cat = [lc for lc in list_columns if lc not in cat_features]
final_list_columns = list_columns_no_cat + cat_features_vec
final_list_columns

['adult_mortality',
 'infant_deaths',
 'alcohol',
 'percentage_expenditure',
 'hepatitis_b',
 'measles',
 'bmi',
 'under_five_deaths',
 'polio',
 'total_expenditure',
 'diphtheria',
 'hiv_aids',
 'gdp',
 'population',
 'thinness__1_19_years',
 'thinness_5_9_years',
 'income_composition_of_resources',
 'schooling',
 'country_vec',
 'year_vec',
 'status_vec']

In [82]:
features_unscaled_col = "features_unscaled"
features_col = "features"

In [83]:
indexers = [StringIndexer(inputCol=cat_features[i], outputCol=cat_features_ind[i]) for i in range(len(cat_features))]
encoder = OneHotEncoder(inputCols=cat_features_ind, outputCols=cat_features_vec)
assembler = VectorAssembler(inputCols=final_list_columns, outputCol=features_unscaled_col).setHandleInvalid("skip")
scaler = MinMaxScaler(inputCol=features_unscaled_col, outputCol=features_col)
lr = LinearRegression(featuresCol=features_col, labelCol=label_column)

In [84]:
stages = indexers + [encoder, assembler, scaler, lr]
stages

[StringIndexer_4efb1605e001,
 StringIndexer_8adf8a32c6ba,
 StringIndexer_d4553a5dd698,
 OneHotEncoder_f7191995360d,
 VectorAssembler_5941b2ba423f,
 MinMaxScaler_bda7871762a0,
 LinearRegression_84f8ba2a1b0d]

In [76]:
pipeline = Pipeline(stages=stages)

In [77]:
model = pipeline.fit(train_df)

In [86]:
print("First 5 coefficients: " + str(lr_model.coefficients[:5]))

First 5 coefficients: [ -0.67608467 101.50268169  -0.85175586  -2.44337375   0.46890406]


In [87]:
print("Intercept: " + str(lr_model.intercept))

Intercept: 71.00042604968357


In [43]:
lr_model = model.stages[-1]

In [46]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1.567439
r2: 0.967992


In [78]:
prediction = model.transform(test_df)

In [47]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=label_column, metricName="r2")

In [50]:
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(prediction))

R Squared (R2) on test data = 0.961337


In [89]:
model.save("lr_model_with_preprocessing")