In [1]:
import numpy as np
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer, VectorIndexer, Bucketizer
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GeneralizedLinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
spark = SparkSession.builder.getOrCreate()
spark

## Loading the data

In [3]:
df_train = pd.read_csv('/dbfs/FileStore/tables/train.csv')

## EDA & Data cleaning

In [5]:
fig, ax = plt.subplots()
ax = plt.hist(df_train['SalePrice'])
plt.xlabel('SalePrice')
display(fig.figure)



In [6]:
#Plot for SalePrice_Log
df_train['SalePrice_Log'] = np.log(df_train['SalePrice'])
fig, ax = plt.subplots()
ax = plt.hist(df_train['SalePrice_Log'])
plt.xlabel('SalePrice_Log')
display(fig.figure)
# skewness and kurtosis
print("Skewness: %f" % df_train['SalePrice_Log'].skew())
print("Kurtosis: %f" % df_train['SalePrice_Log'].kurt())

In [7]:
#Dropping SalePrice
df_train.drop('SalePrice', axis= 1, inplace=True)

In [8]:
#Getting numerical Features
numerical_feats = df_train.dtypes[df_train.dtypes != "object"].index
print("Number of Numerical features: ", len(numerical_feats))
#Getting Categorical features
categorical_feats = df_train.dtypes[df_train.dtypes == "object"].index
print("Number of Categorical features: ", len(categorical_feats))

In [9]:
print(df_train[numerical_feats].columns)
print("*"*100)
print(df_train[categorical_feats].columns)

In [10]:
#Fetching Null value columns
total = df_train.isnull().sum().sort_values(ascending=False)
percent = (df_train.isnull().sum()/df_train.isnull().count()).sort_values(ascending=False)
missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
missing_data.head(20)

Unnamed: 0,Total,Percent
PoolQC,1453,0.995205
MiscFeature,1406,0.963014
Alley,1369,0.937671
Fence,1179,0.807534
FireplaceQu,690,0.472603
LotFrontage,259,0.177397
GarageCond,81,0.055479
GarageType,81,0.055479
GarageYrBlt,81,0.055479
GarageFinish,81,0.055479


In [11]:
# columns where NaN values have meaning e.g. no pool etc.
cols_fillna = ['PoolQC','MiscFeature','Alley','Fence','MasVnrType','FireplaceQu',
               'GarageQual','GarageCond','GarageFinish','GarageType', 'Electrical',
               'KitchenQual', 'SaleType', 'Functional', 'Exterior2nd', 'Exterior1st',
               'BsmtExposure','BsmtCond','BsmtQual','BsmtFinType1','BsmtFinType2',
               'MSZoning', 'Utilities']

# replace 'NaN' with 'None' in these columns
for col in cols_fillna:
    df_train[col].fillna('None',inplace=True)

In [12]:

total = df_train.isnull().sum().sort_values(ascending=False)
percent = (df_train.isnull().sum()/df_train.isnull().count()).sort_values(ascending=False)
missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
missing_data.head(20)

Unnamed: 0,Total,Percent
LotFrontage,259,0.177397
GarageYrBlt,81,0.055479
MasVnrArea,8,0.005479
SalePrice_Log,0,0.0
ExterCond,0,0.0
RoofStyle,0,0.0
RoofMatl,0,0.0
Exterior1st,0,0.0
Exterior2nd,0,0.0
MasVnrType,0,0.0


In [13]:
# fillna with mean for the remaining columns: LotFrontage, GarageYrBlt, MasVnrArea
df_train.fillna(df_train.mean(), inplace=True)


In [14]:
#checking for any more Null values
total = df_train.isnull().sum().sort_values(ascending=False)
percent = (df_train.isnull().sum()/df_train.isnull().count()).sort_values(ascending=False)
missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
missing_data.head(5)

Unnamed: 0,Total,Percent
SalePrice_Log,0,0.0
Heating,0,0.0
RoofStyle,0,0.0
RoofMatl,0,0.0
Exterior1st,0,0.0


In [15]:
#Checking skewness 
fig, ax = plt.subplots()
ax = sns.distplot(df_train['GrLivArea'])
plt.xlabel('GrLivArea')
display(fig.figure)
# skewness and kurtosis


