<a href="http://www.calstatela.edu/centers/hipic"><img align="left" src="https://avatars2.githubusercontent.com/u/4156894?v=3&s=100"><image/>
</a>
<img align="right" alt="California State University, Los Angeles" src="http://www.calstatela.edu/sites/default/files/groups/California%20State%20University%2C%20Los%20Angeles/master_logo_full_color_horizontal_centered.svg" style="width: 360px;"/>

#    CIS5560 Term Project Tutorial

------
#### Authors: Samyuktha Muralidharan, Sanjana Boddireddy, Savita Yadav, Farnood Rahbar Far

#### Instructor: [Jongwook Woo](https://www.linkedin.com/in/jongwook-woo-7081a85)

#### Date: 05/23/2021

## Objective
**Airbnb** is an online marketplace that connects people who want to rent out their homes with people who are looking for accommodations in that locale. One challenge that Airbnb hosts face is determining the **optimal rent price/night**. The amount a host can charge on a nightly basis is closely linked to the dynamics of the marketplace.The objective of this tutorial includes building a machine learning model that predicts the optimal price of a property considering the features of the listings. We use a **Regression model** for Price Prediction and the algorithm used here is **Random Forest Regression**. We also evaluate the model performance to determine how well the model predicts the Airbnb Listing price.

##Import Spark SQL and Spark ML Libraries
Import all the Spark SQL and ML libraries as mentioned below. This is neccessary to access the functions available in those libraries.

In [0]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

## To run the code in PySpark CLI
Set the following to True:
```
PYSPARK_CLI = True
```
Generate .py(Python) file from Databricks: File > Export > Source File
```
Run it at the Hadoop/Spark cluster:
$ spark-submit Random Forest Regression.py
```

In [0]:
PYSPARK_CLI = False
if PYSPARK_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

##Read the csv file from DBFS (Databricks File System)
The file **'airbnb_sample.csv'** consists of various Airbnb Listings and its features. The label column is **'Price'** that indicates the price/night of the Airbnb property. Locate the data file, mention its type and read the file as a pyspark dataframe.

The url to the sampled file : **https://www.kaggle.com/samyukthamurali/airbnb-ratings-dataset?select=airbnb_sample.csv.** You can download the sampled file from this url and upload it in DBFS.

In [0]:
# File location and type
file_location = "/FileStore/tables/airbnb_sample.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
# Load the csv file as a pyspark dataframe
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.show(5)



##Create a temporary view of the dataframe 'df'

In [0]:
# Create a view or table
temp_table_name = "airbnb_sample_csv"
df.createOrReplaceTempView(temp_table_name)

In [0]:
if PYSPARK_CLI:
    csv = spark.read.csv('airbnb_sample.csv', inferSchema=True, header=True)
else:
    csv = spark.sql("SELECT * FROM airbnb_sample_csv")

csv.show(5)

##Selecting features
In the following step, we are selecting the features that are useful for Price Prediction.We also select the **'Price'** column which will be the label the model will predict.

In [0]:
# Select features and label
data=csv.select("Host Listings Count",	"Host Total Listings Count",	"Neighborhood",	"Latitude","Longitude",	"Property Type",	"Room Type","Bed Type",	"Accomodates",	"Bathrooms",	"Bedrooms","Monthly Price","Cleaning Fee",	"Guests Included","Extra People","Minimum Nights","Review Scores Rating",	"Review Scores Accuracy",	"Review Scores Cleanliness",	"Review Scores Checkin","Review Scores Communication",	"Review Scores Location",	"Review Scores Value","Sentiment",col("Price").alias("label"))

data.show(5)

## Data Cleaning
It is a critical process for the success of a machine learning model.

**Detecting and Removing Outliers:** We determine the **5th** percentile and **95th** percentile values of each of the features. Then filter the dataframe to contain data between these values.

In [0]:
# approxQuantile() to determine the 5th and 95th percentile values
outliers = data.stat.approxQuantile(['label',"Host Listings Count","Host Total Listings Count","Accomodates","Bathrooms","Bedrooms","Monthly Price", "Cleaning Fee","Guests Included","Extra People","Minimum Nights"],[0.05,0.95],0.0)
print(outliers)

import pyspark.sql.functions as f

# Filtering the dataframe by removing the outliers
data = data.filter((f.col('label') >= outliers[0][0]) & (f.col('label') <= outliers[0][1]))
data = data.filter((f.col('Host Listings Count') >= outliers[1][0]) & (f.col('Host Listings Count') <= outliers[1][1]))
data = data.filter((f.col('Host Total Listings Count') >= outliers[2][0]) & (f.col('Host Total Listings Count') <= outliers[2][1]))
data = data.filter((f.col('Accomodates') >= outliers[3][0]) & (f.col('Accomodates') <= outliers[3][1]))
data = data.filter((f.col('Bathrooms') >= outliers[4][0]) & (f.col('Bathrooms') <= outliers[4][1]))
data = data.filter((f.col('Bedrooms') >= outliers[5][0]) & (f.col('Bedrooms') <= outliers[5][1]))
data = data.filter((f.col('Monthly Price') >= outliers[6][0]) & (f.col('Monthly Price') <= outliers[6][1]))
data = data.filter((f.col('Cleaning Fee') >= outliers[7][0]) & (f.col('Cleaning Fee') <= outliers[7][1]))
data = data.filter((f.col('Guests Included') >= outliers[8][0]) & (f.col('Guests Included') <= outliers[8][1]))
data = data.filter((f.col('Extra People') >= outliers[9][0]) & (f.col('Extra People') <= outliers[9][1]))
data = data.filter((f.col('Minimum Nights') >= outliers[10][0]) & (f.col('Minimum Nights') <= outliers[10][1]))

