# Caso práctico de Analítica Escalable (Ejercicios) #

En este notebook, se van a realizar los ejercicios del módulo. En lugar de tener contenido teórico y descripciones, se dejarán únicamente las celdas de código necesarias para su ejecución.

Para completar los ejercicios, hay que codificar y ejecutar la solución en las celdas que se encuentran justo debajo de los enunciados de los ejercicios.

Una vez se haya terminado, en el menú de la izquiera, a la hora de seleccionar el notebook, si se le hace click a la flecha que se encuentra en la derecha, se puede exportar al notebook. Hay que exportarlo en formato DBC (Databricks Notebook) como en HTML.

In [3]:
print sc.version

Los ejercicios consistirán en añadir nuevas funcionalidades, o ejecutar nuevo código, sobre el Notebook que contiene toda la teoría vista en el módulo. Por ello, gran parte del código que se encuentra dentro del notebook de contenido teórico se encontrará aquí de nuevo, pero se pedirá nuevo código.

## Importando los datos ##

In [6]:
dbutils.fs.cp("/FileStore/tables/Hotel_Reviews.csv", "file:///databricks/driver/Hotel_Reviews.csv")

In [7]:
def score_to_string(score):
  if score < 5:
    return "Bad"
  elif score < 7:
    return "Normal"
  elif score < 9:
    return "Good"
  elif score < 10: 
    return "Excellent"
  else:
    return "Perfect"
  
def score_to_evaluation(score_string):
  score_dict = {
    "Bad": 0,
    "Normal": 1,
    "Good": 2,
    "Excellent": 3,
    "Perfect": 4
  }
  return score_dict.get(score_string, None)

## DataFrames en Spark: SparkSQL. ##

In [9]:
df_spark_sql = spark.read.format("csv")\
         .option("header", "true")\
         .option("inferSchema", "true")\
         .load("file:///databricks/driver/Hotel_Reviews.csv")

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

score_string_udf = udf(score_to_string, StringType())
score_evaluation_udf = udf(score_to_evaluation, IntegerType())

In [11]:
df_spark_sql = df_spark_sql.withColumn('score_string',score_string_udf(df_spark_sql["Average_Score"]))
df_spark_sql = df_spark_sql.withColumn('score_evaluation',score_evaluation_udf(df_spark_sql["score_string"]))

In [12]:
def day_to_int(day):
  return int(day.replace(" days", "").replace(" day", ""))
day_to_int_udf = udf(day_to_int, IntegerType())
df_spark_sql = df_spark_sql.withColumn("days_since_review", day_to_int_udf(df_spark_sql["days_since_review"]))

### Ejercicio 1: Crear un bucle que muestre todas las columnas del DataFrame, junto con sus tipos. También puedes pintar el esquema del Dataframe. ###

In [14]:
# Escribir aquí el ejericicio
# 
print("Primero imprimimos el esquema:")
print(df_spark_sql.printSchema())
print("Ahora hacemos el bucle que muestra las columnas y tipos del dataFrame")
for name, dtype in df_spark_sql.dtypes:
  print(name, " --> ",dtype)

### Ejercicio 2: Realizar un muestreo de 10 valores únicos de nombres de hoteles. Ordénalos alfanuméricamente de forma ascendente (primero los números 0-9, después A-Z). ###

In [16]:
# Escribir aquí el ejercicio
df_spark_sql.select('Hotel_Name').distinct().limit(10).orderBy("Hotel_Name").show()

### Ejercicio 3: Transforma las columnas *lat* y *lng* al tipo Float.

In [18]:
# Transformación de la columna sin utilizar UDF.
from pyspark.sql.types import FloatType

df_new_aux = df_spark_sql.withColumn("lat", df_spark_sql["lat"].cast(FloatType()) )
df_new_sql = df_new_aux.withColumn("lng", df_spark_sql["lng"].cast(FloatType()) )
df_spark_sql = df_new_sql
print ('  ----  Comprobamos que ha cambiado ------ ')
print(df_spark_sql.printSchema())

In [19]:
splits = df_spark_sql.randomSplit([0.67, 0.33])
df_spark_sql_train = splits[0].dropna()
df_spark_sql_test = splits[1].dropna()
print(df_spark_sql_train.count())
print(df_spark_sql_test.count())

