### Feature Engineering with PySpark

#### This program summarizes this course on Feature Engineering, which focuses on building a model to predict how much a house sells for. 
    
#### The dataset we have is a sample of homes that were sold in St Paul, MN area over the course of 2017. 
    
#### Using this sample, we are to provide a quick proof of concept of whether it is worth investing in more data for the 5.5 milion homes that were sold in the US In 2017.
    
##### Note: Dowload data at https://assets.datacamp.com/production/repositories/1704/datasets/d26c25f46746882d0a0f474cc6709c629f69872c/2017_StPaul_MN_Real_Estate.csv

### Part 0: Add PySpark to path and create a SparkSession 

In [1]:
# Add PySpark to sys.path at runtime
import findspark
findspark.init()

# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
# Create or get a SparkSession
spark = SparkSession.builder.getOrCreate()

### Part 1: Exploratory data analysis (EDA)
#### (1) columns names and dtypes, statistics (describe()), number of rows and columns (5000, 74) 
#### (2) correlations between Xi and Y, distribution and distplot(), and lmplot() (linear model)

In [2]:
# (1.1) Read in CSV as Spark DataFrame (first row as column names)
df = spark.read.csv('../../data/2017_StPaul_MN_Real_Estate.csv', 
                    header=True, inferSchema=True)

# (1.2) Correct date columns dtype
from pyspark.sql.functions import to_date

# convert LISTDATE to date
df = df.withColumn('LISTDATE', to_date('LISTDATE', 'MM/dd/yyyy HH:mm'))

# convert OFFMKTDATE to date
df = df.withColumn('OFFMKTDATE', to_date('OFFMKTDATE', 'MM/dd/yyyy HH:mm'))


### <font color=brown> Part 2: Feature engineering

### (2.0) Categorize columns

In [3]:
# Columns to drop
cols_to_drop = ['UNITNUMBER','Class','STREETNUMBERNUMERIC','LOTSIZEDIMENSIONS','MLSID','streetaddress','STREETNAME','PostalCode','StateOrProvince','PricePerTSFT','MapLetter','RoomFloor1','RoomFloor2','RoomFloor3','RoomFloor4','RoomFloor5','RoomFloor6','RoomFloor7','RoomFloor8']

# Columns to extract features from
cols_to_extract = ['FENCE','ROOF','PoolDescription','GARAGEDESCRIPTION','APPLIANCES','EXTERIOR','DiningRoomDescription','BASEMENT','BATHDESC','ZONING','CoolingDescription','ROOMFAMILYCHAR','roomtype']

# Categorical columns for one hot encoding
categorical_cols = ['CITY','LISTTYPE','SCHOOLDISTRICTNUMBER','PotentialShortSale','STYLE','AssumableMortgage','ASSESSMENTPENDING']

# Temperary columns (not yet suitable as feature)
temperary_cols = ['longitude','latitude','NO','PDOM','RoomArea1','RoomArea2','RoomArea3','RoomArea4','RoomArea5','RoomArea6','RoomArea7','RoomArea8','LISTDATE','OFFMKTDATE','DAYSONMARKET','BACKONMARKETDATE']

# Label column (variable to be predicted)
label_col = ['SALESCLOSEPRICE']

# Feature columns (more to be added)
feature_cols = list(set(df.columns)-set(cols_to_drop)-set(cols_to_extract)-set(categorical_cols)-set(temperary_cols)-set(label_col))
print(feature_cols)  # 27

['PERCENT_OLDER_AGE_HOMES', 'ACRES', 'TAXWITHASSESSMENTS', 'BATHSTHREEQUARTER', 'PERCENT_SIMILAR_AGE_HOMES', 'GarageDescription', 'PERCENT_BIGGER_SIZE_HOMES', 'BATHQUARTER', 'YEARBUILT', 'LISTPRICE', 'LIVINGAREA', 'FOUNDATIONSIZE', 'SQFTABOVEGROUND', 'SQFTBELOWGROUND', 'BATHSTOTAL', 'OriginalListPrice', 'TAXES', 'ASSESSEDVALUATION', 'ASSOCIATIONFEE', 'FIREPLACES', 'PERCENT_SIMILAR_SIZE_HOMES', 'BEDROOMS', 'BATHSHALF', 'PERCENT_NEWER_AGE_HOMES', 'PERCENT_SMALLER_SIZE_HOMES', 'BATHSFULL', 'TAXYEAR']


