In [1]:
import warnings
warnings.filterwarnings('ignore')   # to avoid warnings

import pandas as pd
from scipy.stats import norm
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="darkgrid")

# pySpark - ML - Regressions

In [2]:
import findspark
findspark.init("C:\spark\spark-3.0.1-bin-hadoop2.7")
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('./diamonds2.csv')
df

DataFrame[carat: double, cut: string, color: string, clarity: string, depth: double, table: double, price: int, x: double, y: double, z: double]

# Feature engineering

In [3]:
df.show(5)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



In [4]:
categoricalCols = ['cut', 'color', 'clarity']
continuousCols = ['carat', 'depth', 'table', 'x', 'y', 'z']
labelCol = ['price']

# Categorical to numerical pyspark

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)\
                                                    -set(['carat','depth','table', 'price', 'x', 'y', 'z' ])) ]


pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

df_r.show()

+-----+---------+-----+-------+-----+-----+-----+----+----+----+-------------+-----------+---------+
|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|clarity_index|color_index|cut_index|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+-------------+-----------+---------+
| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|          2.0|        1.0|      0.0|
| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|          0.0|        1.0|      1.0|
| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|          3.0|        1.0|      3.0|
| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|          1.0|        5.0|      1.0|
| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|          2.0|        6.0|      3.0|
| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|          4.0|        6.0|      2.0|
| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|          5.0|        5.0| 

In [6]:
# Dropping columns
columns_to_drop = ['cut', 'color', 'clarity']

df_droped = df.drop(*columns_to_drop)

In [7]:
df_droped.show()

+-----+-----+-----+-----+----+----+----+
|carat|depth|table|price|   x|   y|   z|
+-----+-----+-----+-----+----+----+----+
| 0.23| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31| 63.3| 58.0|  335|4.34|4.35|2.75|
| 0.24| 62.8| 57.0|  336|3.94|3.96|2.48|
| 0.24| 62.3| 57.0|  336|3.95|3.98|2.47|
| 0.26| 61.9| 55.0|  337|4.07|4.11|2.53|
| 0.22| 65.1| 61.0|  337|3.87|3.78|2.49|
| 0.23| 59.4| 61.0|  338| 4.0|4.05|2.39|
|  0.3| 64.0| 55.0|  339|4.25|4.28|2.73|
| 0.23| 62.8| 56.0|  340|3.93| 3.9|2.46|
| 0.22| 60.4| 61.0|  342|3.88|3.84|2.33|
| 0.31| 62.2| 54.0|  344|4.35|4.37|2.71|
|  0.2| 60.2| 62.0|  345|3.79|3.75|2.27|
| 0.32| 60.9| 58.0|  345|4.38|4.42|2.68|
|  0.3| 62.0| 54.0|  348|4.31|4.34|2.68|
|  0.3| 63.4| 54.0|  351|4.23|4.29| 2.7|
|  0.3| 63.8| 56.0|  351|4.23|4.26|2.71|
|  0.3| 62.7| 59.0|  351|4.21|4.27|2.66|
+-----+-----+-----+-----+----+----+----+
only showing top

In [8]:
df_droped.printSchema()

