# ![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)

# Procesamiento por lotes (batch) vs. interactivo (streaming)
## UOC - Máster en Data Science
### Alumno: **Fernando Antonio Barbeiro Campos** - fbarbeiro@uoc.edu

Apache Spark incluyó en su versión 2.0 la primera versión de una nueva API para el procesamiento en flujo de "nivel superior", en inglés la denominó "Structured Streaming". En esta PEC veremos como usar dicha API sobre Spark DataFrame para construir aplicaciones de "Flujo Estructurado". Nuestro objetivo será calcular métricas en tiempo real, como conteos, o promedios en tiempo real dentro de ventanas (p.ej. _moving average_) en una secuencia de acciones con marca de tiempo (p.ej. acciones Abrir y Cerrar en nuestros datos de muestra).

** Esta PEC cubrirá: **
* *Parte 1: Conocimiento del dominio*
* *Parte 2: Procesamiento por lotes* (3 puntos sobre 10)
* *Parte 3: Procesamiento interactivo* (7 puntos sobre 10)

#### Parte 1. Datos de esta PEC

Podemos encontrar algunos ejemplos de datos en flujo en los archivos ubicados en ```/databricks-datasets/structured-stream/events/```. Estos datos son los que vamos a usar para construir las diferentes métricas. Veamos que contiene este directorio ejecutando la siguiente celda.

In [3]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008


Hay aproximadamente unos 50 archivos JSON. Veamos que contiene uno de ellos, por ejemplo el archivo ```file-0.json```

In [5]:
%fs head /databricks-datasets/structured-streaming/events/file-0.json

Cada linea del archivo contiene un registro JSON con dos campos: ```tiempo``` y ```acción```. Tratemos de analizar estos archivos primero como si fueran ficheros en lote y luego de forma interactiva.

#### Parte 2. Procesamiento por lotes

El primer paso habitual para intentar procesar los datos es consultar los mismos de forma estática. Definamos para ello un DataFrame basado en el formato de los archivos y guardemos dicho DataFrame en formato de tabla.

En esta PEC no introduciremos aún como funcionan los tipos en pySpark. Esto lo haremos durante las siguientes PEC. Igualmente, para entender que estamos haciendo en la siguiente celda podemos consultar la lista completa de tipos se encuetra en el módulo [pyspark.sql.types](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#module-pyspark.sql.types). Para nuestros datos, usaremos los tipos [TimestampType()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.TimestampType) y [StringType()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.StringType).

In [8]:
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Dado que ya hemos analizado un poco los datos y conocemos su formato, definiremos el esquema para acelerar el procesamiento (no es necesario que Spark intente inferir su esquema)
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# Static DataFrame que representa datos en los archivos JSON
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

# Esta instruccion se ocupa de almacenar el DataFrame como una tabla de SparkQL, así podremos accederla usando lenguaje SQL
staticInputDF.createOrReplaceTempView("static_input")

display(staticInputDF)

time,action
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:29.000+0000,Open
2016-07-28T04:19:31.000+0000,Close
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:32.000+0000,Close
2016-07-28T04:19:33.000+0000,Close
2016-07-28T04:19:35.000+0000,Close
2016-07-28T04:19:36.000+0000,Open


Antes de empezar a trabajar con estos datos, reduciremos su granularidad temporal a nivel de minuto para cada tipo de acción. Además, generaremos una vista para poder usar consultas SQL y así poder calcular nuestras métricas de forma sencilla.

Para realizar este proceso, ejecutaremos la siguiente celda.

In [10]:
from pyspark.sql.functions import *      # para poder usar la funcion window()

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.action, 
       window(staticInputDF.time, "1 minute"))    
    .count()
)
staticCountsDF.cache()

# Registrar el DataFrame como una tabla llamada 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

display(staticCountsDF)

action,window,count
Close,"List(2016-07-28T04:20:00.000+0000, 2016-07-28T04:21:00.000+0000)",14
Close,"List(2016-07-27T12:35:00.000+0000, 2016-07-27T12:36:00.000+0000)",16
Open,"List(2016-07-27T13:08:00.000+0000, 2016-07-27T13:09:00.000+0000)",10
Close,"List(2016-07-27T19:00:00.000+0000, 2016-07-27T19:01:00.000+0000)",15
Close,"List(2016-07-26T17:16:00.000+0000, 2016-07-26T17:17:00.000+0000)",30
Open,"List(2016-07-26T17:31:00.000+0000, 2016-07-26T17:32:00.000+0000)",14
Close,"List(2016-07-26T22:06:00.000+0000, 2016-07-26T22:07:00.000+0000)",8
Close,"List(2016-07-26T14:42:00.000+0000, 2016-07-26T14:43:00.000+0000)",19
Close,"List(2016-07-26T08:44:00.000+0000, 2016-07-26T08:45:00.000+0000)",17
Close,"List(2016-07-26T09:03:00.000+0000, 2016-07-26T09:04:00.000+0000)",12


