# Integración de datos con *Apache Spark*
De manera general, la integración de datos se refiere a realizar consultas para las cuales los datos están divididos en distintas tablas, o incluso almacenados en distintas máquinas. En la vida real, se presentan casos complejos en los que los esquemas de las tablas no son exactamente iguales, por lo que hay que hacer un trabajo previo de correspondencia entre los atributos. En este notebook trabajaremos con un ejemplo simple de integración de datos usando el dataset [MovieLens](https://grouplens.org/datasets/movielens/)

## Configuración del ambiente en Google Colaboratory

In [None]:
# Download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Next, we will install Apache Spark 3.0.1 with Hadoop 2.7 from here.
!wget https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
# Now, we just need to unzip that folder.
!tar xf spark-3.3.2-bin-hadoop3.tgz

# Setting JVM and Spark path variables
import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

# Installing required packages
!pip install pyspark==3.3.2
!pip install findspark

In [None]:
# Importamos las librerias necesarias
import numpy as np
import pandas as pd
import datetime as dt
import matplotlib.pyplot as plt

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as fct

In [None]:
# Descargamos el dataset
!wget -q https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip ml-latest-small.zip
%cd ml-latest-small

In [None]:
!echo "Lista de archivos en el dataset: "
!ls

In [None]:
!cat README.txt

In [None]:
# Iniciamos la sesión de Spark
ss = (SparkSession
      .builder
      .appName("data_integration")
      .getOrCreate())

## Lectura de los datos


In [None]:
# Información sobre las películas
!head movies.csv

In [None]:
movies_data = ss.read.csv('/content/ml-latest-small/movies.csv', sep=',',
                          header=True, quote='"', 
                          schema='movieId INT, title STRING, genres STRING')
movies_data.printSchema()
movies_data.head(5)

In [None]:
# Información sobre las valoraciones de los usuarios
!head ratings.csv

In [None]:
ratings_data = ss.read.csv('/content/ml-latest-small/ratings.csv', sep=',',
                           header=True, quote='"',
                           schema='userId INT, movieId INT, rating DOUBLE, timestamp INT')
ratings_data.printSchema()
ratings_data.head(5)

In [None]:
# Información sobre los tags creados por el usuario
!head tags.csv

In [None]:
tags_data = ss.read.csv('/content/ml-latest-small/tags.csv', sep=',',
                        header=True, quote='"',
                        schema=<SU CODIGO>)
tags_data.printSchema()
tags_data.head(5)

### Conversión de las etiquetas de tiempo

In [None]:
# Obtener el año a partir de la etiqueta de tiempo (timestamp)
get_year = fct.udf(lambda x: dt.datetime.fromtimestamp(x/1000.0).year)
# Almacenar el año en una nueva columna del DataFrame
ratings_data = ratings_data.withColumn("year", get_year(ratings_data.timestamp))
tags_data = tags_data.withColumn("year", get_year(tags_data.timestamp))
# Primeras filas de los DataFrame de valoraciones y etiquetas
ratings_data.show(5)
tags_data.show(5)

### Información básica de los datos

In [None]:
res = (movies_data
       .select('movieId')
       .dropDuplicates()
       .count())
print(f'{res} películas')

In [None]:
res = (ratings_data
       .count())
print(f'{res} valoraciones')

In [None]:
res = (ratings_data
       .select('userId')
       .dropDuplicates()
       .count())
print(f'{res} usuarios con valoraciones para al menos una película')

In [None]:
print('Distribución de las valoraciones:')
(ratings_data
 .groupby('rating')
 .count()
 .sort(fct.desc('rating'))
 .show())

In [None]:
print('Datos originales: ')
(movies_data
 .show(1))
print('Modificación sobre la columna género: ')
movies_data_genre = (movies_data
                     .withColumn("genres_array", fct.split("genres", "\|")) # Generar una lista con los diferentes géneros
                     .withColumn("genre", fct.explode("genres_array")) # Generar un "nuevo" registro por cada género de una película
                     .select("movieId", "title", "genre"))
movies_data_genre.show(5)
print('Géneros únicos de películas: ')
(movies_data
 .withColumn("genres_array", fct.split("genres", "\|"))
 .withColumn("genre", fct.explode("genres_array"))
 .select("genre")
 .dropDuplicates()
 .sort(fct.asc("genre"))
 .show())

## Combinaciones [[ref]](https://support.microsoft.com/es-es/office/combinar-tablas-y-consultas-3f5838bd-24a0-4832-9bc1-07061a1478f6)

### Combinaciones internas (Inner Joins)

Una combinación interna es una en la que solo se incluyen datos de una tabla si hay datos correspondientes en la tabla relacionada y viceversa.

In [None]:
movies_data_inner = movies_data.join(ratings_data, ["movieId"], "inner")
print(f'{movies_data_inner.count()} filas con una unión interna')
movies_data_inner.show()

### Combinaciones externas (Outer Joins)

Una combinación externa es como una combinación interna, pero agrega las filas restantes de una de las tablas. Las combinaciones externas son direccionales: una combinación externa izquierda incluye todos los registros de la tabla izquierda (la primera tabla de la combinación) y una combinación externa derecha incluye todos los registros de la tabla derecha, la segunda tabla de la combinación. 

In [None]:
# LEFT OUTER
movies_data_outer_l = movies_data.join(ratings_data, ["movieId"], "left")
print(f'{movies_data_outer_l.count()} filas con una unión externa (izq)')
movies_data_outer_l.show()

In [None]:
# RIGHT OUTER
movies_data_outer_r = movies_data.join(ratings_data, ["movieId"], "right")
print(f'{movies_data_outer_r.count()} filas con una unión externa (der)')
movies_data_outer_r.show()

### Combinación externa completa (Full Join)

En algunos sistemas, una combinación externa completa incluye todas las filas de ambas tablas, con filas combinadas cuando se correspondan.

In [None]:
movies_data_full = movies_data.join(ratings_data, ["movieId"], "full")
print(f'{movies_data_full.count()} filas con una unión completa')
movies_data_full.show()

## Extracción de información

### Valoraciones promedio por película

In [None]:
rating_by_movie =(ratings_data
                  .groupby("movieId")
                  .agg(fct.count("rating").alias("# reviews"), fct.avg("rating").alias("avg rating"))
                  .join(movies_data, ["movieID"], "inner")
                  .select(["title", "avg rating", "# reviews"]))
rating_by_movie.orderBy(["# reviews"], ascending=False).show()

In [None]:
(rating_by_movie
 .orderBy(["avg rating"], ascending=False)
 .show())

In [None]:
(rating_by_movie
 .where(rating_by_movie["# reviews"]>10)
 .orderBy(["avg rating"])
 .show())

### Valoraciones promedio por género

In [None]:
(rating_by_movie
 .join(movies_data_genre, ["title"], "inner")
 .groupby("genre")
 .agg(fct.count("genre").alias("# movies"), fct.avg("avg rating").alias("avg rating"), fct.sum("# reviews").alias("# reviews"))
 .orderBy(["avg rating"], ascending=False)
 .show())

### Valoraciones promedio por usuario
<!--
rating_by_user =(ratings_data
                  .groupby("userId")
                  .agg(fct.count("rating").alias("# reviews"), fct.avg("rating").alias("avg rating"))
                  .select(["userId", "avg rating", "# reviews"]))
rating_by_user.orderBy(["# reviews"], ascending=False).show()
-->

In [None]:
rating_by_user =(ratings_data
                  .groupby(<SU CODIGO>)
                  .agg(fct.count(<SU CODIGO>).alias("# reviews"), fct.avg(<SU CODIGO>).alias("avg rating"))
                  .select(["userId", "avg rating", "# reviews"]))
rating_by_user.orderBy(["# reviews"], ascending=False).show()

In [None]:
(rating_by_user
 .where(rating_by_user["# reviews"]>10)
 .orderBy(["avg rating"], ascending=False)
 .show())

In [None]:
(rating_by_user
 .where(rating_by_user["# reviews"]>10)
 .orderBy(["avg rating"])
 .show())

### Valoraciones promedio por año
<!--
reg_exp = "(\d{4})"
movies_data_year = (movies_data
                    .withColumn("movie year", fct.regexp_extract("title", reg_exp, 0)) # Generar una nueva columna con el año a partir del título
                    .select("movieId", "title", "movie year"))
movies_data_year = (movies_data_year
                    .where(movies_data_year["movie year"]>1950)
                    .where(movies_data_year["movie year"]<2023))
movies_data_year.show(5)
-->

In [None]:
reg_exp = "(\d{4})" # Expresión regular para las fechas (4 dígitos)
movies_data_year = (movies_data
                    .withColumn(<SU CODIGO>, fct.regexp_extract(<SU CODIGO>, <SU CODIGO>, 0)) # Generar una nueva columna con el año a partir del título
                    .select("movieId", "title", "movie year"))
movies_data_year = (movies_data_year
                    .where(<SU CODIGO>) # Dados errores en los datos, seleccionar solo las
                    .where(<SU CODIGO>))# películas entre 1950 y 2023
movies_data_year.show(5)

In [None]:
rating_by_year = (rating_by_movie
                  .join(movies_data_year, ["title"], "inner")
                  .groupby("movie year")
                  .agg(fct.count("movie year").alias("# movies"), fct.avg("avg rating").alias("avg rating"), fct.sum("# reviews").alias("# reviews")))
rating_by_year.show(5)

In [None]:
(rating_by_year
 .where(rating_by_year["# reviews"]>100)
 .orderBy(["avg rating"], ascending = False)
 .show(5))

In [None]:
(rating_by_year
 .orderBy(["# movies"], ascending = False)
 .show(5))

### Mejor película por año

In [None]:
best_movie_by_year = (rating_by_movie
                      .join(movies_data_year, ["title"], "inner")
                      .where(rating_by_movie["# reviews"] > 10)
                      .orderBy(["avg rating"], ascending=False)
                      .groupby("movie year")
                      .agg(fct.first("title").alias("title"), fct.first("avg rating").alias("avg rating")))
best_movie_by_year.orderBy(["movie year"], ascending=False).show()

In [None]:
best_movie_by_year.orderBy(["movie year"]).show()

### Mejor película para cada usuario

In [None]:
best_movie_by_user = (ratings_data
                      .join(movies_data, ["movieId"], "inner")
                      .orderBy(["rating"], ascending=False)
                      .groupby("userId")
                      .agg(fct.first("title").alias("title"), fct.first("rating").alias("rating")))
best_movie_by_user.show() # Con la precisión de que obtenemos solo un resultado por usuario cuando puede haber varias películas con valoración 5.0

In [None]:
(ratings_data
 .join(movies_data_year, ["movieId"], "inner")
 .where("userId = 1 AND rating = 5.0")
 .select(["title", "movie year"])
 .show())

## Ejercicios
### ¿Cuál es la peor película por cada año?

### ¿Cuál es el género favorito de cada usuario?
Género más frecuente de las películas que el usuario valoró con 5.0.

<details>
<summary>
  <font size=3 color=darkgreen><b>Pistas</b></font>
</summary>
<p>
<ul>
    <li>Los géneros de las películas están en la tabla <i>movies_data_genre</i></li>
    <li>Para obtener un conteo de cada género por cada usuario se pueden agrupar las dos variables <i>.groupby(["userId", "genre"])</i></li>
    <li>Después se puede usar la secuencia de pasos en los ejemplos anteriores con <i>.fct.first("genre")</i></li>
</ul>
</p>
</details>

### ¿Cuál es el género menos preferido por el usuario?
Género más frecuente de las 5 películas a las que el usuario les dio una valoración menor a 2.