root
 |-- carat: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- table: double (nullable = true)
 |-- price: integer (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [9]:
df_droped.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
carat,53940,0.7979397478679852,0.4740112444054196,0.2,5.01
depth,53940,61.74940489432624,1.4326213188336525,43.0,79.0
table,53940,57.45718390804603,2.2344905628213247,43.0,95.0
price,53940,3932.799721913237,3989.439738146397,326,18823
x,53940,5.731157211716609,1.1217607467924915,0.0,10.74
y,53940,5.734525954764462,1.1421346741235616,0.0,58.9
z,53940,3.5387337782723316,0.7056988469499883,0.0,31.8


# Correlation between features and label (price)

In [12]:
import six
for i in df_droped.columns:
    if not( isinstance(df_droped.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to price for ", i, df_droped.stat.corr('price',i))

Correlation to price for  carat 0.9215913011934687
Correlation to price for  depth -0.010647404584155439
Correlation to price for  table 0.12713390212172268
Correlation to price for  price 1.0
Correlation to price for  x 0.8844351610161171
Correlation to price for  y 0.8654208978641909
Correlation to price for  z 0.8612494438514451


In [13]:
df_droped.columns

['carat', 'depth', 'table', 'price', 'x', 'y', 'z']

# Vector assembler

In [14]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['carat', 'depth', 'table', 'x', 'y', 'z'], outputCol = 'features')
vdf_droped = vectorAssembler.transform(df_droped)
vdf_droped = vdf_droped.select(['features', 'price'])
vdf_droped.show(3)

+--------------------+-----+
|            features|price|
+--------------------+-----+
|[0.23,61.5,55.0,3...|  326|
|[0.21,59.8,61.0,3...|  326|
|[0.23,56.9,65.0,4...|  327|
+--------------------+-----+
only showing top 3 rows



# Split dataset (vectorized) into train and test

In [28]:
splits = vdf_droped.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

# Linear regression model

In [16]:
# training model
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='price', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [9737.017406872577,-169.02432337240174,-95.1897208418857,-49.28073820328152,-564.3207055830457,-346.06745754703496]
Intercept: 16813.323272545374


In [17]:
# evaluation
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1500.600623
r2: 0.857431


In [18]:
train_df.describe().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|             37839|
|   mean| 3921.613229736515|
| stddev|3974.2756974345984|
|    min|               326|
|    max|             18823|
+-------+------------------+



In [19]:
# Predictions on test_df
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","price","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="price",metricName="r2")

print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-------------------+-----+--------------------+
|         prediction|price|            features|
+-------------------+-----+--------------------+
|-354.47856273282014|  367|[0.2,59.7,62.0,3....|
|-345.06598784146263|  367|[0.2,59.8,62.0,3....|
|-404.84997765423395|  345|[0.2,60.2,62.0,3....|
| -181.6690701574771|  367|[0.2,61.5,57.0,3....|
| -463.9344833993928|  367|[0.2,61.7,60.0,3....|
+-------------------+-----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.854443


In [20]:
# Model Evaluation on test_df
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 1535.51


In [21]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 11
objectiveHistory: [0.5, 0.41595811977846303, 0.1772436032044164, 0.09628115640565678, 0.0952807389717537, 0.0874152102635148, 0.08098418374048308, 0.11338753634409142, 0.07158281635508934, 0.07149643161193148, 0.07138613274685973]
+------------------+
|         residuals|
+------------------+
|386.17456012872935|
|  85.6303089779467|
|   673.62131497265|
| 887.1735661596431|
|127.59513409366264|
| 540.2399890193483|
|  528.470101027302|
| 624.8295507552284|
| 687.1774591230969|
| 494.2560603229613|
| 502.4318197623361|
| 555.1058170936922|
| 566.7129507429709|
| 846.6477195579246|
|1377.1460140837698|
| 374.7666668661004|
| 236.0947092861643|
|-495.6231367507571|
|114.15930404382743|
| 662.0685870851667|
+------------------+
only showing top 20 rows



In [22]:
# Predictions on test_df
predictions = lr_model.transform(test_df)
predictions.select("prediction","price","features").show()

+-------------------+-----+--------------------+
|         prediction|price|            features|
+-------------------+-----+--------------------+
|-354.47856273282014|  367|[0.2,59.7,62.0,3....|
|-345.06598784146263|  367|[0.2,59.8,62.0,3....|
|-404.84997765423395|  345|[0.2,60.2,62.0,3....|
| -181.6690701574771|  367|[0.2,61.5,57.0,3....|
| -463.9344833993928|  367|[0.2,61.7,60.0,3....|
| -274.9492313846713|  367|[0.2,62.2,57.0,3....|
| -540.8050196713775|  367|[0.2,62.3,60.0,3....|
| -666.2678559660126|  367|[0.2,63.4,59.0,3....|
|  178.8980971217934|  386|[0.21,58.3,59.0,3...|
| -195.4777484612714|  386|[0.21,59.1,62.0,3...|
|-200.33844505727393|  404|[0.22,61.6,58.0,3...|
|  463.9631888693384|  395|[0.23,56.2,60.0,4...|
|  314.1802245495128|  498|[0.23,56.3,62.0,4...|
| 323.84069595617257|  550|[0.23,58.1,59.0,4...|
| -82.67018578036004|  468|[0.23,58.1,63.0,4...|
|   46.7555372045972|  530|[0.23,58.6,61.0,4...|
| -43.50824474649926|  373|[0.23,59.0,61.0,4...|
| -218.3662859313008

# Decision Tree Regression

In [23]:
# Decision Tree Regression 
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'price')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")

rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 1428.97


In [24]:
dt_model.featureImportances

SparseVector(6, {0: 0.7303, 1: 0.0027, 2: 0.0016, 3: 0.0005, 4: 0.2648})

In [25]:
df_droped.take(1)

[Row(carat=0.23, depth=61.5, table=55.0, price=326, x=3.95, y=3.98, z=2.43)]

# Gradient-boosted tree regression

In [26]:
# Gradient-boosted tree regression 
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'price', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'price', 'features').show(5)

+-----------------+-----+--------------------+
|       prediction|price|            features|
+-----------------+-----+--------------------+
|509.3355056342071|  367|[0.2,59.7,62.0,3....|
|509.3355056342071|  367|[0.2,59.8,62.0,3....|
|509.3355056342071|  345|[0.2,60.2,62.0,3....|
|610.1423440788802|  367|[0.2,61.5,57.0,3....|
|610.1423440788802|  367|[0.2,61.7,60.0,3....|
+-----------------+-----+--------------------+
only showing top 5 rows



In [27]:
gbt_evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 1400.35