### (2.1) Drop unuseful columns

In [4]:
# Drop a list of unuseful columns (CLASS is constant)
df = df.drop(*cols_to_drop)  # 76 -> 57

### (2.2) Use text filter to remove rows

In [5]:
# (2.2.1) Use text filter to remove 'Not Disclosed' POTENTIALSHORTSALE
df = df.where(~df['PotentialShortSale'].like('Not Disclosed'))
#print(df.count())  # 5000 -> 4989

# (2.2.2) Use text filters to remove ASSUMABLEMORTGAGE, which is an unusual occurrence in the real estate
df.select(['AssumableMortgage']).distinct().show()

# List of possible values containing 'yes'
yes_values = ['Yes w/ Qualifying', 'Yes w/No Qualifying']

# Filter the text values out of df but keep null values
text_filter = ~df['AssumableMortgage'].isin(yes_values) | df['ASSUMABLEMORTGAGE'].isNull()
df = df.where(text_filter)

#print(df.count())  # 4989 -> 4965

+-------------------+
|  AssumableMortgage|
+-------------------+
|  Yes w/ Qualifying|
| Information Coming|
|               null|
|Yes w/No Qualifying|
|      Not Assumable|
+-------------------+



### (2.3) Find and remove outliers using skewness of normal distribution 

In [6]:
# (2.3.1) Check the skewness (distortion from the normal distribution) of LISTPRICE and SALESCLOSEPRICE
"""For Random Forest Regression, no need to convert variable to be standard normally distributed."""
from pyspark.sql.functions import skewness

# Compute and print skewness of LISTPRICE
print(df.agg({'LISTPRICE': 'skewness'}).collect())  # 1.20, positive-->left skewed, log(var)->correct left skew data to be normal

# Compute and print skewness of SALESCLOSEPRICE
print(df.agg({'SALESCLOSEPRICE': 'skewness'}).collect())  # 1.16

[Row(skewness(LISTPRICE)=2.788039060845876)]
[Row(skewness(SALESCLOSEPRICE)=2.6213496232050693)]


In [7]:
# (2.3.2)  Filter out any outlier homes whose log_ScaledListPrice and log_ScaledClosePrice are significantly more or les than the average
from pyspark.sql.functions import mean, stddev, log

def remove_outlier(df, cols):
    for col in cols:
        # calculate values used for filtering
        std_val = df.agg({col: 'stddev'}).collect()[0][0]
        mean_val = df.agg({col: 'mean'}).collect()[0][0]
        # create three std upper and lower bounds for data
        hi_bound = mean_val + (3*std_val)
        low_bound = mean_val - (3*std_val)
        # use where() to filter the DataFrame between values
        df = df.where((df[col]<hi_bound) & (df[col]>low_bound))
    return df

# Define logarithmic scaled list and close prices
df = df.withColumn('log_ScaledListPrice',log(df['LISTPRICE']))
df = df.withColumn('log_ScaledClosePrice', log(df['SALESCLOSEPRICE']))

# Drop outlier LISTPRICE and log_ScaledClosePrice (both are left skewed, logarithmic scale turns them to be normally distributed)
df = remove_outlier(df, ['log_ScaledListPrice', 'log_ScaledClosePrice'])

# Print count of remaining records
#print(df.count())  # 4965 -> 4924

# Drop the log_ScaledListPrice and log_ScaledClosePrice, as we only use them to find outliers
df = df.drop(*['log_ScaledListPrice','log_ScaledClosePrice'])

### (2.4) Get more data from other DataFrames