### Ejercicio 4: ¿Cuántos hoteles tienen una puntuación de 'Perfect'? ¿Y 'Good'? ¿Y 'Normal' junto a 'Good'? (Utilizar el dataset de Train)

In [21]:
# Escribir aquí el ejercicio
print ('  ----  Cantidad de Perfect ------ ')
df_spark_sql_train.select('score_string').filter("score_string = 'Perfect'").groupBy('score_string').count().show()
print ('  ----  Cantidad de Good ------ ')
df_spark_sql_train.select('score_string').filter("score_string = 'Good'").groupBy('score_string').count().show()
print ('  ----  Cantidad de Normal y Good ------ ')
df_spark_sql_train.select('score_string').filter("score_string = 'Normal' or score_string = 'Good'").groupBy('score_string').count().show()

### Ejercicio 5: Obtener los hoteles con mayor puntuación media, descartando todos los que tengan una puntuación por encima de Good. (Utilizar el dataset de Train) ###

In [23]:
# Escribir aquí el ejercicio
from pyspark.sql.functions import desc
# df_spark_sql_train.limit(1000).show()
print ('  ----  10 Hoteles no Excelent con mayor puntuacion media ------ ')
df_Puntuacion = df_spark_sql_train.filter("score_string <> 'Excellent'").groupBy('Hotel_Name').avg('Reviewer_Score')
df_Puntuacion.orderBy(desc("avg(Reviewer_Score)")).limit(10).show()

# Machine Learning en Apache Spark: Spark MLLib y Spark ML #

## Clasificación Supervisada: Árboles de decisión ##

### Ejercicio 6.1: Volver a observar todas las columnas del dataframe, para identificar las que sean categóricas. ###

In [27]:
# Escribir aquí el ejercicio
print("   ---------- Primero imprimimos el esquema: -----------")
print(df_spark_sql.printSchema())
print("   ---------- Despues imprimimos algunos datos: -----------")
df_spark_sql.select('*').limit(100).show()

### Ejercicio 6.2: Eliminar, de los dataframes df_spark_sql_train y df_spark_sql test, las variables 'Hotel_Address', 'Hotel_Name', 'Tags', 'Positive Review', 'Negative_Review' y 'score_string'. Llamarlos: df_DT_train y df_DT_test. ###

In [29]:
# Escribir aquí el ejercicio
df_DT_train = df_spark_sql_train.drop('Hotel_Address', 'Hotel_Name', 'Tags', 'Positive_Review', 'Negative_Review', 'score_string')
print("   ---------- Comprobamos que se han eliminado las columnas de df_DT_train -----------")
print(df_DT_train.printSchema())
df_DT_test = df_spark_sql_test.drop('Hotel_Address', 'Hotel_Name', 'Tags', 'Positive_Review', 'Negative_Review', 'score_string')
print("   ---------- Comprobamos que se han eliminado las columnas de df_DT_test -----------")
print(df_DT_test.printSchema())

### Ejercicio 7: Para cada columa restante que sea String ('Review_Date' y 'Review_Nationality'), aplicar un StringIndexer(), devolviendo como resultado la misma columna, pero con su nombre acabando en _index. Sobreescribir ambos dataframes.  ###

In [31]:
# Escribir aquí el ejercicio
from pyspark.ml.feature import StringIndexer

print("   ---------- Definimos un indexer para cada columna -----------")
indexerDate = StringIndexer(inputCol="Review_Date", outputCol="Review_Date_index")
indexerNationality = StringIndexer(inputCol="Reviewer_Nationality", outputCol="Reviewer_Nationality_index")

print("   ---------- Ejecutamos ambos indexers sobre ambos dataframes -----------")
df_DT_train = indexerDate.fit(df_DT_train).transform(df_DT_train)
df_DT_train = indexerNationality.fit(df_DT_train).transform(df_DT_train)
df_DT_test = indexerDate.fit(df_DT_test).transform(df_DT_test)
df_DT_test = indexerNationality.fit(df_DT_test).transform(df_DT_test)
print("   ---------- Comprobamos que se han creado las columnas de df_DT_train -----------")
df_DT_train.select('Review_Date','Review_Date_Index','Reviewer_Nationality','Reviewer_Nationality_Index').limit(100).show()
print("   ---------- Comprobamos que se han creado las columnas de df_DT_test -----------")
df_DT_test.select('Review_Date','Review_Date_Index','Reviewer_Nationality','Reviewer_Nationality_Index').limit(100).show()


