# Load Data

In [None]:
trainpath = "dbfs:/FileStore/house-prices-advanced-regression-techniques/train.csv"
testpath = "dbfs:/FileStore/house-prices-advanced-regression-techniques/test.csv"
raw_traindf = spark.read.csv(trainpath, header = True, inferSchema = True, multiLine = True)
raw_testdf = spark.read.csv(testpath, header = True, inferSchema = True, multiLine = True)
display(raw_traindf)
display(raw_testdf)
raw_traindf.printSchema()

## Adjust Data Type

In [None]:
from pyspark.sql.functions import *
raw_traindf = raw_traindf.withColumn("MSSubClass", col("MSSubClass").cast("string"))\
.withColumn("LotFrontage", col("LotFrontage").cast("integer"))\
.withColumn("MasVnrArea", col("MasVnrArea").cast("integer"))\
.withColumn("GarageYrBlt", col("GarageYrBlt").cast("integer"))\
.withColumn("SalePrice", col("SalePrice").cast("double"))
raw_traindf.printSchema()

In [None]:
from pyspark.sql.functions import *
raw_testdf = raw_testdf.withColumn("MSSubClass", col("MSSubClass").cast("string"))\
.withColumn("LotFrontage", col("LotFrontage").cast("integer"))\
.withColumn("MasVnrArea", col("MasVnrArea").cast("integer"))\
.withColumn("GarageYrBlt", col("GarageYrBlt").cast("integer"))\
.withColumn("BsmtFullBath", col("BsmtFullBath").cast("integer"))\
.withColumn("BsmtHalfBath", col("BsmtHalfBath").cast("integer"))\
.withColumn("GarageCars", col("GarageCars").cast("integer"))\
.withColumn("GarageArea", col("GarageArea").cast("integer"))\
.withColumn("TotalBsmtSF", col("TotalBsmtSF").cast("integer"))\
.withColumn("BsmtUnfSF", col("BsmtUnfSF").cast("integer"))\
.withColumn("BsmtFinSF2", col("BsmtFinSF2").cast("integer"))\
.withColumn("BsmtFinSF1", col("BsmtFinSF1").cast("integer"))
raw_testdf.printSchema()

# Feature Engineering

## 1. Label Encoding

In [None]:
# poolqc
from pyspark.sql.types import IntegerType
dicts = {"NA":0, "Fa":1, "TA":2, "Gd":3, "Ex":4}
sc.broadcast(dicts)

def newVals(x):
    return dicts[x]

callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('PoolQCOrdered', callnewColsUdf(col('PoolQC')))
raw_testdf = raw_testdf.withColumn('PoolQCOrdered', callnewColsUdf(col('PoolQC')))
display(raw_testdf)

# FireplaceQu	
dicts = {"NA":0, "Po":1, "Fa":2, "TA":3, "Gd":4, "Ex":5}
sc.broadcast(dicts)
callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('FireplaceQuOrdered', callnewColsUdf(col('FireplaceQu')))
raw_testdf = raw_testdf.withColumn('FireplaceQuOrdered', callnewColsUdf(col('FireplaceQu')))

In [None]:
# GarageQual
dicts = {"NA":0, "Po":1, "Fa":2, "TA":3, "Gd":4, "Ex":5}
sc.broadcast(dicts)
callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('GarageQualOrdered', callnewColsUdf(col('GarageQual')))
raw_testdf = raw_testdf.withColumn('GarageQualOrdered', callnewColsUdf(col('GarageQual')))

# GarageCond
dicts = {"NA":0, "Po":1, "Fa":2, "TA":3, "Gd":4, "Ex":5}
sc.broadcast(dicts)
callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('GarageCondOrdered', callnewColsUdf(col('GarageCond')))
raw_testdf = raw_testdf.withColumn('GarageCondOrdered', callnewColsUdf(col('GarageCond')))

In [None]:
# BsmtExposure
dicts = {"NA":0, "No":1, "Mn":2, "Av":3, "Gd":4}
sc.broadcast(dicts)
callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('BsmtExposureOrdered', callnewColsUdf(col('BsmtExposure')))
raw_testdf = raw_testdf.withColumn('BsmtExposureOrdered', callnewColsUdf(col('BsmtExposure')))

#BsmtCond
dicts = {"NA":0, "Po":1, "Fa":2, "TA":3, "Gd":4, "Ex":5}
sc.broadcast(dicts)
callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('BsmtCondOrdered', callnewColsUdf(col('BsmtCond')))
raw_testdf = raw_testdf.withColumn('BsmtCondOrdered', callnewColsUdf(col('BsmtCond')))