In [8]:
# (2.4.1) Load location DataFrame that contains WALKSCORE and BIKESCORE etc.
walk_df = spark.read.csv('../../data/location.csv', header=True, inferSchema=True)

# Cast data types
walk_df = walk_df.withColumn('longitude', walk_df['longitude'].cast('double'))
walk_df = walk_df.withColumn('latitude', walk_df['latitude'].cast('double'))

# Round precision
from pyspark.sql.functions import round
df = df.withColumn('longitude', round(df['longitude'], 5))
df = df.withColumn('latitude', round(df['latitude'], 5))

# Create join condition
condition = ['longitude','latitude']

# Join the dataframes together
df = df.join(walk_df, on=condition, how='left')

# Add new columns to feature columns
feature_cols += ['walkscore','bikescore']  # 29

In [9]:
# (2.4.2) Add median home value from previous years at different cities
from pyspark.sql.functions import year

# Load price dataframe
price_df = spark.read.csv('../../data/price.csv', header=True, inferSchema=True)

# Create year column
df = df.withColumn('LIST_YEAR', year('LISTDATE'))

# Adjust year to match
df = df.withColumn('REPORT_YEAR', (df['LIST_YEAR'] - 1))

# Create join condition
condition = [df['CITY'] == price_df['City'], df['REPORT_YEAR'] == price_df['year']]

# Join the dataframes together
df = df.join(price_df, on=condition, how='left').drop(price_df.City)
#df = df.drop('City')

# Inspect that new columns are available
#df[['MedianHomeValue']].show()

# Add column to feature columns?
feature_cols += ['MedianHomeValue']

In [10]:
# (2.4.3) Add mortgage rates: same week as the list date, 1-week, 2-week, 3-week and 4-week ahead of list date
"""PART I: define lagged mortgage rates"""  
from pyspark.sql.functions import lag, datediff
from pyspark.sql.window import Window
  
# Load mortgage dataframe and cast data type
mort_df = spark.read.csv('../../data/mortgage.csv', header=True, inferSchema=True)
mort_df = mort_df.withColumn('DATE', to_date(mort_df['DATE']))

# Create window
w = Window().orderBy(mort_df['DATE'])

# Create lag columns of DATE and MORTGAGEUS30
for i in range(4):
    mort_df = mort_df.withColumn('MORTGAGE30US-'+str(i+1)+'WK', lag('MORTGAGE30US', count=i+1).over(w))

# Calculate difference between date columns
#mort_df = mort_df.withColumn('Days_Between_Report', datediff('DATE', 'DATE-1'))
#mort_df.select('Days_Between_Report').distinct().show()

"""PART II: define year and week of year for df and mort_df"""
# Create week of year for df and mort_df
from pyspark.sql.functions import to_date, weekofyear

# Convert to date type
mort_df = mort_df.withColumn('DATE', to_date('DATE', 'MM/dd/yyyy HH:mm'))

# Get the year and week of the year
mort_df = mort_df.withColumn('LIST_YEAR', year('DATE'))
mort_df = mort_df.withColumn('LIST_WEEKOFYEAR', weekofyear('DATE'))

# Drop column DATE
mort_df = mort_df.drop('DATE')

# Get the week of the year
df = df.withColumn('LIST_WEEKOFYEAR', weekofyear('LISTDATE'))

"""PART III: join df and mort_df on year and week of year"""
# Join df and mort_df
condition = ['LIST_YEAR', 'LIST_WEEKOFYEAR']
df = df.join(mort_df, on=condition, how='left')

# Add new columns to feature columns
feature_cols += ['LIST_YEAR','MORTGAGE30US','MORTGAGE30US-1WK','MORTGAGE30US-2WK','MORTGAGE30US-3WK','MORTGAGE30US-4WK']  # 35

### (2.5) Generate features by combining fields

In [11]:
# (2.5.1) Generate features by getting ratios between fields
# ASSESSED_TO_LIST
df = df.withColumn('ASSESSED_TO_LIST', (df['ASSESSEDVALUATION'] / df['LISTPRICE']))

