### 1. Initiate Spark environment

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=2f2fef10c806d35ff037345d0cd466b99e30410dec12433211121a56156ccf4c
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
# Basic Library
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_absolute_error
import warnings
warnings.filterwarnings('ignore')
from scipy import stats
from scipy.stats import norm, skew 

# Spark Library
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext, Window
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import mean, col, split,regexp_extract, when, lit,max,min,isnan,count, desc,var_samp,avg, udf
from pyspark.ml.feature import StringIndexer, VectorAssembler,StandardScaler
# StringIndexer: mapping of string column of label to an ML column of label indice
# VectorAssembler: merge multiple column vector to single column
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.stat import Correlation
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
spark_session = SparkSession.builder.master("local[2]").appName("HousingRegression").getOrCreate()

In [None]:
spark_context = spark_session.sparkContext

In [None]:
spark_sql_context = SQLContext(spark_context)

### 2. Loading data & EDA

In [None]:
 from google.colab import drive
 drive.mount("/content/drive")
 %cd "drive/MyDrive/ColabNotebooks/comp4651_proj"

Mounted at /content/drive
/content/drive/MyDrive/ColabNotebooks/comp4651_proj


Use pandas to handle the missing data and create visualizations.

In [None]:
pd_train = pd.read_csv('train.csv')
pd_test = pd.read_csv('test.csv')
na_cols = pd_train.columns[pd_train.isna().any()].tolist()

Building correlation matrix

The correlation coefficient is a measure of the strength and direction of the linear relationship between two variables.

The values in a correlation matrix range from -1 to 1. A value of 1 indicates a perfect positive correlation.  A value of -1 indicates a perfect negative correlation

In [None]:
corr = pd_train.corr()

In [None]:
corr[['SalePrice']].sort_values(by='SalePrice',ascending=False).style.background_gradient(cmap='viridis', axis=None)

Unnamed: 0,SalePrice
SalePrice,1.0
OverallQual,0.790982
GrLivArea,0.708624
GarageCars,0.640409
GarageArea,0.623431
TotalBsmtSF,0.613581
1stFlrSF,0.605852
FullBath,0.560664
TotRmsAbvGrd,0.533723
YearBuilt,0.522897


### Creating Spark DataFrames

In [None]:
train_df = spark_session.createDataFrame(pd_train)
test_df = spark_session.createDataFrame(pd_test)

### Handling missing data:
determining the proportion of missing data from overall dataset.

In [None]:
total = pd_train.isnull().sum().sort_values(ascending=False)
percent = (pd_train.isnull().sum()/pd_train.shape[0]).sort_values(ascending=False)

missing = pd.concat([total, percent], axis=1, keys=['Total', 'Perc_missing'])
missing.head(15)

Unnamed: 0,Total,Perc_missing
PoolQC,1453,0.995205
MiscFeature,1406,0.963014
Alley,1369,0.937671
Fence,1179,0.807534
FireplaceQu,690,0.472603
LotFrontage,259,0.177397
GarageYrBlt,81,0.055479
GarageCond,81,0.055479
GarageType,81,0.055479
GarageFinish,81,0.055479


In [None]:
perc_df=train_df.select([(count(when(isnan(c), c))/count("Id")).alias(c) for c in train_df.columns])
perc_df.show()

+---+----------+--------+------------------+-------+------+------------------+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+--------------------+--------------------+---------+---------+----------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+----------+---------+-----------+-------+---------+----------+--------------------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+------------------+-------------------+-------------------+-------------------+----------+----------+-------------------+-------------------+----------+----------+-----------+-------------+---------+-----------+--------+------------------+------------------+-----------------+-------+------+-----

In [None]:
drop_list=[ key for (key,value) in perc_df.collect()[0].asDict().items() if value > 0.15  ]
print(drop_list)

['LotFrontage', 'Alley', 'FireplaceQu', 'PoolQC', 'Fence', 'MiscFeature']