In [16]:
print("Skewness: %f" % df_train['GrLivArea'].skew())
print("Kurtosis: %f" % df_train['GrLivArea'].kurt())

In [17]:
fig, ax = plt.subplots()
ax = sns.distplot(df_train['LotArea'])
plt.xlabel('LotArea')
display(fig.figure)


In [18]:
# skewness and kurtosis
print("Skewness: %f" % df_train['LotArea'].skew())
print("Kurtosis: %f" % df_train['LotArea'].kurt())

In [19]:
#Taking log of columns
for df in [df_train]:
    df['GrLivArea_Log'] = np.log(df['GrLivArea'])
    df.drop('GrLivArea', inplace= True, axis = 1)
    df['LotArea_Log'] = np.log(df['LotArea'])
    df.drop('LotArea', inplace= True, axis = 1)
    
    
fig, ax = plt.subplots()
ax = sns.distplot(df_train['GrLivArea_Log'])
plt.xlabel('GrLivArea_Log')
display(fig.figure)

fig, ax = plt.subplots()
ax = sns.distplot(df_train['LotArea_Log'])
plt.xlabel('LotArea_Log')
display(fig.figure)


numerical_feats = df_train.dtypes[df_train.dtypes != "object"].index
   

In [20]:
df_train.head(1)

Unnamed: 0,Id,MSSubClass,MSZoning,LotFrontage,Street,Alley,LotShape,LandContour,Utilities,LotConfig,LandSlope,Neighborhood,Condition1,Condition2,BldgType,HouseStyle,OverallQual,OverallCond,YearBuilt,YearRemodAdd,RoofStyle,RoofMatl,Exterior1st,Exterior2nd,MasVnrType,MasVnrArea,ExterQual,ExterCond,Foundation,BsmtQual,BsmtCond,BsmtExposure,BsmtFinType1,BsmtFinSF1,BsmtFinType2,BsmtFinSF2,BsmtUnfSF,TotalBsmtSF,Heating,HeatingQC,...,Electrical,1stFlrSF,2ndFlrSF,LowQualFinSF,BsmtFullBath,BsmtHalfBath,FullBath,HalfBath,BedroomAbvGr,KitchenAbvGr,KitchenQual,TotRmsAbvGrd,Functional,Fireplaces,FireplaceQu,GarageType,GarageYrBlt,GarageFinish,GarageCars,GarageArea,GarageQual,GarageCond,PavedDrive,WoodDeckSF,OpenPorchSF,EnclosedPorch,3SsnPorch,ScreenPorch,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,SalePrice_Log,GrLivArea_Log,LotArea_Log
0,1,60,RL,65.0,Pave,,Reg,Lvl,AllPub,Inside,Gtl,CollgCr,Norm,Norm,1Fam,2Story,7,5,2003,2003,Gable,CompShg,VinylSd,VinylSd,BrkFace,196.0,Gd,TA,PConc,Gd,TA,No,GLQ,706,Unf,0,150,856,GasA,Ex,...,SBrkr,856,854,0,1,0,2,1,3,1,Gd,8,Typ,0,,Attchd,2003.0,RFn,2,548,TA,TA,Y,0,61,0,0,0,0,,,,0,2,2008,WD,Normal,12.247694,7.444249,9.041922


In [21]:
print("GrLivArea_Log Skewness: %f" % df_train['GrLivArea_Log'].skew())
print("GrLivArea_Log Kurtosis: %f" % df_train['GrLivArea_Log'].kurt())
print("LotArea_Log Skewness: %f" % df_train['LotArea_Log'].skew())
print("LotArea_Log Kurtosis: %f" % df_train['LotArea_Log'].kurt())

In [22]:
li_num_feats = list(numerical_feats)


In [23]:
df_train = df_train.drop(df_train[(df_train['OverallQual']==10) & (df_train['SalePrice_Log']<12.3)].index)

In [24]:
df_train = df_train.drop(df_train[(df_train['GrLivArea_Log']>8.3) & (df_train['SalePrice_Log']<12.5)].index)

In [25]:
#Correlation
target = 'SalePrice_Log'
min_val_corr = 0.4  

corr = df_train.corr()
corr_abs = corr.abs()

