# Regression with MLlib and PySpark

Regression Algorithms iny Pyspark:

1. Linear regression
2. Decision tree regression
3. Random forest regression
4. Gradit-boosted tree regression

*This will be very shor notebook, witout any analysis.*

<div class="alert alert-block alert-warning"><b>Data</b></div>

*Creating our session*

In [1]:
import pyspark 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Regression").getOrCreate()

*Mandatory imports.*

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#New one - for multicolinearity checking
from pyspark.ml.stat import Correlation

**Import Dataset**

This is a dataset containing housing pricing data for California. Each row of data represents the median statistics for a block (eg. median income, median age of house, etc.). You could this data in a number of ways, but we will use it to predict the median house value. 

**About this dataset **
1. longitude: A measure of how far west a house is; a higher value is farther west
2. latitude: A measure of how far north a house is; a higher value is farther north
3. housingMedianAge: Median age of a house within a block; a lower number is a newer building
4. totalRooms: Total number of rooms within a block
5. totalBedrooms: Total number of bedrooms within a block
6. population: Total number of people residing within a block
7. households: Total number of households, a group of people residing within a home unit, for a block
8. medianIncome: Median income for households within a block of houses (measured in tens of thousands of US Dollars)
9. medianHouseValue: Median house value for households within a block (measured in US Dollars)
10. oceanProximity: Location of the house w.r.t ocean/sea

**Source:** https://www.kaggle.com/camnugent/california-housing-prices

In [3]:
path ="Datasets/"
df = spark.read.csv(path+'housing.csv',inferSchema=True,header=True)

In [4]:
df.limit(4).toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY


In [5]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



*Droping nan rows.*

In [6]:
df = df.na.drop()
df.count()

20433

<div class="alert alert-block alert-warning"><b>Data preparation</b></div>

*Firstly assinging our input and dependent.We will use only 4 columns here*

In [7]:
input_columns = ['total_bedrooms','population','households','median_income']
dependent_var = 'median_house_value'

*Renaming our depndent to a label.*

In [8]:
renamed = df.withColumnRenamed(dependent_var,'label')

*Just checking if all of my label rows are numberic and if not changing to float.*

In [9]:
if str(renamed.schema['label'].dataType) != 'IntegerType':
    renamed = renamed.withColumn("label", renamed["label"].cast(FloatType()))

*Now I am converting all string data types to a numeric one.*

In [11]:
numeric_inputs = []
string_inputs = []
for column in input_columns:
    if str(renamed.schema[column].dataType) == 'StringType':
        new_col_name = column+"_num"
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)
        indexed = renamed
        
if len(string_inputs) != 0: 
    for column in input_columns:
        if str(renamed.schema[column].dataType) == 'StringType':
            indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
            indexed = indexer.fit(renamed).transform(renamed)
        else:
            indexed = renamed

In [12]:
numeric_inputs

['total_bedrooms', 'population', 'households', 'median_income']

In [13]:
string_inputs

[]

**Outliers - same old skew function**

In [14]:

d = {}

for col in numeric_inputs: 
    d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) 
for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
    skew = skew[0][0]
    # This function will floor, cap and then log+1 (just in case there are 0 values)
    if skew > 1:
        indexed = indexed.withColumn(col, \
        log(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1:
        indexed = indexed.withColumn(col, \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")

total_bedrooms has been treated for positive (right) skewness. (skew =) 3.4592923587675024 )
population has been treated for positive (right) skewness. (skew =) 4.959652416875933 )
households has been treated for positive (right) skewness. (skew =) 3.4135995729616138 )
median_income has been treated for positive (right) skewness. (skew =) 1.6444361858367003 )


In [15]:
features_list = numeric_inputs + string_inputs

In [16]:
features_list

['total_bedrooms', 'population', 'households', 'median_income']

In [17]:
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
final_data = assembler.transform(indexed).select('features','label')

In [18]:
final_data.show(5)