In [None]:
train_df=train_df.drop(*drop_list)
train_df.show()

+---+----------+--------+-------+------+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotArea|Street|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition2|BldgType|HouseStyle|OverallQual|OverallCond|YearBuilt|YearRemodAdd|Roof

### Changing Ordinal Categorical Data into Numerical Values

In [None]:
def encodeRating(df):
    df=df.withColumn("LotShape",when(col("LotShape")=="Reg",0).when(col("LotShape")=="IR1",1).when(col("LotShape")=="IR2",2).when(col("LotShape")=="IR3",3))
    df=df.withColumn("Utilities",when(col("Utilities")=="AllPub",3).when(col("Utilities")=="NoSewr",2).when(col("Utilities")=="NoSeWa",1).when(col("Utilities")=="ELO",0))
    df=df.withColumn("LandSlope",when(col("LandSlope")=="Gtl",0).when(col("LandSlope")=="Mod",1).when(col("LandSlope")=="Sev",2))
    df=df.withColumn("ExterQual",when(col("ExterQual")=="Ex",4).when(col("ExterQual")=="Gd",3).when(col("ExterQual")=="TA",2).when(col("ExterQual")=="Fa",1).when(col("ExterQual")=="Po",0))
    df=df.withColumn("ExterCond",when(col("ExterCond")=="Ex",4).when(col("ExterCond")=="Gd",3).when(col("ExterCond")=="TA",2).when(col("ExterCond")=="Fa",1).when(col("ExterCond")=="Po",0))
    df=df.withColumn("BsmtQual",when(col("BsmtQual")=="Ex",5).when(col("BsmtQual")=="Gd",4).when(col("BsmtQual")=="TA",3).when(col("BsmtQual")=="Fa",2).when(col("BsmtQual")=="Po",1).otherwise(0))
    df=df.withColumn("BsmtCond",when(col("BsmtCond")=="Ex",5).when(col("BsmtCond")=="Gd",4).when(col("BsmtCond")=="TA",3).when(col("BsmtCond")=="Fa",2).when(col("BsmtCond")=="Po",1).otherwise(0))
    df=df.withColumn("BsmtExposure",when(col("BsmtExposure")=="Gd",4).when(col("BsmtExposure")=="Av",3).when(col("BsmtExposure")=="Mn",2).when(col("BsmtExposure")=="No",1).otherwise(0))
    df=df.withColumn("BsmtFinType1",when(col("BsmtFinType1")=="GLQ",6).when(col("BsmtFinType1")=="ALQ",5).when(col("BsmtFinType1")=="BLQ",4).when(col("BsmtFinType1")=="Rec",3).when(col("BsmtFinType1")=="LwQ",2).when(col("BsmtFinType1")=="Unf",1).otherwise(0))
    df=df.withColumn("BsmtFinType2",when(col("BsmtFinType2")=="GLQ",6).when(col("BsmtFinType2")=="ALQ",5).when(col("BsmtFinType2")=="BLQ",4).when(col("BsmtFinType2")=="Rec",3).when(col("BsmtFinType2")=="LwQ",2).when(col("BsmtFinType2")=="Unf",1).otherwise(0))
    df=df.withColumn("HeatingQC",when(col("HeatingQC")=="Ex",4).when(col("HeatingQC")=="Gd",3).when(col("HeatingQC")=="TA",2).when(col("HeatingQC")=="Fa",1).otherwise(0))
    df=df.withColumn("CentralAir",when(col("CentralAir")=="N",0).when(col("CentralAir")=="Y",1))
    df=df.withColumn("Electrical",when(col("Electrical")=="SBrkr",4).when(col("Electrical")=="FuseA",3).when(col("Electrical")=="FuseF",2).when(col("Electrical")=="FuseP",1).otherwise(0))
    df=df.withColumn("KitchenQual",when(col("KitchenQual")=="Ex",4).when(col("KitchenQual")=="Gd",3).when(col("KitchenQual")=="TA",2).when(col("KitchenQual")=="Fa",1).otherwise(0))
    df=df.withColumn("Functional",when(col("Functional")=="Typ",6).when(col("Functional")=="Min1",6).when(col("Functional")=="Min2",5).when(col("Functional")=="Mod",4).when(col("Functional")=="Maj1",3).when(col("Functional")=="Maj2",2).when(col("Functional")=="Sev",1).when(col("Functional")=="Sal",0))
    df=df.withColumn("GarageFinish",when(col("GarageFinish")=="Fin",3).when(col("GarageFinish")=="RFn",2).when(col("GarageFinish")=="Unf",1).otherwise(0))
    df=df.withColumn("GarageQual",when(col("GarageQual")=="Ex",5).when(col("GarageQual")=="Gd",4).when(col("GarageQual")=="TA",3).when(col("GarageQual")=="Fa",2).when(col("GarageQual")=="Po",1).otherwise(0))
    df=df.withColumn("GarageCond",when(col("GarageCond")=="Ex",5).when(col("GarageCond")=="Gd",4).when(col("GarageCond")=="TA",3).when(col("GarageCond")=="Fa",2).when(col("GarageCond")=="Po",1).otherwise(0))
    df=df.withColumn("PavedDrive",when(col("PavedDrive")=="Y",2).when(col("PavedDrive")=="P",1).when(col("PavedDrive")=="N",0))
    df=df.fillna(0,['GarageYrBlt', 'GarageArea', 'GarageCars','MasVnrArea'])
    df=df.withColumn('MSSubClass',df['MSSubClass'].cast('string'))

    return df

