## Group 5 Project Regression
In this project, we are going to explore the relationship between features and price of dimonds. Our goal is to predict the price of diamonds using variables that describe the quality of diamonds such as carat, cut and clarity.

In [2]:
#Import Classes
import pandas as pd
import warnings # current version of seaborn generates a bunch of warnings that we'll ignore
warnings.filterwarnings("ignore")
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SQLContext
from statsmodels.graphics.mosaicplot import mosaic
from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import IntegerType 
from pyspark.sql.types import DoubleType 
from pyspark.sql.functions import udf 
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import GBTRegressor
from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import col
import functools 
import numpy as np
import operator

#### 1. Read the Data

In [4]:
sqlContext = SQLContext(sc)
diamonds_df = sqlContext.sql("Select * from diamonds")
#remove the ID variable
diamonds_df = diamonds_df.drop("id")

#### 2. Data Exploration

In [6]:
#Get a glimpse of dataset -  columns etc
display(diamonds_df)

carat,cut,color,clarity,depth,table,price,x,y,z
0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


#### 3. Define Target variable and segregate into categorical and numerical variables

In [8]:
cat_var = [ t[0]  for t in diamonds_df.dtypes if t[1]=='string']
target = 'price'
num_var = list(set(diamonds_df.columns)-set(cat_var)-set([target]))

#### 4. Summarize numerical columns

In [10]:
#for the numeric fields find the min,max, std dev
display(diamonds_df[num_var].describe().toPandas())  
#0's indicate missing values

summary,x,depth,carat,table,y,z
count,53940.0,53940.0,53940.0,53940.0,53940.0,53940.0
mean,5.731157211716609,61.74940489432624,0.7979397478679852,57.45718390804603,5.734525954764462,3.538733778272332
stddev,1.1217607467924915,1.4326213188336523,0.4740112444054196,2.2344905628213247,1.1421346741235616,0.7056988469499883
min,0.0,43.0,0.2,43.0,0.0,0.0
max,10.74,79.0,5.01,95.0,58.9,31.8


In [11]:
# Replace 0's with NAs
get_null_label = udf(lambda x: None if x <= 0 else x, DoubleType())
diamonds_df = diamonds_df.withColumn("x", get_null_label(diamonds_df["x"]))
diamonds_df = diamonds_df.withColumn("y", get_null_label(diamonds_df["y"]))
diamonds_df = diamonds_df.withColumn("z", get_null_label(diamonds_df["z"]))

In [12]:
display(diamonds_df[num_var].describe().toPandas()) 

summary,x,depth,carat,table,y,z
count,53932.0,53940.0,53940.0,53940.0,53933.0,53920.0
mean,5.732007342579431,61.74940489432624,0.7979397478679852,57.45718390804603,5.735270242708454,3.5400463649853404
stddev,1.119669945410266,1.4326213188336523,0.4740112444054196,2.2344905628213247,1.14033861355638,0.7025303439301769
min,3.73,43.0,0.2,43.0,3.68,1.07
max,10.74,79.0,5.01,95.0,58.9,31.8


#### 5. Check the distribution of categorical variables 
####### Hypothesis 1. Colourless Diamonds (code D-F) generally cost more, is this evident from the dataset?

In [14]:
sns.catplot(x='color', y="price", kind="boxen",
            data=diamonds_df.toPandas().sort_values('color'));

In [15]:
sns.countplot(x='color',data=diamonds_df.toPandas())

####### Hypothesis 2. Do flawless diamonds cost more than those with significant inclusions?

In [17]:
sns.catplot(x='clarity', y="price", kind="boxen",
            data=diamonds_df.toPandas().sort_values('clarity'));

In [18]:
sns.countplot(x='clarity',data=diamonds_df.toPandas())

####### Hypothesis 3. Do ideal cuts fetch more price than fair ones?

In [20]:
sns.catplot(x='cut', y="price", kind="boxen",
            data=diamonds_df.toPandas().sort_values('cut'));

In [21]:
sns.countplot(x='cut',data=diamonds_df.toPandas())

###### Traditionally, colorless diamonds (nearer to D) are priced more. But from our dataset , we can see that median prices for faintly coloured diamonds(J) is more than that for colorless. This maybe due to the fact some edgier designs look better with coloured diamonds and can be a recent change in customer preference or there might be other variables which influence the price much more. Similarly IF (internally flawless) diamonds have lesser median price than I1(flawed) diamonds.Again it seems that clarity alone does not influence price. Ideal cuts have the least median price than other cuts in our dataset, indicating that cut alone does not govern price

#### Mosaic plot to check relationships between categorical variables.

In [24]:
fig, axes = plt.subplots(nrows=1, ncols=1,figsize=(30,15))
mosaic(diamonds_df.toPandas(),['cut','color','clarity'],ax=axes, title='Distribution of cut, color, clarity')
 # From the plot below, we can see that there are very less number of fair diamonds in our dataset and that might be the reason why a few of them have influenced the price so much. 