### Ejercicio 8: Aplicar VectorAssembler() sobre las columnas que no son ni las dos anteriores, ni la columna 'score_evaluation', devolviendo una columna llamada 'features'. Llamar al resultado DT_vector_assembler. ###

In [33]:
# Escribir aquí el ejercicio
from pyspark.ml.feature import VectorAssembler

print("   ---------- Imprimimos el esquema para ver cuales son las columnas a utilizar -----------")
print(df_DT_train.printSchema())

DT_vector_assembler = VectorAssembler(
    inputCols=["Additional_Number_of_Scoring", "Average_Score", "Review_Total_Negative_Word_Counts", "Total_Number_of_Reviews", "Review_Total_Positive_Word_Counts", "Total_Number_of_Reviews_Reviewer_Has_Given", "Reviewer_Score", "days_since_review", "Review_Date_index", "Reviewer_Nationality_index"],
    outputCol="features")

### Ejercicio 9: Aplicar el transformador sobre ambos dataframes. ###

In [35]:
# Escribir aquí el ejercicio
df_DT_train_assembled = DT_vector_assembler.transform(df_DT_train)
df_DT_test_assembled = DT_vector_assembler.transform(df_DT_test)

print("   ---------- Vemos el dataframe obtenido -----------")
df_DT_train_assembled.select("*").show(10)

### Ejercicio 10: Inicializar el modelo de árbol de decisión, entrenarlo y aplicarlo sobre los datos de test. ###
* Modelo: DecisionTreeClassifier:
  * Label: score_evaluation.
  * Features: features.
  * maxBins: 1000
  * maxDepth: 1

In [37]:
# Escribir aquí el ejercicio
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

print("   ---------- Creamos el arbol de decision para la columna score_evaluation y entrenamos el modelo con los datos TRAIN -----------")
dt = DecisionTreeClassifier(labelCol="score_evaluation", featuresCol="features", maxBins=1000, maxDepth=1)
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(df_DT_train_assembled)
print("   ---------- Aplicamos el modelo a los datos de TEST para obtener las previsiones y poder posteriormente evaluar el modelo -----------")
predictions = model.transform(df_DT_test_assembled)

print("   ---------- Vemos resultados obtenidos -----------")
predictions.select("score_evaluation", "features", "rawPrediction", "probability", "prediction").show(100)


### Ejercicio 11: Evaluar el modelo aplicándole un clasificador multiclase. Calcular la métrica 'accuracy', y conseguir el complementario para calcular el error. ###
* Evaluador: MulticlassClassificationEvaluator
  * Label: score_evaluation.
  * Prediction: prediction.
  * MetricName: accuracy.

In [39]:
# Escribir aquí el ejercicio
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="score_evaluation", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("   ---------- Vemos error obtenido -----------")
print("Error en el test = %g " % (1.0 - accuracy))


## Spark ML: Pipelines ##

### Pipelines: Árboles de Decisión ###
Con el mismo concepto que con el KMeans, se va a diseñar el flujo para los árboles de decisión. Primero hay que aplicar los cambios de preprocesamiento vistos anteriormente al DataFrame inicial para preparalo.

### Ejercicio 12: Eliminar, de los dataframes df_spark_sql_train y df_spark_sql test, las variables 'Hotel_Address', 'Hotel_Name', 'Tags', 'Positive Review', 'Negative_Review' y 'score_string'. Llamarlos: df_DT_train y df_DT_test. ###

In [43]:
# Escribir aquí el ejercicio
# Hecho en un punto anterior
# df_DT_train = df_spark_sql_train.drop('Hotel_Address', 'Hotel_Name', 'Tags', 'Positive_Review', 'Negative_Review', 'score_string')
print("   ---------- Comprobamos que se han eliminado las columnas de df_DT_train -----------")
print(df_DT_train.printSchema())
print("   ---------- Comprobamos que se han eliminado las columnas de df_DT_test -----------")
print(df_DT_test.printSchema())