##Data Cleaning
**Handling Missing Values:** Filling the missing values of numeric columns with **'0'** and string columns with **'NA'**

In [0]:
# Replacing missing values with '0' and 'NA' for numeric columns and string columns respectively
data_clean = data.na.fill(value=0).na.fill("NA")
data_clean.show(5)

## Correlation
Correlation determines how one variable changes in relation with the other variable. It gives us an idea about the degree of the relationship of the two variables. Determine the correlation of the label **'price'** with the features of the data indicating the **dependence** between the label and each of the features. We can iteratively try to remove the features with less correlation to improve model performance.

In [0]:
import six

df_Corr=data_clean.select("Host Listings Count","Host Total Listings Count","Neighborhood",	"Latitude","Longitude","Property Type","Room Type","Bed Type","Accomodates","Bathrooms","Bedrooms",	"Monthly Price","Cleaning Fee",	"Guests Included","Extra People","Minimum Nights","Review Scores Rating","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Sentiment","label")

# Determining correlation using DataFrameStatFunctions.corr
for i in df_Corr.columns:
   if not( isinstance(df_Corr.select(i).take(1)[0][0], six.string_types)):
      print( "Correlation to PRICE for ", i, df_Corr.stat.corr('label',i))

## Feature Transformation
Convert the string type columns into indices using StringIndexer

In [0]:
# Converting the String type columns into indices 
data_clean = StringIndexer(inputCol='Property Type', outputCol='PropertyType_index').fit(data_clean).transform(data_clean)
data_clean = StringIndexer(inputCol='Room Type', outputCol='RoomType_index').fit(data_clean).transform(data_clean)
data_clean = StringIndexer(inputCol='Bed Type', outputCol='BedType_index').fit(data_clean).transform(data_clean)

data_clean.show(5)

##Split the Data
It is common practice when building supervised machine learning models to split the source data, using some of it to train the model and reserving some to test the trained model. Here we split the data into train data and test data. We have split the data in the ratio of **70:30**

In [0]:
# Split the data for random forest regression
splits = data_clean.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

print ("Training Rows:", train.count(), " Testing Rows:", test.count())

## Define the Pipeline
Define a pipeline for feature transformation. It creates a feature vector and trains a regression model
1. A **VectorAssembler** that combines categorical features into a single vector.
2. A **Vector Indexer** that creates indices for a vector of categorical features.
3. A **VectorAssembler** that creates a vector of continuous numeric features.
4. A **MinMaxScaler** to normalize the continuous numeric features.
5. A **VectorAssembler** that creates a vector of categorical and continuous features.
6. A **Random Forest Regressor** that trains a Regression model.
7. **Process pipeline** with the series of transformations above.

In [0]:
# Combine Categorical features into a single vector
catVect = VectorAssembler(inputCols =['PropertyType_index', 'RoomType_index','BedType_index'], outputCol="catFeatures")

# Create indices for the vector of categorical features
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures").setHandleInvalid("skip")

#Create a vector of the numeric features
numVect = VectorAssembler(inputCols = ["Host Listings Count","Host Total Listings Count","Latitude","Longitude","Accomodates","Bathrooms","Bedrooms","Monthly Price","Cleaning Fee","Guests Included","Extra People","Minimum Nights","Review Scores Rating","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Sentiment"], outputCol="numFeatures")

# Scale the numeric features
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")

#Create a vector of categorical and numeric features
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"],  outputCol="features")

# Random Forest Regression model
rf = RandomForestRegressor(labelCol="label", featuresCol="features",maxBins=40)

# Process the pipeline with the transformations
pipeline = Pipeline(stages=[catVect,catIdx,numVect, minMax,featVect, rf])


### Train a Regression model using Parameter Tuning
Use the  **CrossValidator** class to evaluate each combination of parameters defined in a **ParameterGrid** against multiple folds of the data split into training and validation datasets, in order to find the best performing parameters. It is used to find the best model for the data. Here the number of folds is assigned to **5**.

In [0]:
# Defining the parameter grid
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth,[2,3,20])
            .addGrid(rf.minInfoGain,[0.0, 0.7])
            .build())

# Number of folds
K = 5
cv = CrossValidator(estimator=pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, numFolds=K)
#tvs = TrainValidationSplit(estimator=pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)

# Train the model
model = cv.fit(train)


### Test the Pipeline Model
The model produced by the pipeline is a transformer that will apply all of the stages in the pipeline to a specified DataFrame and apply the trained model to generate predictions. In this case, we will transform the **test** DataFrame using the pipeline to generate label predictions.

In [0]:
# Transform the test data and generate predictions by applying the trained model
prediction = model.transform(test)
predicted = prediction.select("normFeatures", "prediction", "trueLabel")
predicted.show(5)

## Evaluate the model
Metrics used for evaluation are **Root Mean Square Error(RMSE)** and **Co-efficient of Determination(r2)**. RMSE is measured in the same units as the predicted and actual values - so in this case, the RMSE indicates the average difference in dollars between predicted and actual price values. r2 indicates how close the data are to the fitted regression line. **RegressionEvaluator** class is used to determine **RMSE** and **r2**.

In [0]:
# Evaluator to determine rmse
evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)

# Evaluator to determine r2
evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(prediction)

print ("Root Mean Square Error (RMSE):", rmse)
print ("Co-efficient of Determination", r2)