#### Ejercicio 2(a)

Modifica la granularidad del dataframe ```staticCountsDF``` a nivel de hora y repite el mismo conteo. Para superar el test, la columna de salida del dataFrame ha de conservar el mismo nombre de columna que en el dataFrame ```staticCountsDF```.

*Hint*: Puedes usar la opción ```.withColumnRenamed("original_name", "desired_name")``` de la operación groupBy() para cambiar el nombre de las columnas del dataFrame.

In [12]:
staticCountsHourlyDF = (
  staticInputDF
    .groupBy(
       staticInputDF.action, 
       window(staticInputDF.time, "1 hour"))    
    .count()
)

# Registrar el DataFrame como una tabla llamada 'static_mean'
staticCountsHourlyDF.createOrReplaceTempView("static_counts_hourly")

display(staticCountsHourlyDF)

action,window,count
Close,"List(2016-07-26T13:00:00.000+0000, 2016-07-26T14:00:00.000+0000)",1028
Open,"List(2016-07-26T18:00:00.000+0000, 2016-07-26T19:00:00.000+0000)",1004
Close,"List(2016-07-27T02:00:00.000+0000, 2016-07-27T03:00:00.000+0000)",971
Open,"List(2016-07-27T04:00:00.000+0000, 2016-07-27T05:00:00.000+0000)",995
Open,"List(2016-07-27T05:00:00.000+0000, 2016-07-27T06:00:00.000+0000)",986
Open,"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",1000
Open,"List(2016-07-26T11:00:00.000+0000, 2016-07-26T12:00:00.000+0000)",991
Close,"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",1011
Close,"List(2016-07-27T05:00:00.000+0000, 2016-07-27T06:00:00.000+0000)",987
Open,"List(2016-07-26T10:00:00.000+0000, 2016-07-26T11:00:00.000+0000)",1007


In [13]:
#Test
from databricks_test_helper import *

Test.assertEquals(spark.sql("select max(count) from static_counts_hourly").rdd.flatMap(list).first(), 1036, "Incorrect couting by hour")
Test.assertEquals(spark.sql("select min(count) from static_counts_hourly").rdd.flatMap(list).first(), 11, "Incorrect couting by hour")

### Ejercicio 2(b)

Ahora que hemos registrado la vista ```static_counts_hourly``` usando el dataframe ```static_counts_hourly```, calcula usando una consulta SQL el número de acciones totales de cada tipo (```Open```, ```Close```).

**IMPORTANTE**: Recuerda usar ```as``` para renombrar la columna con la suma como ```total_count```.

In [15]:
sum_static_counts_hourly = spark.sql("select action, sum(count) as total_count from static_counts_hourly group by action")

sum_static_counts_hourly.show()

In [16]:
Test.assertEquals(sum_static_counts_hourly.take(1)[0].asDict()['total_count'], 50000, "Incorrect total counting")

### Ejercicio 2(c)

Ahora vamos a complicar un poco el ejercicio. Cuenta el numero de acciones totales por minuto y tipo.

**IMPORTANTE**: Recuerda usar el dataframe ```static_counts```

In [18]:
window_static_counts_minute = spark.sql("select action, count from static_counts order by window, action")

window_static_counts_minute.show()

In [19]:
Test.assertEquals(window_static_counts_minute.take(3)[2].asDict()['count'], 11, "Incorrect counting")

Test.assertEquals(window_static_counts_minute.count(),6122,"Incorrect number of minutes")

### Ejercicio 2(d)

Ahora que ya somos capaces de contar de varias formas y con diferentes granularidades, vamos a calcular algun estadístico muy simple, como por ejemplo la media. 

Usando un código parecido al del _ejercicio 2(a)_, calcula el promedio de acciones por minuto para cada hora, independientemente si son acciones ```Open``` o ```Close```. 

**IMPORTANTE**: Recuerda renombrar la columna donde calculas la media como ```average```.

In [21]:
staticAvgPerMinuteDF = (
  staticCountsDF
    .groupBy(staticCountsDF.window)
    .agg(avg('count').alias('average'))
)

StaticAverageHourlyDF = staticAvgPerMinuteDF.groupBy(window(staticAvgPerMinuteDF.window.start, "1 hour")).agg(avg('average').alias('average'))

