#Modeling Diamond Data using Linear Regression to Predict Diamond Price using PySpark

In [1]:
#importing a few necessary packages
!pip install plotly
!conda install -c conda-forge matplotlib==3.3.1

/bin/bash: conda: command not found


In [None]:
# Do not delete or change this cell

# grading import statements
%matplotlib inline
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
import os

# Define a function to determine if we are running on data bricks
# Return true if running in the data bricks environment, false otherwise
def is_databricks():
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
# Correct Usage Example (pass ONLY the full file name):
#   file_name_to_load = get_training_filename("sms_spam.csv") # correct - pass ONLY the full file name  
#   
# Incorrect Usage Example
#   file_name_to_load = get_training_filename("/sms_spam.csv") # incorrect - pass ONLY the full file name
#   file_name_to_load = get_training_filename("sms_spam.csv/") # incorrect - pass ONLY the full file name
#   file_name_to_load = get_training_filename("c:/users/will/data/sms_spam.csv") incorrect -pass ONLY the full file name
def get_training_filename(data_file_name):    
    # if running on data bricks
    if is_databricks():
        # build the full path file name assuming data brick env
        full_path_name = "dbfs:/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

# Diamonds Data
If you have ever had an interest in diamonds then this homework is for you!  This homework assignment will use a diamonds dataset to explore spark pipelines, linear regression, feature transformation, model scoring, inference, and feature selection.