nr_num_cols = len(numerical_feats)
ser_corr = corr_abs.nlargest(nr_num_cols, target)[target]

cols_abv_corr_limit = list(ser_corr[ser_corr.values > min_val_corr].index)
cols_bel_corr_limit = list(ser_corr[ser_corr.values <= min_val_corr].index)

In [26]:
print(ser_corr)
print("*"*30)
print("List of numerical features with r above min_val_corr :")
print(cols_abv_corr_limit)
print("*"*30)
print("List of numerical features with r below min_val_corr :")
print(cols_bel_corr_limit)

In [27]:
#Heatmap of correlation
corr_spear = df_train.corr(method='spearman')
mask = np.zeros_like(corr_spear)
mask[np.triu_indices_from(mask)] = True
# Heatmap
fig, ax = plt.subplots()

plt.figure(figsize=(15, 10))
ax = sns.heatmap(corr,
            vmax=.5,
            mask=mask,
            #annot=True, 
            fmt='.2f',
            linewidths=.2, cmap="YlGnBu")

display(fig.figure)

In [28]:
k= 11
cols = corr.nlargest(k,'SalePrice_Log')['SalePrice_Log'].index
print(cols)
cm = np.corrcoef(df_train[cols].values.T)
fig, ax = plt.subplots(figsize = (14,12))
sns.heatmap(cm, vmax=.8, linewidths=0.01,square=True,annot=True,cmap='viridis',
            linecolor="white",xticklabels = cols.values ,annot_kws = {'size':12},yticklabels = cols.values)

display(fig.figure)

In [29]:
li_cat_feats = list(categorical_feats)
nr_rows = 15
nr_cols = 3

fig, axs = plt.subplots(nr_rows, nr_cols, figsize=(nr_cols*4,nr_rows*3))

for r in range(0,nr_rows):
    for c in range(0,nr_cols):  
        i = r*nr_cols+c
        if i < len(li_cat_feats):
            sns.boxplot(x=li_cat_feats[i], y=target, data=df_train, ax = axs[r][c])
    
plt.tight_layout()    
display(fig.figure)

# 'MSZoning', 'Neighborhood', 'Condition2', 'MasVnrType', 'ExterQual', 'BsmtQual','CentralAir', 'Electrical', 'KitchenQual', 'SaleType' show strong correlation with SalePrice_Log.
Only these categorial features will be considered in the model

In [31]:
catg_strong_corr = [ 'MSZoning', 'Neighborhood', 'Condition2', 'MasVnrType', 'ExterQual', 
                     'BsmtQual','CentralAir', 'Electrical', 'KitchenQual', 'SaleType']

catg_weak_corr = ['Street', 'Alley', 'LotShape', 'LandContour', 'Utilities', 'LotConfig', 
                  'LandSlope', 'Condition1',  'BldgType', 'HouseStyle', 'RoofStyle', 
                  'RoofMatl', 'Exterior1st', 'Exterior2nd', 'ExterCond', 'Foundation', 
                  'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2', 'Heating', 
                  'HeatingQC', 'Functional', 'FireplaceQu', 'GarageType', 'GarageFinish', 
                  'GarageQual', 'GarageCond', 'PavedDrive', 'PoolQC', 'Fence', 'MiscFeature', 
                  'SaleCondition' ]

In [32]:
nr_feats = len(cols_abv_corr_limit)


In [33]:
#Heatmap
corr_2 = df_train.corr()
corr_abs = corr_2.abs()
cols = corr_abs.nlargest(nr_feats, 'SalePrice_Log')['SalePrice_Log'].index
cm = np.corrcoef(df[cols].values.T)

fig, ax = plt.subplots()

#plt.figure(figsize=(nr_feats/1.5, nr_feats/1.5))
#sns.set(font_scale=1.25)
ax = sns.heatmap(cm, linewidths=1.5, annot=True, square=True, 
                fmt='.2f', annot_kws={'size': 10}, 
                yticklabels=cols.values, xticklabels=cols.values
               )

display(fig.figure)

In [34]:

to_drop_num  = cols_bel_corr_limit
to_drop_catg = catg_weak_corr

cols_to_drop = ['Id'] + to_drop_num + to_drop_catg 