#### 6. Check the distribution and relationships of numerical variables

In [26]:
#We use scatter plot to check relationship and distribution of numerical variables
sns.pairplot(diamonds_df.toPandas(), kind="scatter")
plot.show()

###### Carat seems to have a linear relationship with Price, x and almost exponential with y and z. This seems correct as carat is calculated as weight/0.2 and x, y and z are essentially dimensions of the diamonds which will influence weight.

#### Check correlations

In [29]:
#Check correlations
plt.figure(figsize=(10, 10))
numeric_var = num_var+[target]
correlation = diamonds_df[numeric_var].toPandas().iloc[:, 0:].corr()
sns.heatmap(correlation, vmax=1, annot=True,square=True, cmap="YlGnBu")
#x,y,z, carat and price are hughly correlated. Some need to be removed for linear regression, to reduce multicollinearity

#### 7. Check the number of null variables in the dataset
###### There are few missing values

In [31]:
#Check for nulls for imputation
null_list = [diamonds_df.where(diamonds_df[x].isNull()).count() for x in diamonds_df.columns]
#Check number of null values
null_list
#x,y and z have some missing

#### 8. Create Custom Transformer for Imputation

In [33]:
# impute missing values

class NumericImputer(Transformer, HasInputCol, HasOutputCol):

    def __init__(self, inputCol=None, outputCol=None):
        super(NumericImputer, self).__init__()
        self.setParams(inputCol = inputCol , outputCol = outputCol)

        
    def setParams(self, inputCol=None, outputCol=None):
      return self._set(inputCol = inputCol, outputCol = outputCol)
        

    def _transform(self, dataset):

      out_col = self.getOutputCol()
      in_col = self.getInputCol()
      from pyspark.sql.functions import when  
      median_v = dataset.approxQuantile(in_col, [0.5], 0)[0]
      return dataset.withColumn(out_col, when(col(in_col).isNull(), median_v).otherwise(col(in_col)))

#### 9. Create Custom z-score standardizer

In [35]:
class Standardizer(Transformer, HasInputCol, HasOutputCol):

    def __init__(self, inputCol=None, outputCol=None):
        super(Standardizer, self).__init__()
        self.setParams(inputCol = inputCol , outputCol = outputCol)
       
    def setParams(self, inputCol=None, outputCol=None):
      return self._set(inputCol = inputCol, outputCol = outputCol)
        
    def _transform(self, dataset):
      from pyspark.sql.functions import stddev, mean, col
      out_col = self.getOutputCol()
      in_col = dataset[self.getInputCol()]
      mean, sttdev = dataset.select(mean(in_col), stddev(in_col)).first()
      return dataset.withColumn(out_col, (in_col - mean)/sttdev)   

#### 10. Create ordinal encoder using UDFs for our categorical variable

In [37]:
#For ordinal encoding of categorical variables
def get_cut_labelled(cut):
  if cut=='Ideal':
    return 4
  elif cut=='Premium':
    return 3
  elif cut=='Very Good':
    return 2
  elif cut=='Good':
    return 1
  else :
    return 0
  
def get_color_labelled(color):
  if color=='D':
    return 6
  elif color=='E':
    return 5
  elif color=='F':
    return 4
  elif color=='G':
    return 3
  elif color=='H':
    return 2
  elif color=='I':
    return 1
  else :
    return 0
  
def get_clarity_labelled(clarity):
  if clarity=='IF':
    return 7
  elif clarity=='VVS1':
    return 6
  elif clarity=='VVS2':
    return 5
  elif clarity=='VS1':
    return 4
  elif clarity=='VS2':
    return 3
  elif clarity=='SI1':
    return 2
  elif clarity=='SI2':
    return 1
  else :
    return 0 

In [38]:
# Custom Ordinal Encoder
get_cut_label = udf(get_cut_labelled, IntegerType())
get_color_label = udf(get_color_labelled, IntegerType())
get_clarity_label = udf(get_clarity_labelled, IntegerType())
new_diamonds_df = diamonds_df.withColumn("cut", get_cut_label(diamonds_df["cut"]))
new_diamonds_df = new_diamonds_df.withColumn("color", get_color_label(new_diamonds_df["color"]))
new_diamonds_df = new_diamonds_df.withColumn("clarity", get_clarity_label(new_diamonds_df["clarity"]))
new_diamonds_df.head(10)

#### 11. Divide the data into test(30%) and train (70%)

In [40]:
#divide data into test and train
train, test = new_diamonds_df.randomSplit([0.7, 0.3], seed = 2020)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

#### 13. Decision Tree Regressor

In [42]:
#Building stages for pipeline
input_cols=cat_var+num_var #all are numeric columns now
numericimputers = [NumericImputer(inputCol = column, outputCol = column) for column in num_var]
standardizers = [Standardizer(inputCol = column, outputCol = column) for column in input_cols] #keep only standardized columns in the dataset
assembler = VectorAssembler(inputCols= input_cols, outputCol="features")

