In [1]:
#####################################Import relevant libraries and packages###########################################################################

In [2]:
## spark imports
import random
random.seed(42)
import numpy as np
import pandas as pd

from pyspark import SparkContext
from pyspark.sql import SQLContext

from pyspark.sql import Row, SparkSession, Window
from pyspark.sql.functions import *
import pyspark.sql.functions as func
from pyspark.sql.functions import col, split, mean, count, isnan, when, udf, abs, sqrt, max, round
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml.feature import *
from pyspark.ml.feature import CountVectorizer, VectorIndexer, VectorAssembler,StringIndexer, OneHotEncoder, VectorSlicer
from pyspark.ml import Pipeline, feature
from pyspark.ml.regression import DecisionTreeRegressor, LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark import keyword_only
from pyspark.ml.base import Estimator
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.param.shared import HasOutputCol

In [3]:
#############################################User Defined Functions###########################################################################

In [4]:
# Convert a scaled value from 0 - 100 into a percentage value from 0 to 1
def calc_percentage(col):
  return(col/100)

In [5]:
# Divide any 2 columns in the dataframe as required
def div_two_cols(numerator, denominator):
  return(numerator/denominator)

In [6]:
# Categorize Debit Risk score into High, Medium or Low Risk as per thresholds specified by TransUnion
def categorize_debt_risk(col_name):
  if col_name < 707:
    return ("High Risk")
  elif col_name >= 769:
    return ("Low Risk")
  else:
    return("Medium Risk")

In [7]:
# Calculate proportional values in a column where the value fo the current index is divided by the sum of the total column
def determine_proportion(df,col_name):
  max_value = find_max_value_in_col(df,col_name)
  return(col_name/max_value)

In [8]:
#Check if data in cell, if no data present return 0 else 1
def true_false_categorization(text):
  if text != None:
    return (1)
  else:
    return(0)

In [9]:
# Recode cells with true or false values to be 1 or 0
def binary_encoding(text):
  if 'f' in text:
    return (0)
  else:
    return (1)


In [10]:
# Replace a characther in a string 
def replace_char(df,column_name,orig_char, new_char):
  return(df.withColumn(column_name, regexp_replace(column_name,orig_char,new_char)))

In [11]:
# Convert data in column to an array so as to use with Count Vectorizer
def convert_col_to_array(df,new_col,orig_col, split_char):
  return(df.withColumn(new_col,split(col(orig_col),split_char)))

In [12]:
def initialize_Count_Vectorizer(input_Col, output_Col):
  return(CountVectorizer(inputCol = input_Col, outputCol = output_Col))

In [13]:
def convert_dtype(df, colname,new_dtype):
  return(df.withColumn(colname, df[colname].cast(new_dtype)))

In [14]:
def categorize_ranks(rank):
  max_rank =  137
  if rank > 0 * max_rank and rank <=0.25 * max_rank:
    return("No-Low")
  elif rank > 0.25*max_rank and rank <=0.5 *max_rank:
    return("Low-Med")
  elif rank >0.5 * max_rank and rank <= 0.75*max_rank:
    return ("Med-High")
  else:
    return ("High")

In [15]:

def clean_cancellation_policy(text):
  if 'strict' in text:
    return('strict')
  else:
    return(text)

In [16]:
def cat_property_type(text):
  if 'Apartment' in text or 'House' in text or 'Condominium' in text:
    return(text)
  else:
    return('Other')

In [17]:
def smape(A, F):
    return 100/len(A) * np.sum(2 * np.abs(F - A) / (np.abs(A) + np.abs(F)))

In [18]:
def find_max_value_in_col(df,col_name):
  return(df.agg({col_name: "max"}).collect()[0][0])