for df in [df_train]:
    df.drop(cols_to_drop, inplace= True, axis = 1)

In [35]:
catg_list = catg_strong_corr.copy()
catg_list.remove('Neighborhood')

In [36]:
for catg in catg_list :
    g = df_train.groupby(catg)[target].mean()
    print(g)

In [37]:
# 'MSZoning'
msz_catg2 = ['RM', 'RH']
msz_catg3 = ['RL', 'FV'] 


# Neighborhood
nbhd_catg2 = ['Blmngtn', 'ClearCr', 'CollgCr', 'Crawfor', 'Gilbert', 'NWAmes', 'Somerst', 'Timber', 'Veenker']
nbhd_catg3 = ['NoRidge', 'NridgHt', 'StoneBr']

# Condition2
cond2_catg2 = ['Norm', 'RRAe']
cond2_catg3 = ['PosA', 'PosN'] 

# SaleType
SlTy_catg1 = ['Oth']
SlTy_catg3 = ['CWD']
SlTy_catg4 = ['New', 'Con']

In [38]:
for df in [df_train]:
    
    df['MSZ_num'] = 1  
    df.loc[(df['MSZoning'].isin(msz_catg2) ), 'MSZ_num'] = 2    
    df.loc[(df['MSZoning'].isin(msz_catg3) ), 'MSZ_num'] = 3        
    
    df['NbHd_num'] = 1       
    df.loc[(df['Neighborhood'].isin(nbhd_catg2) ), 'NbHd_num'] = 2    
    df.loc[(df['Neighborhood'].isin(nbhd_catg3) ), 'NbHd_num'] = 3    

    df['Cond2_num'] = 1       
    df.loc[(df['Condition2'].isin(cond2_catg2) ), 'Cond2_num'] = 2    
    df.loc[(df['Condition2'].isin(cond2_catg3) ), 'Cond2_num'] = 3    
    
    df['Mas_num'] = 1       
    df.loc[(df['MasVnrType'] == 'Stone' ), 'Mas_num'] = 2 
    
    df['ExtQ_num'] = 1       
    df.loc[(df['ExterQual'] == 'TA' ), 'ExtQ_num'] = 2     
    df.loc[(df['ExterQual'] == 'Gd' ), 'ExtQ_num'] = 3     
    df.loc[(df['ExterQual'] == 'Ex' ), 'ExtQ_num'] = 4     
   
    df['BsQ_num'] = 1          
    df.loc[(df['BsmtQual'] == 'Gd' ), 'BsQ_num'] = 2     
    df.loc[(df['BsmtQual'] == 'Ex' ), 'BsQ_num'] = 3     
 
    df['CA_num'] = 0          
    df.loc[(df['CentralAir'] == 'Y' ), 'CA_num'] = 1    

    df['Elc_num'] = 1       
    df.loc[(df['Electrical'] == 'SBrkr' ), 'Elc_num'] = 2 


    df['KiQ_num'] = 1       
    df.loc[(df['KitchenQual'] == 'TA' ), 'KiQ_num'] = 2     
    df.loc[(df['KitchenQual'] == 'Gd' ), 'KiQ_num'] = 3     
    df.loc[(df['KitchenQual'] == 'Ex' ), 'KiQ_num'] = 4      
    
    df['SlTy_num'] = 2       
    df.loc[(df['SaleType'].isin(SlTy_catg1) ), 'SlTy_num'] = 1  
    df.loc[(df['SaleType'].isin(SlTy_catg3) ), 'SlTy_num'] = 3  
    df.loc[(df['SaleType'].isin(SlTy_catg4) ), 'SlTy_num'] = 4 

In [39]:
new_col_num = ['MSZ_num', 'NbHd_num', 'Cond2_num', 'Mas_num', 'ExtQ_num', 'BsQ_num', 'CA_num', 'Elc_num', 'KiQ_num', 'SlTy_num']

In [40]:
#Dropping categorical variables
catg_cols_to_drop = ['Neighborhood' , 'Condition2', 'MasVnrType', 'ExterQual', 'BsmtQual','CentralAir', 'Electrical', 'KitchenQual', 'SaleType']

corr1 = df_train.corr()
corr_abs_1 = corr1.abs()

