# Spark on Tour
## Ejemplo de procesamiento de datos en streaming para generar un dashboard en NRT

En este notebook vamos a ver un ejemplo completo de como se podría utilizar la API de streaming estructurado de Spark para procesar un stream de eventos de puntuación en vivo, en el tiempo real, y generar como salida un conjunto de estadísticas, o valores agregados, con los que poder construir un dashboard de visualización y monitorización en tiempo real.

Particularmente vamos a simular una plataforma de vídeo bajo demanda en la que los usuarios están viendo pelítculas y puntuándolas. Tomaremos los eventos de puntuación que van entrando en streaming, y genrar, en tiempo real, estadísticas de visualización agredas por género, de forma que podamos monitorizar qué películas son las más populates en este momento.

### Importamos librerías, definimos esquemas e inicializamos la sesión Spark.

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

from IPython.display import clear_output
import plotly.express as px

In [2]:
ratingSchema = StructType([
    StructField("user", IntegerType()),
    StructField("movie", IntegerType()),
    StructField("rating", FloatType())
])

movieSchema = StructType([
    StructField("movie", IntegerType()),
    StructField("title", StringType()),
    StructField("genres", StringType())
])

In [3]:
def foreach_batch_function(df, epoch_id):
    mostPopularMovies = df.limit(10).toPandas()
    clear_output()
    print(mostPopularMovies)

In [4]:
#setup spark session
sparkSession = (SparkSession.builder
                .appName("Movie ratings streaming")
                .master("local[*]")
                .config("spark.scheduler.mode", "FAIR")
                .getOrCreate())
sparkSession.sparkContext.setLogLevel("ERROR")

### Leemos el dataset de películas

In [5]:
movies = sparkSession.read.csv("/tmp/movielens/movies.csv", schema=movieSchema, header=True)
movies.show()

+-----+--------------------+--------------------+
|movie|               title|              genres|
+-----+--------------------+--------------------+
|    1|    Toy Story (1995)|Adventure|Animati...|
|    2|      Jumanji (1995)|Adventure|Childre...|
|    3|Grumpier Old Men ...|      Comedy|Romance|
|    4|Waiting to Exhale...|Comedy|Drama|Romance|
|    5|Father of the Bri...|              Comedy|
|    6|         Heat (1995)|Action|Crime|Thri...|
|    7|      Sabrina (1995)|      Comedy|Romance|
|    8| Tom and Huck (1995)|  Adventure|Children|
|    9| Sudden Death (1995)|              Action|
|   10|    GoldenEye (1995)|Action|Adventure|...|
|   11|American Presiden...|Comedy|Drama|Romance|
|   12|Dracula: Dead and...|       Comedy|Horror|
|   13|        Balto (1995)|Adventure|Animati...|
|   14|        Nixon (1995)|               Drama|
|   15|Cutthroat Island ...|Action|Adventure|...|
|   16|       Casino (1995)|         Crime|Drama|
|   17|Sense and Sensibi...|       Drama|Romance|


### Inicializamos la carga del stream de puntuaciones desde Apache Kafka

In [6]:
dataset = (sparkSession
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:29092")
        .option("subscribe", "ratings")
        .load())
dataset = dataset.selectExpr("CAST(value AS STRING)")
dataset = dataset.select(f.from_json(f.col("value"), ratingSchema).alias("data")).select("data.*")

### Agrupamos por película y sumamos visualizaciones y media de puntuación

In [7]:
dataset = dataset.select("movie", "rating") \
                .groupBy("movie") \
                .agg(f.count("rating").alias("num_ratings"), f.avg("rating").alias("avg_rating"))

### Mezclamos con el dataset de películas para obtener el título

In [8]:
dataset = dataset.join(movies, dataset["movie"] == movies["movie"], "left_outer") \
    .drop(movies["movie"]) \
    .drop("genres")

### Ordenamos la salida por número de votaciones (visualizaciones)

In [9]:
dataset = dataset.select("movie", "title", "avg_rating", "num_ratings") \
    .sort(f.desc("num_ratings"))

### Ejecutamos el procesamiento en streaming

In [10]:
query = dataset \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .foreachBatch(foreach_batch_function) \
    .start()

In [11]:
query.explain()
query.awaitTermination()

   movie                                              title  avg_rating  \
0   2628   Star Wars: Episode I - The Phantom Menace (1999)    3.285714   
1   1580                   Men in Black (a.k.a. MIB) (1997)    3.666667   
2   1721                                     Titanic (1997)    3.833333   
3    296                                Pulp Fiction (1994)    4.333333   
4     34                                        Babe (1995)    4.333333   
5   1210  Star Wars: Episode VI - Return of the Jedi (1983)    3.500000   
6    380                                   True Lies (1994)    3.333333   
7    356                                Forrest Gump (1994)    3.833333   
8   1923                There's Something About Mary (1998)    4.333333   
9   1784                          As Good as It Gets (1997)    3.166667   

   num_ratings  
0            7  
1            6  
2            6  
3            6  
4            6  
5            6  
6            6  
7            6  
8            6  
9   

KeyboardInterrupt: 