Después se diseña el flujo para este modelo, el cual será:

** StringIndexer --> VectorAssembler --> Decission Tree (Inicialización) --> Decission Tree (Entrenamiento) --> Modelo Decission Tree entrenado **

### Ejercicio 13: Recoger una lista con todos los StringIndexer a aplicar, y llamarla DT_string_indexers ###
 En lugar de sobreescribir cada vez el dataframe, crear una lista, y con el método 'append', se irán añadiendo todos los StringIndexers().

In [46]:
# Escribir aquí el ejercicio

print("   ---------- Definimos un indexer para cada columna tipo string -----------")
DT_string_indexers = []

for name, dtype in df_DT_train.dtypes:
  if dtype == "string":
    print("   ---------- Definimos indexer para la columna: ", name)
    DT_string_indexers.append(StringIndexer(inputCol=name, outputCol=name+"_index2"))
df_DT_train.printSchema() 

### Ejercicio 14: Guardar en la variable 'DT_vector_assembler' la aplicación del mismo VectorAssembler() del ejercicio 8. ###

In [48]:
# Escribir aquí el ejercicio
DT_vector_assembler = VectorAssembler(
    inputCols=["Additional_Number_of_Scoring", "Average_Score", "Review_Total_Negative_Word_Counts", "Total_Number_of_Reviews", "Review_Total_Positive_Word_Counts", "Total_Number_of_Reviews_Reviewer_Has_Given", "Reviewer_Score", "days_since_review", "Review_Date_index2", "Reviewer_Nationality_index2"],
    outputCol="features")

### Ejercicio 15: Crear una lista con el mombre de DT_pipeline_stages, y añadirle la lista de StringIndexers y el VectorAssembler (en este orden) ###

In [50]:
# Escribir aquí el ejercicio
DT_pipeline_stages = []
# Añadir la lista de String Indexers
DT_pipeline_stages = [str_indexer for str_indexer in DT_string_indexers]
DT_pipeline_stages.append(DT_vector_assembler)


### Ejercicio 16: Inicializar el modelo de árbol de decisión (mismas especificaciones que en el ej. 10), y añadirlo a la lista de pasos 'DT_pipeline_stages' ###

In [52]:
# Escribir aquí el ejercicio
print("   ---------- Creamos el arbol de decision para la columna score_evaluation y entrenamos el modelo con los datos TRAIN -----------")
dt_pipeline = DecisionTreeClassifier(labelCol="score_evaluation", featuresCol="features", maxBins=1000, maxDepth=1)
DT_pipeline_stages.append(dt_pipeline)


### Ejercicio 17: Diseñar el Pipeline y aplicarlo sobre los datos de Train, llamándolo 'DT_pipeline_model' ###

In [54]:
# Escribir aquí el ejercicio

# Definimos el pipeline a partir de los pasos anteriores
pipeline_list = Pipeline(stages=DT_pipeline_stages)

# Aplicamos el pipeline sobre los datos TRAIN:
DT_pipeline_model = pipeline_list.fit(df_DT_train)


### Ejercicio 18: Aplicar el modelo resultante sobre los datos de test y evaluarlo al igual que se hizo en el ej. 11 ###

In [56]:
# Escribir aquí el ejercicio
DT_predictions = DT_pipeline_model.transform(df_DT_test)
DT_predictions.select("score_evaluation", "features", "rawPrediction", "probability", "prediction").show(100)

print(predictions.dtypes)
print(DT_predictions.dtypes)

print(type(predictions))
print(type(DT_predictions))

dtModel = DT_pipeline_model.stages[len(DT_pipeline_model.stages)-1]
evaluator2 = MulticlassClassificationEvaluator(
    labelCol="score_evaluation", predictionCol="prediction", metricName="accuracy")
# No entiendo que esta expresion de error...
accuracy2 = evaluator2.evaluate(DT_predictions)
print("   ---------- Vemos error obtenido -----------")
print("Error en el test = %g " % (1.0 - accuracy2))