def getAvg(df,feat):
    a=df.groupBy(feat).avg("SalePrice")
    # v=df.groupBy(feat).agg(var_samp("SalePrice"))
    # r=a.join(v,feat)
    return a

def encodeTarget(df,feat):
    avg_df=df.select([feat,"SalePrice"])
    avg_df=avg_df.groupBy(feat).mean("SalePrice").alias(feat+"_avg")
    avg_df=avg_df.select(F.col(feat),F.col("avg(SalePrice)").alias(feat+"_avg"))
    df=df.join(avg_df,feat)
    return df

In [None]:
train_df=encodeRating(train_df)
train_df.limit(5).show()
print(train_df.select([count(when(isnan(c), c)).alias(c) for c in train_df.columns]).show())


+---+----------+--------+-------+------+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotArea|Street|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition2|BldgType|HouseStyle|OverallQual|OverallCond|YearBuilt|YearRemodAdd|Roof

In [None]:
cat_feat=['MSSubClass','MSZoning','Street','LandContour','LotConfig',\
          'Neighborhood','Condition1','Condition2','BldgType','HouseStyle',\
          'RoofStyle','RoofMatl','Exterior1st','Exterior2nd','MasVnrType',\
          'Foundation','Heating','GarageType','SaleType','SaleCondition']
numerical_feat=[x for x in train_df.columns if (x not in cat_feat)]

In [None]:
pd_train.columns