#BsmtQual
dicts = {"NA":0, "Po":1, "Fa":2, "TA":3, "Gd":4, "Ex":5}
sc.broadcast(dicts)
callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('BsmtQualOrdered', callnewColsUdf(col('BsmtQual')))
raw_testdf = raw_testdf.withColumn('BsmtQualOrdered', callnewColsUdf(col('BsmtQual')))

In [None]:
# kitchen quality
dicts = {"NA":0, "Fa":1, "TA":2, "Gd":3, "Ex":4}
sc.broadcast(dicts)
callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('KitchenQualOrdered', callnewColsUdf(col('KitchenQual')))
raw_testdf = raw_testdf.withColumn('KitchenQualOrdered', callnewColsUdf(col('KitchenQual')))

# LandContour
dicts = {"NA":0, "Bnk":1, "Lvl":2, "Low":3, "HLS":4}
sc.broadcast(dicts)
callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('LandContourOrdered', callnewColsUdf(col('LandContour')))
raw_testdf = raw_testdf.withColumn('LandContourOrdered', callnewColsUdf(col('LandContour')))

# Functional
dicts = {"NA":0, "Sev":1, "Maj2":2, "Maj1":3, "Mod":4, "Min2":5, "Min1":6, "Typ":7}
sc.broadcast(dicts)
callnewColsUdf = udf(newVals, IntegerType())
raw_traindf = raw_traindf.withColumn('FunctionalOrdered', callnewColsUdf(col('Functional')))
raw_testdf = raw_testdf.withColumn('FunctionalOrdered', callnewColsUdf(col('Functional')))

## 2. Add in extra columns

### MSZoning_avg & std

In [None]:
global_avg = raw_traindf.select(mean('SalePrice')).collect()[0]['avg(SalePrice)']
global_stddev = raw_traindf.select(stddev('SalePrice')).collect()[0]['stddev_samp(SalePrice)']
# avg
zone_mapping = raw_traindf.groupby('MSZoning').avg('SalePrice').withColumnRenamed('avg(SalePrice)', 'MSZoning_avg')
raw_traindf = raw_traindf.join(zone_mapping, on='MSZoning', how='left')
raw_testdf = raw_testdf.join(zone_mapping, on='MSZoning', how='left')
# stddev
zone_mapping = raw_traindf.groupby('MSZoning').agg({'SalePrice': 'stddev'}).withColumnRenamed('stddev(SalePrice)', 'MSZoning_stddev')
raw_traindf = raw_traindf.join(zone_mapping, on='MSZoning', how='left')
raw_testdf = raw_testdf.join(zone_mapping, on='MSZoning', how='left')

In [None]:
raw_testdf = raw_testdf.na.fill({'MSZoning_avg': global_avg, 'MSZoning_stddev': global_stddev})
raw_testdf = raw_testdf.na.fill({'GarageFinish': 'NA' })
raw_testdf.printSchema()
display(raw_testdf.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in raw_testdf.columns]))

In [None]:
raw_testdf.groupby('MSZoning').avg('MSZoning_avg').show()
raw_testdf.groupby('MSZoning').avg('MSZoning_stddev').show()

+--------+------------------+
|MSZoning| avg(MSZoning_avg)|
+--------+------------------+
| C (all)|           74528.0|
|      NA|180921.19589041095|
|      RH|        131558.375|
|      FV|214014.06153846157|
|      RL|191004.99478713618|
|      RM|126316.83027522986|
+--------+------------------+

+--------+--------------------+
|MSZoning|avg(MSZoning_stddev)|
+--------+--------------------+
| C (all)|   33791.09203128343|
|      NA|   79442.50288288663|
|      RH|   35714.11843491777|
|      FV|   52369.66206744745|
|      RL|   80766.34131930319|
|      RM|   48521.68983254903|
+--------+--------------------+



### Neighbour_avg

In [None]:
raw_traindf.groupby('Neighborhood').avg('SalePrice').orderBy('avg(SalePrice)').show()

+------------+------------------+
|Neighborhood|    avg(SalePrice)|
+------------+------------------+
|     MeadowV|  98576.4705882353|
|      IDOTRR|100123.78378378379|
|      BrDale|         104493.75|
|     BrkSide|124834.05172413793|
|     Edwards|          128219.7|
|     OldTown|128225.30088495575|
|      Sawyer|136793.13513513515|
|     Blueste|          137500.0|
|       SWISU|         142591.36|
|     NPkVill|142694.44444444444|
|       NAmes|         145847.08|
|     Mitchel| 156270.1224489796|
|     SawyerW| 186555.7966101695|
|      NWAmes| 189050.0684931507|
|     Gilbert|192854.50632911394|
|     Blmngtn|194870.88235294117|
|     CollgCr|197965.77333333335|
|     Crawfor|210624.72549019608|
|     ClearCr|212565.42857142858|
|     Somerst|225379.83720930232|
+------------+------------------+
only showing top 20 rows



