# PRÁCTICA SYSTEMAS DISTRIBUIDOS III <a class="anchor" id="back">
<font color='#fb7813'>**KAFKA STREAMING + SPARK**</font>

**Descripción:** este notebook tiene como objetivo leer los datos proporcionados en CSV mediante un productor que envíe los datos a una cola Kafka de nombre *test* con un retardovariable entre muestras insertadas de entre 0.6 y 1.3 segundos. Posteriormente, usando la API de *Spark Structured Streaming* con un intervalo de actualizacion de micro-batches (triggers) de 5 segundos se pide realizar 3 consultas.

**Datos:** Los datos contienen información de un experimento para intentar predecir los tiempos de ocupación de una estancia, en función de los valores tomados por variables ambientales.los datos proporcionados están en formato CSV. El archivo occupancy_data.csv contiene 8.143 entradas, una para cada muestra recogida, a intervalos aproximadamente regulares de 1 minuto entre muestras. 

**Miembros del equipo:** Verónica Gómez, Carlos Grande y Pablo Olmos

**GitHub URL:** https://github.com/charlstown/SparkStructureStreaming

## Índice
* [0. Instrucciones](#readme)
* [1. Contexto y entorno de Spark](#context)
* [2. Lectura de datos en batch](#batch)
* [3. Conexión con Kafka](#kafka)
* [4. Definición del Stream](#stream)
* [5. Queries](#queries)
---

## Carga de librerías

In [1]:
from time import sleep
import sys, os
from collections import OrderedDict

# spark libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, TimestampType, DoubleType,\
    IntegerType, StringType, ArrayType, BooleanType
import pyspark.sql.functions as fn
from pyspark.sql import Window

## Carga de scripts

In [2]:
import sys
sys.path.append('../02_scripts')
import myKafka

---

## 0. Instrucciones <a class="anchor" id="readme">
[<font color='#fb7813'>**\>\> Volver arriba**</font>](#back)

Para la correcta ejecución de este notebook se necesitan los siguientes elementos en funcionamiento.

1. Ejecución de Zookeeper mediante el siguiente comando sobre el directorio de Kafka.

    ```bin/zookeeper-server-start.sh config/zookeeper.properties```


2. Ejecución de Kafka Server mediante el siguiente comando sobre el mismo directorio.

    ```bin/kafka-server-start.sh config/server.properties```


3. Ejecución del script Kafka producer desde la consola en Python, se ha usado el mismo script proporcionado para la práctica bajo una cola con el *topic* de **test**.
    
    ```python kafka_producer.py 0.6 1.3 test ../01_data/occupancy_data.csv```

4. Lanzar y ejecutar el Jupyter Notebook

Por otro lado es importante recordar que la estructura de carpetas debe ser igual que la del repositorio proporcionado en el enlace y contener al menos los siguientes archivos.

**01_data:**
- occupancy_data.csv

**02_scripts:**
- \__init\__.py
- myKafka.py

**03_notebooks:**
- Practica_SD3_SparkSS.ipynb

## 1. Contexto y entorno de Spark <a class="anchor" id="context">
[<font color='#fb7813'>**\>\> Volver arriba**</font>](#back)

In [3]:
packages = "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5"
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)

print("PYSPARK_SUBMIT_ARGS = ",os.environ["PYSPARK_SUBMIT_ARGS"],"\n")
print("JAVA_HOME = ", os.environ["JAVA_HOME"])

PYSPARK_SUBMIT_ARGS =  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 pyspark-shell 

JAVA_HOME =  /usr/lib/jvm/java-1.8.0-openjdk-amd64


In [4]:
# Create SparkContext object (low level)
spark = (SparkSession
    .builder
    .appName("PracticaSD3")
    .getOrCreate())

spark

## 2. Lectura de datos en Batch <a class="anchor" id="batch">
[<font color='#fb7813'>**\>\> Volver arriba**</font>](#back)

### 2.1 Creación del esquema y muestra de los datos

In [5]:
custom_schema = StructType([
    StructField("row", IntegerType(), True),
    StructField("date", TimestampType(), True), 
    StructField("Temperature", DoubleType(), True),
    StructField("Humidity", DoubleType(), True),
    StructField("Light", DoubleType(), True),
    StructField("CO2", DoubleType(), True),
    StructField("HumidityRatio", DoubleType(), True),
    StructField("Occupancy", IntegerType(), True) 
                           ])

In [6]:
batch = spark.read.option("header", True).csv('../01_data/occupancy_data.csv', 
                                                  schema=custom_schema)

In [7]:
batch.show(5)

+---+-------------------+-----------+--------+-----+------+-------------------+---------+
|row|               date|Temperature|Humidity|Light|   CO2|      HumidityRatio|Occupancy|
+---+-------------------+-----------+--------+-----+------+-------------------+---------+
|  1|2015-02-04 17:51:00|      23.18|  27.272|426.0|721.25|0.00479298817650529|        1|
|  2|2015-02-04 17:51:59|      23.15| 27.2675|429.5| 714.0|0.00478344094931065|        1|
|  3|2015-02-04 17:53:00|      23.15|  27.245|426.0| 713.5|0.00477946352442199|        1|
|  4|2015-02-04 17:54:00|      23.15|    27.2|426.0|708.25|0.00477150882608175|        1|
|  5|2015-02-04 17:55:00|       23.1|    27.2|426.0| 704.5|0.00475699293331518|        1|
+---+-------------------+-----------+--------+-----+------+-------------------+---------+
only showing top 5 rows



### 2.2 Comprobación datos nulos

In [8]:
columns = batch.columns
batch.select([fn.count(fn.when(fn.col(colmn).isNull(), colmn))
              .alias(colmn) for colmn in columns]).show()


+---+----+-----------+--------+-----+---+-------------+---------+
|row|date|Temperature|Humidity|Light|CO2|HumidityRatio|Occupancy|
+---+----+-----------+--------+-----+---+-------------+---------+
|  0|   0|          0|       0|    0|  0|            0|        0|
+---+----+-----------+--------+-----+---+-------------+---------+



## 3. Conexión con Kafka <a class="anchor" id="kafka">
[<font color='#fb7813'>**\>\> Volver arriba**</font>](#back)

In [9]:
read_stream = spark\
    .readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "test").load()

In [10]:
read_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



## 4. Definición del Stream <a class="anchor" id="stream">
[<font color='#fb7813'>**\>\> Volver arriba**</font>](#back)

In [11]:
df_stream = read_stream\
    .withColumn('array_value',
                fn.split(fn.col('value').cast(StringType()), ',')
               )

In [12]:
# Definimos OrderedDict para leer values en orden
schema = OrderedDict()
schema["row"] = IntegerType()
schema["date"] = TimestampType()
schema["Temperature"] = DoubleType()
schema["Humidity"] = DoubleType()
schema["Light"] = DoubleType()
schema["CO2"] = DoubleType()
schema["HumidityRatio"] = DoubleType()
schema["Occupancy"] = IntegerType()

In [13]:
df_stream_schema = myKafka.apply_scheme(df_stream, schema)

In [14]:
test_query = (df_stream_schema
                .select(*columns)
    .writeStream
    .format("memory")
    .queryName("test_query")
    .trigger(processingTime="5 seconds")
    .outputMode('append')
    .start())

In [15]:
sleep(11)
df = spark.sql("SELECT * FROM test_query")
df.show(5)
df.count()

+---+-------------------+-----------+--------+-----+-----+-------------------+---------+
|row|               date|Temperature|Humidity|Light|  CO2|      HumidityRatio|Occupancy|
+---+-------------------+-----------+--------+-----+-----+-------------------+---------+
|647|2015-02-05 04:37:00|      20.89|    24.1|  0.0|445.0|0.00367691564904522|        0|
|648|2015-02-05 04:38:00|      20.89|    24.0|  0.0|438.0|0.00366156892375949|        0|
|649|2015-02-05 04:38:59|     20.945|    24.0|  0.0|438.5|0.00367404992094876|        0|
|650|2015-02-05 04:40:00|       21.0|    24.0|  0.0|440.0|0.00368656839813018|        0|
|651|2015-02-05 04:40:59|     20.945|    24.0|  0.0|446.0|0.00367404992094876|        0|
+---+-------------------+-----------+--------+-----+-----+-------------------+---------+
only showing top 5 rows



8

In [16]:
test_query.stop()

## 5. Queries <a class="anchor" id="queries">
[<font color='#fb7813'>**\>\> Volver arriba**</font>](#back)

### 5.1 Promedio de valores
Calcular el promedio de valores de temperatura, humedad relativa y concentración de CO2 para cada micro-batch, y el promedio de dichos valores desde el arranque de la aplicación.

**Por Microbatch**

In [17]:
columns = ["Temperature", "Humidity", "CO2"]
query_1a = (df_stream_schema
             .select(*[fn.avg(col).alias("avg_" + col) for col in columns])
    .writeStream
    .format("memory")
    .queryName("query1a")
    .trigger(processingTime="5 seconds")
    .outputMode("update")
    .start())

In [18]:
sleep(11)
df = spark.sql("SELECT * FROM query1a")
df.show(5)

+------------------+------------------+------------------+
|   avg_Temperature|      avg_Humidity|           avg_CO2|
+------------------+------------------+------------------+
|              null|              null|              null|
|              21.0| 23.79555555555555|446.11111111111114|
|20.978611111111107|23.785902777777768|444.59027777777777|
+------------------+------------------+------------------+



In [19]:
query_1a.stop()

**Desde memoria**

In [20]:
query_1b = (df_stream_schema
             .select(*[fn.avg(col).alias("avg_" + col) for col in columns])
    .writeStream
    .format("memory")
    .queryName("query1b")
    .trigger(processingTime="5 seconds")
    .outputMode('update')
    .option("checkpointLocation", "../01_data/checkpoints")
    .start())

In [21]:
sleep(11)
df = spark.sql("SELECT * FROM query1b")
df.show(5)

+------------------+------------------+-----------------+
|   avg_Temperature|      avg_Humidity|          avg_CO2|
+------------------+------------------+-----------------+
|21.614222185815713|25.646400939274383|572.8330002184363|
|21.613438068222635|25.644260528037993|572.6966997599827|
| 21.61264966579432|25.642039959313745|572.5574269834352|
+------------------+------------------+-----------------+



In [22]:
query_1b.stop()

### 5.2 Promedio luminosidad
Calcular el promedio de luminosidad en la estancia en ventanas deslizantes de tamaño 45 segundos, con un valor de deslizamiento de 15 segundos entre ventanas consecutivas.

In [23]:
df_window = (df_stream_schema
                .withWatermark("date", "45 seconds")
                .groupBy(fn.window("date",
                                "45 seconds",
                                "15 seconds")
                        )
                .agg(fn.avg("Light").alias("avg_light"))
               )

In [24]:
query_2 = (df_window
    .writeStream
    .format("memory")
    .queryName("query2")
    .trigger(processingTime='5 seconds')
    .outputMode('append')
    .start())

In [25]:
sleep(11)
df = spark.sql("SELECT * FROM query2")
df.orderBy("window", ascending=False).show(5, truncate=False)

+------------------------------------------+---------+
|window                                    |avg_light|
+------------------------------------------+---------+
|[2015-02-05 05:15:30, 2015-02-05 05:16:15]|0.0      |
|[2015-02-05 05:15:00, 2015-02-05 05:15:45]|0.0      |
|[2015-02-05 05:14:45, 2015-02-05 05:15:30]|0.0      |
|[2015-02-05 05:14:30, 2015-02-05 05:15:15]|0.0      |
|[2015-02-05 05:13:45, 2015-02-05 05:14:30]|0.0      |
+------------------------------------------+---------+
only showing top 5 rows



In [26]:
query_2.stop()

### 5.3 Intervalo de muestras
Examinando los datos, podemos apreciar que el intervalo entre muestras originales no es exactamente de 1 minuto en muchos casos. Calcular el número de parejas de muestras consecutivas en cada micro-batch entre las cuales el intervalo de separación no es exactamente de
1 minuto.

In [27]:
def get_diff(df, epoch_id):
    w = Window.partitionBy().orderBy('date')
    df_w = (df.withColumn("prev_date", fn.lag("date").over(w))
              .withColumn("time_diff",
                        fn.col("date").cast("long") - fn.col("prev_date").cast("long")
                       )
            .withColumn("diff_seconds",
                         fn.col("date").cast("long") - fn.col("prev_date").cast("long"))
              .withColumn("type", fn.when(fn.col("time_diff") != 60, fn.lit("different_1_minute"))
                                .otherwise(fn.lit("1_minute"))
                      )
            )
    df_group = (df_w.where(fn.col('type') != '1_minute')
                    .groupBy('type')
                    .count())
    df_group.show()

In [28]:
query_3 = (df_stream_schema
    .writeStream
    .format("memory")
    .foreachBatch(get_diff)
    .trigger(processingTime="5 seconds")
    .start())
sleep(12)

+----+-----+
|type|count|
+----+-----+
+----+-----+

+----+-----+
|type|count|
+----+-----+
+----+-----+

+------------------+-----+
|              type|count|
+------------------+-----+
|different_1_minute|    2|
+------------------+-----+

+------------------+-----+
|              type|count|
+------------------+-----+
|different_1_minute|    1|
+------------------+-----+



In [29]:
query_3.stop()

In [30]:
spark.stop()