In [19]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
  # Takes in feature importance from a model and map it to column names
  
  list_extract = []
  for i in dataset.schema[featuresCol].metadata['ml_attr']['attrs']:
    list_extract = list_extract + dataset.schema[featuresCol].metadata['ml_attr']['attrs'][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return (varlist.sort_values('score', ascending = False))

In [20]:
######################################Import relevant data###########################################################################################

In [21]:
# Import AirBnB dataset
sqlContext = SQLContext(sc)
df = sqlContext.sql('SELECT * FROM listings_new_csv')

#Drop rows with data quality issues - Hide if issues no longer exist in data import
#df = df.filter(col('property_type').isin(['Apartment', 'House', 'Condominium', 'Townhouse', 'Guest Suite', 'Bungalow', 'Loft', 'Serviced apartment', 'Bed and breakfast', 'Guesthouse', 'Villa', 'Boutique hotel', 'Hostel' ])) 

# Specify list of columns that need to be deleted as they won't impact the price to be predicted
columns_to_drop = ['id','listing_url', 'scrape_id', 'last_scraped','name', 'summary', 'space', 'description', 'experiences_offered',
                  'neighborhood_overview', 'notes', 'transit', 'access', 'interaction', 'thumbnail_url','medium_url', 'picture_url',
                  'xl_picture_url', 'host_id', 'host_url', 'host_name', 'host_since', 'host_location', 'host_about', 'host_response_time',
                  'host_response_rate', 'host_acceptance_rate', 'host_is_superhost', 'host_thumbnail_url', 'host_picture_url', 'host_listings_count',
                  'host_total_listings_count','host_verifications', 'host_has_profile_pic', 'host_identity_verified','street', 'neighbourhood_group_cleansed',
                  'city','state','market', 'smart_location','country_code','country','latitude', 'longitude', 'is_location_exact','weekly_price',
                  'monthly_price','minimum_nights', 'maximum_nights', 'minimum_minimum_nights', 'maximum_minimum_nights', 'minimum_maximum_nights',
                  'maximum_maximum_nights', 'calendar_updated', 'has_availability','availability_30','availability_60','availability_90', 
                  'availability_365','calendar_last_scraped', 'number_of_reviews', 'number_of_reviews_ltm', 'first_review', 'last_review',
                  'review_scores_rating','review_scores_accuracy','review_scores_cleanliness', 'review_scores_checkin', 'review_scores_communication',
                  'review_Scores_location', 'review_Scores_value', 'license', 'jurisdiction_names',  'requires_guest_profile',
                  'requires_guest_phone_verification','calculated_host_listings_count', 'calculated_host_listings_count_entire_homes','calculated_host_listings_count_private_rooms',
                  'calculated_host_listings_count_shared_rooms','reviews_per_month', 'host_neighbourhood', 'neighbourhood', 'zipcode','requires_license',
                   'is_business_travel_ready', 'require_guest_profile_picture', 'require_guest_phone_verification', 'extra_people', 'minimum_nights_avg_ntm','maximum_nights_avg_ntm', 'reviews_per_month']

#Drop columns listed above
data = df.drop(*columns_to_drop)


In [22]:
#Import Neighbourhood Dataset ie Wellbeing dataset with education and econominc indicators of neighbourhood population
neighbourhood_df = sqlContext.sql('SELECT * FROM wellbeing_toronto_csv').drop('Combined_Indicators', 'Neighbourhood_ID', 'Combined Indicators', 'Neighbourhood Id', 'Neighbourhood Equity Score')

#Determine rank of neighbourhood given column data
neighbourhood_df = neighbourhood_df.withColumn('Volume of TTC stops rank', dense_rank().over(Window.orderBy('TTC Stops'))) #Determine rank of neighbourhoods based on volume of TTC stops
neighbourhood_df = neighbourhood_df.withColumn('Volume of Health Providers rank', dense_rank().over(Window.orderBy('Health Providers'))) #Determine rank of neighbourhoods based on volume of Health Providers
neighbourhood_df = neighbourhood_df.withColumn('Home price rank', dense_rank().over(Window.orderBy('Home Prices\r'))) #Determine rank of neighbourhoods based on average home prices of neighbourhoods

# Define User Defined Functions on columns
udf_div_two_cols= udf(div_two_cols)
udf_perc_calc = udf(calc_percentage)
udf_cat_debt_risk = udf(categorize_debt_risk)

#Apply User Defined Functions
neighbourhood_df = neighbourhood_df.withColumn('Percentage_Social_Housing', round(udf_div_two_cols('Social Housing Units', 'Total Private Dwellings'),3)) #Calc % of social housing in neighbourhood
neighbourhood_df = neighbourhood_df.withColumn('Walk Score', round(udf_perc_calc('Walk Score'),2)) # Convert walk score to fraction
neighbourhood_df = neighbourhood_df.withColumn('Debt Risk Score', udf_cat_debt_risk('Debt Risk Score')) # Categorize Debt Risk score as specified by TransUnion

#Find proportion of row entry over total data in column eg: find prop a neighbourhood contributes to overall column category
neighbourhood_df = neighbourhood_df.withColumn('Total Population', round(func.col('Total Population')/func.sum('Total Population').over(Window.partitionBy()),3)) # Calc prop of population of neighbourhood over total population
neighbourhood_df = neighbourhood_df.withColumn('Total Area', round(func.col('Total Area')/func.sum('Total Area').over(Window.partitionBy()),3))  # Calc prop of area of neighbourhood over total area

#Drop extraneous columns
columns_to_drop = ['TTC Stops', 'Health Providers', 'Home Prices\r','Social Housing Units', 'Total Private Dwellings']
neighbourhood_df = neighbourhood_df.drop(*columns_to_drop)
neighbourhood_df= neighbourhood_df.na.fill(0)
# join the listings with the wellbeing neighbourhood data by neighbourhood name
airbnb_df = data.join(neighbourhood_df, data.neighbourhood_cleansed == neighbourhood_df.Neighbourhood).drop('Neighbourhood', 'Neighbourhood_ID')




In [23]:
#Import MCI data set and find average number of MCI events occurring in the neighbourhood and obtain ranks (1 - lowest occurances, max - highest occurances)
mci_df = sqlContext.sql("SELECT mci_2014_to_2018_csv.occurrenceyear, mci_2014_to_2018_csv.MCI, LEFT(mci_2014_to_2018_csv.Neighbourhood,POSITION(' (' IN mci_2014_to_2018_csv.Neighbourhood)-1) AS cleaned_Neighbourhood FROM mci_2014_to_2018_csv WHERE mci_2014_to_2018_csv.occurrenceyear >=2014")
mci_df = mci_df.groupBy(['cleaned_Neighbourhood','occurrenceyear']).agg(count('occurrenceyear')).orderBy(['cleaned_Neighbourhood','occurrenceyear'], ascending = True)
mci_df = mci_df.groupby(['cleaned_Neighbourhood']).agg(mean('count(occurrenceyear)')).orderBy('avg(count(occurrenceyear))', ascending = False)
mci_df = mci_df.withColumn('Crime Rate rank',dense_rank().over(Window.orderBy('avg(count(occurrenceyear))')))
mci_df = mci_df.withColumn('cleaned_Neighbourhood', regexp_replace('cleaned_Neighbourhood','_','-'))

# join the listings with the MCI data by neighbourhood name
airbnb_df = airbnb_df.join(mci_df, airbnb_df.neighbourhood_cleansed == mci_df.cleaned_Neighbourhood).drop('cleaned_Neighbourhood','avg(count(occurrenceyear))')

In [24]:
####################################################Feature Engineering###########################################################################

In [25]:
# Drop Property Types and Bed Types with limited data entries
#airbnb_df = airbnb_df.filter(col('property_type').isin(['Apartment', 'House', 'Condominium' ])) 
#airbnb_df = airbnb_df.filter(col('room_type').isin(['Entire home/apt']))
#airbnb_df = airbnb_df.filter(col('bed_type').isin(['Real Bed']))
airbnb_df = airbnb_df.dropna(subset='price')
airbnb_df = airbnb_df.dropna(subset='security_deposit')
airbnb_df = airbnb_df.dropna(subset='cleaning_fee')
#Fill remainining dataframe null values with 0 as no additional information would be possible with further feature engineering
airbnb_df = airbnb_df.na.fill(0)

charReplace = udf(lambda x: x.replace('$',''))
charReplace2 = udf(lambda x: x.replace(',', ''))

airbnb_df = airbnb_df.withColumn('price', charReplace('price'))
airbnb_df = airbnb_df.withColumn('price', airbnb_df.price.cast('double'))
airbnb_df = airbnb_df.filter(airbnb_df.price <=300)

airbnb_df = airbnb_df.withColumn('security_deposit', charReplace('security_deposit'))
airbnb_df = airbnb_df.withColumn('security_deposit', charReplace2('security_deposit'))
#airbnb_df = airbnb_df.withColumn('security_deposit', airbnb_df.security_deposit.cast('double'))

airbnb_df = airbnb_df.withColumn('cleaning_fee', charReplace('cleaning_fee'))
airbnb_df = airbnb_df.withColumn('cleaning_fee', charReplace2('cleaning_fee'))
#airbnb_df = airbnb_df.withColumn('cleaning_fee', airbnb_df.cleaning_fee.cast('double'))

In [26]:
# Reclassify Cancellation Policy:
udf_cancellation_pol = udf(clean_cancellation_policy)
airbnb_df = airbnb_df.withColumn('cancellation_policy', udf_cancellation_pol('cancellation_policy'))

#Reclassify Property Types:
udf_cat_prop = udf(cat_property_type)
airbnb_df = airbnb_df.withColumn('property_type', udf_cat_prop('property_type'))

In [27]:
# Convert True False values into boolean values
udf_binary_encoding = udf(binary_encoding)
airbnb_df = airbnb_df.withColumn('Instant_Bookable',udf_binary_encoding('instant_bookable'))

In [28]:
# String Hashing on Amenities
tokenizer = RegexTokenizer(inputCol='amenities', outputCol='amenitieslist', pattern="\\W+")
tokenized = tokenizer.transform(airbnb_df)

hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='amenitiesFeatures', numFeatures=40)
airbnb_df = hashingTF.transform(tokenized).drop('amenities', 'amenitieslist')