In [None]:
df = raw_traindf.toPandas()
df.groupby('Neighborhood')['SalePrice'].median().sort_values()

Out[12]: Neighborhood
MeadowV     88000.0
IDOTRR     103000.0
BrDale     106000.0
OldTown    119000.0
Edwards    121750.0
BrkSide    124300.0
Sawyer     135000.0
Blueste    137500.0
SWISU      139500.0
NAmes      140000.0
NPkVill    146000.0
Mitchel    153500.0
SawyerW    179900.0
Gilbert    181000.0
NWAmes     182900.0
Blmngtn    191000.0
CollgCr    197200.0
ClearCr    200250.0
Crawfor    200624.0
Veenker    218000.0
Somerst    225500.0
Timber     228475.0
StoneBr    278000.0
NoRidge    301500.0
NridgHt    315000.0
Name: SalePrice, dtype: float64

In [None]:
neighbour_mapping = raw_traindf.groupby('Neighborhood').avg('SalePrice').withColumnRenamed('avg(SalePrice)', 'Neighbour_avg')
raw_traindf = raw_traindf.join(neighbour_mapping, on='Neighborhood', how='left')
raw_testdf = raw_testdf.join(neighbour_mapping, on='Neighborhood', how='left')

### OverallQual_avg & std

In [None]:
global_avg = raw_traindf.select(mean('SalePrice')).collect()[0]['avg(SalePrice)']
global_stddev = raw_traindf.select(stddev('SalePrice')).collect()[0]['stddev_samp(SalePrice)']
# avg
quality_mapping = raw_traindf.groupby('OverallQual').avg('SalePrice').withColumnRenamed('avg(SalePrice)', 'OverallQual_avg')
raw_traindf = raw_traindf.join(quality_mapping, on='OverallQual', how='left')
raw_testdf = raw_testdf.join(quality_mapping, on='OverallQual', how='left')
# stddev
quality_mapping = raw_traindf.groupby('OverallQual').agg({'SalePrice': 'stddev'}).withColumnRenamed('stddev(SalePrice)', 'OverallQual_stddev')
raw_traindf = raw_traindf.join(quality_mapping, on='OverallQual', how='left')
raw_testdf = raw_testdf.join(quality_mapping, on='OverallQual', how='left')

In [None]:
raw_testdf = raw_testdf.na.fill({'OverallQual_avg': global_avg, 'OverallQual_stddev': global_stddev})
raw_testdf.printSchema()
display(raw_testdf.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in raw_testdf.columns]))

## 3. Fill in Missing Values

### categorical variables

In [None]:
# train data
raw_traindf = raw_traindf.filter(raw_traindf.Electrical != 'NA')

MasVnrType_mode = raw_traindf.groupby("MasVnrType").count().orderBy("count", ascending=False).first()[0]
raw_traindf = raw_traindf.replace('NA', MasVnrType_mode, subset = ['MasVnrType'])

In [None]:
# test data
MasVnrType_mode = raw_traindf.groupby("MasVnrType").count().orderBy("count", ascending=False).first()[0]
MSZoning_mode = raw_traindf.groupby("MSZoning").count().orderBy("count", ascending=False).first()[0]
Exterior1st_mode = raw_traindf.groupby("Exterior1st").count().orderBy("count", ascending=False).first()[0]
Exterior2nd_mode = raw_traindf.groupby("Exterior2nd").count().orderBy("count", ascending=False).first()[0]
#KitchenQual_mode = raw_traindf.groupby("KitchenQual").count().orderBy("count", ascending=False).first()[0]
SaleType_mode = raw_traindf.groupby("SaleType").count().orderBy("count", ascending=False).first()[0]
Utilities_mode = raw_traindf.groupby("Utilities").count().orderBy("count", ascending=False).first()[0]
#Functional_mode = raw_traindf.groupby("Functional").count().orderBy("count", ascending=False).first()[0]
#Functional，kitchenQual之前已被label encoding，所以这里不用管了

print(MasVnrType_mode, MSZoning_mode, Exterior1st_mode, Exterior2nd_mode, SaleType_mode,Utilities_mode)

raw_testdf = raw_testdf.replace('NA', MasVnrType_mode, subset = ['MasVnrType'])\
.replace('NA', MSZoning_mode, subset = ['MSZoning'])\
.replace('NA', Exterior1st_mode, subset = ['Exterior1st'])\
.replace('NA', Exterior2nd_mode, subset = ['Exterior2nd'])\
.replace('NA', SaleType_mode, subset = ['SaleType'])\
.replace('NA', Utilities_mode, subset = ['Utilities'])