# Registrar el DataFrame como una tabla llamada 'static_mean'
StaticAverageHourlyDF.createOrReplaceTempView("static_mean")

display(StaticAverageHourlyDF)

window,average
"List(2016-07-26T07:00:00.000+0000, 2016-07-26T08:00:00.000+0000)",16.641666666666666
"List(2016-07-28T03:00:00.000+0000, 2016-07-28T04:00:00.000+0000)",16.575
"List(2016-07-28T02:00:00.000+0000, 2016-07-28T03:00:00.000+0000)",16.725
"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",7.733333333333333
"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",11.508333333333333
"List(2016-07-26T19:00:00.000+0000, 2016-07-26T20:00:00.000+0000)",16.375
"List(2016-07-27T11:00:00.000+0000, 2016-07-27T12:00:00.000+0000)",16.7
"List(2016-07-26T16:00:00.000+0000, 2016-07-26T17:00:00.000+0000)",16.466666666666665
"List(2016-07-26T13:00:00.000+0000, 2016-07-26T14:00:00.000+0000)",16.95
"List(2016-07-27T19:00:00.000+0000, 2016-07-27T20:00:00.000+0000)",16.466666666666665


In [22]:
Test.assertEquals(StaticAverageHourlyDF.take(2)[1].asDict()['average'], 16.575, "Incorrect averaging")
Test.assertEquals(StaticAverageHourlyDF.take(3)[2].asDict()['average'], 16.725, "Incorrect averaging")

Test.assertEquals(StaticAverageHourlyDF.count(),53,"Incorrect number of minutes")

### Ejercicio 2(e)

Para concluir nuestros cálculos en batch, determina la hora en la que se han producido un mayor número de acciones promedio por minuto.

In [24]:
max_static_averages_hourly =  spark.sql("select * from (select row_number() over (ORDER BY average DESC) as rownum, window, average from static_mean) as foo where rownum <= 1")
max_static_averages_hourly.show()

¿Qué hora ha sido la que ha tenido una mayor actividad promedio por minuto?

Mirando el resultado del exercicio arriba, queda claro que la hora donde hubo un promedio por minuto mayor ha sido entre **20:00 - 21:00 del dia 26 de Julio de 2016**. Para confirmar los datos, simplemente he confirmado en el `display(StaticAverageHourlyDF)` donde hay la posibilidad de hacer sorting por average y el resultado ha sido lo mismo que definido aquí.

#### Parte 3: Procesamiento interactivo

Ahora que hemos analizado los datos de forma estática, vamos a cambiar el análisis a una consulta que se actualice continuamente a medida que llegan nuevos datos. Como solo tenemos un conjunto estático de archivos, vamos a emular un flujo leyendo un archivo a la vez, en el orden cronológico en que fueron creados. La consulta que tenemos que escribir es prácticamente la misma que la anterior.

In [28]:
from pyspark.sql.functions import *

# Parecido a la definicion staticInputDF anterior, solo hemos cambiado `readStream` en lugar de `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Instanciamos el esquema de datos en formato JSON
    .option("maxFilesPerTrigger", 1)  # Trataremos los archivos como si fueran una secuencia, seleccionando un archivo a la vez
    .json(inputPath)
)

# Misma consulta que en el caso staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 minute"))
    .count()
)

Vamos a comprobar que realmente disponemos de un stream de datos

In [30]:
streamingCountsDF.isStreaming

Ahora vamos a establecer la configuración en el cluster del flujo de datos.

In [32]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # mantenemos pequeno el tamaño de los shuffle

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table
    .queryName("counts")     # counts = nombre de la tabla in-memory
    .outputMode("complete")  # complete = todos los contadores deben guardarse en la tabla
    .start()
)

query2 = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table
    .queryName("counts2")     # counts = nombre de la tabla in-memory
    .outputMode("complete")  # complete = todos los contadores deben guardarse en la tabla
    .start()
)

### Ejercicio 3(a)

Por simplicidad en esta parte podemos usar la siguiente notación:

`%sql` que es una sentencia que solo funciona en los notebooks de Databricks. Esta ```magic function``` ejecuta `sqlContext.sql()` y pasa los resultados a la función `display()`. Estas dos sentencias son equivalentes:

`%sql select * from counts order by window`

`display(sqlContext.sql("select * from counts  order by window"))`

**Nota:** Como el comando display se ejecuta en el navegador no en el cluster, está limitado a solo mostrar las 1000 primeras filas, para contar cuantas filas se han leído del stream podéis ejecutar el siguiente código:

`%sql select count(*) from counts`

In [34]:
%sql select count(*) from counts

count(1)
782


In [35]:
%sql select count(*) from counts2

count(1)
1140


