Work as a group, try to improve the performance (measured by rmse and r2) of the airbnb rental price prediction model.  As a reference: current rmse: 215, r2: 0.20
####Ways to improve the model:
1. remove outliers, especailly for the target label
2. find better ways to deal with missing values
3. add/delete/modify features, create additional features based on existing features
4. conduct hyper-parameters tuning and cross-validation
5. try different models/algorithms.
6. use more data or anything else you find helpful

### please document your steps clearly, and discuss your best rmse and r2 score. 

### Due Date: Tuesday 11/16 at midnight

### Deliverables: submit the completed notebook with results/visualization in html format.

[Spark Machine Learning Reference](https://spark.apache.org/docs/latest/ml-guide.html)

[Multiple Imputation with lightgbm](https://towardsdatascience.com/multiple-imputation-with-random-forests-in-python-dec83c0ac55b)

[Oversampling Inbalance Data](https://towardsdatascience.com/5-smote-techniques-for-oversampling-your-imbalance-data-b8155bdbe2b5)

## Data cleaning/pre-processing

## Refinement: Remove price above 1000, and use log price (using log value of the price did not imporve the model)

In [0]:
from pyspark.sql.functions import col, translate, when
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import col, exp, log

filePath = "/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb.csv"
 
rawDF = spark.read.csv(filePath, header="true", inferSchema="true", multiLine="true", escape='"')

columnsToKeep = [
  "host_is_superhost",
  "cancellation_policy",
  "instant_bookable",
  "host_total_listings_count",
  "neighbourhood_cleansed",
  "latitude",
  "longitude",
  "property_type",
  "room_type",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value",
  "price"]
 
baseDF = rawDF.select(columnsToKeep)

# fix data types
 
fixedPriceDF = baseDF.withColumn("price", translate(col("price"), "$,", "").cast("double"))

# remove nulls from host_is_superhost
noNullsDF = fixedPriceDF.na.drop(subset=["host_is_superhost"])

# cast integer to Double

integerColumns = [x.name for x in baseDF.schema.fields if x.dataType == IntegerType()]

doublesDF = noNullsDF

for c in integerColumns:
  doublesDF = doublesDF.withColumn(c, col(c).cast("double"))
  
# select the columns to imputer
 
imputeCols = [
  "bedrooms",
  "bathrooms",
  "beds", 
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value"
]

imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)
 
imputedDF = imputer.fit(doublesDF).transform(doublesDF)

# deal with outliers

#only keep rows with a price greater than 0 and less than 1000 and add a new column to store log value of the price

# posPricesDF = imputedDF.filter(col("price") > 0)

posPricesDF = imputedDF.filter(col("price") > 0).filter(col("price")<=1000).withColumn('log_price', log(col('price')))

# Filter out those records where the minimum_nights is greater then 365:
cleanDF = posPricesDF.filter(col("minimum_nights") <= 365)

# save cleaned data for future analysis
outputPath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
 
cleanDF.write.mode("overwrite").parquet(outputPath)

### Refine the dataset

In [0]:
airbnb=spark.read.parquet('/tmp/sf-airbnb/sf-airbnb-clean.parquet')

In [0]:
display(airbnb.summary())

summary,host_is_superhost,cancellation_policy,instant_bookable,host_total_listings_count,neighbourhood_cleansed,latitude,longitude,property_type,room_type,accommodates,bathrooms,bedrooms,beds,bed_type,minimum_nights,number_of_reviews,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,price,log_price
count,7066,7066,7066,7066.0,7066,7066.0,7066.0,7066,7066,7066.0,7066.0,7066.0,7066.0,7066,7066.0,7066.0,7066.0,7066.0,7066.0,7066.0,7066.0,7066.0,7066.0,7066.0,7066.0
mean,,,,53.033257854514574,,37.76562541183129,-122.43056714690084,,,3.163317294084348,1.3183555052363432,1.325219360317011,1.7456835550523635,,15.849985847721484,43.93801302009624,96.0161335975092,9.819841494480611,9.697565808095105,9.895556184545711,9.87248797056326,9.717095952448345,9.52490801018964,192.6599207472403,5.045931494089355
stddev,,,,178.29629481606926,,0.0225034256970666,0.0268501745182546,,,1.8559417347933795,0.7848629228337817,0.9064248077244172,1.1454697592454994,,22.1856446544379,72.83848368998738,6.2920676060141805,0.6030892660050703,0.7063254926827135,0.450040568147729,0.5226790995582293,0.6617092547749441,0.7518899226596337,145.2113952067284,0.6422000059640569
min,f,flexible,f,0.0,Bayview,37.70743,-122.51306,Aparthotel,Entire home/apt,1.0,0.0,0.0,0.0,Airbed,1.0,0.0,20.0,2.0,2.0,2.0,2.0,2.0,2.0,10.0,2.302585092994046
25%,,,,1.0,,37.75096,-122.44302,,,2.0,1.0,1.0,1.0,,2.0,1.0,95.0,10.0,10.0,10.0,10.0,10.0,9.0,100.0,4.605170185988092
50%,,,,2.0,,37.76724,-122.42551,,,2.0,1.0,1.0,1.0,,4.0,12.0,98.0,10.0,10.0,10.0,10.0,10.0,10.0,150.0,5.010635294096256
75%,,,,8.0,,37.78435,-122.41107,,,4.0,1.5,2.0,2.0,,30.0,55.0,99.0,10.0,10.0,10.0,10.0,10.0,10.0,230.0,5.438079308923196
max,t,super_strict_60,t,1199.0,Western Addition,37.81031,-122.36979,Villa,Shared room,16.0,14.0,14.0,14.0,Real Bed,365.0,677.0,100.0,10.0,10.0,10.0,10.0,10.0,10.0,1000.0,6.907755278982137


## Regression Analysis (use log scale for price produced the worse model)

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
 
# load the cleaned data
filePath =  "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
 
# split into train and test dataset
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)
 
# Feature engineering
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
 
indexOutputCols = [x + "Index" for x in categoricalCols]
 
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

oheOutputCols = [x + "OHE" for x in categoricalCols]

oheEncoder = OneHotEncoder(inputCols=indexOutputCols, 
                           outputCols=oheOutputCols)
 
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price") & (field != "log_price"))]

assemblerInputs = oheOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
 
# initiate linear regression

#lr = LinearRegression(labelCol="log_price", predictionCol="log_pred", featuresCol="features", standardization=True)

lr = LinearRegression(labelCol="price", featuresCol="features", standardization=False)
 
# define hyper-parameter. Please be aware each algorithm has different hyper-parameters to tune.
paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.01, 1, 2.0])
            .addGrid(lr.elasticNetParam, [0.01,0.1, 0.5, 1.0])
            .build())
 