In [29]:

udf_recode_true_false = udf(true_false_categorization)
airbnb_df = airbnb_df.withColumn('house_rules', udf_recode_true_false('house_rules')) #Are there any house rules?
airbnb_df = airbnb_df.withColumn('security_deposit', udf_recode_true_false('security_deposit')) # Is a security deposit required

In [30]:
#Drop NAs from Property Type and Bed Type ie these must exist in the listing to be considered
list = ['property_type', 'bed_type']
airbnb_df = airbnb_df.dropna(subset=list)

In [31]:
udf_rank_categorization = udf(categorize_ranks)

#List of columns to rank
Rank_List = ['Crime Rate rank', 'Volume of TTC stops rank', 'Volume of Health Providers rank', 'Home price rank']

#Generate ranking categories for each element in list
for element in Rank_List:
  airbnb_df = airbnb_df.withColumn(element, udf_rank_categorization(element)) #Develop rank categories ie convert element ranks into 4 categories (Rare, Low, Medium and High)
#  print(element)

#airbnb_df = airbnb_df.withColumn('Crime Rate rank', udf_rank_categorization('Crime Rate rank'))
#airbnb_df = airbnb_df.withColumn('Volume of TTC stops rank', udf_rank_categorization('Volume of TTC stops rank'))
#airbnb_df = airbnb_df.withColumn('Volume of Health Providers rank', udf_rank_categorization('Volume of Health Providers rank'))
#airbnb_df = airbnb_df.withColumn('Home price rank', udf_rank_categorization('Home price rank'))