nr_all_cols = len(df_train)
ser_corr_1 = corr_abs_1.nlargest(nr_all_cols, target)[target]

print(ser_corr_1)
cols_bel_corr_limit_1 = list(ser_corr_1[ser_corr_1.values <= min_val_corr].index)


for df in [df_train] :
    df.drop(catg_cols_to_drop, inplace= True, axis = 1)
    df.drop(cols_bel_corr_limit_1, inplace= True, axis = 1) 

In [41]:
corr2 = df_train.corr()
corr_abs_2 = corr2.abs()

nr_all_cols = len(df_train)
ser_corr_2 = corr_abs_2.nlargest(nr_all_cols, target)[target]

print(ser_corr_2)

In [42]:
corr = df_train.corr()
corr_abs = corr.abs()

nr_all_cols = len(df_train)
print (corr_abs.nlargest(nr_all_cols, target)[target])

In [43]:
nr_feats=len(df_train.columns)
corr = df_train.corr()
corr_abs = corr.abs()
cols = corr_abs.nlargest(nr_feats, 'SalePrice_Log')['SalePrice_Log'].index
cm = np.corrcoef(df[cols].values.T)

fig, ax = plt.subplots()

#plt.figure(figsize=(nr_feats/1.5, nr_feats/1.5))
#sns.set(font_scale=1.25)
ax = sns.heatmap(cm, linewidths=1.5, annot=True, square=True, 
                fmt='.2f', annot_kws={'size': 10}, 
                yticklabels=cols.values, xticklabels=cols.values
               )

display(fig.figure)

In [44]:
cols = corr_abs.nlargest(nr_all_cols, target)[target].index
cols = list(cols)
drop_similar = 1

if drop_similar == 1 :
    for col in ['GarageArea','1stFlrSF','TotRmsAbvGrd','GarageYrBlt'] :
        if col in cols: 
            cols.remove(col)

In [45]:
cols = list(cols)
print(cols)

In [46]:
#temp = pd.read_csv('/dbfs/FileStore/tables/train_extended.csv')

feats = list(cols)

df_train_ml = df_train[feats]
#df_train_ml['SalePrice'] = temp['SalePrice']
feats.remove('SalePrice_Log')

print(feats)
df_train_ml.head(5)

Unnamed: 0,SalePrice_Log,OverallQual,GrLivArea_Log,NbHd_num,ExtQ_num,GarageCars,KiQ_num,BsQ_num,TotalBsmtSF,FullBath,YearBuilt,YearRemodAdd,Fireplaces,MasVnrArea,MSZ_num,LotArea_Log
0,12.247694,7,7.444249,2,3,2,3,2,856,2,2003,2003,0,196.0,3,9.041922
1,12.109011,6,7.140453,2,2,2,2,2,1262,2,1976,1976,1,0.0,3,9.169518
2,12.317167,7,7.487734,2,3,2,3,2,920,2,2001,2002,1,162.0,3,9.328123
3,11.849398,7,7.448334,2,2,3,3,1,756,1,1915,1970,1,0.0,3,9.164296
4,12.429216,8,7.695303,3,3,3,3,2,1145,2,2000,2000,1,350.0,3,9.565214


In [47]:
data = spark.createDataFrame(df_train_ml)

train , valid = data.randomSplit([0.8, 0.2])


In [48]:
train.limit(5).toPandas()

Unnamed: 0,SalePrice_Log,OverallQual,GrLivArea_Log,NbHd_num,ExtQ_num,GarageCars,KiQ_num,BsQ_num,TotalBsmtSF,FullBath,YearBuilt,YearRemodAdd,Fireplaces,MasVnrArea,MSZ_num,LotArea_Log
0,11.134589,4,6.253829,1,2,1,1,1,520,1,1927,1950,0,0.0,2,8.752107
1,11.277203,6,6.999422,1,2,0,2,1,572,1,1924,1950,0,0.0,3,9.169518
2,11.289782,4,6.616065,1,2,1,2,1,747,1,1945,1950,0,0.0,2,8.435549
3,11.314475,4,7.049255,1,2,0,1,1,0,2,1955,1955,0,0.0,3,8.706159
4,11.344507,6,6.625392,1,2,0,2,1,520,1,1935,1982,0,0.0,2,8.821732


