## Objective
**Airbnb** is an online marketplace that connects people who want to rent out their homes with people looking for accommodations in that locale. 
The Aim is to understand how Airbnb hosts can make simple changes to their properties to boost customer satisfaction.  We used a **Classification model** for Rating Prediction, and the algorithm used here is **Support Vector Machines CLassifier**. This experiment predicts Customer's behavior in classifying their reviews as high rated or low rated using feature Review Score Rating.

## Import Spark SQL and Spark ML Libraries
Import all the Spark SQL and ML libraries as mentioned below. This is neccessary to access the functions available in those libraries.

In [0]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LinearSVC

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

## To run the code in PySpark CLI
Set the following to True:
```
PYSPARK_CLI = True
```
Generate .py(Python) file from Databricks: File > Export > Source File
```
Run it at the Hadoop/Spark cluster:
$ spark-submit LogisticRegression.py
```

In [0]:
PYSPARK_CLI = False
if PYSPARK_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

##Read csv file from DBFS  (Databricks File System)
1. After <filename>.csv file is added to the data in the left frame, create a table using the UI, "Upload File"
2. Click "Preview Table to view the table" and select the option as <filename>.csv has a header as the first row. "First line is header"
3. Change the data type of the columns
4. Click on the create table button.
  
The link to the sampled file : https://www.kaggle.com/samyukthamurali/airbnb-ratings-dataset?select=airbnb_sample.csv. You can download the sample file from here and upload it in DBFS.

In [0]:
# File location and type
file_location = "/FileStore/tables/airbnb_sample.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

#display(df)

##Create a temporary view of the dataframe 'df'

In [0]:
# Create a view or table
temp_table_name = "airbnb_sample_csv"
df.createOrReplaceTempView(temp_table_name)

##Create a dataframe from the table, using Spark SQL

In [0]:
if PYSPARK_CLI:
    csv = spark.read.csv('airbnb_sample', inferSchema=True, header=True)
else:
    csv = spark.sql("SELECT * FROM airbnb_sample_csv")
    
csv.show(5)

In [0]:

csv = csv.withColumn("Review Scores Rating", when(col("Review Scores Rating") >= 80,1).otherwise(0))
csv.show(5)


##Selecting features
In the following step, we are selecting the features that are useful for Rating Prediction.

In [0]:
csv = csv.filter(col("Minimum Nights")<= 365)

data = csv.select("Host Response Time","Host Response Rate","Host Acceptance Rate","Host Neighborhood","Host Listings Count","Host Total Listings Count","Property Type","Room Type","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Cancellation Policy","Calculated host listings count","Neighborhood Cleansed","Neighborhood Group Cleansed","Bedrooms","Bathrooms","Beds","Security Deposit","Cleaning Fee","Extra People","Minimum Nights","Calendar Updated","Amenities", col("Review Scores Rating").alias("label"))


data.show(5)

#display(data.describe())

##Data Cleaning
**Handling Missing Values:** Filling the missing values of numeric columns with **'0'** and string columns with **'NA'**

In [0]:
data_clean = data.na.fill(value=0).na.fill("")
data_clean.show(20)


##Convert the string type columns into indices using StringIndexer

In [0]:
data_clean = StringIndexer(inputCol='Host Response Time', outputCol='Host_Response_index').fit(data_clean).transform(data_clean)
data_clean = StringIndexer(inputCol='Room Type', outputCol='RoomType_index').fit(data_clean).transform(data_clean)
data_clean = StringIndexer(inputCol='Property Type', outputCol='PropertyType_index').fit(data_clean).transform(data_clean)
data_clean = StringIndexer(inputCol='Cancellation Policy', outputCol='Cancellation_index').fit(data_clean).transform(data_clean)


## Split the data
In the next step we split the data in a train and test set. We have split the data in the ratio of **70 to 30**.

In [0]:
# Split the data
splits = data_clean.randomSplit([0.7, 0.3])

# for Support Vector Machines classifier 
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

print ("Training Rows:", train.count(), " Testing Rows:", test.count())

## Define the Pipeline
Define a pipeline that creates a feature vector and trains a regression model
1. A **VectorAssembler** that combines categorical features into a single vector.
2. A **Vector Indexer** that creates indices for a vector of categorical features.
3. A **VectorAssembler** that creates a vector of continuous numeric features.
4. A **MinMaxScaler** to normalize the continuous numeric features.
5. A **VectorAssembler** that creates a vector of categorical and continuous features.
6. A **Support Vector Machines CLassifier** that trains a Classification model.
7. **Process pipeline** with the series of transformations above.

In [0]:
catVect = VectorAssembler(inputCols = ["Host_Response_index","RoomType_index", "PropertyType_index", "Cancellation_index"], outputCol="catFeatures")

catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures").setHandleInvalid("skip") 

numVect = VectorAssembler(inputCols = ["Host Response Rate","Host Listings Count","Host Total Listings Count","Price","Weekly Price","Monthly Price","Maximum Nights","Review Scores Accuracy","Review Scores Cleanliness","Review Scores Checkin","Review Scores Communication","Review Scores Location","Review Scores Value","Calculated host listings count","Bedrooms","Bathrooms","Beds","Security Deposit","Cleaning Fee","Extra People","Minimum Nights"], outputCol="numFeatures")

minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")

featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"],  outputCol="features")

svc = LinearSVC(labelCol="label", featuresCol="features")

pipeline = Pipeline(stages=[catVect,catIdx,numVect, minMax,featVect, svc])


### Train a Classification model using Parameter Tuning
Use the  **CrossValidator** class to evaluate each combination of parameters defined in a **ParameterGrid** against multiple folds of the data split into training and validation datasets, in order to find the best performing parameters. It is used to find the best model for the data. Here the number of folds is assigned to **2**.

In [0]:

paramGrid = ParamGridBuilder() \
            .addGrid(svc.maxIter, [5, 10, 50]) \
            .addGrid(svc.regParam, [0.01, 0.3, 0.5]) \
            .addGrid(svc.aggregationDepth, [2,30]) \
            .build()

In [0]:
#val = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=5)
val = TrainValidationSplit(estimator=pipeline, evaluator=BinaryClassificationEvaluator(),  estimatorParamMaps=paramGrid, trainRatio=0.8)


In [0]:
model = val.fit(train)


### Test the Pipeline Model
The model produced by the pipeline is a transformer that will apply all of the stages in the pipeline to a specified DataFrame and apply the trained model to generate predictions. In this case, we will transform the **test** DataFrame using the pipeline to generate label predictions.

In [0]:
prediction = model.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
#predicted = prediction.select("prediction", "trueLabel")
predicted.show(30)

In [0]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])
metrics.show()


### Retrieve the Area Under Curve
There are several evaluation metrics for the Classification Model. Of these, the AUC is capable of distinguishing between classes. Higher the AUC, the better the model is at predicting 0s as 0s and 1s as 1s. By analogy, the Higher the AUC, the better the model distinguishes between the high and low ratings. You can use the **BinaryClassificationEvaluator** class to retrieve the AUC value.

In [0]:

evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC") 
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)