+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[4.86753445045558...|452600.0|
|[7.00940893270863...|358500.0|
|[5.25227342804663...|352100.0|
|[5.46383180502561...|341300.0|
|[5.63835466933374...|342200.0|
+--------------------+--------+
only showing top 5 rows



<div class="alert alert-block alert-warning"><b>Multicollinearity check</b></div>

*Just checking if some of our pairs are not correlated too much.*

In [19]:
from pyspark.ml.stat import Correlation
pearsonCorr = Correlation.corr(final_data, 'features', 'pearson').collect()[0][0]
array = pearsonCorr.toArray()

In [20]:
for item in array:
    print(item[0])
    print(" ")
    print(item[1])
    print(" ")
    print(item[2])

1.0
 
0.8975232138344982
 
0.9745926155657936
0.8975232138344982
 
1.0
 
0.932189718732677
0.9745926155657936
 
0.932189718732677
 
1.0
0.014403796002467354
 
0.0317896708855436
 
0.046944012890930975


We have some correlated data here. For linear regreesion is a good solution to delete some of them, for other estimators it is not required.

In [21]:
train,test = final_data.randomSplit([0.7,0.3])

<div class="alert alert-block alert-warning"><b>Linear regressor</b></div>

In [23]:
regressor = LinearRegression()

#Now train with cross val
paramGrid = (ParamGridBuilder() \
             .addGrid(regressor.maxIter, [10, 15]) \
             .addGrid(regressor.regParam, [0.1, 0.01]) \
             .build())

evaluator = RegressionEvaluator(metricName="rmse")

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=regressor,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2) # 3 is best practice

print('\033[1m' + "Linear Regression Model Summary WITH cross validation:"+ '\033[0m')
print(" ")
# Run cross validations
fitModel = crossval.fit(train)

# Extract Best model
LR_BestModel = fitModel.bestModel

# Get Model Summary Statistics
# ModelSummary = fitModel.bestModel.summary
ModelSummary = LR_BestModel.summary
print("Coefficient Standard Errors: ")
coeff_ste = ModelSummary.coefficientStandardErrors
result = spark.createDataFrame(zip(input_columns,coeff_ste), schema=['feature','coeff std error'])
print(result.orderBy(result["coeff std error"].desc()).show(truncate=False))
print(" ")
print("P Values: ") 
# Then zip with input_columns list and create a df
pvalues = ModelSummary.pValues
result = spark.createDataFrame(zip(input_columns,pvalues), schema=['feature','P-Value'])
print(result.orderBy(result["P-Value"].desc()).show(truncate=False))
print(" ")

# Use test set here so we can measure the accuracy of our model on new data
ModelPredictions = fitModel.transform(test)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
test_results = evaluator.evaluate(ModelPredictions)
print('RMSE:', test_results)

[1mLinear Regression Model Summary WITH cross validation:[0m
 
Coefficient Standard Errors: 
+--------------+------------------+
|feature       |coeff std error   |
+--------------+------------------+
|households    |5216.668975931509 |
|total_bedrooms|4280.169699530648 |
|population    |2588.08643911679  |
|median_income |1929.0096209205208|
+--------------+------------------+

None
 
P Values: 
+--------------+--------------------+
|feature       |P-Value             |
+--------------+--------------------+
|total_bedrooms|0.015851289930112644|
|population    |0.0                 |
|median_income |0.0                 |
|households    |0.0                 |
+--------------+--------------------+

None
 
RMSE: 81873.67532455365


<div class="alert alert-block alert-warning"><b>Decision Trees</b></div>

In [24]:
regressor = DecisionTreeRegressor()

# Build your parameter grid
paramGrid = (ParamGridBuilder() \
                         .addGrid(regressor.maxDepth, [2, 5, 10, 20, 30]) \
                         .addGrid(regressor.maxBins, [10, 20, 40]) \
                         .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=regressor,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="rmse"),
                          numFolds=2) # 3 is best practice

# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Get Best Model
DT_BestModel = fitModel.bestModel

# FEATURE IMPORTANCES
# Estimate of the importance of each feature.
# Each feature’s importance is the average of its importance across all trees 
# in the ensemble The importance vector is normalized to sum to 1. 
print(" ")
print('\033[1m' + " Feature Importances"+ '\033[0m')
print("(Scores add up to 1)")
print("Lowest score is the least important")
print(" ")
DT_featureImportances = DT_BestModel.featureImportances.toArray()
# Convert from numpy array to list
imp_scores = []
for x in DT_featureImportances:
    imp_scores.append(float(x))
# Then zip with input_columns list and create a df
result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
print(result.orderBy(result["score"].desc()).show(truncate=False))

# Make predictions.
# PySpark will automatically use the best model when you call fitmodel
predictions = fitModel.transform(test)


# And then apply it your predictions dataframe
rmse = evaluator.evaluate(predictions)
print(rmse)

 
[1m Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+--------------+--------------------+
|feature       |score               |
+--------------+--------------------+
|median_income |0.9412743117388724  |
|households    |0.025483948811760478|
|population    |0.025157915442438228|
|total_bedrooms|0.00808382400692876 |
+--------------+--------------------+

None
81443.11956252148


<div class="alert alert-block alert-warning"><b>Random Forest Regressor</b></div>

In [25]:
regressor = RandomForestRegressor()

# Add parameters of your choice here:
paramGrid = (ParamGridBuilder() \
                           .addGrid(regressor.maxDepth, [2, 5, 10])
                           .addGrid(regressor.maxBins, [5, 10, 20])
               .addGrid(regressor.numTrees, [5, 20])
             .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=regressor,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2) # 3 is best practice
# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Get Best Model
RF_BestModel = fitModel.bestModel

# FEATURE IMPORTANCES
# Estimate of the importance of each feature.
# Each feature’s importance is the average of its importance across all trees 
# in the ensemble The importance vector is normalized to sum to 1. 
print(" ")
print('\033[1m' + " Feature Importances"+ '\033[0m')
print("(Scores add up to 1)")
print("Lowest score is the least important")
print(" ")
RF_featureImportances = RF_BestModel.featureImportances.toArray()
# Convert from numpy array to list
imp_scores = []
for x in RF_featureImportances:
    imp_scores.append(float(x))
# Then zip with input_columns list and create a df
result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
print(result.orderBy(result["score"].desc()).show(truncate=False))

# Make predictions.
# PySpark will automatically use the best model when you call fitmodel
predictions = fitModel.transform(test)

# And then apply it your predictions dataframe
rmse = evaluator.evaluate(predictions)
print(rmse)

 
[1m Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+--------------+--------------------+
|feature       |score               |
+--------------+--------------------+
|median_income |0.7625897722359528  |
|population    |0.1128317787702994  |
|households    |0.06492867479303441 |
|total_bedrooms|0.059649774200713365|
+--------------+--------------------+

None
76006.20702902295


<div class="alert alert-block alert-warning"><b>Gradient Boosted Regressor</b></div>

In [26]:
regressor = GBTRegressor()

# Add parameters of your choice here:
paramGrid = (ParamGridBuilder() \
                           .addGrid(regressor.maxDepth, [2, 5, 10])
                           .addGrid(regressor.maxBins, [5, 10, 20])
                           .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=regressor,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2) # 3 is best practice
# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Get Best Model
GBT_BestModel = fitModel.bestModel

# FEATURE IMPORTANCES
# Estimate of the importance of each feature.
# Each feature’s importance is the average of its importance across all trees 
# in the ensemble The importance vector is normalized to sum to 1. 
print(" ")
print('\033[1m' + " Feature Importances"+ '\033[0m')
print("(Scores add up to 1)")
print("Lowest score is the least important")
print(" ")
GBT_featureImportances = GBT_BestModel.featureImportances.toArray()
# Convert from numpy array to list
imp_scores = []
for x in GBT_featureImportances:
    imp_scores.append(float(x))
# Then zip with input_columns list and create a df
result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
print(result.orderBy(result["score"].desc()).show(truncate=False))

# Make predictions.
# PySpark will automatically use the best model when you call fitmodel
predictions = fitModel.transform(test)

# And then apply it your predictions dataframe
rmse = evaluator.evaluate(predictions)
print(rmse)

 
[1m Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+--------------+-------------------+
|feature       |score              |
+--------------+-------------------+
|median_income |0.4432783006540673 |
|population    |0.26028531900622015|
|households    |0.21660222723099073|
|total_bedrooms|0.07983415310872183|
+--------------+-------------------+

None
76193.25100108575


In [28]:
predictions = RF_BestModel.transform(test)
predictions.show()

+--------------------+--------+------------------+
|            features|   label|        prediction|
+--------------------+--------+------------------+
|[1.38629436111989...|250000.0|227978.71826770186|
|[1.60943791243410...|500001.0| 377555.7430468508|
|[1.60943791243410...| 80000.0|178535.08350059908|
|[1.79175946922805...|118800.0|131510.85703766922|
|[1.79175946922805...|137500.0|         459775.85|
|[1.94591014905531...| 67500.0|131510.85703766922|
|[2.07944154167983...|500001.0| 377555.7430468508|
|[2.07944154167983...|162500.0|227978.71826770186|
|[2.19722457733621...| 71300.0|147682.34609985608|
|[2.19722457733621...|450000.0|131510.85703766922|
|[2.19722457733621...|150000.0| 142157.6140057303|
|[2.19722457733621...|225000.0| 377555.7430468508|
|[2.30258509299404...|375000.0|119101.78234203234|
|[2.39789527279837...| 42500.0|119101.78234203234|
|[2.39789527279837...|225000.0|119101.78234203234|
|[2.39789527279837...|137500.0|164527.62629639663|
|[2.48490664978800...| 67500.0|

<div class="alert alert-block alert-danger"><b>BeAware:</b>only fo educational purposes, quite a good reference</div>

***The End***