In [32]:
display(airbnb_df)

house_rules,neighbourhood_cleansed,property_type,room_type,accommodates,bathrooms,bedrooms,beds,bed_type,square_feet,price,security_deposit,cleaning_fee,guests_included,Instant_Bookable,cancellation_policy,Debt Risk Score,Walk Score,Total Area,Total Population,Volume of TTC stops rank,Volume of Health Providers rank,Home price rank,Percentage_Social_Housing,Crime Rate rank,amenitiesFeatures
1,Annex,House,Private room,1,1.5,1,1,Pull-out Sofa,120,67.0,1,27.0,1,0,moderate,Medium Risk,0.94,0.004,0.011,Low-Med,Med-High,High,0.05,High,"List(0, 40, List(2, 8, 12, 13, 16, 17, 36, 37), List(1.0, 1.0, 1.0, 3.0, 1.0, 1.0, 1.0, 2.0))"
1,Briar Hill-Belgravia,House,Private room,3,1.0,1,1,Real Bed,0,73.0,1,0.0,2,1,strict,Medium Risk,0.81,0.003,0.005,No-Low,No-Low,Low-Med,0.057,Med-High,"List(0, 40, List(2, 3, 5, 7, 8, 9, 10, 11, 12, 13, 14, 16, 18, 19, 21, 23, 24, 25, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 38, 39), List(6.0, 6.0, 5.0, 3.0, 4.0, 1.0, 1.0, 1.0, 1.0, 3.0, 3.0, 2.0, 2.0, 1.0, 1.0, 4.0, 5.0, 1.0, 1.0, 2.0, 3.0, 3.0, 3.0, 1.0, 5.0, 2.0, 1.0, 3.0, 3.0, 3.0))"
1,South Parkdale,House,Entire home/apt,5,1.0,2,2,Real Bed,0,150.0,1,125.0,2,0,strict,High Risk,0.83,0.004,0.008,No-Low,No-Low,Med-High,0.146,Med-High,"List(0, 40, List(2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 18, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 36, 38, 39), List(3.0, 6.0, 1.0, 3.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 1.0, 1.0, 3.0, 2.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 4.0, 2.0, 1.0, 2.0, 2.0))"
1,Wexford/Maryvale,Other,Entire home/apt,4,1.0,2,2,Real Bed,100,110.0,1,0.0,1,1,moderate,Medium Risk,0.67,0.016,0.01,Med-High,Low-Med,Low-Med,0.058,High,"List(0, 40, List(1, 2, 3, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 18, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 36, 37, 38, 39), List(2.0, 2.0, 4.0, 1.0, 2.0, 4.0, 1.0, 2.0, 1.0, 2.0, 5.0, 2.0, 2.0, 1.0, 2.0, 1.0, 1.0, 1.0, 4.0, 4.0, 1.0, 1.0, 2.0, 2.0, 5.0, 2.0, 3.0, 2.0, 5.0, 3.0, 4.0, 3.0, 1.0, 3.0))"
1,Rosedale-Moore Park,Apartment,Entire home/apt,2,1.0,1,1,Real Bed,0,119.0,1,75.0,2,0,strict,Low Risk,0.84,0.007,0.008,Low-Med,No-Low,High,0.057,High,"List(0, 40, List(2, 3, 5, 7, 8, 9, 10, 11, 14, 16, 17, 18, 19, 21, 23, 24, 26, 28, 29, 30, 31, 32, 33, 34, 35, 36, 38), List(3.0, 2.0, 3.0, 2.0, 1.0, 2.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 2.0, 2.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 2.0))"
0,Rosedale-Moore Park,Apartment,Private room,2,1.0,1,1,Real Bed,900,110.0,1,60.0,1,0,strict,Low Risk,0.84,0.007,0.008,Low-Med,No-Low,High,0.057,High,"List(0, 40, List(1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 17, 18, 21, 22, 23, 24, 25, 26, 28, 29, 31, 32, 33, 34, 36, 38), List(1.0, 3.0, 4.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 2.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0))"
1,Church-Yonge Corridor,Condominium,Entire home/apt,3,1.0,0,1,Real Bed,0,110.0,1,82.0,2,0,strict,Medium Risk,0.98,0.002,0.011,Low-Med,Low-Med,Low-Med,0.2,High,"List(0, 40, List(2, 3, 4, 5, 7, 8, 9, 10, 11, 13, 14, 18, 19, 21, 22, 23, 24, 25, 26, 28, 29, 30, 31, 32, 33, 34, 36, 38), List(3.0, 2.0, 1.0, 2.0, 2.0, 1.0, 2.0, 2.0, 1.0, 1.0, 2.0, 2.0, 1.0, 1.0, 1.0, 2.0, 2.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0))"
0,Rosedale-Moore Park,Apartment,Private room,2,1.0,1,1,Real Bed,900,105.0,1,50.0,1,0,strict,Low Risk,0.84,0.007,0.008,Low-Med,No-Low,High,0.057,High,"List(0, 40, List(1, 2, 4, 5, 8, 9, 10, 14, 18, 21, 22, 24, 25, 26, 28, 29, 31, 32, 33, 34, 36), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 2.0))"
1,Rosedale-Moore Park,Apartment,Private room,2,1.0,1,1,Real Bed,0,90.0,1,70.0,1,0,strict,Low Risk,0.84,0.007,0.008,Low-Med,No-Low,High,0.057,High,"List(0, 40, List(1, 2, 4, 5, 8, 9, 10, 14, 18, 21, 22, 24, 25, 26, 28, 29, 31, 32, 33, 34, 36), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 2.0))"
1,Church-Yonge Corridor,Apartment,Entire home/apt,3,1.0,1,1,Real Bed,0,145.0,1,80.0,2,0,strict,Medium Risk,0.98,0.002,0.011,Low-Med,Low-Med,Low-Med,0.2,High,"List(0, 40, List(2, 3, 4, 5, 7, 8, 9, 10, 11, 14, 18, 19, 21, 22, 24, 25, 26, 29, 31, 32, 33, 34, 36, 38, 39), List(1.0, 2.0, 1.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 1.0, 1.0, 1.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 1.0, 1.0))"