The diamonds.csv data set contains 10 columns:
- carat: Carat weight of the diamond
- cut: Describes cut quality of the diamond. Quality in increasing order Fair, Good, Very Good, Premium, Ideal
- color: Color of the diamond, with D being the best and J the worst
- clarity: How obvious inclusions are within the diamond:(in order from best to worst, FL = flawless, I3= level 3 inclusions) FL,IF, VVS1, etc.  See this web site for an exhaustive ranking of [clarity](https://4cs.gia.edu/en-us/diamond-clarity/?gclid=Cj0KCQjwnqH7BRDdARIsACTSAduMoc2KQbXkO94BxCfBNC5X8YyjAYcFpWThKQMW46cQj_3p0pZ0o84aAuagEALw_wcB).  The web site has a nice sliding scale you can drag to see the relationship between clarity grades.
- depth: depth % - The height of a diamond, measured from the culet to the table, divided by its average girdle diameter
- table: table% -  The width of the diamond's table expressed as a percentage of its average diameter
- price: The price of the diamond
- x: Length (mm)
- y: Width (mm)
- z: Height (mm)

# Reading in our data
Read the diamonds.csv file into a spark data frame named `diamonds_df`.

In [None]:
# Your code here
diamonds_df = spark.read.format("csv").option("header", "true").load(get_training_filename('diamonds.csv'))
diamonds_df = diamonds_df.drop('_c0')

In [None]:
# Grading Cell - do not modify
display(diamonds_df.toPandas().head())

carat,cut,color,clarity,depth,table,price,x,y,z
0.23,Ideal,E,SI2,61.5,55,326,3.95,3.98,2.43
0.21,Premium,E,SI1,59.8,61,326,3.89,3.84,2.31
0.23,Good,E,VS1,56.9,65,327,4.05,4.07,2.31
0.29,Premium,I,VS2,62.4,58,334,4.2,4.23,2.63
0.31,Good,J,SI2,63.3,58,335,4.34,4.35,2.75


# Statistically Summarizing the Data
Investigate the diamond data.  Create a pair plot on the real data columns which sumarizes the data. In addition to the pair plot, use a spark dataframe built in function to provide a statistical summary of the data.  Provide a written summary of the pair plot and statistical summary observations.

In [None]:
#spark dataframe built-in function for staistical summary of data
cols = ['carat','depth','table','price','x','y','z']
df = diamonds_df.select(cols)
df.describe().show()
#Wow! the min value for x, y and z is 0 which can't be possible! I will drop these rows 

In [None]:
#recasting the datatypes in our diamonds_df for compatability/descriptive statistics with our linear regression model
#recasting the datatypes of some of our columns to be compatible with linear regression model. Categorical variables such as color, cut and clarity can remain as strings given that they will be one hot encoded in the following steps
diamonds_df = diamonds_df.selectExpr("cast(carat as float) carat",
     "cast(depth as float) depth",
     "cast(table as float) table",
     "cast(x as float) x",
      "cast(y as float) y",
      "cast(z as float) z",
      "cast(price as float) price",
      "cast(cut as string) cut",
      "cast(color as string) color",
      "cast(clarity as string) clarity")                                              
diamonds_df.printSchema()
diamonds_df.show(truncate=False)

In [None]:
#dropping rows where any of the dimensional values correspond to 0. This must be an error resulting from the data collection process. 
diamonds_df=diamonds_df.where("x!=0")
diamonds_df=diamonds_df.where("z!=0")
diamonds_df.describe().show()

In [None]:
#correlation matrix
df = diamonds_df.toPandas()
df.corr()

Unnamed: 0,carat,depth,table,x,y,z,price
carat,1.0,0.028259,0.181646,0.977779,0.953991,0.961048,0.921592
depth,0.028259,1.0,-0.295733,-0.025017,-0.029069,0.095023,-0.010729
table,0.181646,-0.295733,1.0,0.196097,0.184493,0.152483,0.127245
x,0.977779,-0.025017,0.196097,1.0,0.974918,0.975435,0.887231
y,0.953991,-0.029069,0.184493,0.974918,1.0,0.956744,0.867864
z,0.961048,0.095023,0.152483,0.975435,0.956744,1.0,0.868206
price,0.921592,-0.010729,0.127245,0.887231,0.867864,0.868206,1.0


In [None]:
#import seaborn as sns
numeric_features = [i[0] for i in diamonds_df.dtypes if i[1] == 'float']
pplt_df = diamonds_df.select(numeric_features).toPandas()
#selecting the 'real' colums we would like to include in our subsetted dataframe for our pairplot
import plotly.express as px
import plotly.graph_objects as go

fig = go.Figure(data=go.Splom(
  dimensions = [dict(label = 'price',
                    values = pplt_df['price']),
                dict(label = 'carat',
                   values = pplt_df['carat']),
               dict(label = 'depth',
                   values=pplt_df['depth']),
               dict(label='table',
                   values=pplt_df['table']),
               dict(label='x',
                   values=pplt_df['x']),
               dict(label='y',
               values=pplt_df['y']),
                dict(label='z',
                    values=pplt_df['z'])],
                   showupperhalf=False,
                   marker=dict(line_color='green',line_width=0.24,size=3)))

# Add images
fig.update_layout(
title= 'Pairplot of Numeric Variables in Diamond Dataframe',
width=1200, height= 1100,
hovermode='closest',
template='plotly_dark',
paper_bgcolor='black',
title_font=dict(size=18))
fig.show()

In [None]:
#variables with numeric values
numeric_features = [i[0] for i in diamonds_df.dtypes if i[1] == 'float']
diamonds_df.select(numeric_features).show(5)

Your summary explanation here:  
Carat seems to have a strong positive correlation to diamond price. Carats simply measure the weight of a diamond so the heavier it is, the more rare it is and scarcity seems to drive price in this business. This is to say that carat has the strongest impact on diamond price. 
The same can be said for columns x,y and z which correspond to a diamond's length, width and height respectively. 
X, y and z would comprise the size of the diamond which is why it would be erroneous to assume that a heavier stone (high carat) is simply a larger stone. While these attributes all have strong positive correlations to one another, it's still important to keep the distinction in mind. 

Depth is its Height (Z column but in millimeters) measured from the Culet to the Table.
A small depth percentage generally means the stone will have a much darker appearence which could affect our categorical column: color. But at any rate price seems to show a lot of variation for diamonds of similar depths. 
Thus depth seems to be inversely related to price

Table (percentage) is the Width (y column) of the Diamond's Table divided by its diameter. A large table value will mean that light will be able to reflect off of it's angles making it more appealing. Therefore it appears to have a slightly negative correlation to price but diamonds with the same table percentage may still vary widely with price.

##### Grading Feedback Cell

# Converting Categorical data to Numeric via OHE
There are one or more columns of data in diamonds_df which are not in an appropriate format for performing linear regression.  Perform feature engineering on all columns which are not in a format which is ready for use in a linear regression model.  Create a new data frame named `diamonds_df_xformed` which contains the same number of columns and the same column names as diamonds_df.  Transform diamonds_df such that diamonds_df_xformed is ready to feed into a linear regression model for training.  diamonds_df_xformed Shall contain new feature engineered columns that can be directly used in linear regression.  Encapsulate your feature engineering / transformation code into a spark pipeline named feature_engineering_pipe.  Describe in words all columns on which you performed feature engineering.  Include the specific transformation algorighm used, and your reasoning for using the chosen algorithm.  We expect to see an explanation for each and every column on which you perform feature engineering.

In [None]:
display(diamonds_df)

carat,depth,table,x,y,z,price,cut,color,clarity
0.23,61.5,55.0,3.95,3.98,2.43,326.0,Ideal,E,SI2
0.21,59.8,61.0,3.89,3.84,2.31,326.0,Premium,E,SI1
0.23,56.9,65.0,4.05,4.07,2.31,327.0,Good,E,VS1
0.29,62.4,58.0,4.2,4.23,2.63,334.0,Premium,I,VS2
0.31,63.3,58.0,4.34,4.35,2.75,335.0,Good,J,SI2
0.24,62.8,57.0,3.94,3.96,2.48,336.0,Very Good,J,VVS2
0.24,62.3,57.0,3.95,3.98,2.47,336.0,Very Good,I,VVS1
0.26,61.9,55.0,4.07,4.11,2.53,337.0,Very Good,H,SI1
0.22,65.1,61.0,3.87,3.78,2.49,337.0,Fair,E,VS2
0.23,59.4,61.0,4.0,4.05,2.39,338.0,Very Good,H,VS1


In [None]:
# your code her
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexerModel, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import when, col
#ordering cut from worst to best
diamonds_df_xformed = diamonds_df
#using stringindexermodel to manually order our categorical variables according to their respective ranks
cu = StringIndexerModel.from_labels(['Fair','Good','Very Good','Premium','Ideal'], inputCol="cut",outputCol="ordered_cut")
co = StringIndexerModel.from_labels(['J','I','H','G','F','E','D'], inputCol="color",outputCol="ordered_color")
cl = StringIndexerModel.from_labels(['I1','SI2','SI1','VS2','VS1','VVS2','VVS1','IF'], inputCol ="clarity",outputCol = "ordered_clarity")
#creating our feature engineering pipeline that will add each of these attributes and then fit them on the rest of our dataframe
feature_engineering_pipe = Pipeline(stages=[cu,co,cl])
#model = feature_engineering_pipe.fit(diamonds_df_xformed)
#fit transforming this pipeline on data frame and dropping original string formatted categorical attributes
diamonds_df_xformed = feature_engineering_pipe.fit(diamonds_df_xformed).transform(diamonds_df_xformed)
diamonds_df_xformed=diamonds_df_xformed.drop('cut','color','clarity')
#renaming these variables so our new dataframe will have the same column names as the original
diamonds_df_xformed= diamonds_df_xformed.withColumnRenamed('ordered_cut','cut')
diamonds_df_xformed= diamonds_df_xformed.withColumnRenamed('ordered_color','color')
diamonds_df_xformed= diamonds_df_xformed.withColumnRenamed('ordered_clarity','clarity')


display(diamonds_df_xformed)

# df = df.withColumn("cut",
#        when(col("cut") == "Fair" , 0)
#       .when(col("cut") == "Good" , 1)
#       .when(col("cut") == "Ideal" , 2)
#       .when(col("cut") == "Premium" , 3)
#       .otherwise(4))
# display(df)
# #ordering color from worst to best
# df = df.withColumn("color",
#        when(col("color") == "J" , 0)
#       .when(col("color") == "I" , 1)
#       .when(col("color") == "H" , 2)
#       .when(col("color") == "G" , 3)
#       .when(col("color") == "F" , 4)
#       .when(col("color") == "E" , 5)
#       .otherwise(6))

# #ordering clarity from worst to best
# df = df.withColumn("clarity",
#        when(col("clarity") == "I1" , 0)
#       .when(col("clarity") == "SI2" , 1)
#       .when(col("clarity") == "SI1" , 2)
#       .when(col("clarity") == "VS2" , 3)
#       .when(col("clarity") == "VS1" , 4)
#       .when(col("clarity") == "VVS2" , 5)
#       .when(col("clarity")=="VVS1", 6)
#       .otherwise(7))
# display(df)

#columns= ['cut', 'clarity','color','carat','depth','table','x','y','z']

carat,depth,table,x,y,z,price,cut,color,clarity
0.23,61.5,55.0,3.95,3.98,2.43,326.0,4.0,5.0,1.0
0.21,59.8,61.0,3.89,3.84,2.31,326.0,3.0,5.0,2.0
0.23,56.9,65.0,4.05,4.07,2.31,327.0,1.0,5.0,4.0
0.29,62.4,58.0,4.2,4.23,2.63,334.0,3.0,1.0,3.0
0.31,63.3,58.0,4.34,4.35,2.75,335.0,1.0,0.0,1.0
0.24,62.8,57.0,3.94,3.96,2.48,336.0,2.0,0.0,5.0
0.24,62.3,57.0,3.95,3.98,2.47,336.0,2.0,1.0,6.0
0.26,61.9,55.0,4.07,4.11,2.53,337.0,2.0,2.0,2.0
0.22,65.1,61.0,3.87,3.78,2.49,337.0,0.0,5.0,3.0
0.23,59.4,61.0,4.0,4.05,2.39,338.0,2.0,2.0,4.0


In [None]:
#here we recast our datatypes for homogeneity in our final diamonds_df_xformed data frame
diamonds_df_xformed = diamonds_df_xformed.selectExpr("cast(carat as float) carat",
     "cast(depth as float) depth",
     "cast(table as float) table",
     "cast(x as float) x",
      "cast(y as float) y",
      "cast(z as float) z",
      "cast(price as float) price",
      "cast(cut as float) cut",
      "cast(color as float) color",
      "cast(clarity as float) clarity")

In [None]:
# Grading Cell do not modify
display(diamonds_df_xformed.toPandas().head())

carat,depth,table,x,y,z,price,cut,color,clarity
0.23,61.5,55.0,3.95,3.98,2.43,326.0,4.0,5.0,1.0
0.21,59.8,61.0,3.89,3.84,2.31,326.0,3.0,5.0,2.0
0.23,56.9,65.0,4.05,4.07,2.31,327.0,1.0,5.0,4.0
0.29,62.4,58.0,4.2,4.23,2.63,334.0,3.0,1.0,3.0
0.31,63.3,58.0,4.34,4.35,2.75,335.0,1.0,0.0,1.0


Your explanation here: the string indexer was used initially however, since it randomly assigns values to our categorical ordinal variables (cut color clarity) I decided to manually encode them in the order they should be represented. 
Premium Cut on Diamonds are the most Expensive, followed by Excellent / Very Good Cut suggesting that this variable is indeed ordinal. 
The Color of a Diamond refers to the Tone and Saturation of Color, or the Depth of Color in a stone.
The Color of a Diamond can Range from Colorless to a Yellow or a Faint Brownish Colored hue.
Colorless Diamonds are Rarer and more therefore, pricier than tainted diamonds suggesting that this isn't an inherently categorical nominal variable.
It seems that VS1 and VS2 affect the Diamond's Price equally having quite high Price margin.
A Premium cut diamond is generally more expensive than an ideal stone. A stone with a color of E tends to have less mineral impurities than a diamond labeled J therefore making E diamonds more expensive. The same argument could be made for the clarity of a stone.
clarity column (categorical nominal according to price but categorical ordinal in that VS2 has less mineral blemishes than an I3 for instance)

# Implementing our diamond pipeline for performing Linear Regression on the Cleaned Dataframe
Create a new pipeline named `diamond_pipe`.  diamond_pipe Shall contain a spark linear regression object (at a minimum) with default parameters.  Train and test diamond_pipe on the diamonds_df_xformed data frame using price as the target.  Use a spark evaluator object to score the linear regression model using mean squared error.  Print the train and test mean squared error results.

In [None]:
#convert categorical variables to binary represenatation by one-hot encoding
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
#list of categorical variables in the dataframe
categorical_vars = ['cut','color','clarity']
OH_cols =  ['cut_encoded','color_encoded','clarity_encoded']
# Then we'll create a One hot encoding each of our categorical features and fitting them to the dataframe
encoder = OneHotEncoder(inputCols=categorical_vars, outputCols=OH_cols)
diamonds_df_xformed = encoder.fit(diamonds_df_xformed).transform(diamonds_df_xformed)
# #Then we'll do the same thing for our numeric variables and use these variables as input for our vector assembler. Finally we'll output a features column. 
#diamonds_df_xformed.show(5) 
# Next, we utilize a VectorAssembler that combines a list of columns into a single vector column to convert all the independent variables into a single features vector. This is the only input spark can handle for Machine Learning models in the case of Linear Regression. 
#list of numeric variables
numeric_vars = ['carat', 'depth', 'table', 'x', 'y', 'z']
#categorical variables + numeric variables = features.
ind_cols = ['cut','color','clarity']
assembler_input = ind_cols + OH_cols + numeric_vars
assembler = VectorAssembler(inputCols=assembler_input, outputCol="features")

In [None]:
from pyspark.ml import feature, regression, evaluation, Pipeline
#splitting our data so that 70% of it will be used for training our model, while the remaining 30% will be used to test our model
training_df, testing_df = diamonds_df_xformed.randomSplit([0.7, 0.3])
#creating our linear regressor as a step in one of our pipeline's stages. Price would correspond to the model's label 
lr = regression.LinearRegression(featuresCol='features', labelCol='price')
#creating our diamond_pipe and fitting it to the training_df
diamond_pipe = Pipeline(stages=[assembler, lr]).fit(training_df)
#transforming our training_df and viewing a scatter plot (with databricks plot option button) to see how well price and prediction align
pred_tr = diamond_pipe.transform(training_df)
display(pred_tr)

carat,depth,table,x,y,z,price,cut,color,clarity,cut_encoded,color_encoded,clarity_encoded,features,prediction
0.2,59.0,60.0,3.81,3.78,2.24,367.0,3.0,5.0,3.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(5), List(1.0))","List(0, 7, List(3), List(1.0))","List(0, 26, List(0, 1, 2, 6, 12, 16, 20, 21, 22, 23, 24, 25), List(3.0, 5.0, 3.0, 1.0, 1.0, 1.0, 0.20000000298023224, 59.0, 60.0, 3.809999942779541, 3.7799999713897705, 2.240000009536743))",62.42528030955873
0.2,59.7,62.0,3.84,3.8,2.28,367.0,3.0,5.0,3.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(5), List(1.0))","List(0, 7, List(3), List(1.0))","List(0, 26, List(0, 1, 2, 6, 12, 16, 20, 21, 22, 23, 24, 25), List(3.0, 5.0, 3.0, 1.0, 1.0, 1.0, 0.20000000298023224, 59.70000076293945, 62.0, 3.8399999141693115, 3.799999952316284, 2.2799999713897705))",-67.0224319140143
0.2,59.8,62.0,3.79,3.77,2.26,367.0,3.0,5.0,3.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(5), List(1.0))","List(0, 7, List(3), List(1.0))","List(0, 26, List(0, 1, 2, 6, 12, 16, 20, 21, 22, 23, 24, 25), List(3.0, 5.0, 3.0, 1.0, 1.0, 1.0, 0.20000000298023224, 59.79999923706055, 62.0, 3.7899999618530273, 3.7699999809265137, 2.259999990463257))",-14.737817210851972
0.2,60.2,62.0,3.79,3.75,2.27,345.0,3.0,5.0,1.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(5), List(1.0))","List(0, 7, List(1), List(1.0))","List(0, 26, List(0, 1, 2, 6, 12, 14, 20, 21, 22, 23, 24, 25), List(3.0, 5.0, 1.0, 1.0, 1.0, 1.0, 0.20000000298023224, 60.20000076293945, 62.0, 3.7899999618530273, 3.75, 2.2699999809265137))",-1617.6041603193644
0.2,61.1,59.0,3.81,3.78,2.32,367.0,3.0,5.0,3.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(5), List(1.0))","List(0, 7, List(3), List(1.0))","List(0, 26, List(0, 1, 2, 6, 12, 16, 20, 21, 22, 23, 24, 25), List(3.0, 5.0, 3.0, 1.0, 1.0, 1.0, 0.20000000298023224, 61.099998474121094, 59.0, 3.809999942779541, 3.7799999713897705, 2.319999933242798))",-51.0293683292748
0.2,61.7,60.0,3.77,3.72,2.31,367.0,3.0,6.0,3.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(), List())","List(0, 7, List(3), List(1.0))","List(0, 26, List(0, 1, 2, 6, 16, 20, 21, 22, 23, 24, 25), List(3.0, 6.0, 3.0, 1.0, 1.0, 0.20000000298023224, 61.70000076293945, 60.0, 3.7699999809265137, 3.7200000286102295, 2.309999942779541))",146.04678450891697
0.2,62.2,57.0,3.76,3.73,2.33,367.0,4.0,5.0,3.0,"List(0, 4, List(), List())","List(0, 6, List(5), List(1.0))","List(0, 7, List(3), List(1.0))","List(0, 26, List(0, 1, 2, 12, 16, 20, 21, 22, 23, 24, 25), List(4.0, 5.0, 3.0, 1.0, 1.0, 0.20000000298023224, 62.20000076293945, 57.0, 3.759999990463257, 3.7300000190734863, 2.3299999237060547))",42.36251540877038
0.2,62.3,60.0,3.73,3.68,2.31,367.0,3.0,6.0,3.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(), List())","List(0, 7, List(3), List(1.0))","List(0, 26, List(0, 1, 2, 6, 16, 20, 21, 22, 23, 24, 25), List(3.0, 6.0, 3.0, 1.0, 1.0, 0.20000000298023224, 62.29999923706055, 60.0, 3.7300000190734863, 3.680000066757202, 2.309999942779541))",152.44033903463878
0.2,62.6,59.0,3.73,3.71,2.33,367.0,3.0,4.0,3.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 7, List(3), List(1.0))","List(0, 26, List(0, 1, 2, 6, 11, 16, 20, 21, 22, 23, 24, 25), List(3.0, 4.0, 3.0, 1.0, 1.0, 1.0, 0.20000000298023224, 62.599998474121094, 59.0, 3.7300000190734863, 3.7100000381469727, 2.3299999237060547))",-112.66249263603368
0.2,63.4,59.0,3.74,3.71,2.36,367.0,2.0,5.0,3.0,"List(0, 4, List(2), List(1.0))","List(0, 6, List(5), List(1.0))","List(0, 7, List(3), List(1.0))","List(0, 26, List(0, 1, 2, 5, 12, 16, 20, 21, 22, 23, 24, 25), List(2.0, 5.0, 3.0, 1.0, 1.0, 1.0, 0.20000000298023224, 63.400001525878906, 59.0, 3.740000009536743, 3.7100000381469727, 2.359999895095825))",-171.9714151091248


