# Scalable Analytics Case Study (Databricks Notebook) #

In this notebook, you will carry out the module exercises. 

The solution must be coded and executed in the cells just below the statements of the exercises.

Once it is finished, you can export the notebook in DBC (Databricks Notebook) format.


In [3]:
print(sc.version)

The exercises will consist of adding new functionalities, or executing new code, on the Notebook that contains all the theory seen in the module.

## Importing the data ##

In [6]:
dbutils.fs.cp("/FileStore/tables/Hotel_Reviews.csv", "file:///databricks/driver/Hotel_Reviews.csv")

In [7]:
def score_to_string(score):
  if score < 5:
    return "Bad"
  elif score < 7:
    return "Normal"
  elif score < 9:
    return "Good"
  elif score < 10: 
    return "Excellent"
  else:
    return "Perfect"
  
def score_to_evaluation(score_string):
  score_dict = {
    "Bad": 0,
    "Normal": 1,
    "Good": 2,
    "Excellent": 3,
    "Perfect": 4
  }
  return score_dict.get(score_string, None)

## DataFrames in Spark: SparkSQL. ##

In [9]:
df_spark_sql = spark.read.format("csv")\
         .option("header", "true")\
         .option("inferSchema", "true")\
         .load("/FileStore/tables/Hotel_Reviews.csv")

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

score_string_udf = udf(score_to_string, StringType())
score_evaluation_udf = udf(score_to_evaluation, IntegerType())

In [11]:
df_spark_sql = df_spark_sql.withColumn('score_string',score_string_udf(df_spark_sql["Average_Score"]))
df_spark_sql = df_spark_sql.withColumn('score_evaluation',score_evaluation_udf(df_spark_sql["score_string"]))

In [12]:
def day_to_int(day):
  return int(day.replace(" days", "").replace(" day", ""))
day_to_int_udf = udf(day_to_int, IntegerType())
df_spark_sql = df_spark_sql.withColumn("days_since_review", day_to_int_udf(df_spark_sql["days_since_review"]))

### Exercise 1: Create a loop that shows all the columns of the DataFrame, along with their types. You can also paint the outline of the Dataframe. ###

I'm going to start by creating a loop with for "i" in df_spark_sql.dtypes and printing "i" on each iteration. In this way it prints the information it contains, in this case the name of the column and its type.

Next I am going to use the code "df_spark_sql.printSchema ()" to get the schema of the dataframe.

In [14]:

print("----tipos de datos----")
for i in df_spark_sql.dtypes:
  print(i)
  
print("\n"*3)
print("----Esquema----")
df_spark_sql.printSchema()


### Exercise 2: Sample 10 unique hotel name values. Sort them alphanumerically in ascending order (numbers 0-9 first, then A-Z).###

I start by selecting the column 'Hotel_Name'. Next I tell the code to only select the unique values. Later I limit the number of results to 10 and finally I order the results.

In [16]:
df_spark_sql.select('Hotel_Name').distinct().limit(10).orderBy('Hotel_Name').show()

### Exercise 3: Transform the * lat * and * lng * columns to the Float type.

I start by converting the "lat" column. For this purpose I use the "withColumn" function that creates a new column, however, since the name of the "new" column is the same as an existing column in the dataframe, the function used will replace the column that was in the dataframe.

Then I make reference to the column 'df_spark_sql ["lat"]' and finish this part by indicating that I want it to become a "Float" type.

I apply the same procedure with the feature "lng"


In [18]:
df_spark_sql = df_spark_sql.withColumn("lat", df_spark_sql["lat"].cast("Float"))
df_spark_sql = df_spark_sql.withColumn("lng", df_spark_sql["lng"].cast("Float"))

In [19]:
splits = df_spark_sql.randomSplit([0.67, 0.33])
df_spark_sql_train = splits[0].dropna()
df_spark_sql_test = splits[1].dropna()
print(df_spark_sql_train.count())
print(df_spark_sql_test.count())

