# Linear Regression in PySpark

# Dependencies
## Install pyspark, pandas

In [354]:
# !pip install pyspark

In [355]:
# !pip install pandas

## Import dependencies/packages such as pyspark

In [356]:
import pyspark

In [357]:
import pandas as pd

# Create Spark Session

In [358]:
from pyspark.sql import SparkSession

In [359]:
spark = SparkSession.builder.appName('spark_project ').getOrCreate()

## Change logging options, to suppress WARNings

In [360]:
spark.sparkContext.setLogLevel("ERROR")

### Note: getOrCreate() is important.
Otherwise you have to manually reset kernel everytime, and manually run cells in proper sequence

# ALWAYS USE SPARK FUNCTIONS
TO TAKE ADVANTAGE OF SPARK'S EXECUTION SPEED. STAY AWAY FROM USER-DEFINED FUNCTIONS IF POSSIBLE.

In [361]:
spark

## SparkUI hyperlink available

# Data Wrangling, ETL process
Get Data, part of

In [362]:
# df_spark = spark.read.option('header','true').csv("file:///D:/2_R_repo/2_python repo/Spark project/auto-mpg.csv", inferSchema=True)
df_spark = spark.read.csv("file:///D:/2_general_repo/1_public_repo/Spark project/auto-mpg.csv", inferSchema=True, header=True)

Note: without inferSchema, everything is type-string.
df_spark.describe()
DataFrame[summary: string, _c0: string, V1: string, V2: string, V3: string, V4: string, V5: string, V6: string, V7: string, V8: string, V9: string]

## Rename ALL columns
toDF(*new_col_names)
Can be used for multiple (as in less than all) columns

In [363]:
new_col_names = ["sr_no", "mpg", "cyl", "dspl", "hp", "wt", "accl", "yr", "origin", "name"]

Without assignment OR without capturing return value of function, the result is only view, not modification to df.
Also Spark uses RDD, immutable datastructures, so everytime a brand new datastructure is created

In [364]:
df_spark = df_spark.toDF(*new_col_names)

## Dimensions of dataset, shape

In [365]:
(df_spark.count() , len(df_spark.columns))

(225, 10)

## Show data, .show()
similar to pandas .head()

In [366]:
df_spark.toDF(*new_col_names).show(5)

+-----+----+---+-----+---+----+----+---+------+--------------------+
|sr_no| mpg|cyl| dspl| hp|  wt|accl| yr|origin|                name|
+-----+----+---+-----+---+----+----+---+------+--------------------+
|    1|18.0|  8|307.0|130|3504|12.0| 70|     1|chevrolet chevell...|
|    2|15.0|  8|350.0|165|3693|11.5| 70|     1|   buick skylark 320|
|    3|18.0|  8|318.0|150|3436|11.0| 70|     1|  plymouth satellite|
|    4|16.0|  8|304.0|150|3433|12.0| 70|     1|       amc rebel sst|
|    5|17.0|  8|302.0|140|3449|10.5| 70|     1|         ford torino|
+-----+----+---+-----+---+----+----+---+------+--------------------+
only showing top 5 rows



## Get data_types in DataFrame, .dtypes()

In [367]:
df_spark.dtypes

[('sr_no', 'int'),
 ('mpg', 'double'),
 ('cyl', 'int'),
 ('dspl', 'double'),
 ('hp', 'string'),
 ('wt', 'int'),
 ('accl', 'double'),
 ('yr', 'int'),
 ('origin', 'int'),
 ('name', 'string')]

## Null value handling

In [368]:
from pyspark.sql.functions import isnan, when, count, col

In [369]:
df_spark.na.drop(how="all")

DataFrame[sr_no: int, mpg: double, cyl: int, dspl: double, hp: string, wt: int, accl: double, yr: int, origin: int, name: string]

In [370]:
df_spark.select( [ count( when( col(c).isNull(), c)).alias(c) for c in df_spark.columns]).show()

+-----+---+---+----+---+---+----+---+------+----+
|sr_no|mpg|cyl|dspl| hp| wt|accl| yr|origin|name|
+-----+---+---+----+---+---+----+---+------+----+
|    0|  0|  0|   0|  0|  0|   0|  0|     0|   0|
+-----+---+---+----+---+---+----+---+------+----+



# PIPELINE

### Input Column list
Define column lists that will be transformed, or converted to categorical variable from original data_type

In [372]:
input_col_list = ['cyl', 'yr']

In [373]:
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler

In [374]:
from pyspark.ml import Pipeline

## Initialize empty list of Stages in Pipeline

In [375]:
stages_list = []

## Append Stages, StringIndexer

## Append Stages, IndexToString