None RL VinylSd VinylSd WD AllPub


### numerical variables

In [None]:
# we intend to change all integer columns into double type
integercols = []
for (field, dataType) in raw_traindf.dtypes:
    if dataType == 'int':
        integercols.append(field)
integercols.remove('Id')

integercols

In [None]:
def int_to_double(df, integercols):
    for c in integercols:
        df = df.withColumn(c, df[c].cast("double"))
    return df 
    
raw_traindf = int_to_double(raw_traindf, integercols)
raw_testdf = int_to_double(raw_testdf, integercols)

raw_traindf.printSchema()
raw_testdf.printSchema()

#### train data

In [None]:
raw_traindf = raw_traindf.na.fill(0, subset = ['MasVnrArea']) #because MasVnrType missing values are filled with 'none'

In [None]:
from pyspark.sql.functions import when
#Lotfrontage，GarageYrBlt，MasVnrArea
imputeCols = [
    "LotFrontage",
    "GarageYrBlt"
]

for c in imputeCols:
  raw_traindf = raw_traindf.withColumn(c + "_na", when(col(c).isNull(), 1.0).otherwise(0.0))

In [None]:
from pyspark.ml.feature import Imputer

imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols) # strategy options include: mean, median, mode

raw_traindf = imputer.fit(raw_traindf).transform(raw_traindf)

In [None]:
raw_traindf = raw_traindf.drop("LotFrontage_na","GarageYrBlt_na","MasVnrArea_na")

In [None]:
display(raw_traindf.describe())