En la celda de arriba, además, puedes cambiar la forma de visualizar los datos, ex. En forma de tabla, histograma, linea, etc.

Visualiza en forma de histograma los resultados y re-ejecuta unas cuantas veces la celda. Podrás observar que conforme van llegando nuevos datos, la gráfica se va actualizando.

### Ejercicio 3(b)

Vamos a crear un sistema de alerta muy sencillo que nos indique cuando, en un minuto, hay una diferencia mayor de 20 acciones entre los contadores de las acciones ```Open``` y ```Close```.

** NOTA:** Recuerda re-ejecuta las celdas de la parte 3 para re-iniciar el flujo de datos. Sólo hay tres minutos en todo el dataset donde la condición descrita anteriormente se cumple.

In [38]:
from time import sleep

latest_already_shown = '1970-01-01 00:00:00.000+0000'
for i in range(10):
  diff = spark.sql("select sc.action as action1, sc.window, sc.count as countAction1, sc2.action as action2, sc2.count as countAction2 from counts sc inner join counts2 sc2 on sc.window = sc2.window and sc.action <> sc2.action  where sc.window.start > '" + latest_already_shown + "' having (sc.count - sc2.count) > 20 or (sc2.count - sc.count) > 20 order by window desc")
  contador = diff.count()
  
  if contador > 0:
    print 'Iteración ' + str(i) + ': ' +  str(contador) + ' nuevos registros encontrados con diferencia mayor de 20 acciones entre los contadores Open y Close:'
    diff.show()
    start = diff.select('window.start').first()
    latest_already_shown = start['start'].strftime("%Y-%m-%d %H:%M:%S")
    
  sleep(5)

### Ejercicio 3(c)


Ahora vamos a calcular la [media móvil simple](https://en.wikipedia.org/wiki/Moving_average) del número de acciones de los últimos 30 minutos. Tienes los detalles de como realizar este cálculo [aquí](https://en.wikipedia.org/wiki/Moving_average#Simple_moving_average).

In [40]:
from time import sleep

for i in range(10):
  movingAvg = spark.sql("select *, avg(count) OVER (PARTITION BY action ORDER BY window.start desc range between interval 30 minutes preceding and current row) as MovingAvg from counts")
  print 'Iteración ' + str(i) + ':'
  movingAvg.show()
  sleep(5)

### Ejercicio 3(d)

Ahora vamos a calcular la [varianza](https://es.wikipedia.org/wiki/Varianza) en el número de acciones. Tienes los detalles de como calcular la varianza online [aquí](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm). Para esto, recupera usando una sentencia sql, la suma de las acciones del último minuto y ves actualizando el resultado de la varianza.

In [42]:
%sql select variance(count) from (select * from counts where sc.window.start > '" + latest_already_shown + "' order by window desc) as ordered

var_samp(CAST(count AS DOUBLE))
18.443862131112745


In [44]:
from time import sleep

for i in range(50):
  variance = spark.sql("select variance(count) as varianza from (select * from counts order by window desc) as ordered")
  
  print 'Iteración ' + str(i) + ': '
  variance.show()
  sleep(5)

### Ejercicio 3(d)

Responde las siguientes preguntas:

- ¿Qué es una arquitectura lambda? ¿Spark cumple con esta definición?
- ¿El código que has utilizado en la parte de streaming es reusable para procesado batch? ¿y viceversa?

- *¿Qué es una arquitectura lambda? ¿Spark cumple con esta definición?*
En un resumen bastante efectivo, es una arquitectura de procesamiento de datos diseñada para manejar cantidades masivas de datos (es decir, "Big Data") mediante el uso de métodos de procesamiento por lotes (batches) y procesamiento en serie (stream). Como hemos visto en los ejercicios hasta el momento, Spark cumple con la definición.

Añadiendo un poco más de información, la arquitectura lambda intenta equilibrar la latencia, el rendimiento, la escala y la tolerancia a fallos mediante el procesamiento por lotes para proporcionar vistas completas y precisas de los datos por lotes, mientras que al mismo tiempo se utiliza el procesamiento de flujo en tiempo real para proporcionar vistas de los datos en línea. Las dos salidas de vista se pueden unir antes de la presentación.<br />

<hr />


- *¿El código que has utilizado en la parte de streaming es reusable para procesado batch? ¿y viceversa?*

Si consideramos los pequeños cambios necesarios para aplicar la lectura como `readStream` o simplemente`read` para procesado en batch, las mismas partes de codigo de extración de información pueden ser utilizadas en ambos, es decir, con pocas o zero adaptaciones, podemos ser capaces de aplicar las mismas operaciones con casi zero esfuerzo.