In [None]:
display(pred_tr.select('price', 'prediction'))

price,prediction
367.0,62.42528030955873
367.0,-67.0224319140143
367.0,-14.737817210851972
345.0,-1617.6041603193644
367.0,-51.0293683292748
367.0,146.04678450891697
367.0,42.36251540877038
367.0,152.44033903463878
367.0,-112.66249263603368
367.0,-171.9714151091248


In [None]:
#Generating predictions for our testing data 
pred_test = diamond_pipe.transform(testing_df)
display(pred_test.select('price', 'prediction'))

price,prediction
367.0,140.8326914602485
367.0,246.38713447104877
386.0,247.3551367641376
386.0,235.64785308212777
394.0,-1523.8230384962271
404.0,326.65404743924137
404.0,32.715597369325224
470.0,-650.3272335736847
337.0,-955.5403428379304
327.0,255.7675300333167


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
#finding the mean squared error of our models
#class code
from pyspark.sql import functions as fn
mse = fn.mean((fn.col('price')-fn.col('prediction'))**2).alias('mse')
#storing mse for training
mse_tra = pred_tr.select(mse).collect()
#doing the same for testing 
mse_test = pred_test.select(mse).collect()
#outputting our final results
print("Mean Squared Error of Training Dataframe: ", round((mse_tra[0][0]),2))
print("Mean Squared Error of Testing Dataframe: ", round((mse_test[0][0]),2))