#define evaluator
evaluator = RegressionEvaluator(labelCol="price", 
                                predictionCol="prediction", 
                                metricName="rmse")
# cross-validation
cv = CrossValidator( estimator=lr, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=3, 
                    parallelism=10, 
                    seed=42)
 
#create pipeline
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, cv])
 
pipelineModel_lr = pipeline.fit(trainDF)

In [0]:
# display category columns and numerical columns to make sure that they are correct
print("category columns: ", categoricalCols)

print("numerical columns ", numericCols)

In [0]:
# evaluate the model
from pyspark.sql.functions import col, exp

predDF = pipelineModel_lr.transform(testDF)
 
#expDF = predDF.withColumn("prediction", exp(col("log_pred")))
 
regressionEvaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")

#rmse = regressionEvaluator.setMetricName("rmse").evaluate(expDF)
#r2 = regressionEvaluator.setMetricName("r2").evaluate(expDF)

rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)

print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

In [0]:
def lr_feature_names(df):
  featureIndex=df.schema["features"].metadata["ml_attr"]["attrs"]
 
  feature_names=[]
  # print numeric feature
  for x in range(len(df.schema["features"].metadata["ml_attr"]["attrs"]['numeric'])):
    try:
      feature_names.append(featureIndex["numeric"][x]['name'])
    except:
      continue
 # print binary feature   
  for x in range(len(df.schema["features"].metadata["ml_attr"]["attrs"]['binary'])):
    try:
       feature_names.append(featureIndex["binary"][x]['name'])
    except:
      continue
  return feature_names

In [0]:
# feature importance
import pandas as pd

lrModel = pipelineModel_lr.stages[-1]

coefficients =lrModel.bestModel.coefficients
 
feature_names=lr_feature_names(predDF)
 
weightsDF = pd.DataFrame(zip(feature_names, coefficients), columns=['feature', 'coefficients'])

# get absoluate value of weight
weightsDF['abs_coefficients']=weightsDF['coefficients'].abs()
 
#weightsDF
 
display(weightsDF.sort_values('abs_coefficients', ascending=False).head(20))