In [33]:
#One Hot Encoding

OHE_List= ['property_type', 'room_type', 'bed_type', 'neighbourhood_cleansed', 'cancellation_policy', 'Crime Rate rank', 'Volume of TTC stops rank', 'Volume of Health Providers rank', 'Home price rank', 'Debt Risk Score']

# Generate OHE for elements in the list above
for element in OHE_List:
  print(element)
#Remove Spaces from categorical fields to prevent issues with encoding
  airbnb_df = replace_char(airbnb_df, element,' ', '_')
  
  #Convert String to Array so that OHE can run (requires an array as an input)
  airbnb_df = convert_col_to_array(airbnb_df, element + '_array', element,' ')
  
  #Initialize Count vectorizer
  elementVectorizer = initialize_Count_Vectorizer(element + '_array',element + '_OHE')
  
  #Fit a vectorizer model
  elementVectorizer_model = elementVectorizer.fit(airbnb_df)
  
  #Transform Data
  airbnb_df = elementVectorizer_model.transform(airbnb_df)
  
  #Specify Extraneous Cols to drop 
  columns_to_drop = [element, element + '_array']
  
  #Drop Extraneous Cols listed above
  airbnb_df = airbnb_df.drop(*columns_to_drop)

In [34]:
#Identify list of columns who's string type must be a number ie double or integer instead of string
list_stringtypes = ['house_rules','accommodates', 'bathrooms','bedrooms','beds', 'price','security_deposit','cleaning_fee', 'guests_included','Instant_Bookable']

