<a href="https://colab.research.google.com/github/dalgual/aidatasci/blob/main/airbnbPrice_XGBoost_emr_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


------
#### Authors: Samyuktha Muralidharan

#### Instructor: [Jongwook Woo](https://www.linkedin.com/in/jongwook-woo-7081a85)

#### Date: 05/23/2021
#### Updated: 12/22/2021 for AWS, Savita Yadav & Jwoo


### References
1. https://github.com/rapidsai/spark-examples/blob/master/examples/notebooks/python/mortgage-gpu.ipynb
1. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-rapids.html
1. https://docs.databricks.com/_static/notebooks/xgboost-regression.html
1. https://docs.databricks.com/_static/notebooks/xgboost-pyspark.html
1. https://towardsdatascience.com/pyspark-and-xgboost-integration-tested-on-the-kaggle-titanic-dataset-4e75a568bdb


## Objective
**Airbnb** is an online marketplace that connects people who want to rent out their homes with people looking for accommodations in that locale. 
The Aim is to understand how Airbnb hosts can make simple changes to their properties to boost customer satisfaction.  We used a **Classification model** for Rating Prediction, and the algorithm used here is **Decision Tree Classifier**. This experiment predicts Customer's behavior in classifying their reviews as high rated or low rated using feature Review Score Rating.


## Import Spark SQL and Spark ML Libraries
Import all the Spark SQL and ML libraries as mentioned below. This is neccessary to access the functions available in those libraries.


In [None]:
%pyspark

# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.storagelevel import StorageLevel

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

from ml.dmlc.xgboost4j.scala.spark import XGBoostClassificationModel, XGBoostClassifier
#from ml.dmlc.xgboost4j.scala.spark.rapids import GpuDataReader


##Read csv file from DBFS  (Databricks File System)
1. After <filename>.csv file is added to the data in the left frame, create a table using the UI, "Upload File"
2. Click "Preview Table to view the table" and select the option as <filename>.csv has a header as the first row. "First line is header"
3. Change the data type of the columns
4. Click on the create table button.
  
 The link to the sampled file : https://www.kaggle.com/samyukthamurali/airbnb-ratings-dataset?select=airbnb_sample.csv. You can download the sample file from here and upload it in DBFS.

In [None]:
%pyspark
start = time()

In [None]:
%pyspark

# from ingest.Connectors import Connectors
from pyspark.sql import SQLContext

IS_AWS = False #True #
IS_CPU = True # False #True #  
# airbnb-listings.csv: seperated by ;
#   airbnb_sample.csv and airbnb_US.csv: seperated by ,
IS_FULL_DATA = True #False #  

AWS_BUCKET_NAME = "hipicdatasets"
MOUNT_NAME = "airbnb"  # mounted name
  
#aws_bucket_name = "cis5560"
mount_name = "airbnb"


In [None]:
%pyspark

# File location and type: file_path = 'gs://hdp-240/airbnb/airbnb_US.csvv'
if (IS_AWS == True):
    file_location = "s3://bigdai-pub/airbnb_listings.csv" # 1.93 GB
    #file_location = "s3://hipicdatasets/airbnb-listings.csv" # 1.92 GB
    #file_location = "s3://hipicdatasets/airbnb_sample.csv" # 32MB
    # file_location = "s3://hipicdatasets/airbnb_US.csv" # 370.5 MB
else: 
    #GCP:
    #file_location = "gs://hdp-240/airbnb/airbnb_US.csv"
    #file_location = "gs://hdp-240/airbnb/airbnb_sample.csv"
    # HDFS
    file_location = "/user/hadoop/airbnb/airbnb_listings.csv"


file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
if (IS_FULL_DATA == True):
    delimiter = ";" #","
else:
    delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
# Load the csv file as a pyspark dataframe
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)




In [None]:
%pyspark
end = time()
phrase="Data Reading Time"
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))


In [None]:
%pyspark
df.show(5)


## Create a temporary view of the dataframe 'df'

In [None]:
%pyspark

# Create a view or table
temp_table_name = "airbnb_sample_csv"
#df.createOrReplaceTempView(temp_table_name)


## Create a dataframe from the table, using Spark SQL

In [None]:
%pyspark