In [376]:
for col in input_col_list:
    # recast to string_index type from original type. ### Convert to String first.
    # NOTE: It really converts to string_type that is indexed by frequency, max_frequency is given index 0.
    stages_list.append(StringIndexer(inputCol=col, outputCol=col + '_str_ix', handleInvalid='skip'))
    # recast to categorical variable from string_index
    stages_list.append(IndexToString(inputCol=col + '_str_ix', outputCol=col + '_catg'))

Check partial pipeline output

In [377]:
# Assemble pipeline
pipeline = Pipeline(stages=stages_list)
# Estimator fit
pipeline_model = pipeline.fit(df_spark)
# Transformer fit
df_spark_updated = pipeline_model.transform(df_spark)
df_spark_updated.show(2)

+-----+----+---+-----+---+----+----+---+------+--------------------+----------+--------+---------+-------+
|sr_no| mpg|cyl| dspl| hp|  wt|accl| yr|origin|                name|cyl_str_ix|cyl_catg|yr_str_ix|yr_catg|
+-----+----+---+-----+---+----+----+---+------+--------------------+----------+--------+---------+-------+
|    1|18.0|  8|307.0|130|3504|12.0| 70|     1|chevrolet chevell...|       1.0|       8|      3.0|     70|
|    2|15.0|  8|350.0|165|3693|11.5| 70|     1|   buick skylark 320|       1.0|       8|      3.0|     70|
+-----+----+---+-----+---+----+----+---+------+--------------------+----------+--------+---------+-------+
only showing top 2 rows



## Append Stages, VectorAssembler

### Select features using column names list

In [378]:
predictor_cols = ["dspl", "wt", "accl" ]

Transform DataFrame with vector-assembled features

In [379]:
stages_list.append(VectorAssembler(inputCols=[col for col in predictor_cols], outputCol='features'))

Note: VectorAssembler_object.transform(DataFrame) returns a new DataFrame

Note: outputCol in VectorAssembler is the name of column,
containing "vector" of input or independent features or predictors.
It is added as a new column to original dataframe.

## Form Pipeline

In [380]:
# Assemble pipeline
pipeline = Pipeline(stages=stages_list)
# Estimator fit
pipeline_model = pipeline.fit(df_spark)
# Transformer fit
df_spark_updated = pipeline_model.transform(df_spark)
df_spark_updated.show(2)

+-----+----+---+-----+---+----+----+---+------+--------------------+----------+--------+---------+-------+-------------------+
|sr_no| mpg|cyl| dspl| hp|  wt|accl| yr|origin|                name|cyl_str_ix|cyl_catg|yr_str_ix|yr_catg|           features|
+-----+----+---+-----+---+----+----+---+------+--------------------+----------+--------+---------+-------+-------------------+
|    1|18.0|  8|307.0|130|3504|12.0| 70|     1|chevrolet chevell...|       1.0|       8|      3.0|     70|[307.0,3504.0,12.0]|
|    2|15.0|  8|350.0|165|3693|11.5| 70|     1|   buick skylark 320|       1.0|       8|      3.0|     70|[350.0,3693.0,11.5]|
+-----+----+---+-----+---+----+----+---+------+--------------------+----------+--------+---------+-------+-------------------+
only showing top 2 rows



In [381]:
from pyspark.ml.regression import LinearRegression

In [382]:
lm_model = LinearRegression(featuresCol='features', labelCol='mpg')

In [383]:
stages_list.append(LinearRegression(featuresCol='features', labelCol='mpg'))

In [384]:
print(stages_list)

[StringIndexer_47f1ac54a4e5, IndexToString_359293c2d6ac, StringIndexer_f76956302d01, IndexToString_c02fcede1809, VectorAssembler_41211c03b1fd, LinearRegression_612a1e4f892a]


In [385]:
# Assemble pipeline
pipeline = Pipeline(stages=stages_list)
# Estimator fit
pipeline_model = pipeline.fit(df_spark)
# Transformer fit
df_spark_updated = pipeline_model.transform(df_spark)
df_spark_updated.show(2)

+-----+----+---+-----+---+----+----+---+------+--------------------+----------+--------+---------+-------+-------------------+------------------+
|sr_no| mpg|cyl| dspl| hp|  wt|accl| yr|origin|                name|cyl_str_ix|cyl_catg|yr_str_ix|yr_catg|           features|        prediction|
+-----+----+---+-----+---+----+----+---+------+--------------------+----------+--------+---------+-------+-------------------+------------------+
|    1|18.0|  8|307.0|130|3504|12.0| 70|     1|chevrolet chevell...|       1.0|       8|      3.0|     70|[307.0,3504.0,12.0]|17.375689672172097|
|    2|15.0|  8|350.0|165|3693|11.5| 70|     1|   buick skylark 320|       1.0|       8|      3.0|     70|[350.0,3693.0,11.5]|15.903179717661235|
+-----+----+---+-----+---+----+----+---+------+--------------------+----------+--------+---------+-------+-------------------+------------------+
only showing top 2 rows