feature,coefficients,abs_coefficients
property_typeOHE_Cottage,1250.1281326886717,1250.1281326886717
property_typeOHE_House,159.69881916113562,159.69881916113562
property_typeOHE_Loft,127.47314490385122,127.47314490385122
neighbourhood_cleansedOHE_Visitacion Valley,-107.21968970542956,107.21968970542956
property_typeOHE_Serviced apartment,89.96224078849195,89.96224078849195
property_typeOHE_Boutique hotel,-81.18951630842359,81.18951630842359
property_typeOHE_Hostel,67.96297851388876,67.96297851388876
neighbourhood_cleansedOHE_North Beach,-59.28413118379439,59.28413118379439
cancellation_policyOHE_strict,58.32213689285541,58.32213689285541
neighbourhood_cleansedOHE_Lakeshore,-51.483340198176485,51.483340198176485


## Random Forest (put everything together)

In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
 
# load the cleaned data
filePath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
 
# split into train and test dataset
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)
 
# Feature engineering
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
 
indexOutputCols = [x + "Index" for x in categoricalCols]
 
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")
 
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price") & (field != "log_price"))]

assemblerInputs = indexOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
 
# iniatiate random forest

#rf = RandomForestRegressor(labelCol="log_price", predictionCol="log_pred", maxBins=40, seed=42)

rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)
 
# define hyper-parameter. Please be aware each algorithm has different hyper-parameter to tune.
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [4, 6, 8])
            .addGrid(rf.maxBins, [40, 50])
            .addGrid(rf.numTrees, [50, 100])
            .build())
 
#define evaluator
evaluator = RegressionEvaluator(labelCol="price", 
                                predictionCol="prediction", 
                                metricName="rmse")
# cross-validation
cv = CrossValidator(estimator=rf, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=3, 
                    parallelism=10, 
                    seed=42)
#create pipeline 
 
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])

# fit the model 
pipelineModel_rf = pipeline.fit(trainDF)

### Evalaute the model (convert log scale back to original value to evlaute the result)

In [0]:
from pyspark.sql.functions import col, exp

predDF = pipelineModel_rf.transform(testDF)
 
#expDF = predDF.withColumn("prediction", exp(col("log_pred")))
 
regressionEvaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")

rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)

#rmse = regressionEvaluator.setMetricName("rmse").evaluate(expDF)
#r2 = regressionEvaluator.setMetricName("r2").evaluate(expDF)

print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

### Get the feature importance

In [0]:
import pandas as pd
cvModel = pipelineModel_rf.stages[-1]
featureImp = pd.DataFrame(
  list(zip(vecAssembler.getInputCols(), cvModel.bestModel.featureImportances)),
  columns=["feature", "importance"])
display(featureImp.sort_values(by="importance", ascending=False).head(10))

feature,importance
bedrooms,0.2595675248154019
accommodates,0.148227744153993
neighbourhood_cleansedIndex,0.1178614756562193
bathrooms,0.0894326607563371
beds,0.0846870514716321
minimum_nights,0.0523424148120084
room_typeIndex,0.0516698370518479
property_typeIndex,0.0456032502099468
latitude,0.0367490691520235
host_total_listings_count,0.0246094942673526


In [0]:
# check the average price of bedrooms

from pyspark.sql.functions import avg
display(airbnbDF.groupBy('bedrooms').avg('price').orderBy('bedrooms'))

bedrooms,avg(price)
0.0,143.93266832917706
1.0,139.8397129186603
2.0,263.1511627906977
3.0,379.0656814449918
4.0,482.6666666666667
5.0,584.0588235294117
6.0,573.0
14.0,69.0


In [0]:
from pyspark.sql.functions import avg
display(airbnbDF.groupBy('accommodates').avg('price').orderBy('accommodates'))

accommodates,avg(price)
1.0,89.74800637958533
2.0,140.1679699718486
3.0,164.1206896551724
4.0,225.9013793103448
5.0,278.64375
6.0,359.64181818181817
7.0,350.56043956043953
8.0,448.5527950310559
9.0,357.5263157894737
10.0,514.025


In [0]:
from pyspark.sql.functions import avg, desc
display(airbnbDF.groupBy('neighbourhood_cleansed').agg(avg('price').alias('avgPrice')).orderBy(desc('avgPrice')))

neighbourhood_cleansed,avgPrice
Golden Gate Park,315.0
Pacific Heights,282.5263157894737
Russian Hill,279.38666666666666
Marina,241.728813559322
Twin Peaks,233.83870967741936
Castro/Upper Market,232.94306930693068
Presidio Heights,227.12
Noe Valley,226.3694267515924
North Beach,223.45637583892616
Western Addition,209.74616695059623