In [None]:
#for good measure, I have done the same for the correlation between price and prediction in our testing and training dataframe
corr_tra = pred_tr.select(fn.corr('price', 'prediction')).collect()
print("Correlation between price and prediction in Training Dataframe: ", round((corr_tra[0][0]),4))
corr_test = pred_test.select(fn.corr('price', 'prediction')).collect()
print("Correlation between price and prediction in Testing Dataframe: ", round((corr_test[0][0]),4))

# Using K-Folds Cross Validation to Score our Linear Regressor Model
Repeat the previous section only this time score the model using 3-fold cross validation using an empty parameter grid. Print the resulting score from 3 fold cross validation.  Briefly explain how 3 fold cross validation is different than the scoring technique than used in the last section.

In [None]:
# your code here
#q5_df = diamonds_df_xformed
training_df, testing_df = diamonds_df_xformed.randomSplit([0.7, 0.3])
from pyspark.ml.regression import LinearRegression as lr
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# #creating our linear regression as a step in one of our pipeline's stages. Price would correspond to the models label in this case
lr = regression.LinearRegression(featuresCol='features', labelCol='price')
assembler = VectorAssembler(inputCols=assembler_input, outputCol="features")
#creating our diamond_pipe
diamond_pipe = Pipeline(stages=[assembler, lr])
#creating our linear regression as a step in one of our pipeline's stages. Price would correspond to the models label in this case
modelEvaluator = RegressionEvaluator(metricName='mse',labelCol='price',predictionCol='prediction')
param_grid = ParamGridBuilder().build()
crossval = CrossValidator(estimator=diamond_pipe,
                      estimatorParamMaps=param_grid,
                      evaluator= modelEvaluator, 
                      numFolds=3)
