# Structured Streaming using Python DataFrames API

Apache Spark 2.0 adds the first version of a new higher-level stream processing API, Structured Streaming. In this notebook we are going to take a quick look at how to use DataFrame API to build Structured Streaming applications. We want to compute real-time metrics like running counts and windowed counts on a stream of timestamped actions (e.g. Open, Close, etc).

To run this notebook, import it and attach it to a **Spark 2.x** cluster.

## Sample Data

In [0]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008


Hay alrededor de 50 archivos JSON en el directorio. Veamos qu√© contiene cada archivo JSON.

In [0]:
%fs head /databricks-datasets/structured-streaming/events/file-0.json

Cada l√≠nea del archivo contiene un registro JSON con dos campos: tiempo y acci√≥n. Vamos a analizar estos archivos de forma interactiva.

## Batch/Interactive Processing
El primer paso es consultar los datos de forma interactiva. Para hacerlo, se define un DataFrame est√°tico.

In [0]:
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

#Como ya conocemos el formato de los datos, se define el esquema para acelerar el procesamiento
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# Se define el DataFrame est√°tico que toma los datos y el esquema definido previamente. Sirve para representar el contenido de los archivos JSON
staticInputDF = (spark.read.schema(jsonSchema).json(inputPath))
display(staticInputDF)

time,action
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:29.000+0000,Open
2016-07-28T04:19:31.000+0000,Close
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:32.000+0000,Close
2016-07-28T04:19:33.000+0000,Close
2016-07-28T04:19:35.000+0000,Close
2016-07-28T04:19:36.000+0000,Open


Ahora vamos a calcular el n√∫mero de acciones de "abrir" y "cerrar" en una ventana de tiempo 1 hora. Para hacer esto, se agrupar√° por la columna de acci√≥n y se indicar√° explicitamente la ventana de 1 hora sobre la columna de tiempo.

In [0]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (staticInputDF.groupBy(staticInputDF.action,window(staticInputDF.time, "1 hour")).count())
staticCountsDF.cache()

# El DataFrame se crea como una tabla
staticCountsDF.createOrReplaceTempView("static_counts")

Ahora ya podemos usar SQL directamente para consultar la tabla. Por ejemplo, aqu√≠ est√°n los recuentos totales de todas las horas.

In [0]:
%sql select action, sum(count) as total_count from static_counts group by action

action,total_count
Close,50000
Open,50000


A continuaci√≥n generamos una l√≠nea de tiempo de recuentos de eventos

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


Observa los dos extremos de la gr√°fica. 
Las acciones de cierre se generan de tal manera que son posteriores a las acciones de apertura correspondientes, por lo que hay m√°s "aperturas" al principio y m√°s "cierres" al final.

## Stream Processing: emulando una transmisi√≥n contin√∫a de datos

Dado que ya analizados los datos, ahora vamos a convertir en una consulta de transmisi√≥n que se actualiza a medida que llegan los datos.

¬øC√≥mo?
Dado que solo tenemos un conjunto est√°tico de archivos, se va a emular un flujo de ellos leyendo un archivo a la vez, en el orden cronol√≥gico en que fueron creados.

In [0]:
from pyspark.sql.functions import *

#Recordemos la funci√≥n creada anteriormente
#staticInputDF = (spark.read.schema(jsonSchema).json(inputPath))

# Ahora en lugar de usar `read` se usar√° `readStream`.
streamingInputDF = (spark.readStream.schema(jsonSchema)
                    .option("maxFilesPerTrigger", 1)  # Trata una secuencia de archivos como un flujo de datos
                                                      #tomando un archivo a la vez
                    .json(inputPath))

#Recordemos la consulta anterior
#staticCountsDF = (staticInputDF.groupBy(staticInputDF.action,window(staticInputDF.time, "1 hour")).count())

#Generamos la misma consulta
streamingCountsDF = (streamingInputDF.groupBy(streamingInputDF.action, window(streamingInputDF.time, "1 hour"))
                      .count())

# Ahora definimos que `streamingCountsDF`es un DataFrame para flujo de datos, y retorna `True`
streamingCountsDF.isStreaming  #streaming Dataframe 

¬øQu√© sigue?
Vamos a consultar los datos de ¬°manera interactiva!

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (streamingCountsDF.writeStream
                          .format("memory")#almacena la tabla en memoria
                          .queryName("counts")#nombre de la tabla en memoria
                          .outputMode("complete")#todas las cuentas deben estar en la tabla
                          .start())

`query` es un identificador de la consulta de transmisi√≥n que se ejecuta en segundo plano. 
Esta consulta extrae los archivos continuamente y actualiza los recuentos en ventana.

La barra de progreso muestra que la consulta est√° activa. Adem√°s, si expande los `conteos` se observar√° la cantidad de archivos que ya han procesado.

In [0]:
from time import sleep
sleep(5)  # esperemos un poco para iniciar el conteo

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,176
Open,Jul-26 05:00,289


Observemos la l√≠nea de tiempo de recuentos en ventana (similar a la est√°tica anterior) acumul√°ndose. 

Si seguimos ejecutando esta consulta interactiva repetidamente, se observar√° los √∫ltimos recuentos actualizados que la consulta de transmisi√≥n est√° actualizando en segundo plano.

In [0]:
sleep(5)  # wait a bit more for more data to be computed

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


In [0]:
sleep(5)  # wait a bit more for more data to be computed

Analicemos el n√∫mero de aperturas y cierre

In [0]:
%sql select action, sum(count) as total_count from counts group by action order by action

action,total_count
Close,6488
Open,7512


In [0]:
#query.stop()