# House Price Regression Analysis
### by Charlie LaBarge

In [51]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import col, avg

## Import training data

In [52]:
train_df = spark.read.csv("train.csv", header=True)

## Basic description of the dataset

In [53]:
train_df.printSchema()
print "Training set length: " + str(train_df.count())

root
 |-- Id: string (nullable = true)
 |-- MSSubClass: string (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: string (nullable = true)
 |-- OverallCond: string (nullable = true)
 |-- YearBuilt: string (nullable = true)
 |-- YearRemodAdd: string (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |-- Exteri

## Feature shaping + pre-processing

### Helper functions for feature shaping

In [54]:
# given a dataframe and a list of column names, cast those columns to double
def convertColsToDbl(dataframe, col_names):
    for col_name in col_names:
        # convert the column and drop the old column
        dataframe = dataframe.withColumn(col_name + "_asDbl", dataframe[col_name].cast("double")).drop(col_name)
        
    return dataframe

In [55]:
def getStringColumns(dataframe):
    strcols = []
    for column in dataframe.columns:
        # if its a string column, add to the list
        if(str(dataframe.schema[column].dataType) == 'StringType'):
            strcols.append(column)
    
    return strcols

In [56]:
# given a dataframe, convert the given columns to indexed columns (for pushing into a onehotencoder)
def stringIndexDf(dataframe, strcols):
    for column in strcols:
        indexer = StringIndexer(inputCol=column, outputCol=column+"_index")
        dataframe = indexer.fit(dataframe).transform(dataframe)
        dataframe = dataframe.drop(column)
        
    return dataframe

In [57]:
def oneHotDf(dataframe, strcols):
    for column in strcols:
        encoder = OneHotEncoder(inputCol=column+"_index", outputCol=column+"_onehot", dropLast=False)
        dataframe = encoder.transform(dataframe)
        dataframe = dataframe.drop(column+"_index")
        
    return dataframe

### Dealing with null values

In [58]:
# function to deal with null values
def fillNullVals(dataframe):
    for column in dataframe.columns:
        # if string column, replace with "Unknown" string
        if(str(dataframe.schema[column].dataType) == 'StringType'):
            dataframe = dataframe.fillna("Unknown", [column])
        elif (str(dataframe.schema[column].dataType) == 'DoubleType'):
            # calculate average value
            nonnull = dataframe.dropna(subset=[column])
            colAvg = nonnull.agg(avg(col(column)))
            colAvg = colAvg.rdd.map(lambda x: x[0]).first()
                        
            # replace null values with average value
            dataframe = dataframe.fillna(colAvg, [column])
            
    return dataframe

### Declare initial columns to convert to double val, and convert them

In [59]:
doubleCols = ['LotFrontage', 'LotArea', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd',
              'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 
              'GrLivArea', 'GarageYrBlt', 'GarageArea', 'MiscVal', 'YrSold',
              'WoodDeckSf', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea',
              'MasVnrArea']

In [60]:
train_df = convertColsToDbl(train_df, doubleCols)

### Deal with null values

In [61]:
train_df = fillNullVals(train_df)

### Index and one-hot encode string columns

In [62]:
# save the ids to a different df in case we need them, then drop them
ids_df = train_df.select("Id")
train_df = train_df.drop("Id")

# save the price column as the label
train_df = train_df.withColumn("label", train_df["SalePrice"].cast("Double")).drop("SalePrice")

In [64]:
# extract list of string columns
string_cols = getStringColumns(train_df)

# index the string columns, then one hot encode the indexed string columns 
train_df = stringIndexDf(train_df, string_cols)
train_df = oneHotDf(train_df, string_cols)

In [65]:
# remove label column from columns to be assembled
columns_less_label = list(train_df.columns)
columns_less_label.remove('label')

feature_pipeline = Pipeline(stages= [VectorAssembler(inputCols=columns_less_label, outputCol="features")])

train_df_transformed = feature_pipeline.fit(train_df).transform(train_df)

## Model 1: Random Forest Regressor

In [66]:
rf_model = RandomForestRegressor(maxDepth=30)

In [67]:
# sanity check before doing actual cross-validation of model
rf_fitted_on_all = rf_model.fit(train_df_transformed)
rf_fitted_on_all.transform(train_df_transformed).select("label", "prediction").show(5)

+--------+----------+
|   label|prediction|
+--------+----------+
|208500.0|  213172.5|
|181500.0|  176000.0|
|223500.0|  223100.0|
|140000.0| 143120.25|
|250000.0|  267990.0|
+--------+----------+
only showing top 5 rows



### 3-fold cross validation on RF model

In [68]:
paramGrid = ParamGridBuilder().build()
crossValidator = CrossValidator(estimator=rf_model,
                                estimatorParamMaps=paramGrid,
                                evaluator=RegressionEvaluator(),
                                numFolds=3)

rf_validated = crossValidator.fit(train_df_transformed)

#### RMSE for the model

In [69]:
rf_validated.avgMetrics

[30774.58762688168]

# Part 2: Different Feature Shaping Strategies and Impact on Performance

In [146]:
from pyspark.ml.feature import Bucketizer, MinMaxScaler, PCA
from pyspark.sql.functions import min, max, udf

## Setup: getting training data back in unshaped form

In [152]:
train_df = spark.read.csv("train.csv", header=True)

In [153]:
train_df = convertColsToDbl(train_df, doubleCols)
train_df = fillNullVals(train_df)

# save the ids to a different df in case we need them, then drop them
ids_df = train_df.select("Id")
train_df = train_df.drop("Id")

# save the price column as the label
train_df = train_df.withColumn("label", train_df["SalePrice"].cast("Double")).drop("SalePrice")

## Helper functions for feature transformations

In [154]:
# function to generate bin columns for a list of features
def binFeatures(dataframe, columnsToBin, numBins=10):
    for column in columnsToBin:
        minVal = dataframe.agg(min(col(column))).rdd.map(lambda x: x[0]).first()
        maxVal = dataframe.agg(max(col(column))).rdd.map(lambda x: x[0]).first()
        
        rangeVal = maxVal - minVal
        
        splits = [-float("inf")]
        
        bucketSize = 1.0*rangeVal/numBins
        
        for i in range(numBins):
            splits.append(float((minVal + i*bucketSize)))
            
        splits.append(float("inf"))
                    
        bucketizer = Bucketizer(splits=splits,
                                inputCol=column, outputCol=column+"_bucketed")
        dataframe = bucketizer.transform(dataframe)
        
    return dataframe

In [155]:
def normalizeFeatures(dataframe, featuresToNormalize):
    for column in featuresToNormalize:
        va = VectorAssembler(inputCols=[column],
                             outputCol=column+"_vec")
        dataframe = va.transform(dataframe)
        normalizer = MinMaxScaler(inputCol=column+"_vec", outputCol=column+"_normed")
        dataframe = normalizer.fit(dataframe).transform(dataframe)
        
        dataframe = dataframe.drop(column).withColumn(column, dataframe[column+"_normed"]).drop(column+"_vec")
        
    return dataframe

In [156]:
def addPCACols(dataframe, numDimensions=5):
    pca = PCA(k=numDimensions, inputCol="features", outputCol="pcaFeatures")
    va = VectorAssembler(inputCols=["features", "pcaFeatures"],
                             outputCol="features_final")
    
    dataframe = pca.fit(dataframe).transform(dataframe)
    dataframe = va.fit(dataframe).transform(dataframe)
    dataframe = dataframe.drop("features").withColumn("features", dataframe["features_final"])

In [133]:
# train_df_saved = train_df

In [148]:
# train_df = train_df_saved

## Grouping of categoricals + Combinations of categories

In [157]:
# Step 1: grouping of categoricals
residentialCategories = ['RH', 'RL', 'RP', 'RM']
zoningGrouping = udf(lambda x: 'Residential' if x in residentialCategories else 'Non-Residential')

oneFloorCategories = ['20','30', '40']
floorGrouping = udf(lambda x: 'OneFloor' if x in oneFloorCategories else 'NonOneFloor')

train_df = train_df.withColumn("ResidentialOrNot", zoningGrouping(train_df["MSZoning"]))
train_df = train_df.withColumn("OneFloor", floorGrouping(train_df["MSZoning"]))

In [159]:
# Step 2: combining variables
train_df = train_df.withColumn("QualityCombined", train_df["OverallQual_asDbl"]*train_df["OverallCond_asDbl"])
train_df = train_df.withColumn("YearAndMonthCombined", train_df["YrSold_asDbl"] + (train_df["MoSold"].cast("Double")/12.0))

## Binning of numerical variables

In [160]:
doubleCols_new = []
for colname in doubleCols:
    doubleCols_new.append(colname+"_asDbl")

# Step 3: bin features
train_df = binFeatures(train_df, doubleCols_new)

## Normalization of numerical variables

In [161]:
# Step 4: normalize features
train_df = normalizeFeatures(train_df, doubleCols_new)

## PCA on final feature vector

### Get final feature vector in order

In [162]:
# extract list of string columns
string_cols = getStringColumns(train_df)

# index the string columns, then one hot encode the indexed string columns 
train_df = stringIndexDf(train_df, string_cols)
train_df = oneHotDf(train_df, string_cols)

# remove label column from columns to be assembled
columns_less_label = list(train_df.columns)
columns_less_label.remove('label')

feature_pipeline = Pipeline(stages= [VectorAssembler(inputCols=columns_less_label, outputCol="features")])

train_df_transformed = feature_pipeline.fit(train_df).transform(train_df)

### Do the PCA

In [None]:
train_df_transformed = addPCACols(train_df_transformed)

## Rerun of RF model, now with cool feature transformations :)

In [None]:
paramGrid = ParamGridBuilder().build()
crossValidator = CrossValidator(estimator=rf_model,
                                estimatorParamMaps=paramGrid,
                                evaluator=RegressionEvaluator(),
                                numFolds=3)

rf_validated = crossValidator.fit(train_df_transformed)

In [None]:
rf_validated.avgMetrics