#new_df = new_df.drop('features')
cvModel = crossval.fit(training_df)
cv_pred = cvModel.transform(testing_df)
#cvModel.bestModel.summary
#bestModel.summary 

In [None]:
cv_best = cvModel.bestModel
#Obtaining the Mean Squared Error of our Cross Validated model 
cv_mse_tr = modelEvaluator.evaluate(cv_best.transform(training_df))
cv_mse_te = modelEvaluator.evaluate(cv_best.transform(testing_df))

print("Mean Squared Error of Cross Validated Model on Training Data: ", round((cv_mse_tr),2))
print("Mean Squared Error of Cross Validated Model on Testing Data: ", round((cv_mse_te),2))

Your explanation here:
3-fold cross validation is a form of model selection that splits our data into a set of non-overlapping randomly separated partitions or 'folds'. These folds are  are used as separate training and test datasets so in our case: 3 (training, test) dataset pairs are generated; each of which uses 2/3 of the diamonds data for training and 1/3 for testing. Each fold is used as the test set exactly once. Following this, the scores are averged and returned. This can help us in telling if our model is overfit meaning it may have memorized the training data. This is a more accurate representation for the true Mean Squared Error of our Linear Regressor. Our score is slightly higher than our training MSE in Q4 and slightly lower than the test MSE, indicating that we don't have too much of a discrepancy between these two models.

#Performing Inference Analysis to Determine the Order of the Most Important Variables for Predicting Diamond Price
In this question you will use inference to determine the most important predictor order.  Create a new pipeline named inference_pipe which encapsulates a standard scalar and a linear regression object (at a minimum).  After fitting the pipe, create a pandas data frame from the fitted regression model coefficients named `coefficients_df`.  coefficients_df Shall have 2 columns named `coefficient` and `value`.  The coefficient column shall contain the coefficient names and the value column shall contain the regression model coefficient absolute values.  Provide a written explanation of what the results mean.  Include a description of why we are using the standard scalar object, and how the standard scalar object allows us to perform the specified inference.

In [None]:
#your code here
cols = ['carat','depth','table','x','y','z','price','cut','color','clarity']
coefficients_starting_df = diamonds_df_xformed.select(cols)

In [None]:
from pyspark.ml.feature import StandardScaler
from pyspark.sql import functions as fn
from pyspark.ml.feature import VectorAssembler
#definining features that will be used by our vector assembler
features = ('carat','depth','table','x','y','z','cut','color','clarity') 
#splitting the training and testing data 
training_df, testing_df = coefficients_starting_df.randomSplit([0.7, 0.3])
assembler = VectorAssembler(inputCols=features,outputCol="features")
coefficients_starting_df = assembler.transform(training_df)
#running standard scaler on our eatures column
standardscaler=StandardScaler(withMean=True,withStd=True, inputCol = 'features', outputCol = 'scaled_features')
lr = regression.LinearRegression(featuresCol='scaled_features', labelCol='price')

#creating our inference pipeline using standard scallar
inference_pipe = Pipeline(stages=[assembler, standardscaler, lr])
scaled_model = inference_pipe.fit(training_df)
coef_df = scaled_model.transform(testing_df)
display(coef_df.select('scaled_features'))