for element in list_stringtypes:
  airbnb_df = convert_dtype(airbnb_df,element, DoubleType())

In [36]:
# Specify Input Columns that will become features in our final models
input_columns = ['accommodates','bathrooms', 'security_deposit','cleaning_fee', 'amenitiesFeatures',
                 'bedrooms','beds','property_type_OHE', 'room_type_OHE', 'bed_type_OHE', 'guests_included', 'Walk Score',
                 'Percentage_Social_Housing',
                 'neighbourhood_cleansed_OHE','cancellation_policy_OHE','Crime Rate rank_OHE', 'Volume of TTC stops rank_OHE',
                'Volume of Health Providers rank_OHE', 'Home price rank_OHE']

In [37]:
#####################################################MODEL IMPLEMENTATION - PREPWORK#####################################################
# The following command converts cleaned dataframe into features and label using the feature assembler; followed by feature indexing and
# splitting of the dataset into Train and Test sets to train and test the specified models

In [38]:
#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = input_columns, outputCol = "features")
output = feature_assembler.transform(airbnb_df) # Transform dataframe into features
final_airbnb = output.select("features", "price") # Generate final output two cols: one with features and the other with the target ie price

#Rename Price Column to Label to use with ML models in PySpark
final_airbnb = final_airbnb.select(col("features"),col("price").alias("label"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures', maxCategories = 4).fit(final_airbnb)

#Split data into training and testing sets (30% held out for testing)
(trainingData,testData) =  final_airbnb.randomSplit([0.7,0.3])

In [39]:
#####################################################MODEL IMPLEMENTATION - INITIAL MODELS PRE HYPERPARAMETER TUNING#########################
# The following few commands run each of the 4 selected models prior to Hyper parameter tuning. The 4 models used for this project are as follows:
# 1. Linear Regression
# 2. Decision Tree Regression
# 3. Random Forest Regression
# 4. Gradient Boosting Regression
# 5. Basic Neural Network - Sequential Model

In [40]:
# 1. Build Linear Regression Regressor
lin_reg_reg = LinearRegression(featuresCol = 'features', labelCol = 'label', maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)

# 2. Build Decision Tree Regressor
dec_tree_reg = DecisionTreeRegressor(featuresCol = 'features', impurity = 'variance', maxDepth = 5, maxBins = 200)

# 3. Build Random Forest Regressor
rand_for_reg = RandomForestRegressor()

# 4. Build Gradient Boosting Regressor
GBT_reg = GBTRegressor(featuresCol = 'indexedFeatures')

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")

# Build list of associated regressors:
reg_list = [lin_reg_reg, dec_tree_reg, rand_for_reg, GBT_reg]

In [41]:
# Iterate through Models and obtain results
model_num = 1
for regressor in reg_list:
  #Build pipeline with feature indexer and regressor
  pipeline = Pipeline(stages = [feature_Indexer, regressor])
  
  #Train Model while running indexer
  model = pipeline.fit(trainingData)
  
  #Make Predictions on test Data
  predictions = model.transform(testData)

  #Evaluate Predictions using aforementioned Evaluator
  rmse = evaluator.evaluate(predictions)
  A = np.array(predictions.select('label').collect())
  F = np.array(predictions.select('prediction').collect())
  
  #Print RMSE for the current regressor along with the regression model 
  print("========================================================Model " + str(model_num) + "============================================================")
  print("Root Mean Squared Error (RMSE) on test data for regression model is $ %g" % rmse)
  print("Symmetric Mean Absolute Percentage Error (SMAPE) on the test data for the regression model is " + str(smape(A,F)) + " %")
  print("===========================================================================================================================")
  print("                                                                                                                           ")
  
  model_num = model_num + 1
print('Completed!')

In [42]:
# Iterate through Models and obtain results
model_num = 2
reg_list = [dec_tree_reg, rand_for_reg, GBT_reg]
for regressor in reg_list:
  #Build pipeline with feature indexer and regressor
  pipeline = Pipeline(stages = [feature_Indexer, regressor])
  
  #Train Model while running indexer
  model = pipeline.fit(trainingData)
  
  #Make Predictions on test Data
  predictions = model.transform(testData)
 
  varlist = ExtractFeatureImp(model.stages[-1].featureImportances, predictions, "features")
  varidx = [x for x in varlist['idx'][0:10]]
  
  slicer = VectorSlicer(inputCol = 'features', outputCol = 'new features', indices = varidx)
  new_df = slicer.transform(predictions)
  new_df = new_df.drop('rawPrediction', 'probability', 'prediction')
  
  if model_num == 2:
    regressor_2 = DecisionTreeRegressor(featuresCol = 'new features', labelCol = 'label', impurity = 'variance', maxDepth = 5, maxBins = 200)
  elif model_num == 3:
    regressor_2 = RandomForestRegressor(featuresCol = 'new features', labelCol = 'label')
  elif model_num == 4:
    regressor_2 = GBTRegressor(featuresCol = 'new features', labelCol = 'label')
  
  mod2 = regressor_2.fit(new_df)
  new_predictions = mod2.transform(new_df)

  #Evaluate Predictions using aforementioned Evaluator
  rmse = evaluator.evaluate(new_predictions)
  A = np.array(new_predictions.select('label').collect())
  F = np.array(new_predictions.select('prediction').collect())
  
  #Print RMSE for the current regressor along with the regression model 
  print("========================================================Model " + str(model_num) + "============================================================")
  print("Root Mean Squared Error (RMSE) on test data for regression model is $ %g" % rmse)
  print("Symmetric Mean Absolute Percentage Error (SMAPE) on the test data for the regression model is " + str(smape(A,F)) + " %")
  print("===========================================================================================================================")
  print("                                                                                                                           ")
  
  model_num = model_num + 1
print('Completed!')

In [43]:
#####################################################MODEL IMPLEMENTATION - HYPERPARAMETER TUNING###################################
# The following few commands run for each of the 4 selected models using Hyper parameter tuning (Via Grid Search). The 4 models used for this project are as follows:
# 1. Linear Regression
# 2. Decision Tree Regression
# 3. Random Forest Regression
# 4. Gradient Boosting Regression
# 5. Basic Neural Network - Sequential Model

#WARNING: Each of the following models takes at least 20 - 30 minutes to run at minimum. All models take ~ 2 hours to run in conjunction

In [44]:
# Iterate through Models and obtain results
model_num = 1
k_folds = 10
reg_list = [lin_reg_reg, dec_tree_reg, rand_for_reg, GBT_reg]
for regressor in reg_list:
  # We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
  evaluator = RegressionEvaluator(metricName='rmse', labelCol=regressor.getLabelCol(), predictionCol=regressor.getPredictionCol())
  
  if model_num == 1: #Linear Regression HyperParameters
    paramGrid = ParamGridBuilder() \
    .addGrid(lin_reg_reg.regParam, [0.01, 0.1, 0.5, 1]) \
    .addGrid(lin_reg_reg.elasticNetParam, [0.1, 0.4, 1]) \
    .build()
  elif model_num == 2: # Decision Tree HyperParameters
    paramGrid = ParamGridBuilder()\
  .addGrid(dec_tree_reg.maxDepth, [2, 5, 10])\
  .addGrid(dec_tree_reg.maxBins, [32, 100, 200])\
  .build()
  elif model_num == 3: #Random Forest HyperParameters
    paramGrid = ParamGridBuilder()\
  .addGrid(rand_for_reg.numTrees, [5,15,30,50])\
  .addGrid(rand_for_reg.maxDepth, [5,10,15])\
  .build()
  else: #GBT HyperParameters
    paramGrid = ParamGridBuilder()\
  .addGrid(GBT_reg.maxDepth, [2, 5])\
  .addGrid(GBT_reg.maxIter, [10, 100])\
  .build()
    
  # Declare the CrossValidator, which runs model tuning for us.
  cv = CrossValidator(estimator=regressor, evaluator=evaluator, estimatorParamMaps= paramGrid,numFolds = k_folds)

  #Chain Indexer and tree in a pipeline
  pipeline = Pipeline(stages=[feature_Indexer, cv])

  # Train Model and run indexer
  model = pipeline.fit(trainingData)
  
  predictions = pipelineModel.transform(testData)

  #Evaluate Predictions using aforementioned Evaluator
  rmse = evaluator.evaluate(predictions)
  A = np.array(predictions.select('label').collect())
  F = np.array(predictions.select('prediction').collect())
  
  #Print RMSE for the current regressor along with the regression model 
  print("========================================================Model " + str(model_num) + "============================================================")
  print("Root Mean Squared Error (RMSE) on test data for regression model is $ %g" % rmse)
  print("Symmetric Mean Absolute Percentage Error (SMAPE) on the test data for the regression model is " + str(smape(A,F)) + " %")
  print("===========================================================================================================================")
  print("                                                                                                                           ")
  model_num = model_num + 1
print('Completed!')