In [43]:
#Desicion Tree regressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = target)
stages = []
stages = functools.reduce(operator.concat, [numericimputers,standardizers])
stages.append(assembler)
stages.append(dt)
#Run the spark pipeline
pipeline = Pipeline(stages=stages)
dt_model = pipeline.fit(train)
dt_predictions_test= dt_model.transform(test)
dt_predictions_train= dt_model.transform(train)
rmse_evaluator = RegressionEvaluator(
    labelCol=target, predictionCol="prediction", metricName="rmse")
r2_evaluator = RegressionEvaluator(
    labelCol=target, predictionCol="prediction", metricName="r2")
r2_test_dt =r2_evaluator.evaluate(dt_predictions_test)
rmse_train_dt = rmse_evaluator.evaluate(dt_predictions_train)
rmse_test_dt = rmse_evaluator.evaluate(dt_predictions_test)
#Check performance
print("Decision Tree Model :")
print("R-squared on test data = %g" % r2_test_dt)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse_train_dt)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_test_dt)

In [44]:
#Feature Importance as per decision tree
dt_model.stages[-1].featureImportances 
#Depth , x, cut, carat,y, color are imp featured ranked in order as per the above model

###### As per the decision tree , 'table' is the most significant variable in determining the price followed by y, color, cut, z and clarity respectively

In [46]:
del stages[-1] #Delete the decison tree stage from pipeline to add a new regressor

#### 14. Gradient boosted Trees Regressor

In [48]:
#Gradient Boosted Tree regressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = target, maxIter=10)
stages.append(gbt)
#Run the spark pipeline
pipeline_gbt = Pipeline(stages=stages)
gbt_model = pipeline_gbt.fit(train)
gbt_predictions_test= gbt_model.transform(test)
gbt_predictions_train= gbt_model.transform(train)
rmse_train_gbt = rmse_evaluator.evaluate(gbt_predictions_train)
rmse_test_gbt = rmse_evaluator.evaluate(gbt_predictions_test)
r2_test_gbt =r2_evaluator.evaluate(gbt_predictions_test)
#Check performance
print("Gradient Boosted Tree Model :")
print("R-squared on test data = %g" % r2_test_gbt)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse_train_gbt)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_test_gbt)

#### 15. Linear Regression Model

In [50]:
#For the linear regression model, removing the x,y and z model as they are highly correlated with carat. Amongst these 4 columns, carat shows the highest correlations with price.
input_cols_lr = list(set(input_cols) - set(['x'])-set(['y'])-set(['z']))
#standardize variables
standardizers_lr = [Standardizer(inputCol = column, outputCol = column) for column in input_cols_lr]
assembler_lr = VectorAssembler(inputCols= input_cols_lr, outputCol="features")
lr = LinearRegression(featuresCol = 'features', labelCol = target ,maxIter=10, regParam=0.3, elasticNetParam=0.8)
#Create stages for pipeline
stages_lr = []
stages_lr = functools.reduce(operator.concat, [standardizers_lr])
stages_lr.append(assembler_lr)
stages_lr.append(lr)
stages_lr

In [51]:
#divide data into test and train(for only the selected variables)
train, test = new_diamonds_df[input_cols_lr+[target]].randomSplit([0.7, 0.3], seed = 2020)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [52]:
pipeline_lr = Pipeline(stages=stages_lr)
lr_model = pipeline_lr.fit(train)
lr_predictions_test= lr_model.transform(test)
lr_predictions_train= lr_model.transform(train)
rmse_train_lr = rmse_evaluator.evaluate(lr_predictions_train)
rmse_test_lr = rmse_evaluator.evaluate(lr_predictions_test)
r2_test_lr =r2_evaluator.evaluate(lr_predictions_test)
#Check performance
print("Linear Regression Model :")
print("R-squared on test data = %g" % r2_test_lr)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse_train_lr)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_test_lr)

In [53]:
#Coefficients for LR model
coefficients=pipeline_lr.fit(train).stages[-1].coefficients
coefficients
#coeff for depth,clarity,cut,carat,table,color respectively

In [54]:
#Intercept for LR model
intercept=pipeline_lr.fit(train).stages[-1].intercept
intercept

###### Residual vs Fitted for linear regression model

In [56]:
summary=pipeline_lr.fit(train).stages[-1].summary
# Residual vs. Fitted Plot
sns.set(style="whitegrid")
fig, ax = plt.subplots()
# Make an example dataset with y ~ x
rs = np.random.RandomState(7)
x = np.asarray(lr_predictions_train.select('prediction').toPandas())
x = x.reshape(x.shape[0])
y = np.asarray(summary.residuals.toPandas())
y = y.reshape(y.shape[0])
# Plot the residuals after fitting a linear model
sns.residplot(x, y, lowess=True, color="g")
ax.set_title('residual vs. fitted')
ax.set(xlabel='fitted', ylabel='residuals')
display(fig)