scaled_features
"List(1, 9, List(), List(-1.2669923099594076, -1.9114464366569714, 1.1251049312267463, -1.7222852998211844, -1.702675442106566, -1.8429712956015223, 0.08729686954387739, 0.9373205397246063, -0.025487386802967037))"
"List(1, 9, List(), List(-1.2669923099594076, 0.31270571707095857, -0.20929961869264674, -1.7669668498742999, -1.7460939704035936, -1.7157637936244667, 0.9806198797101384, 0.9373205397246063, -0.025487386802967037))"
"List(1, 9, List(), List(-1.2669923099594076, 0.38220939474559595, 1.1251049312267463, -1.7937757799061693, -1.7895124987006215, -1.7440321273971457, 0.08729686954387739, 1.5253920266045637, -0.025487386802967037))"
"List(1, 9, List(), List(-1.245872144116543, -0.8688753631653182, 0.6803034145869487, -1.6686674397574457, -1.6592569138095383, -1.7157637936244667, 0.08729686954387739, 0.9373205397246063, -0.025487386802967037))"
"List(1, 9, List(), List(-1.2247519468021488, -1.7029327522377107, 2.0147079645063415, -1.6329219866565168, -1.6158381784766367, -1.7440321273971457, 0.08729686954387739, 1.5253920266045637, -0.025487386802967037))"
"List(1, 9, List(), List(-1.2247519468021488, -0.03481797409292585, 1.1251049312267463, -1.6418582966671398, -1.6418895024907272, -1.63095845532131, 0.08729686954387739, 0.3492490528446489, -0.6330732831673822))"
"List(1, 9, List(), List(-1.2036317494877546, -3.788074899221017, 2.0147079645063415, -1.4631320964546777, -1.4161127412744354, -1.7440321273971457, -1.6993491507886447, 0.9373205397246063, 1.7972703022902785))"
"List(1, 9, List(), List(-1.2036317494877546, -2.536990141310103, 2.459509481146139, -1.5167499565184164, -1.442164065288526, -1.6733612929654482, -0.8060261406223836, 0.3492490528446489, 1.1896844059258633))"
"List(1, 9, List(), List(-1.2036317494877546, -1.9809501143316088, 1.1251049312267463, -1.5256860534706027, -1.555052238860798, -1.6874954598517877, -0.8060261406223836, 1.5253920266045637, 1.1896844059258633))"
"List(1, 9, List(), List(-1.2036317494877546, -1.7029327522377107, 1.1251049312267463, -1.5524951965609086, -1.424796653969715, -1.6168242884349704, -0.8060261406223836, 0.9373205397246063, 1.1896844059258633))"


In [None]:
# printing the MSE of our scaled linear regressor on the testing data
coef_mse = coef_df.select(mse).collect()
print("Mean Squared Error of Linear Regression model after being scaled via Standard Scaler: ", round((coef_mse[0][0]),2))

In [None]:
#-1 corresponds to the features within our dataframe
coef = scaled_model.stages[-1].coefficients
coef_new = coef.tolist()
#putting our coefficients into a list
#mapping these values to be of float data type
coef_new = map(float,coef_new)
#creating an empty list that will store tuples corresponding to each feature and it's coefficient value
c_df = []
columns = ['carat','depth','table','x','y','z','cut','color','clarity']
#writing a for lopp that appends coefficients with their corresponding features
for col, coe in zip(columns, coef_new):
  c_df.append((col,coe))

  
#importing a few packages in order to define a schema for our pySpark df
from pyspark.sql.types import StructType, StructField, StringType, FloatType

#coefficient will hold string data types while values will be holding floats
schema =StructType([StructField('coefficient', StringType(), True),
          StructField('value', FloatType(), True) ])

#creating out final coefficient dataframe 
coefficients_df = spark.createDataFrame(c_df, schema)
coefficients_df = coefficients_df.toPandas()
coefficients_df['value'] = coefficients_df['value'].abs()
coefficients_df = coefficients_df.sort_values(by='value',ascending=False)
coefficients_df

Unnamed: 0,coefficient,value
0,carat,5243.505371
3,x,1080.706787
8,clarity,817.841003
7,color,550.851868
6,cut,140.775497
1,depth,118.887794
5,z,67.411713
2,table,57.913914
4,y,38.163742


In [None]:
# Grading cell do not modify
display(coefficients_df)

coefficient,value
carat,5243.5054
x,1080.7068
clarity,817.841
color,550.85187
cut,140.7755
depth,118.887794
z,67.41171
table,57.913914
y,38.163742


Your explanation here: Differences in the units across our diamond input variables (weight, mm, $, percent) make it more difficult to accurately model price predictions. Smaller input values for certain features such as carat can result in a model that learns tiny weight values while large inputs may have the opposite effect. Larger thetas or weights can lead to poor performance during training which means our model won't be able to generalize very well. This is where the StandardScaler object comes into play. This object will normalize the data by substracting from each input the mean and dividing by it's standard deviation. 

These coefficients confirm that carat does have the largest associated weight in predicting price. This is followed by length, clarity and color. Interestingly enough width and height didn't play a major role in predicting price but I suspect multicollinearity between some of these variables given that attributes such as carat can be sort-of derived from the dimensions. Depth and table are derived directly from some of our dimensions which is why I believe more feature engineering could be done to improve our model. Additionally, the idea of having lots of negative predictions is sort of off-putting in establishing the veracity of our model.

# Using MSE improvement to find the best predictor according to our Linear Regressor output
Partially implement the forward stepwise selection algorithm 6.2 in ISLR. The code from this question is a partial implementation because the function specified essentially completes a single iteration of algorithm 6.2 per call. Create a function named `find_best_pred` that takes 3 arguments: A list of starting column names for the initial model, a list of column names to test for improvement, and the target column name.  Use MSE as the improvement metric.  The return value is the name of the column that provides the largest MSE improvement.  If no improvement in MSE is detected, return the string 'Null'.  Starting with the starting columns, the find_best_pred function shall return the col name that produces the largest improvement in MSE.  Check for error conditions and return the 'Null' string if there is an error in the input data.  To make it more fun, we plan on running secret tests that include errors when we grade.  Assume that you can use the diamonds_df_xformed as a global variable. You are free to decompose this problem into as many sub functions as you want as long as the main function is named find_best_pred.