summary,OverallQual,Neighborhood,MSZoning,Id,MSSubClass,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,LotConfig,LandSlope,Condition1,Condition2,BldgType,HouseStyle,OverallCond,YearBuilt,YearRemodAdd,RoofStyle,RoofMatl,Exterior1st,Exterior2nd,MasVnrType,MasVnrArea,ExterQual,ExterCond,Foundation,BsmtQual,BsmtCond,BsmtExposure,BsmtFinType1,BsmtFinSF1,BsmtFinType2,BsmtFinSF2,BsmtUnfSF,TotalBsmtSF,Heating,HeatingQC,CentralAir,Electrical,1stFlrSF,2ndFlrSF,LowQualFinSF,GrLivArea,BsmtFullBath,BsmtHalfBath,FullBath,HalfBath,BedroomAbvGr,KitchenAbvGr,KitchenQual,TotRmsAbvGrd,Functional,Fireplaces,FireplaceQu,GarageType,GarageYrBlt,GarageFinish,GarageCars,GarageArea,GarageQual,GarageCond,PavedDrive,WoodDeckSF,OpenPorchSF,EnclosedPorch,3SsnPorch,ScreenPorch,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,SalePrice,PoolQCOrdered,FireplaceQuOrdered,GarageQualOrdered,GarageCondOrdered,BsmtExposureOrdered,BsmtCondOrdered,BsmtQualOrdered,KitchenQualOrdered,LandContourOrdered,FunctionalOrdered,MSZoning_avg,MSZoning_stddev,Neighbour_avg,OverallQual_avg,OverallQual_stddev
count,1459.0,1459,1459,1459.0,1459.0,1459.0,1459.0,1459,1459,1459,1459,1459,1459,1459,1459,1459,1459,1459,1459.0,1459.0,1459.0,1459,1459,1459,1459,1459,1459.0,1459,1459,1459,1459,1459,1459,1459,1459.0,1459,1459.0,1459.0,1459.0,1459,1459,1459,1459,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459,1459.0,1459,1459.0,1459,1459,1459.0,1459,1459.0,1459.0,1459,1459,1459,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459,1459,1459,1459.0,1459.0,1459.0,1459,1459,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0,1459.0
mean,6.100068540095956,,,730.0548320767649,56.88142563399589,69.8615490061686,10517.363947909527,,,,,,,,,,,,5.575736806031529,1971.244002741604,1984.8505825908155,,,,,,103.1877998629198,,,,,,,,443.943797121316,,46.58122001370802,567.3660041124058,1057.8910212474298,,,,,1162.9067854694997,346.79163810829334,5.848526387936943,1515.54694996573,0.4256339958875942,0.0575736806031528,1.5647703906785468,0.3824537354352296,2.866346812885538,1.0466072652501714,,6.517477724468814,,0.6134338588074023,,,1978.5695681973957,,1.7669636737491432,473.0301576422207,,,,94.24057573680604,46.69225496915696,21.969156956819734,3.4119259766963674,15.07128169979438,2.7607950651130917,,,,43.518848526387934,6.322823851953393,2007.815627141878,,,180930.3947909527,0.0130226182316655,1.82659355723098,2.810143934201508,2.808773132282385,1.630568882796436,2.934886908841672,3.4886908841672377,2.5113091158327623,2.050034270047978,6.84167237834133,180914.28444496496,73867.29660750608,180879.16281880453,180953.68242110463,40751.7882033072
stddev,1.383170944196091,,,421.4111814162707,42.31074610988266,22.03507667749981,9984.666267348734,,,,,,,,,,,,1.113078838773851,30.199555032468936,20.64434323030652,,,,,,180.7731576553752,,,,,,,,456.1064174668382,,161.36997748451475,441.9923921610649,438.5009753688544,,,,,386.5720894363108,436.6106368240681,48.63951215718252,525.6509267562226,0.5189689105781217,0.2388297526996548,0.5509868924130839,0.5027979435469846,0.8160502538523828,0.2204103697949821,,1.6259015264401502,,0.6446874256530689,,,23.99370757743297,,0.747546351781757,213.8696041860412,,,,125.38167944484168,66.26747160709357,61.13739957611376,29.32724675545161,55.7751375764842,40.19101781769199,,,,496.2918257341941,2.704331480356036,1.3285417248448086,,,79468.96402523815,0.2041284632999939,1.81086648540676,0.7231285243099302,0.7199145010520035,1.067629653986267,0.5523452034978963,0.8766766806875986,0.663864171030508,0.4500781050084788,0.6679136072067545,26061.79050486736,13460.575823423562,58676.78232388679,65721.86755867035,19001.98850930559
min,1.0,Blmngtn,C (all),1.0,120.0,21.0,1300.0,Grvl,Grvl,IR1,Bnk,AllPub,Corner,Gtl,Artery,Artery,1Fam,1.5Fin,1.0,1872.0,1950.0,Flat,ClyTile,AsbShng,AsbShng,BrkCmn,0.0,Ex,Ex,BrkTil,Ex,Fa,Av,ALQ,0.0,ALQ,0.0,0.0,0.0,Floor,Ex,N,FuseA,334.0,0.0,0.0,334.0,0.0,0.0,0.0,0.0,0.0,0.0,Ex,2.0,Maj1,0.0,Ex,2Types,1900.0,Fin,0.0,0.0,Ex,Ex,N,0.0,0.0,0.0,0.0,0.0,0.0,Ex,GdPrv,Gar2,0.0,1.0,2006.0,COD,Abnorml,34900.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0,74528.0,33791.092031283435,98576.4705882353,50150.0,14254.200796022673
max,10.0,Veenker,RM,1460.0,90.0,313.0,215245.0,Pave,Pave,Reg,Lvl,NoSeWa,Inside,Sev,RRNn,RRNn,TwnhsE,SLvl,9.0,2010.0,2010.0,Shed,WdShngl,WdShing,Wd Shng,Stone,1600.0,TA,TA,Wood,TA,TA,No,Unf,5644.0,Unf,1474.0,2336.0,6110.0,Wall,TA,Y,SBrkr,4692.0,2065.0,572.0,5642.0,3.0,2.0,3.0,2.0,8.0,3.0,TA,14.0,Typ,3.0,TA,,2010.0,Unf,4.0,1418.0,TA,TA,Y,857.0,547.0,552.0,508.0,480.0,738.0,,,TenC,15500.0,12.0,2010.0,WD,Partial,755000.0,4.0,5.0,5.0,5.0,4.0,4.0,5.0,4.0,4.0,7.0,214014.06153846157,80766.3413193025,335295.31707317074,438588.3888888889,159785.49105786977


#### test data

In [None]:
display(raw_testdf.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in raw_testdf.columns]))