### Exercise 4: How many hotels have a 'Perfect' score? And 'Good'? And 'Normal' next to 'Good'? (Use the Train dataset)

I'm going to start the exercise by filtering the rows in the "score_string" column that contain the word "Perfect". As you can see there is no record and I check this with the "distinct" command. No entry appears as "Perfect".

Then I apply the same code for the records in which the word "Good" appears and I end up doing the same for "Normal" and "Good".

In [21]:
print("Puntuación 'Perfect'")
print(df_spark_sql_train.filter('score_string="Perfect"').count()) #no hay ninguno
df_spark_sql_train.select('score_string').distinct().show() #lo compruebo con el comando "distinct" y veo que efectivamente no hay ninguna valoración "Perfect"
print("\n","Puntuación 'Good'")
print(df_spark_sql_train.filter('score_string="Good"').count())
print("\n","Puntuación 'Good'+'Normal'")
print(df_spark_sql_train.filter('(score_string = "Normal") or (score_string = "Good")').count())
     

### Exercise 5: Obtain the hotels with the highest average score, discarding all those with a score above Good. (Use the Train dataset) ###

To do this exercise I started by removing the scores that are better than "Good" ("Perfect" and "Excellent").
Then I have selected the columns that I wanted to display ('Hotel_Name', "Average_Score").
Then I have grouped the data by "Hotel_Name" and calculated the average of the "Average_Score".
I have continued with the command "orderBy" to sort the hotels in descending order according to their means (to get the hotels with the highest score).
Finally, I have limited the results to 10 to get the 10 hotels with the highest average score, not including the hotels with a score above Good.

In [23]:
df_spark_sql_train.filter('(score_string <> "Perfect") and (score_string <> "Excellent")').select('Hotel_Name',"Average_Score").groupBy('Hotel_Name').avg("Average_Score").orderBy("avg(Average_Score)",ascending=False).limit(10).show()

# Machine Learning with Apache Spark: Spark MLLib and Spark ML #

## Supervised Classification: Decision Trees ##

### Exercise 6.1: Look again at all the columns of the dataframe, to identify those that are categorical.###
I apply the same code that I used in the first exercise but now with the dataframe "df_spark_sql_train" and I see that there are indeed categorical variables.
To carry out the Machine learning processes, we must eliminate them or transform them into numerical variables.

In [27]:
for i in df_spark_sql_train.dtypes:
  print(i)

### Exercise 6.2: Delete the variables 'Hotel_Address', 'Hotel_Name', 'Tags', 'Positive Review', 'Negative_Review' and 'score_string' from the df_spark_sql_train and df_spark_sql test dataframes. Call them: df_DT_train and df_DT_test.### 

I choose to eliminate the categorical variables, however I will transform "Review_Date" and "Review_Nationality" into numerical variables.

In [29]:
df_DT_train = df_spark_sql_train.drop("Hotel_Address").drop("Hotel_Name")\
  .drop("Tags").drop("Positive_Review").drop("Negative_Review").drop("score_string")
df_DT_test = df_spark_sql_test.drop("Hotel_Address").drop("Hotel_Name")\
  .drop("Tags").drop("Positive_Review").drop("Negative_Review").drop("score_string")

for i in df_DT_train.dtypes:
  print(i)

### Exercise 7: For each remaining column that is String ('Review_Date' and 'Review_Nationality'), apply a StringIndexer (), returning the same column as a result, but with its name ending in _index. Overwrite both dataframes. ###

In this exercise I convert the strings into numbers. I achieve this by passing the columns that have characters with the "StringIndexer" function, indicating to that function the column that I want to convert and the name of the resulting column.
With the "fit" I choose the dataframe in which the column I want to transform will be searched to make the relevant calculations. With the "transform" I confirm that I want to transform the data, thus generating the "output" column.

I apply this procedure for the train and test columns for the two variables mentioned in this exercise ('Review_Date' and 'Review_Nationality').

In [31]:

from pyspark.ml.feature import StringIndexer