# TAX_TO_LIST
df = df.withColumn('TAX_TO_LIST', (df['TAXES'] / df['LISTPRICE']))

# BED_TO_BATHS
df = df.withColumn('BED_TO_BATHS', (df['BEDROOMS'] / df['BATHSTOTAL']))

# (2.5.2) Generate feature by adding two fields
df = df.withColumn('SQFT_TOTAL', df['SQFTBELOWGROUND'] + df['SQFTABOVEGROUND'])

# Add new columns to feature columns
feature_cols += ['ASSESSED_TO_LIST','TAX_TO_LIST','BED_TO_BATHS','SQFT_TOTAL']  # 39

### (2.5) Convert string features to numeric ones

In [12]:
# Convert dimension description of rooms to numeric values
from pyspark.sql.functions import regexp_replace, split

# columns to be transformed
dimension_cols = ['RoomArea1', 'RoomArea2','RoomArea3','RoomArea4','RoomArea5','RoomArea6','RoomArea7','RoomArea8']
for col in dimension_cols:
    # replace 'X' with 'x', split on 'x', then multiply the 2 values
    df = df.withColumn(col, regexp_replace(col, 'X', 'x'))
    df = df.withColumn(col, split(df[col], 'x'))
    df = df.withColumn(col, df[col][0] * df[col][1])
    
# Add new columns to to feature columns
feature_cols += dimension_cols  # 47

### (2.5) Extract features from free form text

In [13]:
# Split and explode, and pivot and join
from pyspark.sql.functions import explode, lit, coalesce, first

def extract_features(df, col):
    """
    Extract features from free_form text
    """
    global extracted_features  # can be changed within functions
    
    # convert string to list-like array
    df = df.withColumn(col+'_list', split(df[col], ', '))

    # explode the values into new records
    ex_df = df.withColumn('ex_'+col+'_list', explode(df[col+'_list']))

    # create a dummy column of constant value
    ex_df = ex_df.withColumn('constant_val', lit(1))

    # pivot the values 
    piv_df = ex_df.groupBy('NO').pivot('ex_'+col+'_list').agg(coalesce(first('constant_val')))

    # rename pivoted column names except 'NO' for join operations later
    for col_name in list(set(piv_df.columns)-set(['NO'])):
        piv_df = piv_df.withColumnRenamed(col_name, col+'_'+col_name)
    
    # add new columns to feature columns
    extracted_features += list(set(piv_df.columns)-set(['NO']))
    
    # drop col_list
    df = df.drop(col+'_list')
    
    # join the dataframes and fill null
    df = df.join(piv_df, on='NO', how='left')

    # columns to zero fill
    zfill_cols = piv_df.columns

    # zero fill the pivoted values
    df = df.fillna(0, subset=zfill_cols)
    
    return df

# Define a list of extracted feature names
extracted_features = []

#======================================================#
# DUE TO LIMITED MEMEORY, ONLY EXTRACT FIRST 2 COLUMNS #
#======================================================#
# Extract features from selected columns
for col in cols_to_extract[0:2]:
    df = extract_features(df, col)
    
# Add new columns to feature columns
feature_cols += extracted_features
    
print(feature_cols)