#csv = spark.sql("SELECT * FROM airbnb_sample_csv")
#csv = df.select("Review Scores Rating", "Host Listings Count", "Host Total Listings Count", "Calculated host listings count", "Security Deposit", "Cleaning Fee" , "Host Response Time","Host Response Rate","Host Acceptance Rate","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Bedrooms","Bathrooms","Beds","Extra People","Minimum Nights")

#csv.show(5)

In [None]:
%pyspark
phrase= "data engineering time: "
start = time()

In [None]:
%pyspark

csv = csv.withColumn("review_scores_rating", when(col("review_scores_rating") >= 80,1).otherwise(0))
csv = csv.withColumn("host_response_rate", csv["host_response_rate"].cast(IntegerType()))
csv = csv.withColumn("host_listings_count", csv["host_listings_count"].cast(IntegerType()))
csv = csv.withColumn("host_total_listings_count", csv["host_total_listings_count"].cast(IntegerType()))
csv = csv.withColumn("price", csv["price"].cast(IntegerType()))
csv = csv.withColumn("weekly_price", csv["weekly_price"].cast(IntegerType()))
csv = csv.withColumn("monthly_price", csv["monthly_price"].cast(IntegerType()))

csv = csv.withColumn("maximum_nights", csv["maximum_nights"].cast(IntegerType()))
csv = csv.withColumn("review_scores_accuracy", csv["review_scores_accuracy"].cast(IntegerType()))
csv = csv.withColumn("review_scores_cleanliness", csv["review_scores_cleanliness"].cast(IntegerType()))
csv = csv.withColumn("review_scores_checkin", csv["review_scores_checkin"].cast(IntegerType()))
csv = csv.withColumn("review_scores_communication", csv["review_scores_communication"].cast(IntegerType()))
csv = csv.withColumn("review_scores_location", csv["review_scores_location"].cast(IntegerType()))

csv = csv.withColumn("review_scores_value", csv["review_scores_value"].cast(IntegerType()))
csv = csv.withColumn("calculated_host_listings_count", csv["calculated_host_listings_count"].cast(IntegerType()))
csv = csv.withColumn("bedrooms", csv["bedrooms"].cast(IntegerType()))
csv = csv.withColumn("bathrooms", csv["bathrooms"].cast(IntegerType()))
csv = csv.withColumn("beds", csv["beds"].cast(IntegerType()))
csv = csv.withColumn("security_deposit", csv["security_deposit"].cast(IntegerType()))

csv = csv.withColumn("host_acceptance_rate", csv["host_acceptance_rate"].cast(IntegerType()))
csv = csv.withColumn("cleaning_fee", csv["cleaning_fee"].cast(IntegerType()))
csv = csv.withColumn("extra_people", csv["extra_people"].cast(IntegerType()))
csv = csv.withColumn("minimum_nights", csv["minimum_nights"].cast(IntegerType()))

csv.show(5)



## Selecting Columns
In the following step, we are selecting the columns that are useful for Rating Prediction.

In [None]:
%pyspark

csv = csv.filter(col("minimum_nights")<= 365)

data = csv.select("review_scores_rating", "host_listings_count", "host_total_listings_count", "calculated_host_listings_count", "security_deposit", "cleaning_fee" , "host_response_time","host_response_rate","host_acceptance_rate","property_type","room_type","bed_type", "weekly_price","monthly_price","maximum_nights","review_scores_accuracy","review_scores_cleanliness","review_scores_checkin","review_scores_communication","review_scores_location","review_scores_value","cancellation_policy","bedrooms","bathrooms","beds","extra_people","minimum_nights",col("price").cast("Int").alias("label"))

data.show(5)

#display(data.describe())


## Data Cleaning
**Handling Missing Values:** Filling the missing values of numeric columns with **'0'** and string columns with **'NA'**

In [None]:
%pyspark

# Filter Property Type not in the correct list
property_list = ["Apartment","House","Bed & Breakfast","Condominium","Loft", "Townhouse","Other","Villa", "Guesthouse", "Bungalow", "Dorm", "Boat", "Cabin", "Chalet", "Boutique hotel", "Serviced apartment", "Hostel", "Camper/RV", "Timeshare", "Guest suite", "Tent", "Vacation home", "Castle, Treehouse", "In-law", "Earth House", "Hut", "Yurt", "Entire Floor", "Tipi", "Nature lodge", "Cave", "Lighthouse", "Casa particular", "Train", "Island", "Igloo", "Parking Space", "Pension (Korea)", "Ryokan (Japan)", "Car", "Heritage hotel (India)", "Plane", "Van"
]