df_DT_train = StringIndexer(inputCol="Review_Date", outputCol="Review_Date_index").fit(df_DT_train).transform(df_DT_train)
df_DT_train = StringIndexer(inputCol='Reviewer_Nationality', outputCol="Reviewer_Nationality_index").fit(df_DT_train).transform(df_DT_train)
df_DT_test = StringIndexer(inputCol="Review_Date", outputCol="Review_Date_index").fit(df_DT_test).transform(df_DT_test)
df_DT_test = StringIndexer(inputCol='Reviewer_Nationality', outputCol="Reviewer_Nationality_index").fit(df_DT_test).transform(df_DT_test)


for i in df_DT_train.dtypes:
  print(i)



### Exercise 8: Apply VectorAssembler () on the columns that are neither the previous two nor the 'score_evaluation' column, returning a column called 'features'. Call the result "DT_vector_assembler". ###

In this exercise I create the Vector Assembler that will deposit all the information that exists in the dataframe in a single vector.
This step is necessary as it is the way Spark ML expects to get the information.

Since we have created the variables "Review_Date_index" and "Reviewer_Nationality_index" we indicate (using the drop command) that the vector does not include the variables
"Review_Date" and "Reviewer_Nationality".
Also, since "score_evaluation" is the column that we are going to predict, we don't include it either.

Finally, I indicate that I want the output (the vector to be created) to be named "features".

In [33]:
from pyspark.ml.feature import VectorAssembler
LR_vector_assembler = VectorAssembler(\
  inputCols=df_DT_train.drop("Review_Date").drop("Reviewer_Nationality").drop("score_evaluation").columns,\
  outputCol="features")


### Exercise 9: Apply the transformer on both dataframes.###
Now that the "VectorAssembler" has been created, I apply it with the "transform" and I give to it the data that must be transformed into vector, in this case the df_DT_train and the df_DT_test.

In [35]:
df_DT_train = LR_vector_assembler.transform(df_DT_train)
df_DT_test = LR_vector_assembler.transform(df_DT_test)

### Exercise 10: Initialize the decision tree model, train it and apply it to the test data. ###
* Modelo: DecisionTreeClassifier:
  * Label: score_evaluation.
  * Features: features.
  * maxBins: 1000
  * maxDepth: 1
  
Since SparkMLLib requires that I have created the RDDs and in this case I have not, I am going to use the Spark ML libraries.

I'm going to start by importing the necessary libraries.

Next I am going to create the model specifying the "label", the name of the column with which I will train the model, as well as the depth of the model and the bins.
Finally I train the model by passing the data from the training dataframe.

I will finish the exercise 10 by applying the trained model to the data that I have saved for the test.

In [37]:
from pyspark.ml.classification import DecisionTreeClassifier

model = DecisionTreeClassifier(labelCol="score_evaluation", featuresCol="features",maxDepth=1, maxBins=1000).fit(df_DT_train)

prediction = model.transform(df_DT_test)
print(prediction)

### Exercise 11: Evaluate the model applying a multiclass classifier. Calculate the 'accuracy' metric, and get the complementary to calculate the error. ###
* Evaluador: MulticlassClassificationEvaluator
  * Label: score_evaluation.
  * Prediction: prediction.
  * MetricName: accuracy.

I'm going to start this exercise by importing the libraries that I need, then I create the evaluator where I indicate that I want to make a prediction with "score_evaluation" and that I am interested in the "accuracy" metric.

Subsequently, the evaluator passed the variable "prediction" that I have obtained in the previous exercise to calculate the "accuracy".

I finish this exercise by subtracting "1-accuracy" to find out the existing error.

In [39]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="score_evaluation", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print(accuracy)
print(" Error = %g " % (1.0 - accuracy))

## Spark ML: Pipelines ##

### Pipelines: Decision Trees###
With the same concept as with KMeans, the flow for the decision trees will be designed. You must first apply the preprocessing changes seen above to the initial DataFrame to prepare it.