In [0]:
airbnbDF.columns

## Evaluate model hyper-prameters

In [0]:
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

## Build a general model for tree based algorithm

In [0]:
def model_testing(filePath, label, predictionCol, model, paramGrid, log=False):    
    from pyspark.ml.feature import StringIndexer, VectorAssembler
    from pyspark.ml.regression import RandomForestRegressor
    from pyspark.ml.regression import GBTRegressor
    from pyspark.ml import Pipeline
    from pyspark.ml.tuning import ParamGridBuilder
    from pyspark.ml.evaluation import RegressionEvaluator
    from pyspark.ml.tuning import CrossValidator
    from pyspark.sql.functions import col, exp

    # load the cleaned data
    airbnbDF = spark.read.parquet(filePath)

    # split into train and test dataset
    (trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

    # Feature engineering
    categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]

    indexOutputCols = [x + "Index" for x in categoricalCols]

    stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

    numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price") & (field != "log_price"))]

    assemblerInputs = indexOutputCols + numericCols

    vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

    # iniatiate random forest
    
    # define hyper-parameters

    #define evaluator
    evaluator = RegressionEvaluator(labelCol=label, 
                                    predictionCol=predictionCol, 
                                    metricName="rmse")
    # cross-validation
    cv = CrossValidator(estimator=model, 
                        evaluator=evaluator, 
                        estimatorParamMaps=paramGrid, 
                        numFolds=3, 
                        parallelism=10, 
                        seed=42)
    #create pipeline 

    pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])

    # fit the model 
    pipelineModel = pipeline.fit(trainDF)

    
    predDF = pipelineModel.transform(testDF)
        
    if log==True:
      predDF = predDF.withColumn("prediction", exp(col(predictionCol)))
      label="price"
      predictionCol="prediction"
     
    regressionEvaluator = RegressionEvaluator(labelCol=label, predictionCol=predictionCol)
    
    rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
    r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)

    print(f"RMSE is {rmse}")
    print(f"R2 is {r2}")
    
    return predDF

In [0]:
# random forest without original price
filePath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
label="price"
predictionCol="prediction"
rf = RandomForestRegressor(labelCol=label, predictionCol=predictionCol, maxBins=40, seed=42)
paramGrid = (ParamGridBuilder()
                .addGrid(rf.maxDepth, [4, 6, 8])
                .addGrid(rf.maxBins, [40, 50])
                .addGrid(rf.numTrees, [50, 100])
                .build())
predDF=model_testing(filePath, label, predictionCol, rf, paramGrid )

In [0]:
# random forest with log price
filePath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
label="log_price"
predictionCol="log_prediction"
rf = RandomForestRegressor(labelCol=label, predictionCol=predictionCol, maxBins=40, seed=42)
paramGrid = (ParamGridBuilder()
                .addGrid(rf.maxDepth, [4, 6, 8])
                .addGrid(rf.maxBins, [40, 50])
                .addGrid(rf.numTrees, [50, 100])
                .build())
predDF=model_testing(filePath, label, predictionCol, rf, paramGrid, log=True)

In [0]:
# Gradient-Boosted Trees with original price
from pyspark.ml.regression import GBTRegressor

filePath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
label="price"
predictionCol="prediction"
gbt = GBTRegressor(labelCol=label, predictionCol=predictionCol, featuresCol="features", maxBins=40, seed=42)
paramGrid=(ParamGridBuilder()
             .addGrid(gbt.maxBins, [35, 40, 50])
             .addGrid(gbt.maxDepth, [5, 8])
             .addGrid(gbt.stepSize, [0.01, 0.1, 0.2])
             .build())

predDF=model_testing(filePath, label, predictionCol, gbt, paramGrid)

In [0]:
# Gradient-Boosted Trees with log price
from pyspark.ml.regression import GBTRegressor

filePath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
label="log_price"
predictionCol="log_prediction"
gbt = GBTRegressor(labelCol=label, predictionCol=predictionCol, featuresCol="features", maxBins=40, seed=42)
paramGrid=(ParamGridBuilder()
             .addGrid(gbt.maxBins, [35, 40, 50])
             .addGrid(gbt.maxDepth, [5, 8])
             .addGrid(gbt.stepSize, [0.01, 0.1, 0.2])
             .build())

predDF=model_testing(filePath, label, predictionCol, gbt, paramGrid, log=True)