data = data.filter(data.property_type.isin(property_list))


In [None]:
%pyspark
data.persist(StorageLevel.DISK_ONLY_2)

In [None]:
%pyspark

outliers = data.stat.approxQuantile(["bathrooms","bedrooms","monthly_price","extra_people","minimum_nights","label"], [0.05,0.95],0.0)
 
print(outliers)
 
#Filtering the dataframe by removing the outliers
#  data1 = data.filter(col("Host Listings Count") >= outliers(0)(0) && col("Host Listings Count")  <= outliers(0)(1))
#  data2 = data.filter(col("Host Total Listings Count") >= outliers(1)(0) && col("Host Total Listings Count")  <= outliers(1)(1))
# data3 = data.filter(data["Accommodates"] >= outliers[0][0] and data["Accommodates"]  <= outliers[0][1])
data4 = data.filter((data["bathrooms"] >= outliers[0][0]) & (data["bathrooms"]  <= outliers[0][1]))
data5 = data4.filter((data4["bedrooms"] >= outliers[1][0]) & (data4["bedrooms"]  <= outliers[1][1]))
data6 = data5.filter((data5["monthly_price"] >= outliers[2][0]) & (data5["monthly_price"]  <= outliers[2][1]))
# data7 = data6.filter(data6["Cleaning Fee"] >= outliers[4][0] and data6["Cleaning Fee"]  <= outliers[4][1])
#data8 = data7.filter(data7["Guests Included"] >= outliers[5][0]) and data7["Guests Included"]  <= outliers[5][1])
data9 = data6.filter((data6["extra_people"] >= outliers[3][0]) & (data6["extra_people"]  <= outliers[3][1]))
data10 = data9.filter((data9["minimum_nights"] >= outliers[4][0]) & (data9["minimum_nights"]  <= outliers[4][1]))

final_data = data10.filter((data10["label"] >= outliers[5][0]) & (data10["label"]  <= outliers[5][1]))
  
final_data.show(30)



In [None]:
%pyspark

#data_clean = data.na.fill(value=0).na.fill("")
data_clean = final_data.na.fill(value=0).na.fill("NA")
data_clean.show(5)



## Convert the string type columns into indices using StringIndexer

In [None]:
%pyspark

# jwoo: add .setHandleInvalid("skip"): or "keep" for null value
data_clean = StringIndexer(inputCol='host_response_time', outputCol='host_response_time_index').setHandleInvalid("skip").fit(data_clean).transform(data_clean)

data_clean = StringIndexer(inputCol='cancellation_policy', outputCol='cancellation_policy_index').setHandleInvalid("skip").fit(data_clean).transform(data_clean)