OverallQual,Neighborhood,MSZoning,Id,MSSubClass,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,LotConfig,LandSlope,Condition1,Condition2,BldgType,HouseStyle,OverallCond,YearBuilt,YearRemodAdd,RoofStyle,RoofMatl,Exterior1st,Exterior2nd,MasVnrType,MasVnrArea,ExterQual,ExterCond,Foundation,BsmtQual,BsmtCond,BsmtExposure,BsmtFinType1,BsmtFinSF1,BsmtFinType2,BsmtFinSF2,BsmtUnfSF,TotalBsmtSF,Heating,HeatingQC,CentralAir,Electrical,1stFlrSF,2ndFlrSF,LowQualFinSF,GrLivArea,BsmtFullBath,BsmtHalfBath,FullBath,HalfBath,BedroomAbvGr,KitchenAbvGr,KitchenQual,TotRmsAbvGrd,Functional,Fireplaces,FireplaceQu,GarageType,GarageYrBlt,GarageFinish,GarageCars,GarageArea,GarageQual,GarageCond,PavedDrive,WoodDeckSF,OpenPorchSF,EnclosedPorch,3SsnPorch,ScreenPorch,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,PoolQCOrdered,FireplaceQuOrdered,GarageQualOrdered,GarageCondOrdered,BsmtExposureOrdered,BsmtCondOrdered,BsmtQualOrdered,KitchenQualOrdered,LandContourOrdered,FunctionalOrdered,MSZoning_avg,MSZoning_stddev,Neighbour_avg,OverallQual_avg,OverallQual_stddev
0,0,0,0,0,227,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,15,0,0,0,0,0,0,0,1,0,1,1,1,0,0,0,0,0,0,0,0,2,2,0,0,0,0,0,0,0,0,0,0,78,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [None]:
raw_testdf = raw_testdf.na.fill(0, subset = ["BsmtFinSF1", "BsmtUnfSF",'TotalBsmtSF','BsmtHalfBath','BsmtFullBath','MasVnrArea',    "BsmtFinSF2"]) #These are corresponding to no basement.

In [None]:
from pyspark.sql.functions import when
#Lotfrontage，GarageYrBlt，MasVnrArea
imputeCols = [
    "LotFrontage",
    "GarageYrBlt",
    "GarageArea",
    "GarageCars"
]

for c in imputeCols:
  raw_testdf = raw_testdf.withColumn(c + "_na", when(col(c).isNull(), 1.0).otherwise(0.0))

In [None]:
from pyspark.ml.feature import Imputer

imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols) # strategy options include: mean, median, mode

raw_testdf = imputer.fit(raw_traindf).transform(raw_testdf)

In [None]:
raw_testdf = raw_testdf.drop("LotFrontage_na","GarageYrBlt_na","MasVnrArea_na","BsmtFinSF2_na", "BsmtUnfSF_na", "BsmtFinSF1_na", "TotalBsmtSF_na","BsmtFullBath_na","BsmtHalfBath_na","GarageArea_na","GarageCars_na")

In [None]:
display(raw_testdf.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in raw_testdf.columns]))

OverallQual,Neighborhood,MSZoning,Id,MSSubClass,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,LotConfig,LandSlope,Condition1,Condition2,BldgType,HouseStyle,OverallCond,YearBuilt,YearRemodAdd,RoofStyle,RoofMatl,Exterior1st,Exterior2nd,MasVnrType,MasVnrArea,ExterQual,ExterCond,Foundation,BsmtQual,BsmtCond,BsmtExposure,BsmtFinType1,BsmtFinSF1,BsmtFinType2,BsmtFinSF2,BsmtUnfSF,TotalBsmtSF,Heating,HeatingQC,CentralAir,Electrical,1stFlrSF,2ndFlrSF,LowQualFinSF,GrLivArea,BsmtFullBath,BsmtHalfBath,FullBath,HalfBath,BedroomAbvGr,KitchenAbvGr,KitchenQual,TotRmsAbvGrd,Functional,Fireplaces,FireplaceQu,GarageType,GarageYrBlt,GarageFinish,GarageCars,GarageArea,GarageQual,GarageCond,PavedDrive,WoodDeckSF,OpenPorchSF,EnclosedPorch,3SsnPorch,ScreenPorch,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,PoolQCOrdered,FireplaceQuOrdered,GarageQualOrdered,GarageCondOrdered,BsmtExposureOrdered,BsmtCondOrdered,BsmtQualOrdered,KitchenQualOrdered,LandContourOrdered,FunctionalOrdered,MSZoning_avg,MSZoning_stddev,Neighbour_avg,OverallQual_avg,OverallQual_stddev
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


# Model

In [None]:
# should be the same for all models
from pyspark.sql.functions import col, log

raw_traindf = raw_traindf.withColumn("LogSalePrice", log(col("SalePrice")))
display(raw_traindf)
display(raw_testdf)

In [None]:
#should be the same for all models
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
 
# Find out the categorical columns as input columns and specify the names of output columns
categorical_cols = [field for (field, data_type) in raw_traindf.dtypes if data_type == "string"]
index_output_cols = [x + "Index" for x in categorical_cols]
ohe_output_cols = [x + "OHE" for x in categorical_cols]

# Find out the numeric columns and add them into features if it is not the LogSalePrice and Id label
string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="keep") # when the value is unseen, keep it
ohe_encoder = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)
numeric_cols = [field for (field, data_type) in raw_traindf.dtypes if ((data_type == "double") & (field not in ["SalePrice", "LogSalePrice", "Id"]))]
assembler_inputs = ohe_output_cols + numeric_cols
vector_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
print(assembler_inputs)