Index(['Id', 'MSSubClass', 'MSZoning', 'LotFrontage', 'LotArea', 'Street',
       'Alley', 'LotShape', 'LandContour', 'Utilities', 'LotConfig',
       'LandSlope', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType',
       'HouseStyle', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd',
       'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType',
       'MasVnrArea', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual',
       'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinSF1',
       'BsmtFinType2', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', 'Heating',
       'HeatingQC', 'CentralAir', 'Electrical', '1stFlrSF', '2ndFlrSF',
       'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath',
       'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'KitchenQual',
       'TotRmsAbvGrd', 'Functional', 'Fireplaces', 'FireplaceQu', 'GarageType',
       'GarageYrBlt', 'GarageFinish', 'GarageCars', 'GarageArea', 'GarageQual',
       'GarageCond', 'PavedDrive

In [None]:
train_df.printSchema()

root
 |-- Id: long (nullable = true)
 |-- MSSubClass: string (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotArea: long (nullable = true)
 |-- Street: string (nullable = true)
 |-- LotShape: integer (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: integer (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: integer (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: long (nullable = true)
 |-- OverallCond: long (nullable = true)
 |-- YearBuilt: long (nullable = true)
 |-- YearRemodAdd: long (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |-- Exterior2nd: string (nullable = true)
 |-- MasVnrType: string (nullable = true)
 |-- MasVnrArea

In [None]:
# import scipy.stats as stats

# asd=train_df.toPandas()
# catlen=[]
# for col in train_string_columns:
#     catlen.append(asd[col].unique().size)
# plt.figure(figsize=(18,16))

# for i,col in enumerate(train_string_columns):
#     unique_majors = asd[col].unique()
#     fig,ax = plt.subplots(nrows=1,ncols=catlen[i])
#     for j,major in enumerate(unique_majors):
#         stats.probplot(asd[asd[col] == major]['SalePrice'], dist="norm", plot=ax[j])
#         plt.title(major)

### Encode Categorical Data

In [None]:
# Defining string columns to pass on to the String Indexer (= categorical feature encoding)

train_string_columns = []

for col, dtype in train_df.dtypes:
    if dtype == 'string':
        train_string_columns.append(col)
print(train_string_columns)

['MSSubClass', 'MSZoning', 'Street', 'LandContour', 'LotConfig', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType', 'HouseStyle', 'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'Foundation', 'Heating', 'GarageType', 'SaleType', 'SaleCondition']


In [None]:
# train_df.agg(*(F.countDistinct(col(c)).alias(c) for c in train_string_columns))
train_df.agg(*(F.countDistinct(F.col(c)).alias(c) for c in train_string_columns)).show()


+----------+--------+------+-----------+---------+------------+----------+----------+--------+----------+---------+--------+-----------+-----------+----------+----------+-------+----------+--------+-------------+
|MSSubClass|MSZoning|Street|LandContour|LotConfig|Neighborhood|Condition1|Condition2|BldgType|HouseStyle|RoofStyle|RoofMatl|Exterior1st|Exterior2nd|MasVnrType|Foundation|Heating|GarageType|SaleType|SaleCondition|
+----------+--------+------+-----------+---------+------------+----------+----------+--------+----------+---------+--------+-----------+-----------+----------+----------+-------+----------+--------+-------------+
|        15|       5|     2|          4|        5|          25|         9|         8|       5|         8|        6|       8|         15|         16|         5|         6|      6|         7|       9|            6|
+----------+--------+------+-----------+---------+------------+----------+----------+--------+----------+---------+--------+-----------+-----------+

In [None]:
train_df=encodeTarget(train_df,'Neighborhood')
train_df=encodeTarget(train_df,'Exterior1st')
train_df=encodeTarget(train_df,'Exterior2nd')


In [None]:
train_df.show(5)

+-----------+-----------+------------+---+----------+--------+-------+------+--------+-----------+---------+---------+---------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+-------+------+------+--------+-------------+---------+------------------+------------------+------------------+
|Exterior2nd|Exterior1st|Neighborhood| Id|MSSubClass|MSZoning|LotArea|Street|LotShape|LandContour|Utilities|LotConfig|LandSlope|Condition1|C

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer


indexers = [StringIndexer(inputCol=column, outputCol=column+'_index', handleInvalid='keep').fit(train_df) for column in train_string_columns if column not in ['Neighborhood','Exterior1st','Exterior2nd'] ]


pipeline = Pipeline(stages=indexers)


In [None]:
train_indexed = pipeline.fit(train_df).transform(train_df)

In [None]:
train_df.show(5)

+-----------+-----------+------------+---+----------+--------+-------+------+--------+-----------+---------+---------+---------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+-------+------+------+--------+-------------+---------+------------------+------------------+------------------+
|Exterior2nd|Exterior1st|Neighborhood| Id|MSSubClass|MSZoning|LotArea|Street|LotShape|LandContour|Utilities|LotConfig|LandSlope|Condition1|C

In [None]:
print(train_indexed.columns)

['Exterior2nd', 'Exterior1st', 'Neighborhood', 'Id', 'MSSubClass', 'MSZoning', 'LotArea', 'Street', 'LotShape', 'LandContour', 'Utilities', 'LotConfig', 'LandSlope', 'Condition1', 'Condition2', 'BldgType', 'HouseStyle', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd', 'RoofStyle', 'RoofMatl', 'MasVnrType', 'MasVnrArea', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinSF1', 'BsmtFinType2', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', 'Heating', 'HeatingQC', 'CentralAir', 'Electrical', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'KitchenQual', 'TotRmsAbvGrd', 'Functional', 'Fireplaces', 'GarageType', 'GarageYrBlt', 'GarageFinish', 'GarageCars', 'GarageArea', 'GarageQual', 'GarageCond', 'PavedDrive', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 'MiscVal', 'MoSold', 'YrSold', 'SaleType', 'Sal

In [None]:
train_feat=[x for x in numerical_feat if (x!="Id" )]+[x for x in train_indexed.columns if "_" in x]

In [None]:
train_feat

['LotArea',
 'LotShape',
 'Utilities',
 'LandSlope',
 'OverallQual',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'MasVnrArea',
 'ExterQual',
 'ExterCond',
 'BsmtQual',
 'BsmtCond',
 'BsmtExposure',
 'BsmtFinType1',
 'BsmtFinSF1',
 'BsmtFinType2',
 'BsmtFinSF2',
 'BsmtUnfSF',
 'TotalBsmtSF',
 'HeatingQC',
 'CentralAir',
 'Electrical',
 '1stFlrSF',
 '2ndFlrSF',
 'LowQualFinSF',
 'GrLivArea',
 'BsmtFullBath',
 'BsmtHalfBath',
 'FullBath',
 'HalfBath',
 'BedroomAbvGr',
 'KitchenAbvGr',
 'KitchenQual',
 'TotRmsAbvGrd',
 'Functional',
 'Fireplaces',
 'GarageYrBlt',
 'GarageFinish',
 'GarageCars',
 'GarageArea',
 'GarageQual',
 'GarageCond',
 'PavedDrive',
 'WoodDeckSF',
 'OpenPorchSF',
 'EnclosedPorch',
 '3SsnPorch',
 'ScreenPorch',
 'PoolArea',
 'MiscVal',
 'MoSold',
 'YrSold',
 'SalePrice',
 'Neighborhood_avg',
 'Exterior1st_avg',
 'Exterior2nd_avg',
 'MSSubClass_index',
 'MSZoning_index',
 'Street_index',
 'LandContour_index',
 'LotConfig_index',
 'Condition1_index',
 'Condition2_inde

In [None]:
# test_string_columns = []

# for col, dtype in test_df.dtypes:
#     if dtype == 'string':
#         test_string_columns.append(col)

In [None]:
# indexers2 = [StringIndexer(inputCol=column, outputCol=column+'_index', handleInvalid='keep').fit(test_df) for column in test_string_columns]

# pipeline2 = Pipeline(stages=indexers2)
# test_indexed = pipeline2.fit(test_df).transform(test_df)

In [None]:
# print(len(test_indexed.columns))

In [None]:
# def get_dtype(df,colname):
#     return [dtype for name, dtype in df.dtypes if name == colname][0]

# num_cols_train = []
# for col in train_indexed.columns:
#     if get_dtype(train_indexed,col) != 'string':
#         num_cols_train.append(str(col))
        
# num_cols_test = []
# for col in test_indexed.columns:
#     if get_dtype(test_indexed,col) != 'string':
#         num_cols_test.append(str(col))

train_indexed = train_indexed.select(train_feat)
# test_indexed = test_indexed.select(num_cols_test)

In [None]:
print(set(train_feat).symmetric_difference(train_indexed.columns))
# print(len(test_indexed.columns))

set()


In [None]:
# pd_train['New'] = pd_train['OverallQual'] * pd_train['GarageArea'] * pd_train['GrLivArea']
# pd_test['New'] = pd_test['OverallQual'] * pd_test['GarageArea'] * pd_test['GrLivArea']

# As some of the contestants have noticed, this results in a spike in model performance later

### 3. Model building (MLlib)

In [None]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = train_indexed.drop("SalePrice").columns, outputCol = 'features').setHandleInvalid("keep")

train_vector = vectorAssembler.transform(train_indexed)

In [None]:
train_vector.show(5)

+-------+--------+---------+---------+-----------+-----------+---------+------------+----------+---------+---------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+-------+------+------+---------+------------------+------------------+------------------+----------------+--------------+------------+-----------------+---------------+----------------+----------------+--------------+----------------+---------------+--------------+----------------+----------------+-------------+----------------+--------------+-------------------+--------------------+
|LotArea|LotShape|Utilities|LandSlope|OverallQual|OverallC

In [None]:
# vectorAssembler2 = VectorAssembler(inputCols = test_indexed.columns, outputCol = 'features').setHandleInvalid("keep")

# test_vector = vectorAssembler2.transform(test_indexed)

In [None]:
# from pyspark.sql.functions import lit

# test_vector = test_vector.withColumn("SalePrice", lit(0))

In [None]:
# Train-test split

splits = train_vector.randomSplit([0.7, 0.3])
train = splits[0]
val = splits[1]

In [None]:
# Full Feature

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='SalePrice', maxIter=10, 
                      regParam=0.8, elasticNetParam=0.1) # It is always a good idea to play with hyperparameters.
lr_model = lr.fit(train)

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

lr_predictions = lr_model.transform(val)
lr_predictions.select("prediction","SalePrice","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="SalePrice",metricName="r2")
print("R Squared (R2) on val data = %g" % lr_evaluator.evaluate(lr_predictions))

RMSE: 27562.542930
r2: 0.884852
+-----------------+---------+--------------------+
|       prediction|SalePrice|            features|
+-----------------+---------+--------------------+
|122667.6713575105|   124000|[1300.0,0.0,3.0,0...|
|51932.50635172444|    80000|(73,[0,2,4,5,6,7,...|
| 63095.4498482903|    75500|(73,[0,2,4,5,6,7,...|
|73030.74820283498|    86000|(73,[0,2,4,5,6,7,...|
| 87274.0862231201|   112000|[1680.0,0.0,3.0,0...|
+-----------------+---------+--------------------+
only showing top 5 rows

R Squared (R2) on val data = 0.67892


In [None]:
top_feat=corr[['SalePrice']].sort_values(by='SalePrice',ascending=False).index.to_list()[1:14]+["Neighborhood_avg","Foundation_index","GarageType_index","MSSubClass_index","MasVnrType_index"]

In [None]:
vectorAssembler_top = VectorAssembler(inputCols = top_feat, outputCol = 'features_top').setHandleInvalid("keep")

train_vector_top = vectorAssembler_top.transform(train_indexed)

In [None]:
train_vector_top.show(1)

+-------+--------+---------+---------+-----------+-----------+---------+------------+----------+---------+---------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+-------+------+------+---------+------------------+------------------+------------------+----------------+--------------+------------+-----------------+---------------+----------------+----------------+--------------+----------------+---------------+--------------+----------------+----------------+-------------+----------------+--------------+-------------------+--------------------+
|LotArea|LotShape|Utilities|LandSlope|OverallQual|OverallC

In [None]:
# Train-test split

splits_top = train_vector_top.randomSplit([0.7, 0.3])
train_top = splits_top[0]
val_top= splits_top[1]

In [None]:
# Simple baseline (linreg)

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features_top', labelCol='SalePrice', maxIter=10, 
                      regParam=0.8, elasticNetParam=0.1) # It is always a good idea to play with hyperparameters.
lr_model = lr.fit(train_top)

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

lr_predictions = lr_model.transform(val_top)
lr_predictions.select("prediction","SalePrice","features_top").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="SalePrice",metricName="r2")
print("R Squared (R2) on val data = %g" % lr_evaluator.evaluate(lr_predictions))

RMSE: 30368.248107
r2: 0.834202
+------------------+---------+--------------------+
|        prediction|SalePrice|        features_top|
+------------------+---------+--------------------+
| 92104.47035764693|   112000|[5.0,1302.0,1.0,2...|
| 108228.3070478416|    89500|[6.0,987.0,1.0,26...|
|117990.91180360178|   118000|[6.0,1218.0,1.0,2...|
| 117294.2318531957|   118000|[6.0,987.0,1.0,26...|
| 118428.1946652662|   113000|[6.0,1218.0,1.0,2...|
+------------------+---------+--------------------+
only showing top 5 rows

R Squared (R2) on val data = 0.769067


In [None]:
train_feat

['LotArea',
 'LotShape',
 'Utilities',
 'LandSlope',
 'OverallQual',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'MasVnrArea',
 'ExterQual',
 'ExterCond',
 'BsmtQual',
 'BsmtCond',
 'BsmtExposure',
 'BsmtFinType1',
 'BsmtFinSF1',
 'BsmtFinType2',
 'BsmtFinSF2',
 'BsmtUnfSF',
 'TotalBsmtSF',
 'HeatingQC',
 'CentralAir',
 'Electrical',
 '1stFlrSF',
 '2ndFlrSF',
 'LowQualFinSF',
 'GrLivArea',
 'BsmtFullBath',
 'BsmtHalfBath',
 'FullBath',
 'HalfBath',
 'BedroomAbvGr',
 'KitchenAbvGr',
 'KitchenQual',
 'TotRmsAbvGrd',
 'Functional',
 'Fireplaces',
 'GarageYrBlt',
 'GarageFinish',
 'GarageCars',
 'GarageArea',
 'GarageQual',
 'GarageCond',
 'PavedDrive',
 'WoodDeckSF',
 'OpenPorchSF',
 'EnclosedPorch',
 '3SsnPorch',
 'ScreenPorch',
 'PoolArea',
 'MiscVal',
 'MoSold',
 'YrSold',
 'SalePrice',
 'Neighborhood_avg',
 'Exterior1st_avg',
 'Exterior2nd_avg',
 'MSSubClass_index',
 'MSZoning_index',
 'Street_index',
 'LandContour_index',
 'LotConfig_index',
 'Condition1_index',
 'Condition2_inde

In [None]:
from pyspark.ml.regression import GBTRegressor

gbt=GBTRegressor(featuresCol='features',labelCol="SalePrice",maxIter=20,)
gbt_model=gbt.fit(train)




In [None]:
transform_df = gbt_model.transform(val)
evaluator = RegressionEvaluator(labelCol='SalePrice', metricName="r2")
print("R2: ", evaluator.evaluate(transform_df))

R2:  0.6611250799446691


In [None]:
from itertools import chain

attrs = sorted(
    (attr["idx"], attr["name"])
    for attr in (
        chain(*train.schema["features"].metadata["ml_attr"]["attrs"].values())
    )
) 

In [None]:
imp=dict(([
    (name, float(gbt_model.featureImportances[idx]))
    for idx, name in attrs
    if gbt_model.featureImportances[idx]
]))
sortedimp=dict(sorted(imp.items(), key=lambda item: item[1]))
sortedimp

{'FullBath': 0.0001382729135104368,
 'BsmtFinType2': 0.00029944291763050925,
 'GarageQual': 0.00029948385168035045,
 'KitchenAbvGr': 0.00042159979266007466,
 'BsmtFinSF2': 0.00046212120480987703,
 'LowQualFinSF': 0.0005030758283358119,
 'LandSlope': 0.0005086627824389398,
 'Foundation_index': 0.0005415424956469852,
 'Heating_index': 0.0006461191426258781,
 'PoolArea': 0.0008142556118810565,
 'BldgType_index': 0.0010150271134096072,
 'Condition2_index': 0.0011466177421893132,
 'HeatingQC': 0.0014373091423709107,
 'MSZoning_index': 0.001503849313372143,
 'GarageFinish': 0.001659045093000042,
 'Fireplaces': 0.0017372043713623473,
 'HalfBath': 0.001824535957721411,
 'YrSold': 0.0019691133319437973,
 'Exterior2nd_avg': 0.0022078786272905987,
 '3SsnPorch': 0.002391956497649659,
 'Electrical': 0.002573686459303904,
 'ExterCond': 0.0029409224223236117,
 'RoofMatl_index': 0.003210069493787147,
 'EnclosedPorch': 0.003280590800646734,
 'BedroomAbvGr': 0.003429270761034196,
 'GarageType_index': 0.

### RANDOM FOREST PART

In [None]:
# A more complex model with RF

from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol = 'features', labelCol='SalePrice', 
                           maxDepth=20, 
                           minInstancesPerNode=2,
                           bootstrap=True
                          )
rf_model = rf.fit(train)

rf_predictions = rf_model.transform(val)
rf_predictions.select("prediction","SalePrice","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
rf_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="SalePrice",metricName="r2")
print("R Squared (R2) on val data = %g" % rf_evaluator.evaluate(rf_predictions))

+------------------+---------+--------------------+
|        prediction|SalePrice|            features|
+------------------+---------+--------------------+
|144776.51666666666|   124000|[1300.0,0.0,3.0,0...|
|          90543.05|    80000|(73,[0,2,4,5,6,7,...|
| 86006.24999999999|    75500|(73,[0,2,4,5,6,7,...|
| 98100.41666666666|    86000|(73,[0,2,4,5,6,7,...|
|109720.70952380952|   112000|[1680.0,0.0,3.0,0...|
+------------------+---------+--------------------+
only showing top 5 rows

R Squared (R2) on val data = 0.821869


In [None]:
from itertools import chain

attrs = sorted(
    (attr["idx"], attr["name"])
    for attr in (
        chain(*train.schema["features"].metadata["ml_attr"]["attrs"].values())
    )
) 

In [None]:
imp=dict(([
    (name, float(rf_model.featureImportances[idx]))
    for idx, name in attrs
    if gbt_model.featureImportances[idx]
]))
sortedimp=dict(sorted(imp.items(), key=lambda item: item[1]))
sortedimp

{'Condition2_index': 1.9469942710846256e-06,
 'LowQualFinSF': 6.38470079646328e-05,
 'RoofMatl_index': 0.00012719258036852015,
 'Electrical': 0.0001293458108786041,
 'Functional': 0.00018975235631349255,
 '3SsnPorch': 0.00022377453694333253,
 'BldgType_index': 0.0002559659112542749,
 'PoolArea': 0.0002648739075044934,
 'ScreenPorch': 0.00031921096969570714,
 'BsmtFinType2': 0.0003513536697686329,
 'KitchenAbvGr': 0.000367421297093308,
 'EnclosedPorch': 0.0004278039042383659,
 'Heating_index': 0.0004363016057028966,
 'Foundation_index': 0.0004897444998573427,
 'Condition1_index': 0.0005534244776489693,
 'ExterCond': 0.0005634447129286786,
 'HeatingQC': 0.0005854598937407083,
 'BsmtFinSF2': 0.0006101041546998557,
 'BsmtCond': 0.0006913883747856872,
 'GarageQual': 0.0007415915410636255,
 'HouseStyle_index': 0.0010251402838377614,
 'LandSlope': 0.0010328357135781366,
 'MasVnrType_index': 0.0012993539089542988,
 'YrSold': 0.0013575905041198718,
 'BsmtFullBath': 0.0016145948972585537,
 'MSZo