data_clean = StringIndexer(inputCol='property_type', outputCol='property_type_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)
data_clean= StringIndexer(inputCol='room_type', outputCol='room_type_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)
#data_clean = StringIndexer(inputCol='Bed Type', outputCol='BedType_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)
#jwoo
data_clean = StringIndexer(inputCol='bed_type', outputCol='bed_type_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)
#data_clean = StringIndexer(inputCol='Host Listings Count', outputCol='Host_Listings_Count_index').setHandleInvalid("keep").fit(data_clean).transform(data_clean)

data_clean = StringIndexer(inputCol="review_scores_rating", outputCol='review_scores_rating_index').fit(data_clean).transform(data_clean)

data_clean.show(5)



## Split the data
In the next step we split the data in a train and test set. We have split the data in the ratio of **70 to 30**.

In [None]:
%pyspark
#jwoo: drop the unneccessary columns as index value generated from them
final_df = data_clean.drop('host_response_time', "property_type", "room_type", "bed_type", "cancellation_policy", "review_scores_rating")
final_df.show(5)

In [None]:
%pyspark
final_df.printSchema()

In [None]:
%pyspark

# Split the data
splits = final_df.randomSplit([0.7, 0.3])

# for decision tree classifier
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

print ("Training Rows:", train.count(), " Testing Rows:", test.count())

In [None]:
%pyspark
train.show(5)

In [None]:
%pyspark
labelColName = "label"

In [None]:
%pyspark
# features = [ x for x in train.columns if ((x != labelColName) and (x != 'host_response_time') and (x != 'cancellation_policy') and (x != 'property_type') and (x != "room_type") and (x != "review_scores_rating"))]
features = [ x for x in train.columns]

print(features)


## Create XGBoostClassifier

In [None]:
%pyspark

params = { 
    'eta': 0.1,
    'gamma': 0.1,
    'missing': 0.0,
    #'treeMethod': 'gpu_hist',
    'maxDepth': 10, 
    'maxLeaves': 256,
    'growPolicy': 'depthwise',
    'objective': 'reg:squarederror',
    'minChildWeight': 30.0,
    'lambda_': 1.0,
    'scalePosWeight': 2.0,
    'subsample': 1.0,
    'nthread': 1,
    'numRound': 100,
    'numWorkers': 1,
}

#classifier = XGBoostClassifier(**params).setLabelCol(labelColName).setFeaturesCols(features)


In [None]:
%pyspark

print("Value : %s" %  params)

In [None]:
%pyspark

# the nthread configuration (2) must be no larger than spark.task.cpus (1)
if (IS_CPU == True):
    params2 = {"treeMethod": "hist", "numWorkers": 1, "nthread": 1} # "numWorkers": 1, 63sec, "numWorkers": 2, 99sec
else:
    params2 = {"treeMethod": "gpu_hist", "numWorkers": 1, "nthread": 1} # "numWorkers": 1, 61sec
params.update(params2) 
xgbParamFinal = params

print("Value : %s" %  xgbParamFinal)

In [None]:
%pyspark

from ml.dmlc.xgboost4j.scala.spark import XGBoostRegressionModel, XGBoostRegressor

xgBoost = XGBoostRegressor(**xgbParamFinal).setLabelCol(labelColName).setFeaturesCols(features)
# xgBoost = XGBoostClassifier(**params).setLabelCol(labelColName).setFeaturesCols("features")



## Define the Pipeline
Define a pipeline that creates a feature vector and trains a regression model
1. A **VectorAssembler** that combines categorical features into a single vector.
2. A **Vector Indexer** that creates indices for a vector of categorical features.
3. A **VectorAssembler** that creates a vector of continuous numeric features.
4. A **MinMaxScaler** to normalize the continuous numeric features.
5. A **VectorAssembler** that creates a vector of categorical and continuous features.
6. A **Decision Tree Classifier** that trains a Classification model.
7. **Process pipeline** with the series of transformations above.

In [None]:
%pyspark

catVect = VectorAssembler(inputCols = ["host_response_time_index", "cancellation_policy_index", "property_type_index", "room_type_index", 'review_scores_rating_index'], outputCol="catFeatures")

catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures").setHandleInvalid("skip") 

#numVect = VectorAssembler(inputCols = ["Host Response Rate","Host Listings Count","Host Total Listings Count","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Calculated host listings count","Bedrooms","Bathrooms","Beds","Security Deposit","Cleaning Fee","Extra People","Minimum Nights"], outputCol="numFeatures")

# remove  "Price",
#numVect = VectorAssembler(inputCols = ["Host Response Rate","Host Acceptance Rate","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Bedrooms","Bathrooms","Beds","Extra People","Minimum Nights"], outputCol="numFeatures")
numVect = VectorAssembler(inputCols = ["host_listings_count", "host_total_listings_count","bathrooms","bedrooms","monthly_price","minimum_nights","review_scores_accuracy","review_scores_cleanliness","review_scores_checkin","review_scores_communication","review_scores_location","review_scores_value"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")

featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"],  outputCol="features")

#dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
#xgBoost = XGBoostClassifier(labelCol="label", featuresCol="features")

In [None]:
%pyspark

pipeline = Pipeline(stages=[catVect,catIdx,numVect, minMax,featVect, xgBoost])


In [None]:
%pyspark
end=time()
phrase = 'data engineering time'
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))


### Train a Regression model using Parameter Tuning
Use the  **CrossValidator** class to evaluate each combination of parameters defined in a **ParameterGrid** against multiple folds of the data split into training and validation datasets, in order to find the best performing parameters. It is used to find the best model for the data. Here the number of folds is assigned to **2**.

In [None]:
%pyspark

paramGrid = (ParamGridBuilder()\
              .addGrid(xgBoost.maxDepth, [2, 3, 9])\
              #.addGrid(xgBoost.maxBins, [1055,2000])\
              .addGrid(xgBoost.eta,[0.3, 0.7])\
              .build())
 #.addGrid(dt.maxBins, [1055,2000])\  .addGrid(dt.maxBins, [2692,3000])\
#.addGrid(dt.maxBins, [2700,3000,4000])\

In [None]:
%pyspark

#cv = CrossValidator(estimator=pipeline, evaluator= BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, numFolds=2)

cv = TrainValidationSplit(estimator=pipeline, evaluator=RegressionEvaluator(),  estimatorParamMaps=paramGrid, trainRatio=0.8)

In [None]:
%pyspark

from time import time
from decimal import Decimal

def with_benchmark(phrase, action):
    start = time()
    result = action()
    end = time()
    print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))
    return result    

In [None]:
%pyspark

model = with_benchmark('training', lambda: cv.fit(train))
#model =cv.fit(train)




### Test the Pipeline Model
The model produced by the pipeline is a transformer that will apply all of the stages in the pipeline to a specified DataFrame and apply the trained model to generate predictions. In this case, we will transform the **test** DataFrame using the pipeline to generate label predictions.

### Save and Reload the Model

In [None]:
%pyspark

'''if IS_AWS == False:
    model.write().overwrite().save('/data/new-model-path')
    loaded_model = XGBoostClassificationModel().load('/data/new-model-path')'''


## Transformation and Show Result Sample

In [None]:
%pyspark



def transform():
    prediction = model.transform(test).cache()
    prediction.foreachPartition(lambda _: None)
    return prediction
    
prediction = with_benchmark('transform', transform)
predicted = prediction.select("features", "prediction", "trueLabel")


In [None]:
%pyspark
predicted.show(5)

In [None]:
%pyspark

evaluator = RegressionEvaluator()\
  .setLabelCol("trueLabel")\
  .setPredictionCol("prediction")\
  .setMetricName("rmse")
 
evaluator1 = RegressionEvaluator()\
  .setLabelCol("trueLabel")\
  .setPredictionCol("prediction")\
  .setMetricName("r2")

rmse = evaluator.evaluate(predicted)
r2 = evaluator1.evaluate(predicted)




In [None]:
%pyspark

print ("RMSE = ", rmse, " R2 = ", r2)


### airbnb_listing.csv (1.93GB) EMR 3 nodes: g4dn.2xlarge
### GPU (S3)
```
|Data Size| Computing (sec)| Data Reading (sec)    | Data Eng   (sec)       | Train  Time (sec)       | Test  Time (sec)          | RMSE   | R2     |
|1.93 GB  |                | 12.82,10.82,11.02,9.72| 24.10,25.80,25.49,15.84| 17.57,18.90,19.73,15.21 |0.9430,0.7229,0.9535,0.8147| 33.71  | 0.7300 |
|1.92 GB  |  154           |                       |                        | 45.1                    |                           | 32.29  | 0.7524 |
```
### GPU (HDFS)
```
|Data Size | Data Reading (sec)    | Data Eng   (sec)       | Train  Time (sec)       | Test  Time (sec)   | RMSE           | R2             |
|1.93 GB   |    5.72, 5.12         | 18.23, 21.336          | 15.53, 15.23            | 0.9955, 0.8092     | 33.35, 32.98   | 0.7300, 0.7368 |
```

### CPU (S3)
``` 
|Data Sz|Comp (sec)| Data Reading (sec)       | Data Eng   (sec)             | Train  Time (sec)           | Test  Time (sec)            |RMSE | R2    |
|1.93 GB|          |9.92,10.72, 9.62,9.52,9.52| 23.30,24.60,25.70,26.80,22.45|26.70,19.75,21.29,22.35,17.18|0.9384,,0.9958,0.9809,0.9949 |33.63| 0.7280|
|1.92GB |  171     |                          |                              | 38.8                        |                             |31.90| 0.7503|

``` 

### CPU (HDFS)
``` 
|Data Sz | Data Reading (sec)   | Data Eng   (sec)   | Train  Time (sec)           | Test  Time (sec)  |RMSE         | R2            |
|1.93 GB | 5.12, 5.12           | 18.93, 17.93       |17.63, 18.28                 |0.8181, 0.8390     |33.52, 33.51 | 0.7308, 0.7331|

``` 

In [None]:
%sh

nvidia-smi


References
1. https://public.opendatasoft.com/explore/dataset/airbnb-listings/export/?disjunctive.host_verifications&disjunctive.amenities&disjunctive.features
2. 