NOTE: Train-test split is done later. Above results are using 100% of data as training data and predicting using model on the same data.

## Drop extra columns

In [386]:
col_list_to_drop = ("cyl", "yr", "cyl_str_ix", "yr_str_ix", "name", "origin", "dspl", "hp", "wt", "accl","cyl_catg", "yr_catg")
df_spark_updated = df_spark_updated.drop(*col_list_to_drop)
df_spark_updated.show(2)

+----+-------------------+------------------+
| mpg|           features|        prediction|
+----+-------------------+------------------+
|18.0|[307.0,3504.0,12.0]|17.375689672172097|
|15.0|[350.0,3693.0,11.5]|15.903179717661235|
+----+-------------------+------------------+
only showing top 2 rows



In [387]:
df_spark_updated.printSchema()

root
 |-- mpg: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)



Refer above code to know that "input_features" is vector of predictor_cols = ["dspl", "wt", "accl" ]

## Analysis

### Train-Test Split

In [323]:
train_data, test_data = df_spark_updated.randomSplit([0.75,0.25], seed=2022)

In [324]:
train_data.count()

167

In [325]:
test_data.count()

58

In [326]:
df_spark_updated.count()

225

### Get RMSE
Root Mean Squared Error

In [337]:
rmse_lm = round(model_lm_fit_train_data.summary.rootMeanSquaredError,3)
print("RMSE= ", rmse_lm)

RMSE=  2.544


### Get MAE
Mean Absolute Error

In [338]:
mae_lm = round(model_lm_fit_train_data.summary.meanAbsoluteError,3)
print("MAE= ", mae_lm)

MAE=  2.007


### Get R-squared

In [339]:
r2_lm = round(model_lm_fit_train_data.summary.r2, 3)
print("R2= ", r2_lm)

R2=  0.81


### Get coefficients for fitted model

In [340]:
model_coeff = model_lm_fit_train_data.coefficients
print("model_coeff= ", [round(i,3) for i in model_coeff])

model_coeff=  [-0.014, -0.004, -0.002]


Note: "input_features" is vector of inputCols=['cyl', 'dspl', 'wt'].
So coefficients above are 3-qty.total, 1 for each of the 3 predictors.

### Get Intercept for fitted model

In [341]:
model_intercept = round(model_lm_fit_train_data.intercept,3)
print("model_intercept= ", model_intercept)

model_intercept=  36.422


### Make Predictions

In [342]:
yhat_model_predicts = model_lm_fit_train_data.evaluate(dataset= test_data)

In [343]:
yhat_model_predicts

<pyspark.ml.regression.LinearRegressionSummary at 0x26997530130>

In [344]:
yhat_model_predicts.predictions.show(3)

+----+-------------------+------------------+
| mpg|           features|        prediction|
+----+-------------------+------------------+
|10.0|[307.0,4376.0,15.0]|13.528052988819656|
|10.0|[360.0,4615.0,14.0]| 11.78815738494092|
|12.0|[350.0,4456.0,13.5]|12.604154192494484|
+----+-------------------+------------------+
only showing top 3 rows





In [345]:
yhat_model_predicts_2 = model_lm_fit_train_data.transform(dataset= test_data)

In [346]:
yhat_model_predicts_2.show(3)

+----+-------------------+------------------+
| mpg|           features|        prediction|
+----+-------------------+------------------+
|10.0|[307.0,4376.0,15.0]|13.528052988819656|
|10.0|[360.0,4615.0,14.0]| 11.78815738494092|
|12.0|[350.0,4456.0,13.5]|12.604154192494484|
+----+-------------------+------------------+
only showing top 3 rows



In [347]:
from pyspark.ml.evaluation import RegressionEvaluator

In [348]:
lm_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="mpg", metricName="r2")

In [349]:
type( lm_evaluator)

pyspark.ml.evaluation.RegressionEvaluator

In [350]:
r2_lm_test = round(lm_evaluator.evaluate(yhat_model_predicts_2),3)
print("R Squared (R2) on test data = %g" %r2_lm_test )

R Squared (R2) on test data = 0.854


In [351]:
r2_lm_train = round(model_lm_fit_train_data.summary.r2, 3)
print("R Squared (R2) on TRAIN data = ", r2_lm_train)

R Squared (R2) on TRAIN data =  0.81


In [352]:
rmse_lm_test = round(model_lm_fit_train_data.evaluate(test_data).rootMeanSquaredError,3)
print("RMSE on test data = ", rmse_lm_test)

RMSE on test data =  2.485


In [353]:
rmse_lm_train = round(model_lm_fit_train_data.summary.rootMeanSquaredError,3)
print("RMSE on TRAIN data = ", rmse_lm_train)

RMSE on TRAIN data =  2.544