In [None]:
# # your code here
#defining our loss metric for evaluating our lr model. The default is set to rmse however we will manually tweak this to represent the mean squared error
eval_mse = RegressionEvaluator(metricName='mse',labelCol='price',predictionCol='prediction')
import pyspark.sql.functions as fn
from pyspark.ml import feature, regression
#empty list to store mse's for each attribute
mses = []
#creating a function that can automatically calculate the mean squared error of our linear regression model
def make_eval_pipe(starting_cols,target):
    assembler = feature.VectorAssembler(inputCols=starting_cols, outputCol='features')
    lr = regression.LinearRegression(featuresCol='features', labelCol=target)
    #creating our pipe
    pipe = Pipeline(stages=[assembler, lr]).fit(training_df)
    return eval_mse.evaluate(pipe.transform(training_df))
#defining a function that will iteratively find the best predictor of price out of our input attributes. We simply want to return the attributes that lead
#to the largest decrease in our loss metric: Mean Squared error
def find_best_pred(starting_cols,testing_cols,target):
  #inserting a try/except paradigm to account for the possility of improper argument entry for our main function. This is good practice in industry coding because it won't lead to a major runtime error where the entire program crashes. It will simply iterate through the script and if an error occurs we will jump to the except statement which can allow the end-user to know there is an issue with either their argument length or order.
  try:
    #this if statement will only execute if the starting column list isn't empty
    if len(starting_cols)!=0:
      #for further use, if you would like to run this function more than once, we need to empty the list before it can be appended and evaluated to determine the best       predictor. This will come into play in question 8. 
      mses.clear()
      #defining our training and testing dataframe. It will consist of a standard 70/30 split for training and testing respectively from our original  diamonds df
      #the seed is set to 0, otherwise the output would be different each time
      training_df, testing_df = diamonds_df_xformed.randomSplit([0.7, 0.3],0)
      #defining mse_1 to store the loss metric from our pipeline created earlier. This will later be appended into our mses 
      #list using list comprehension based on a decrease in MSE 
      #following the evaluating of a new predictor
      mse_1 = make_eval_pipe(starting_cols, target)
      #creating for loop to append each element in the test list to the start list to calculate the new start list mse
      for col in range(len(testing_cols)):
        #must redefine our training/testingdf because after each iteration of the for loop the features column. Since we are running each iteration
        #through the vector assembler the features column will have to be recreated 
        training_df, testing_df = diamonds_df_xformed.randomSplit([0.7, 0.3],0)
        starting_cols.append(testing_cols[col])
        #defining mse_1 to store the loss metric for the predictor we would like to compare
        mse_2 = make_eval_pipe(starting_cols,target)
        #store the difference of the new start list mse between the original startlist mse. We also append the column name into the start list 
        #we place a negative value in front of the calculation  because if mse_1 is larger than mse_2 then our desired output would be reversed
        mses.append([-(mse_2-mse_1), testing_cols[col]])
        #remove column name that was appended into the start list for next iteration of for loop (only test one column at a time)
        starting_cols.remove(testing_cols[col])
        #sorting these mse differences from highest to lowest in order to find the best predictor 
      mses.sort(reverse=True)
     # print(mses)
    # if the first difference is negative then our model did not show any improvement after trying this predictor
      if mses[0][0]<=0:
        print("No Model Improvement")
        return("Null")
      #If the starting list only contains one predictor then there must have been model improvement by default
      else:
        print("There has been Model Improvement!")
        return(mses[0][1])
    #If the starting list is empty then we only need to calculate each column's mse
    else:
       for col in range(len(testing_cols)):
        training_df, testing_df = diamonds_df_xformed.randomSplit([0.7, 0.3],0)
        starting_cols.append(testing_cols[col])
        mse = make_eval_pipe(starting_cols,target)
        mses.append( [mse,testing_cols[col]])
        starting_cols.remove(testing_cols[col])
        mses.sort()
      #  print(mses)
        return(mses[0][1])
  except: 
      print("Error in Data ")
      return ("Null")

In [None]:
# Grading cell - do not modify
print(find_best_pred([], ['carat', 'cut', 'color', 'clarity', 'depth', 'table', 'x', 'y', 'z'], 'price'))
print(find_best_pred(['carat', 'cut'], ['color', 'clarity', 'depth', 'table', 'x', 'y', 'z'], 'price'))

# Obtaining Stepwise Predictors for Diamonds Dataframe
Finish implementing the forward stepwise selection algorithm 6.2 in ISLR.  The last sectionimplemented a single iteration of ISLR algorithm 6.2.  The purpose of this question is to create a driver function that will iteratively call the find_best_pred function defined in the previous sectionto create the complete ordered list of best predictors.  Create a new function named `get_stepwise_pred_list` which iteratively calls find_best_pred to build an ordered list of the best stepwise features. Terminate building the list if find_best_pred returns Null or you run out of predictors to testing_cols - whichever comes first.  get_stepwise_pred_list Takes 2 arguments:  A list of all predictors and the dataset target column.   get_stepwise_pred_list Returns a list containing the ordered stepwise features selected by iteratively calling find_best_pred.

In [None]:
# your code here
#this empty list will store the column names of the best price predictors based on our get_stepwise_pred_list function
predicted_feats = []
#defining a function that iteratively calls the find_best_pred function to list the best stepwise features
def get_stepwise_pred_list(testing_cols, target):
  #creating a variable, colname that will store the return value from the function listed above
  colname = 0 
  #we want to find columns that are the best predictors of price in terms of our MSE loss metric, therefore the starting columns should be an empty list
  starting_cols = []
  #creating a while loop that builds the list if our find_best_pred function doesn't return null. The first condition ensures that we will only execute 
  #the function if we we have predictors to test on 
  while len(testing_cols)>0 and colname!='Null':
    #running our find_best_pred function on each of these columns to iteratively obtain the best stepwise features. 
    colname = find_best_pred(starting_cols, testing_cols, target)
    #appending these features to our global list of predicted stepwise attributes. Since we sorted our MSE differences in question 7,
    #this list will be ordered properly
    predicted_feats.append(colname)
    #appending that column that feature to our starting columns since we are running the loop multiple times to determine if the next attribute decreases the mse
    starting_cols.append(colname)
    #removing that column from our testing cols to effectively restart the process before returning the ordered list of predicted features. 
    testing_cols.remove(colname)
  return (predicted_feats)

