# Spark Streaming - Kafka JSON

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

0)Abrimos una terminal. Arrancamos zookeeper y kafka en dos pestañas por separado.
**$ zookeeper-server-start.sh config/zookeeper.properties
$ kafka-server-start.sh config/server.properties**

Creamos el topic *json_topic* en una nueva terminal.
**$ kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic json_topic --create --partitions 3 --replication-factor 1**

A continuación, creamos el producer.
**$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic json_topic**

1)El primer paso es crear una sesión de Spark.

In [2]:

spark = SparkSession\
        .builder\
        .appName("PEC2StructuredKafkaWordCount")\
        .getOrCreate()

2)A continuación, definimos un input stream. 
  En las opciones especificamos el nombre del topic **json_topic**, el cual se ha creado previamente en otra ventana.
  También especificamos el host:port del productor con **kafka.bootstrap.servers**
  Con la opción value.deserializer deserializamos los valores. Los valores son siempre deserializados como un array
  de bytes con ByteArrayDeserializer.
  Con la opción **auto.offset.reset** especificamos donde comenzar. **Structured Streaming** es capaz de 
  gestionar cuales offsets son consumidos en lugar de basarse en el consumidor de kafka.
  Asegurándose que no se pierden datos cuando un nuevo topic es subscrito. Con **earliest** aplica desde el principio cuando una nueva query comienza.
  Con la función printSchema mostramos el esquema del **stream dataframe**

In [3]:

inputStream = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", "localhost:9092")\
        .option("auto.offset.reset","earliest") \
        .option("value.deserializer", "StringDeserializer")\
        .option("subscribe", "json_topic")\
        .option('includeTimestamp', 'true') \
        .load()

inputStream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



3)Definimos el esquema json, el cual vamos a utilizar para el procesamiento de los json que vamos a mandar a través del consumidor. Los registros leidos desde el topic de Kafka tiene estructura JSON. Por lo que necesitamos convertir nuestro valor de cadena del topic de Kafka, el cual tiene un tipo binario en una estructura significativa, pasándolo a Dataframe. En este caso seleccionamos **value** y **timestamp**, el primero contendrá los valores de los campos de la estructura que se van pasando a traves del kafka stream. Por otro lado, también pasamos timestamp del stream dataframe.

In [4]:
schema = StructType() \
        .add("nombre", StringType()) \
        .add("edad", IntegerType()) \
        .add("peso", FloatType()) 
        
    
initial = inputStream.selectExpr("CAST(value AS STRING)","timestamp as timestamp").toDF("value","timestamp")
initial.printSchema()

root
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



4)La siguiente línea de comandos es para convertir **value** en su representación JSON. Con esta expresión, la cadena de entrada está siendo deserializada en su valor JSON. El último select **.select("parsed_value.*","time")** de la expresión se utiliza para seleccionar el contenido embebido de la estructura anidada que contiene todos los campos JSON.
Utilizamos la función **printSchema** para ver como queda el esquema que tenemos.

In [5]:
aggregation = initial.select(from_json(col("value"), schema).alias("parsed_value"),col("timestamp").alias("time"))\
    .select("parsed_value.*","time")
    
aggregation.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- edad: integer (nullable = true)
 |-- peso: float (nullable = true)
 |-- time: timestamp (nullable = true)



5)A continuación, definimos un tamaño de ventana y una marca de agua. La ventana tiene 3 parámetros.
- timeColumn: en el que hemos indicado un tamaño de ventana, basándonos en el timestamp de kafka.
- windowDuration: el tamaño de ventana que usamos es de 10 minutos.
- slideDuration: el deslizamiento de ventana que indicamos es de 5 minutos.

Utilizamos la función **GroupBy** para agrupar los datos en base a la ventana de tiempo y el nombre.

In [6]:
 windowedCounts = aggregation \
    .withWatermark("time", "10 minutes") \
    .groupBy(
        window(col("time"), "10 minutes", "5 minutes"),
        aggregation.nombre) 
display(windowedCounts)

<pyspark.sql.group.GroupedData at 0x7f3ab8192240>

6)En la siguiente línea, ejecutamos una operación de aggregation.Recordemos que en el paso anterior ejecutamos una función de agrupacion, por lo que tenemos un Dataframe con las columnas agrupadas. Con la operación agg añadimos columnas adicionales, las cuales contienen operaciones como promedios, o calculo de máximo o mínimo.

In [7]:
aggregatedDF = windowedCounts.agg(avg("edad").alias("media_edad"), avg("peso").alias("media_peso"),\
                                  max("peso").alias("peso_max"),min("peso").alias("peso_min"),\
                                  max("edad").alias("edad_max"),min("edad").alias("edad_min"))


6)El paso final, es escribir los datos agregados en un **sink** o fregadero. En nuestro caso, el fregadero usado es la consola, pero se podría haber utilizado Hive, u otro tipo de formato. El outputMode elegido es **complete**, de esta manera todos los resultados de la tabla son enviados al fregadero después de cada procesamiento.  

In [None]:
query = aggregatedDF.writeStream \
        .outputMode("complete") \
        .format("console") \
        .option('truncate', 'false') \
        .start()

query.awaitTermination()