### Exercise 12: Remove the variables 'Hotel_Address', 'Hotel_Name', 'Tags', 'Positive Review', 'Negative_Review' and 'score_string' from the df_spark_sql_train and df_spark_sql test dataframes. Call them: df_DT_train and df_DT_test.### 

In this exercise I apply the same procedure as in exercise 7.

In [43]:
df_DT_train = df_spark_sql_train.drop("Hotel_Address").drop("Hotel_Name")\
  .drop("Tags").drop("Positive_Review").drop("Negative_Review").drop("score_string")
df_DT_test = df_spark_sql_test.drop("Hotel_Address").drop("Hotel_Name")\
  .drop("Tags").drop("Positive_Review").drop("Negative_Review").drop("score_string")

Después se diseña el flujo para este modelo, el cual será:

** StringIndexer --> VectorAssembler --> Decission Tree (Inicialización) --> Decission Tree (Entrenamiento) --> Modelo Decission Tree entrenado **

### Exercise 13: Collect a list with all the StringIndexer to apply, and call it DT_string_indexers ###
Instead of overwriting the dataframe each time, create a list, and with the 'append' method, all StringIndexers () will be added.
 
I start with creating the Pipeline.
In this case I indicate to the code that if the type of the column is equal to "string" then the code must create a variable "StringIndexer" that is saved in the variable "DT_string_indexers" to be able to correctly mount the pipeline.

In [46]:
DT_string_indexers = []
for dtype in df_DT_train.dtypes:
  if dtype[1] == "string":
    DT_string_indexers.append(StringIndexer(inputCol=dtype[0], outputCol=dtype[0]+"_index"))

### Exercise 14: Save the application of the same VectorAssembler () of exercise 8 in the variable 'DT_vector_assembler'. ###
I do the same as in the previous exercise, to mount the pipeline, I create a variable that contains the "VectorAssembler" saving the result in the variable "D_vector_assembler"

In [48]:
DT_vector_assembler = VectorAssembler(\
  inputCols=df_DT_train.drop("Review_Date").drop("Reviewer_Nationality").drop("score_evaluation").columns,\
  outputCol="features")

### Exercise 15: Create a list with the name DT_pipeline_stages, and add the list of StringIndexers and the VectorAssembler (in this order) ###

In this exercise I simply deposit the information of the variables that I have created in the last two exercises in the "DT_pipeline_stages".

In [50]:
DT_pipeline_stages = [str_indexer for str_indexer in DT_string_indexers]
DT_pipeline_stages.append(DT_vector_assembler)

### Exercise 16: Initialize the decision tree model (same specifications as in ex. 10), and add it to the list of steps 'DT_pipeline_stages' ###

The created model will also be inside the pipeline. This time, we did not train him (as we did in exercise 10), since we will do it in the last step.

In [52]:
model = DecisionTreeClassifier(labelCol="score_evaluation", featuresCol="features",maxDepth=1, maxBins=1000)
DT_pipeline_stages.append(model)

### Exercise 17: Design the Pipeline and apply it to the Train data, calling it 'DT_pipeline_model' ###
In this exercise we pass the entire pipeline created by the Pipeline function. This creates an object that we will apply in the next exercise.

In [54]:
from pyspark.ml import Pipeline
DT_pipeline = Pipeline(stages=DT_pipeline_stages)
print(DT_pipeline)

### Exercise 18: Apply the resulting model to the test data and evaluate it, as was done in exercice 11 ###

Finally, I introduce the data to the pipeline, it will do all the steps that we have placed within it, ending with the process of "training" the model.

I finish the exercise calculating the "accuracy" of the model with the dataframe data of the test.

In [56]:
DT_pipeline_model = DT_pipeline.fit(df_DT_train)

prediction = DT_pipeline_model.transform(df_DT_test)
evaluator = MulticlassClassificationEvaluator(
    labelCol="score_evaluation", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print(accuracy)
print(" Error = %g " % (1.0 - accuracy))