['NeighborhoodOHE', 'MSZoningOHE', 'MSSubClassOHE', 'StreetOHE', 'AlleyOHE', 'LotShapeOHE', 'LandContourOHE', 'UtilitiesOHE', 'LotConfigOHE', 'LandSlopeOHE', 'Condition1OHE', 'Condition2OHE', 'BldgTypeOHE', 'HouseStyleOHE', 'RoofStyleOHE', 'RoofMatlOHE', 'Exterior1stOHE', 'Exterior2ndOHE', 'MasVnrTypeOHE', 'ExterQualOHE', 'ExterCondOHE', 'FoundationOHE', 'BsmtQualOHE', 'BsmtCondOHE', 'BsmtExposureOHE', 'BsmtFinType1OHE', 'BsmtFinType2OHE', 'HeatingOHE', 'HeatingQCOHE', 'CentralAirOHE', 'ElectricalOHE', 'KitchenQualOHE', 'FunctionalOHE', 'FireplaceQuOHE', 'GarageTypeOHE', 'GarageFinishOHE', 'GarageQualOHE', 'GarageCondOHE', 'PavedDriveOHE', 'PoolQCOHE', 'FenceOHE', 'MiscFeatureOHE', 'SaleTypeOHE', 'SaleConditionOHE', 'OverallQual', 'LotFrontage', 'LotArea', 'OverallCond', 'YearBuilt', 'YearRemodAdd', 'MasVnrArea', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAb

##Linear Model

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression

# Model
lr = LinearRegression(labelCol="LogSalePrice",solver="normal")

# Performance Measure 
evaluator = RegressionEvaluator(labelCol="LogSalePrice", 
                                predictionCol="prediction", 
                                metricName="rmse")

# Cross-validation
paramGrid = (ParamGridBuilder()
            .addGrid(lr.elasticNetParam, [0.1,0.5,1,0.8])
            .addGrid(lr.regParam, [0.1,0.12,0.08]) 
            .build())

cv = CrossValidator(estimator=lr, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=5, 
                    parallelism=8, 
                    seed=42)

# Training
pipeline = Pipeline(stages=[string_indexer, ohe_encoder, vector_assembler, cv])
pipelineModel = pipeline.fit(raw_traindf)

MLlib will automatically track trials in MLflow. After your tuning fit() call has completed, view the MLflow UI to see logged runs.


In [None]:
from pyspark.sql.functions import exp
from pyspark.sql.functions import *
predictions = pipelineModel.transform(raw_testdf)
exp_predictions = predictions.withColumn("SalePrice", exp(col("prediction")))

df_results_lr = exp_predictions.select("Id", "SalePrice")
df_results_lr = df_results_lr.withColumn("Id", col("Id").cast("Int"))
lr_finalresult = df_results_lr.select('*').orderBy('Id', ascending = True)
display(lr_finalresult)

Id,SalePrice
1461,118985.61859373304
1462,150744.13157640398
1463,173699.8196055325
1464,193879.08132466656
1465,202746.20320544235
1466,171502.05695814756
1467,178939.9436069785
1468,166701.84345570998
1469,192613.49655057624
1470,122187.06390003792


After submitting to kaggle website, the average result of lasso after tuning is 0.1376

## Tree Model

###1. XGBoost

In [None]:
from sparkdl.xgboost import XgboostRegressor
 
xgb_regressor = XgboostRegressor(num_workers=1, labelCol="LogSalePrice", missing=0.0)

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
 
paramGrid = (ParamGridBuilder()
             .addGrid(xgb_regressor.max_depth, [3])
             .addGrid(xgb_regressor.n_estimators, [700,800])
             .addGrid(xgb_regressor.reg_alpha,[0.8,0.7])
             .addGrid(xgb_regressor.reg_lambda, [0.1,1])
             .addGrid(xgb_regressor.gamma, [0.1,0])
             .addGrid(xgb_regressor.learning_rate,[0.06,0.07])
             .build())

evaluator = RegressionEvaluator(metricName="rmse", labelCol=xgb_regressor.getLabelCol(), predictionCol=xgb_regressor.getPredictionCol())

cross_validator = CrossValidator(estimator=xgb_regressor, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=5, seed=6)

pipeline = Pipeline(stages=[string_indexer, ohe_encoder, vector_assembler, cross_validator])
pipeline_model = pipeline.fit(raw_traindf)

In [None]:
from pyspark.sql.functions import exp
from pyspark.sql.functions import *

predictions = pipeline_model.transform(raw_testdf)
exp_predictions = predictions.withColumn("SalePrice", exp(col("prediction")))

df_results_xgboost = exp_predictions.select("Id", "SalePrice")
df_results_xgboost = df_results_xgboost.withColumn("Id", col("Id").cast("Int"))
xgboost_finalresult = df_results_xgboost.select('*').orderBy('Id', ascending = True)
display(xgboost_finalresult)

Id,SalePrice
1461,122968.28306065527
1462,161259.5310125469
1463,185296.06893467784
1464,198254.6484565094
1465,181119.03606293187
1466,169031.09687869932
1467,180827.70478765265
1468,169260.47965501068
1469,178821.07545897627
1470,122106.55515098263


### 2. GBT

In [None]:
from pyspark.ml.regression import GBTRegressor
 
GBT_regressor = GBTRegressor(labelCol="LogSalePrice")

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
 
paramGrid = (ParamGridBuilder()
             .addGrid(GBT_regressor.maxDepth, [3,5])
             .addGrid(GBT_regressor.maxBins, [36,38])
             .addGrid(GBT_regressor.maxIter, [20,25])
             .build())

evaluator = RegressionEvaluator(metricName="rmse", labelCol=GBT_regressor.getLabelCol(), predictionCol=GBT_regressor.getPredictionCol())

cross_validator = CrossValidator(estimator=GBT_regressor, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=5, seed=6)

pipeline = Pipeline(stages=[string_indexer, ohe_encoder, vector_assembler, cross_validator])
pipeline_model = pipeline.fit(raw_traindf)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.functions import exp

predictions = pipeline_model.transform(raw_testdf)
exp_predictions = predictions.withColumn("SalePrice", exp(col("prediction")))

df_results_gbt = exp_predictions.select("Id", "SalePrice")
df_results_gbt = df_results_gbt.withColumn("Id", col("Id").cast("Int"))
gbt_finalresult = df_results_gbt.select('*').orderBy('Id', ascending = True)
display(gbt_finalresult)

Id,SalePrice
1461,123359.88912172208
1462,161395.99963218725
1463,184864.5111800076
1464,198722.77127447736
1465,182538.17760762505
1466,171145.78809865832
1467,180398.81188725465
1468,168526.80909572152
1469,182334.0936263377
1470,121830.41583358828


### 3. RF

In [None]:
from pyspark.ml.regression import RandomForestRegressor

# Model
rf = RandomForestRegressor(labelCol="LogSalePrice", maxBins=40, seed=42)

# Performance Measure 
evaluator = RegressionEvaluator(labelCol="LogSalePrice", 
                                predictionCol="prediction", 
                                metricName="rmse")
# Cross-validation
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [10, 15]) 
            .addGrid(rf.numTrees, [700]) #10p
            .addGrid(rf.maxBins, [44]) #>=26(maxBins to be at least as large as the number of values in each categorical feature.) 
            .build())