['PERCENT_OLDER_AGE_HOMES', 'ACRES', 'TAXWITHASSESSMENTS', 'BATHSTHREEQUARTER', 'PERCENT_SIMILAR_AGE_HOMES', 'GarageDescription', 'PERCENT_BIGGER_SIZE_HOMES', 'BATHQUARTER', 'YEARBUILT', 'LISTPRICE', 'LIVINGAREA', 'FOUNDATIONSIZE', 'SQFTABOVEGROUND', 'SQFTBELOWGROUND', 'BATHSTOTAL', 'OriginalListPrice', 'TAXES', 'ASSESSEDVALUATION', 'ASSOCIATIONFEE', 'FIREPLACES', 'PERCENT_SIMILAR_SIZE_HOMES', 'BEDROOMS', 'BATHSHALF', 'PERCENT_NEWER_AGE_HOMES', 'PERCENT_SMALLER_SIZE_HOMES', 'BATHSFULL', 'TAXYEAR', 'walkscore', 'bikescore', 'MedianHomeValue', 'LIST_YEAR', 'MORTGAGE30US', 'MORTGAGE30US-1WK', 'MORTGAGE30US-2WK', 'MORTGAGE30US-3WK', 'MORTGAGE30US-4WK', 'ASSESSED_TO_LIST', 'TAX_TO_LIST', 'BED_TO_BATHS', 'SQFT_TOTAL', 'RoomArea1', 'RoomArea2', 'RoomArea3', 'RoomArea4', 'RoomArea5', 'RoomArea6', 'RoomArea7', 'RoomArea8', 'FENCE_Partial', 'FENCE_Rail', 'FENCE_Wire', 'FENCE_Privacy', 'FENCE_Chain Link', 'FENCE_Invisible', 'FENCE_Wood', 'FENCE_Full', 'FENCE_Electric', 'FENCE_Other', 'FENCE_None'

### (2.6) Binarizing to distinguish if house is listed on a week day

In [14]:
# Import needed functions
from pyspark.sql.functions import to_timestamp, dayofweek

# Convert to date type (already converted)
#df = df.withColumn('LISTDATE', to_date('LISTDATE', 'MM/dd/yyyy HH:mm'))

# Get the day of the week
df = df.withColumn('LIST_DAYOFWEEK', dayofweek('LISTDATE'))

# Import transformer
from pyspark.ml.feature import Binarizer

# Create the transformer (>5 --> 1)
binarizer = Binarizer(threshold=5.0, inputCol='LIST_DAYOFWEEK', outputCol='Listed_On_Weekend')

# Apply the transformation to df
df = df.withColumn('LIST_DAYOFWEEK', df['LIST_DAYOFWEEK'].cast('double'))
df = binarizer.transform(df)  # 'LIST_DAYOFWEEK' must be double

# Add new columns to feature columns
feature_cols += ['LIST_DAYOFWEEK']

### (2.7) Bucketing of BEDROOMS to 7 'bukets'

In [15]:
from pyspark.ml.feature import Bucketizer

# Create the bucket splits and bucketizer
splits = [0, 1, 2, 3, 4, 5, float('Inf')]
buck = Bucketizer(splits=splits, inputCol='BEDROOMS', outputCol='bedrooms')

# Apply the transformation to df
df = buck.transform(df)

# Rename bedrooms
df = df.drop('BEDROOMS')
df = df.withColumnRenamed('bedrooms','BEDROOMS')

### (2.8) One hot encoding 

In [16]:
# For categorical values, we can just map the text values to numbers, the use one hot encoding to transform them to numeric vectors
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.pipeline import Pipeline

# (a) Create list of StringIndexers using list comprehension
indexers = [StringIndexer(inputCol=col, outputCol=col+'_IDX').setHandleInvalid("keep") for col in categorical_cols]

# Create pipeline of indexers
indexer_pipeline = Pipeline(stages=indexers)

# Fit and Transform the pipeline to the original data
df = indexer_pipeline.fit(df).transform(df)

# Clean up redundant columns no longer needed
df = df.drop(*categorical_cols)

# Inspect data transformations
#print(df.dtypes)

# (b) One hot encode indexed values
encoders = [OneHotEncoder(inputCol=col+'_IDX', outputCol=col+'_Vec') for col in categorical_cols]

# Create pipeline of indexers
encoder_pipeline = Pipeline(stages=encoders)

# Fit and Transform the pipeline to the data
df = encoder_pipeline.fit(df).transform(df)

# Add new columns to feature columns
feature_cols += [col+'_Vec' for col in categorical_cols]

print(df.columns)
df.select(['CITY_IDX','CITY_Vec']).show()

['NO', 'LIST_YEAR', 'LIST_WEEKOFYEAR', 'longitude', 'latitude', 'SALESCLOSEPRICE', 'LISTDATE', 'LISTPRICE', 'OriginalListPrice', 'FOUNDATIONSIZE', 'FENCE', 'DAYSONMARKET', 'OFFMKTDATE', 'FIREPLACES', 'RoomArea4', 'roomtype', 'ROOF', 'PoolDescription', 'PDOM', 'GarageDescription', 'SQFTABOVEGROUND', 'TAXES', 'RoomArea1', 'TAXWITHASSESSMENTS', 'TAXYEAR', 'LIVINGAREA', 'YEARBUILT', 'ZONING', 'ACRES', 'CoolingDescription', 'APPLIANCES', 'BACKONMARKETDATE', 'ROOMFAMILYCHAR', 'RoomArea3', 'EXTERIOR', 'RoomArea2', 'DiningRoomDescription', 'BASEMENT', 'BATHSFULL', 'BATHSHALF', 'BATHQUARTER', 'BATHSTHREEQUARTER', 'BATHSTOTAL', 'BATHDESC', 'RoomArea5', 'RoomArea6', 'RoomArea7', 'RoomArea8', 'SQFTBELOWGROUND', 'ASSOCIATIONFEE', 'ASSESSEDVALUATION', 'PERCENT_OLDER_AGE_HOMES', 'PERCENT_SIMILAR_AGE_HOMES', 'PERCENT_NEWER_AGE_HOMES', 'PERCENT_BIGGER_SIZE_HOMES', 'PERCENT_SIMILAR_SIZE_HOMES', 'PERCENT_SMALLER_SIZE_HOMES', 'walkscore', 'bikescore', 'transitscore', 'REPORT_YEAR', 'MedianHomeValue', 'Yea

### (2.4) Drop columns with low observations

In [17]:
# (2.4.1) Automate dropping columns if they are missing data beyond a specific threshold
def column_dropper(df, threshold):
    # Takes a dataframe and threshold for missing values. Returns a dataframe.
    total_records = df.count()
    for col in df.columns:
        # Calculate the percentage of missing values
        missing = df.where(df[col].isNull()).count()
        missing_percent = missing / total_records
        # Drop column if percent of missing is more than threshold
        if missing_percent > threshold:
            df = df.drop(col)
    return df

# Drop columns that are more than 60% missing
df = column_dropper(df, 0.6)

In [18]:
# (2.4.2) Drop columns with observations less than 30 (minimum number for statistical significance)
# Get the names of binary columns from extracted features
binary_cols = extracted_features

# Get column
obs_threshold = 30
cols_to_remove = list()
for col in binary_cols:
    # Count the number of 1 values in the binary column
    obs_count = df.agg({col: 'sum'}).collect()[0][0]
    # If less than our observation threshold, remove
    if obs_count < obs_threshold:
        cols_to_remove.append(col)
    
# Drop columns
df = df.drop(*cols_to_remove)

### (2.5) Naively handling missing values

In [19]:
# Replace missing values (missing values in categoricals are filled with 0 already)
#df = df.fillna(-1, subset=df.columns)
df = df.fillna(-1)

### <font color=brown> Part 3: Split data into train and test sets

In [20]:
# (3.0) PySpark ML algorithms require all of the features to be provided in a single column of type vector.
from pyspark.ml.feature import VectorAssembler

# Define the columsn to be converted to vectors (slimmed down features)
feature_cols = ['SQFT_TOTAL','TAXES','LIVINGAREA','SQFTABOVEGROUND','BATHSTOTAL','YEARBUILT','FIREPLACES','BATHSHALF','walkscore']  

# Create teh vector assembler transformer
vec = VectorAssembler(inputCols=feature_cols, outputCol='features')

# Apply the vector transformer to data ('features' column will be added)
df = vec.transform(df)

# Select only the feature vectors and the dependent variable
ml_ready_df = df.select(['SALESCLOSEPRICE','features'])
ml_ready_df.show()

+---------------+--------------------+
|SALESCLOSEPRICE|            features|
+---------------+--------------------+
|         799000|[5586.0,7570.0,55...|
|         244000|[1651.0,2682.0,16...|
|         255000|[1776.0,3424.0,17...|
|         250000|[1567.0,2672.0,15...|
|         231700|[1750.0,2283.0,17...|
|          81400|[724.0,1012.0,724...|
|         120000|[920.0,1358.0,920...|
|         135000|[1096.0,1612.0,10...|
|         145000|[884.0,1635.0,884...|
|         150730|[1550.0,1431.0,15...|
|         160000|[900.0,2127.0,900...|
|         160000|[864.0,1891.0,864...|
|         180000|[2171.0,1155.0,21...|
|         200000|[944.0,2264.0,944...|
|         206925|[1620.0,1482.0,16...|
|         190000|[1146.0,2388.0,11...|
|         214000|[1972.0,2170.0,19...|
|         245000|[1452.0,2190.0,14...|
|         279000|[1825.0,3354.0,18...|
|         547750|[2916.0,8178.0,29...|
+---------------+--------------------+
only showing top 20 rows



In [21]:
# (3.1) Train on all available data except for the last 45 days, which you want to use for a test set.
from datetime import timedelta
from pyspark.sql.functions import datediff, to_date, lit

def train_test_split_date(df, split_col, test_days=45):
    """Calculate the date to split test and training sets"""
    # Find how many days our data spans
    max_date = df.agg({split_col: 'max'}).collect()[0][0]
    min_date = df.agg({split_col: 'min'}).collect()[0][0]
    # Subtract an integer number of days from the last date in dataset
    split_date = max_date - timedelta(days=test_days)
    return split_date

# Find the date to use in spitting test and train
split_date = train_test_split_date(df, 'OFFMKTDATE')

# Create Sequential Test and Training Sets
train_df = df.where(df['OFFMKTDATE'] < split_date)
test_df = df.where(df['OFFMKTDATE'] >= split_date).where(df['LISTDATE'] <= split_date)

In [22]:
# (3.2) Adjust time feature DAYSONMARKET
# Recalculate DAYSONMARKET from what we know on our split date
test_df = test_df.withColumn('DAYSONMARKET', datediff(lit(split_date), 'LISTDATE'))

# Review the difference
test_df[['LISTDATE', 'OFFMKTDATE', 'DAYSONMARKET']].show()

+----------+----------+------------+
|  LISTDATE|OFFMKTDATE|DAYSONMARKET|
+----------+----------+------------+
|2017-12-07|2017-12-18|           3|
|2017-10-24|2017-12-19|          47|
|2017-10-05|2017-12-19|          66|
|2017-11-17|2017-12-13|          23|
|2017-07-27|2017-12-13|         136|
|2017-08-02|2017-12-13|         130|
|2017-11-13|2017-12-18|          27|
|2017-11-28|2018-01-02|          12|
|2017-12-10|2017-12-14|           0|
|2017-09-25|2017-12-14|          76|
|2017-07-27|2017-12-20|         136|
|2017-11-29|2017-12-18|          11|
|2017-08-14|2018-01-09|         118|
|2017-11-22|2018-01-08|          18|
|2017-11-17|2017-12-22|          23|
|2017-11-27|2017-12-27|          13|
|2017-10-23|2017-12-12|          48|
|2017-07-05|2017-12-19|         158|
|2017-10-06|2017-12-14|          65|
|2017-12-04|2017-12-17|           6|
+----------+----------+------------+
only showing top 20 rows



### <font color=brown> Part 4: Train model, make predictions, and evaluate model performance

In [23]:
# (4.1.1) Train a Random Forest Regressor (RFR) and a GBT Regressor model
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor

# initialize a RFR model with columns to utilize
rfr = RandomForestRegressor(featuresCol="features",
                            labelCol="SALESCLOSEPRICE",
                            predictionCol="Prediction_Price",
                            seed=42)

# train the RFR model
rfr_model = rfr.fit(train_df)

# make predictions
rfr_predictions = rfr_model.transform(test_df)

In [24]:
# (4.1.2) Train a Gradient Boosted Trees (GBT) model
gbt = GBTRegressor(featuresCol='features',
                   labelCol='SALESCLOSEPRICE',
                   predictionCol="Prediction_Price",
                   seed=42)

# train the GBT model
gbt_model = gbt.fit(train_df)

# make predictions
gbt_predictions = gbt_model.transform(test_df)

In [25]:
# (4.2) Inspect results
rfr_predictions.select('Prediction_Price','SALESCLOSEPRICE').show()
gbt_predictions.select('Prediction_Price','SALESCLOSEPRICE').show()

+------------------+---------------+
|  Prediction_Price|SALESCLOSEPRICE|
+------------------+---------------+
| 477932.5759685775|         415000|
|151340.02363332474|          85000|
| 587096.6799408385|         625000|
|188066.02407784035|         170000|
|213134.80352795072|         140000|
|178352.33411807456|         162500|
|187511.62608683787|         205000|
|249954.92224734588|         275000|
|157001.55870608546|         109900|
|181616.74266121833|         166140|
| 223712.8217376111|         214900|
|164231.34927270835|         146000|
|145066.02885670474|          78900|
|181746.24445993835|         240000|
|382794.44830720796|         385000|
| 485965.2284785349|         625000|
| 200371.6124723446|         230000|
| 298326.4769805313|         259000|
|415643.60868714296|         398000|
|149100.89447796842|         133000|
+------------------+---------------+
only showing top 20 rows

+------------------+---------------+
|  Prediction_Price|SALESCLOSEPRICE|
+-----------

In [26]:
# (4.3) Evaluate a model
from pyspark.ml.evaluation import RegressionEvaluator

# select columns to compute test error
evaluator = RegressionEvaluator(labelCol='SALESCLOSEPRICE',
                                predictionCol='Prediction_Price')

# dictionary of model predictions to loop over
models = {'Gradient Boosted Trees': gbt_predictions, 'Random Forest Regression': rfr_predictions}

for key, preds in models.items():
    # create evaluation metrics
    rmse = evaluator.evaluate(preds, {evaluator.metricName: 'rmse'})
    r2 = evaluator.evaluate(preds, {evaluator.metricName: 'r2'})
    # print Model Metrics
    print(key + ' RMSE: ' + str(rmse))
    print(key + ' R^2: ' + str(r2))

Gradient Boosted Trees RMSE: 73592.23159304303
Gradient Boosted Trees R^2: 0.6555645898757021
Random Forest Regression RMSE: 60045.794225661586
Random Forest Regression R^2: 0.770697370293886


### <font color=brown> Part 5: Interpreting, saving and loading

In [27]:
# (5.1) Interpreting a model
import pandas as pd

for model in [rfr_model, gbt_model]:
    # convert feature importances to a pandas column
    fi_df = pd.DataFrame(model.featureImportances.toArray(),
                         columns=['importance'])
    
    # convert list of features names to pandas column 
    fi_df['feature'] = pd.Series(feature_cols)
    
    # sort the data based on feature importance
    fi_df.sort_values(by=['importance'], ascending=False, inplace=True)

    # interpret results
    print(fi_df.head())
    
    

   importance          feature
0    0.312659       SQFT_TOTAL
1    0.274874            TAXES
2    0.219070       LIVINGAREA
3    0.070935  SQFTABOVEGROUND
5    0.049717        YEARBUILT
   importance          feature
1    0.263247            TAXES
0    0.193094       SQFT_TOTAL
8    0.178850        walkscore
5    0.166326        YEARBUILT
3    0.087873  SQFTABOVEGROUND


In [None]:
# (5.2) Saving and loading models
# save model
rfr_model.save('rfr_real_estate_model')
gbt_model.save('gbt_real_estate_model')

# load model
from pyspark.ml.regression import RandomForestRegressionModel, GBTRegressionModel

rfr_model2 = RandomForestRegressionModel.load('rfr_real_estate_model')
gbt_model2 = GBTRegressionModel.load('gbt_real_estate_model')