## Model Architecture:

Linear Regression

In [51]:


assembler_input = [f for f in feats] 

feature_vector = VectorAssembler(inputCols=assembler_input, 
                                 outputCol='features', 
                                 handleInvalid = 'keep' )

LR = LinearRegression(featuresCol='features', 
                      labelCol= 'SalePrice_Log',
                     maxIter=10,
                     regParam=0.3,
                     elasticNetParam=0.8)



Linear Regression Pipeline:

In [53]:
ml_pipeline = Pipeline(stages=[feature_vector,LR])
model = ml_pipeline.fit(train)

In [54]:
predicted_data = model.transform(valid)
predicted_data.limit(20).toPandas()

Unnamed: 0,SalePrice_Log,OverallQual,GrLivArea_Log,NbHd_num,ExtQ_num,GarageCars,KiQ_num,BsQ_num,TotalBsmtSF,FullBath,YearBuilt,YearRemodAdd,Fireplaces,MasVnrArea,MSZ_num,LotArea_Log,features,prediction
0,10.596635,4,7.183112,1,2,1,2,1,649,1,1920,1950,0,0.0,1,9.047821,"[4.0, 7.183111701743281, 1.0, 2.0, 1.0, 2.0, 1...",11.910267
1,11.326596,5,6.727432,1,2,1,2,1,458,1,1920,1950,0,0.0,3,9.270965,"[5.0, 6.727431724850855, 1.0, 2.0, 1.0, 2.0, 1...",11.949549
2,11.695247,5,7.200425,1,2,1,2,1,0,1,1957,2006,1,98.0,3,9.111624,"[5.0, 7.200424892944957, 1.0, 2.0, 1.0, 2.0, 1...",11.966753
3,11.724806,4,6.897705,2,2,0,2,2,990,1,1994,1995,0,0.0,3,8.995909,"[4.0, 6.897704943128636, 2.0, 2.0, 0.0, 2.0, 2...",11.899886
4,11.76718,4,7.110696,1,2,0,2,1,950,1,1959,1959,0,0.0,3,9.139918,"[4.0, 7.110696122978827, 1.0, 2.0, 0.0, 2.0, 1...",11.907633
5,11.77529,5,7.21524,1,2,2,2,1,384,1,1955,1955,1,0.0,3,8.872627,"[5.0, 7.215239978730097, 1.0, 2.0, 2.0, 2.0, 1...",11.967292
6,11.811547,5,6.802395,1,2,2,3,1,900,1,1951,2000,0,0.0,3,8.881836,"[5.0, 6.802394763324311, 1.0, 2.0, 2.0, 3.0, 1...",11.952275
7,11.845103,7,7.010312,1,2,1,3,1,637,1,1930,1950,1,0.0,2,8.915835,"[7.0, 7.010311867307229, 1.0, 2.0, 1.0, 3.0, 1...",12.071551
8,11.856515,7,6.984716,1,2,0,3,2,1080,1,2004,2005,0,120.0,3,8.922658,"[7.0, 6.984716320118266, 1.0, 2.0, 0.0, 3.0, 2...",12.07062
9,11.911702,6,6.911747,1,2,2,2,1,1004,1,1970,1970,1,180.0,3,9.327323,"[6.0, 6.911747300251674, 1.0, 2.0, 2.0, 2.0, 1...",12.012109


Prediction

In [56]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="SalePrice_Log",metricName="rmse")
print("RMSE on test data = %g" % lr_evaluator.evaluate(predicted_data))

In [57]:
bhaves_old = predicted_data.toPandas()
bhaves_new = pd.DataFrame()
bhaves_new['Given'] = np.exp(bhaves_old['SalePrice_Log'])
bhaves_new['Predicted'] = np.exp(bhaves_old['prediction'])
bhaves_new.head(20)

Unnamed: 0,Given,Predicted
0,40000.0,148786.433381
1,83000.0,154747.295293
2,120000.0,157432.713093
3,123600.0,147249.808521
4,128950.0,148395.037633
5,130000.0,157517.574218
6,134800.0,155169.822907
7,139400.0,174826.815054
8,141000.0,174664.124279
9,149000.0,164737.649215