cv = CrossValidator(estimator=rf, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=5, 
                    parallelism=8, 
                    seed=42)

# Training
pipeline = Pipeline(stages=[string_indexer, ohe_encoder, vector_assembler, cv])
pipelineModel = pipeline.fit(raw_traindf)

In [None]:
from pyspark.sql.functions import exp
from pyspark.sql.functions import *

predictions = pipelineModel.transform(raw_testdf)
exp_predictions = predictions.withColumn("SalePrice", exp(col("prediction")))

df_results_rf = exp_predictions.select("Id", "SalePrice")
df_results_rf = df_results_rf.withColumn("Id", col("Id").cast("Int"))
rf_finalresult = df_results_rf.select('*').orderBy('Id', ascending = True)
display(rf_finalresult)

## Blended Model

In [None]:
# to change
xgboost = xgboost_finalresult.withColumnRenamed('SalePrice', 'p_xgboost')
rf = rf_finalresult.withColumnRenamed('SalePrice', 'p_rf')
lr = lr_finalresult.withColumnRenamed('SalePrice', 'p_lr')
gbt = gbt_finalresult.withColumnRenamed('SalePrice', 'p_gbt')
result_df = xgboost.join(rf, on='Id', how='inner')
result_df = result_df.join(lr, on='Id', how='inner')
result_df = result_df.join(gbt, on='Id', how='inner')
result_df = result_df.withColumn('SalePrice', (col("p_xgboost")+col("p_rf") + col('p_lr') + col('p_gbt'))/4).select('Id', 'SalePrice')
display(result_df)

# Comparison of model result

* After comparing the best rmse of cross validation in each model, we find that xgboost provides the smallest rmse. 
* Therefore, we finally chose the prediction result of xgboost model as our final result.

In [None]:
finalresult_df = xgboost_finalresult

#Final result (please download this csv)

In [None]:
#After comparison, we find that xgboost performs the best
display(finalresult_df)