## Primer ejemplo con flujos de datos estructurados

#### Structured Streaming
Apache Spark incluye una API de procesamiento de flujos de datos de alto nivel, llamada [Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html). 

- Structured Streaming es un motor de procesamiento de flujos de datos escalable y tolerante a fallos construida sobre el motor de Spark SQL.
- Una de las principales características es: podemos expresar las operaciones sobre los flujos de datos de la misma manera que expresaría un cálculo por lotes en datos estáticos.

En 'Structured streaming' un flujo de datos es tratado como una tabla que constantemente se está actualizando.

<img src="https://docs.databricks.com/_images/gsasg-spark-streaming-workflow.png" alt="image"/>

Cada elemento de datos que llega a la transmisión es como una nueva fila que se agrega a la tabla de entrada.

<img src="https://docs.databricks.com/_images/gsasg-spark-streaming-model.png" alt="image"/>

El objetivo de esta libreta es construir un ejemplo básico usando Structured Streaming. La meta es calcular métricas en tiempo real, como conteos continuos y conteos en ventanas en un flujo de acciones con marca de tiempo (por ejemplo, Abrir, Cerrar, etc.)

En esta libreta aprenderemos:
- Cargar una muestra de datos
- Inicializar un flujo de datos
- Iniciar un proceso de flujos de datos
- Hacer una consulta al flujo de datos

### Carga de datos

In [0]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008


Cada archivo json contiene un conjunto de líneas con dos atributos por línea: acción y tiempo.

In [0]:
%fs head /databricks-datasets/structured-streaming/events/file-0.json

### Batch/Interactive Processing

Como podemos observar tenemos datos estáticos, los cuales van a utilizar para emular un flujo de datos. Para lograrlo vamos a leer un archivo a la vez (simulando un paso de tiempo).

In [0]:
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Dado que ya conocemos que los datos están en json, se define un esquema para acelerar el procesamiento.
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# Creamos un dataframe estático para representar los archivos json
staticInputDF = (spark.read.schema(jsonSchema).json(inputPath))

#Visualizamos los datos
display(staticInputDF)

time,action
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:29.000+0000,Open
2016-07-28T04:19:31.000+0000,Close
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:32.000+0000,Close
2016-07-28T04:19:33.000+0000,Close
2016-07-28T04:19:35.000+0000,Close
2016-07-28T04:19:36.000+0000,Open


Iniciamos calculando el número de veces que aparece la acción de "open" y "close" en una ventana de una hora. Para hacerlo, agrupamos por la columna acción y sobre una hora para la columna de tiempo.

In [0]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (staticInputDF.groupBy(staticInputDF.action, window(staticInputDF.time, "1 hour")).count())
staticCountsDF.cache()

# Guardamos el dataframe obtenido como una tabla
staticCountsDF.createOrReplaceTempView("static_counts")

Ahora, podemos directamente usar SQL para consultar la tabla. Podemos generar una línea de tiempo usando la tabla anterior

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


### Stream Processing

Ahora que hemos analizado los datos de forma interactiva, vamos a convertir esto en una consulta de transmisión que se actualiza continuamente a medida que llegan los datos. 
Dado que solo tenemos un conjunto estático de archivos:
- Vamos a emular una secuencia de ellos leyendo un archivo a la vez, en el orden cronológico en que fueron creados. 
- La consulta que tenemos que escribir es más o menos la misma que la consulta interactiva anterior.

In [0]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (spark.readStream                       
                    .schema(jsonSchema)               # Configuramos el esquema para los datos json
                    .option("maxFilesPerTrigger", 1)  # Tratamos una secuencia de archivos como un stream (tomamos un archivo a la vez)
                    .json(inputPath))

In [0]:
# Misma consulta como el dataframe estático
streamingCountsDF = (streamingInputDF.groupBy(streamingInputDF.action, window(streamingInputDF.time, "1 hour")).count())

#Ahora tenemos un dataframe de tipo flujo de datos
streamingCountsDF.isStreaming

Out[16]: True

Ahora podemos iniciar de la transmisión de los flujos de datos. En nuestro caso, queremos consultar de forma interactiva los recuentos (las mismas consultas que arriba), por lo que configuraremos el conjunto completo de recuentos de 1 hora para que estén en una tabla en memoria.

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # configuramos un número pequeño para ordenamiento

query = (streamingCountsDF.writeStream
        .format("memory")        # memory = almacenamos la tabla en memoria
        .queryName("counts")     # counts = nombre de la tabla en memoria
        .outputMode("complete")  # complete = todas las cuentas deben estar en la tabla
        .start())

La consulta (query) se mantiene con una consulta en streaming que está corriendo (in the background). Esta consulta está continuamente tomando un archivo y actualiza el conteo.En las gŕaficas dinámicas podemos observar el número de archivos que está siento procesados.

Una vez que algunos archivos han sido procesados, podemos hacer consultas de manera interactiva en la tabla que está en memoria.

In [0]:
from time import sleep
sleep(5)  # esperamos unos segundos a que el cómputo inicie

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


In [0]:
#query.stop()