In [58]:
fig, ax = plt.subplots()
ax.plot(bhaves_new['Given'])
ax.plot(bhaves_new['Predicted'])
ax.legend()
display(fig.figure)

Decision Tree Regression

In [60]:

assembler_input = [f for f in feats] 

feature_vector = VectorAssembler(inputCols=assembler_input, 
                                 outputCol='features', 
                                 handleInvalid = 'keep' )

vect_indexer = VectorIndexer(inputCol='features', 
                             outputCol= 'features_indexed', 
                             handleInvalid = 'keep' )

dt = DecisionTreeRegressor(featuresCol="features_indexed", labelCol = 'SalePrice_Log')


In [61]:
dt_pipeline = Pipeline(stages=[feature_vector,vect_indexer,dt])
dt_model = dt_pipeline.fit(train)

In [62]:
predicted_data = dt_model.transform(valid)

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="SalePrice_Log",metricName="rmse")
print("RMSE on validate data = %g" % evaluator.evaluate(predicted_data))

In [63]:
treeModel = dt_model.stages[-1]
# summary only
display(treeModel)

treeNode
"{""index"":31,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0,1.0,2.0,3.0,4.0,5.0],""feature"":0,""overflow"":false}"
"{""index"":15,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0,1.0,2.0,3.0],""feature"":0,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":6.682105456756155,""categories"":null,""feature"":1,""overflow"":false}"
"{""index"":3,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0],""feature"":13,""overflow"":false}"
"{""index"":1,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[3.0],""feature"":0,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":10.460242108190519,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":10.471949809110479,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":5,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0],""feature"":0,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":10.578979797857352,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":11.230829317475528,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [64]:
assembler_input = [f for f in feats] 

feature_vector = VectorAssembler(inputCols=assembler_input, 
                                 outputCol='features', 
                                 handleInvalid = 'keep' )

vect_indexer = VectorIndexer(inputCol='features', 
                             outputCol= 'features_indexed', 
                             handleInvalid = 'keep' )

rf = RandomForestRegressor(featuresCol="features_indexed", labelCol = 'SalePrice_Log')


In [65]:
rf_pipeline = Pipeline(stages=[feature_vector,vect_indexer,rf])
rf_model = rf_pipeline.fit(train)

In [66]:
predicted_data = rf_model.transform(valid)

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="SalePrice_Log",metricName="rmse")
print("RMSE on test data = %g" % evaluator.evaluate(predicted_data))

In [67]:
bhaves_old = predicted_data.toPandas()
bhaves_new = pd.DataFrame()
bhaves_new['Given'] = np.exp(bhaves_old['SalePrice_Log'])
bhaves_new['Predicted'] = np.exp(bhaves_old['prediction'])


In [68]:
fig, ax = plt.subplots()
ax.plot(bhaves_new['Given'])
ax.plot(bhaves_new['Predicted'])
ax.legend()
display(fig.figure)

In [69]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [2,6,10]).addGrid(rf.maxDepth, [6,10]).build()

cv = CrossValidator(estimator = rf_pipeline,estimatorParamMaps = paramGrid,evaluator=evaluator,numFolds=3)

cvModel = cv.fit(train)
cvModelDf = cvModel.transform(valid)  

print('RMSE of tuned RandomForestRegressor: ',evaluator.evaluate(cvModelDf))


In [70]:
bhaves_old = cvModelDf.toPandas()
tuned_df = pd.DataFrame()
tuned_df['Given'] = np.exp(bhaves_old['SalePrice_Log'])
tuned_df['Predicted'] = np.exp(bhaves_old['prediction'])
tuned_df.head(20)

Unnamed: 0,Given,Predicted
0,40000.0,104449.605397
1,83000.0,103298.848996
2,120000.0,131041.5251
3,123600.0,128577.274051
4,128950.0,118542.233919
5,130000.0,143433.632375
6,134800.0,125268.19106
7,139400.0,130380.510488
8,141000.0,169388.863725
9,149000.0,151401.907093


In [71]:
fig, ax = plt.subplots()
ax.plot(tuned_df['Given'])
ax.plot(tuned_df['Predicted'])
ax.legend()
display(fig.figure)