In [None]:
get_stepwise_pred_list((['carat', 'cut', 'color', 'clarity', 'depth', 'table', 'x', 'y', 'z']), 'price')

In [None]:
# grading cell do not modify
print(get_stepwise_pred_list((['carat', 'cut', 'color', 'clarity', 'depth', 'table', 'x', 'y', 'z']), 'price'))

##### Grading Feedback Cell

# Implementing one final MSE callback function
The previous section creates a function that is specific to the diamonds data set.  The problem with the scheme is that it only works for one data set and it assumes a very specific model.  Being a great data scientist, you realize that the code would be significantly more valuable if it was more flexible.  The right thing to do is extend the functionality of the code to be able to use any pipeline and any data set.  Start by adding a data frame argument to get_stepwise_pred_list.  get_stepwise_pred_list Will then need to pass the data frame to the find_best_pred function.  This solves the problem of being able to operate with any data frame.

There is a bigger problem.  We need a way to build a custom model built specifically for the data frame. Currently, we are building a very specific custom model in the find_best_pred function that is customized for only one data frame - the diamond data frame.  We need to come up with a way to customize the pipeline for any data frame. One solution to this problem is to create a user defined callback function which builds the pipelines. The callback function will take as input arguments a list of feature names to train on, and return a ready to use spark pipeline.  Add the callback function to the get_stepwise_pred_list argument list.  Port the code that builds the pipe which was previously defined in the find_best_pred function to your new user defined callback function.  Now instead of building the pipe in find_best_pred, the find_best_pred function will call the callback function to build the pipe.  With this piece of the puzzle solved, we are a lot closer to having the level of flexibility you would need for a production environment.  Notwithstanding, there are still other problems to solve.  For example, we also need to be able to specify the loss function to use - MSE is not the best loss function for say a logistic regression pipe.  It would also be great if we could encapsulate all of this functionality into a python class.  But to get full credit for this problem, you only need the pipe function callback and to add the dataframe as an argument.  

Your are free to add arguments or change things some from the above description. The above is only a loose guide.  Experiment and have fun. 

Demonstrate your results by executing the updated code using the test from problem 8 above.

In [None]:
# your code here
#defining a call back function that will take a list of attribute names to train a linear regression model on and return a pipeline. This will reduce the redundancy of writing another find_best_pred function since we will be automating the pipe building process. A similar approach to this was taken in my case for section 7 with the make_eval_pipeline function. The only difference being that in q7 I returned the rmse loss metric from each case instead of the pipe function itself. 
def callback_func(starting_cols,target):
    assembler = feature.VectorAssembler(inputCols=starting_cols, outputCol='features')
    lr = regression.LinearRegression(featuresCol='features', labelCol=target)
    return Pipeline(stages=[assembler, lr])
mses = [] 
#this is simply reusing question seven's function except for the fact that we are able to input any dataframe we want as an input. This creates a much more flexible approach to the initial problen we were trying to solve
###############################################################
#slightly modified q7 code
###############################################################
def find_best_pred(df, starting_cols,testing_cols,target):
  if len(starting_cols)!=0:
    training_df, testing_df = df.randomSplit([0.7, 0.3],0)
    mses.clear()
    pipe = callback_func(starting_cols,target)
    ml_model = pipe.fit(training_df)
    mse_first =ml_model.transform(testing_df)
    mse_1 = mse_first.select(mse).collect()[0][0]
    for col in range(len(testing_cols)):
      training_df, testing_df = df.randomSplit([0.7, 0.3],0)
      starting_cols.append(testing_cols[col])
      pipe = callback_func(starting_cols,target)
      model = pipe.fit(training_df)
      mse_second = model.transform(testing_df)
      mse_2 = mse_second.select(mse).collect()[0][0]
      mses.append([-(mse_2-mse_1),testing_cols[col]])
      starting_cols.remove(testing_cols[col])
    mses.sort(reverse=True)
    #print(mselist)
    if mses[0][0]<0:
      return('Null')
    else:
      print("There has been model improvement ")
      return(mses[0][1])
  else:
    mses.clear()
    for col in range(len(testing_cols)):
      training_df, testing_df = df.randomSplit([0.7, 0.3],0)
      starting_cols.append(testing_cols[col])
      pipe = callback_func(starting_cols,target)
      ml_model = pipe.fit(training_df)
      mse_first =ml_model.transform(testing_df)
      mse_1 = mse_first.select(mse).collect()[0][0]
      mses.append([mse_1,testing_cols[col]])
      starting_cols.remove(testing_cols[col])
    mses.sort()
    return(mses[0][1])
########################################################################
#question 8 code
#this empty list will store the column names of the best price predictors based on our get_stepwise_pred_list function
predicted_feats = []
#defining a function that iteratively calls the find_best_pred function to list the best stepwise features except using df as an embellished argument. 
def get_stepwise_pred_list(df, testing_cols, target):
  #creating a variable, colname that will store the return value from the function listed above
  colname = 0  
  #we want to find columns that are the best predictors of price in terms of our MSE loss metric, therefore the starting columns should be an empty list
  starting_cols = []
  #creating a while loop that builds the list if our find_best_pred function doesn't return null. The first condition ensures that we will only execute 
  #the function if we we have predictors to test on 
  while  len(testing_cols)>0 and colname!='Null':
  #running our find_best_pred function on each of these columns to iteratively obtain the best stepwise features. 
    colname = find_best_pred(df, starting_cols, testing_cols, target)
    #appending these features to our list of predicted stepwise attributes. Since we sorted our MSE differences in question 7,
    #this list will be ordered properly 
    predicted_feats.append(colname)
    #appending that column that feature to our starting columns since we are running the loop multiple times to determine if the next attribute decreases the mse
    starting_cols.append(colname)
    #removing that column from our testing cols to effectively restart the process before returning the ordered list of predicted features. 
    testing_cols.remove(colname)
  return (predicted_feats)
######################################################################

In [None]:
print(get_stepwise_pred_list(diamonds_df_xformed,['carat','cut'],'price'))