# Proyecto  Final -  Parte 4

### **Parte 4. Spark  Structured Streaming**

- 1. [Pasos previos](#1)
- 2. [Análisis agua](#2)

En esta última parte, se van a analizar los datos aportados por el puerto 45002, emite datos refentes a los consumos de agua

<a id="1"></a>
### * 1. Pasos previos al análisis e importación de librerias*

In [1]:
import org.apache.spark._

In [2]:
import org.apache.spark.streaming._

In [3]:
import org.apache.spark.sql.streaming._

In [4]:
import org.apache.spark.sql.SparkSession

In [5]:
import org.apache.spark.sql.functions._

In [6]:
import org.apache.spark.sql.Row

In [7]:
import org.apache.spark.sql.types._

In [8]:
import java.sql.Timestamp

In [9]:
val spark = SparkSession.builder.getOrCreate()

spark = org.apache.spark.sql.SparkSession@3d2a2aa6


#### * Se registra la fuente de datos*

In [10]:
val datos_raw = spark.readStream.format("socket").option("host","emisor.eoi.rbs-net.com").option("port", 45002).load()

datos_raw = [value: string]


[value: string]

In [11]:
val datos_campos = datos_raw.withColumn("tmp", split($"value", ";"))

datos_campos = [value: string, tmp: array<string>]


[value: string, tmp: array<string>]

#### * Se le asigna nombre a las columnas y se con que tipo de dato las reconoce*

In [12]:
val datos_cabec = datos_campos.select($"tmp".getItem(0).as("Edificio").cast("string"),
                                      $"tmp".getItem(1).as("Consumo").cast("double"),
                                      $"tmp".getItem(2).as("Año").cast("integer"),
                                      $"tmp".getItem(3).as("Mes").cast("integer"),
                                      $"tmp".getItem(4).as("Día").cast("integer"),
                                      $"tmp".getItem(5).as("Hora").cast("integer"),
                                      $"tmp".getItem(6).as("Minuto").cast("integer"))

datos_cabec = [Edificio: string, Consumo: double ... 5 more fields]


[Edificio: string, Consumo: double ... 5 more fields]

#### * Se agrupan los datos en una columna llamada fecha*

In [13]:
val datos_cabe_ok = datos_cabec.select($"Edificio",$"Consumo",(concat($"Año", lit("-"), 
                                          $"Mes", lit("-"), 
                                          $"Día", lit(" "), 
                                          $"Hora", lit(":"),
                                          $"Minuto").cast("Timestamp").as("Fecha")))

datos_cabe_ok = [Edificio: string, Consumo: double ... 1 more field]


[Edificio: string, Consumo: double ... 1 more field]

In [14]:
datos_cabe_ok.printSchema

root
 |-- Edificio: string (nullable = true)
 |-- Consumo: double (nullable = true)
 |-- Fecha: timestamp (nullable = true)



<a id="2"></a>
### * 2. Análisis de los datos del agua *

#### *Se muestran por pantalla sin que se elimine ninguna parte, la tabla datos con los elementos recibidos en cada uno de los intervalos  *

In [31]:
val query_1 = datos_cabe_ok.writeStream.
format("console").
option("truncate", false).
option("numRows", 30).
start()

query_1 = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@47c43ee7


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@47c43ee7

-------------------------------------------
Batch: 0
-------------------------------------------
+------------+-------+-------------------+
|Edificio    |Consumo|Fecha              |
+------------+-------+-------------------+
|AGUA_EDIF_20|0.27   |2013-10-15 14:45:00|
+------------+-------+-------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------+-------+-------------------+
|Edificio    |Consumo|Fecha              |
+------------+-------+-------------------+
|AGUA_EDIF_20|0.3    |2013-10-15 15:00:00|
|AGUA_EDIF_20|0.28   |2013-10-15 15:15:00|
|AGUA_EDIF_20|0.33   |2013-10-15 15:30:00|
|AGUA_EDIF_20|0.22   |2013-10-15 15:45:00|
|AGUA_EDIF_20|0.21   |2013-10-15 16:00:00|
|AGUA_EDIF_20|0.22   |2013-10-15 16:15:00|
|AGUA_EDIF_20|0.25   |2013-10-15 16:30:00|
|AGUA_EDIF_20|0.26   |2013-10-15 16:45:00|
|AGUA_EDIF_20|0.35   |2013-10-15 17:00:00|
|AGUA_EDIF_20|0.25   |2013-10-15 17:15:00|
|AGUA_EDIF_20|0.25   |2013-10-1

#### * Se analizan los consumos por edificio *

In [41]:
val query_2_aux = datos_cabe_ok.groupBy($"Edificio").agg(sum($"Consumo") as "Consumo (m3)")

query_2_aux = [Edificio: string, Consumo (m3): double]


[Edificio: string, Consumo (m3): double]

In [42]:
val query_2 = query_2_aux.writeStream.format("console").
outputMode("complete").queryName("query_2").option("truncate", false).
start()

query_2 = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4838888a


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4838888a

-------------------------------------------
Batch: 0
-------------------------------------------
+------------+------------+
|Edificio    |Consumo (m3)|
+------------+------------+
|﻿AGUA_EDIF_1|0.0         |
+------------+------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------+------------------+
|Edificio    |Consumo (m3)      |
+------------+------------------+
|AGUA_EDIF_1 |3.9800000000000004|
|﻿AGUA_EDIF_1|0.0               |
+------------+------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------+------------+
|Edificio    |Consumo (m3)|
+------------+------------+
|AGUA_EDIF_1 |4.62        |
|﻿AGUA_EDIF_1|0.0         |
+------------+------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+------------+
|Edificio            |Consumo (m3)|
+--------------------+---

#### * Se calcula el número de registros por Edificio junto con la media de los consumos*

In [16]:
val query_3_aux = datos_cabe_ok.groupBy($"Edificio").agg(count($"Consumo") as "Número de registros", round(avg($"Consumo"),2) as "Media de los registros")

query_3_aux = [Edificio: string, Número de registros: bigint ... 1 more field]


[Edificio: string, Número de registros: bigint ... 1 more field]

In [17]:
val query_3 = query_3_aux.writeStream.format("console").
outputMode("complete").queryName("query_2").option("truncate", false).
start()

query_3 = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@334c5c1a


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@334c5c1a

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+-------------------+----------------------+
|Edificio   |Número de registros|Media de los registros|
+-----------+-------------------+----------------------+
|AGUA_EDIF_3|1                  |0.1                   |
+-----------+-------------------+----------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+-------------------+----------------------+
|Edificio   |Número de registros|Media de los registros|
+-----------+-------------------+----------------------+
|AGUA_EDIF_3|36                 |0.06                  |
+-----------+-------------------+----------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+-------------------+----------------------+
|Edificio   |Número de registros|Media de los registros|
+-----------+----------

-------------------------------------------
Batch: 18
-------------------------------------------
+------------+-------------------+----------------------+
|Edificio    |Número de registros|Media de los registros|
+------------+-------------------+----------------------+
|AGUA_EDIF_31|96                 |0.0                   |
|AGUA_EDIF_3 |60                 |0.04                  |
|AGUA_EDIF_36|61                 |0.03                  |
|AGUA_EDIF_32|96                 |0.02                  |
+------------+-------------------+----------------------+

-------------------------------------------
Batch: 19
-------------------------------------------
+------------+-------------------+----------------------+
|Edificio    |Número de registros|Media de los registros|
+------------+-------------------+----------------------+
|AGUA_EDIF_31|96                 |0.0                   |
|AGUA_EDIF_3 |60                 |0.04                  |
|AGUA_EDIF_36|81                 |0.03           

-------------------------------------------
Batch: 31
-------------------------------------------
+------------+-------------------+----------------------+
|Edificio    |Número de registros|Media de los registros|
+------------+-------------------+----------------------+
|AGUA_EDIF_31|96                 |0.0                   |
|AGUA_EDIF_42|75                 |0.0                   |
|AGUA_EDIF_3 |60                 |0.04                  |
|AGUA_EDIF_36|96                 |0.03                  |
|AGUA_EDIF_4 |96                 |0.02                  |
|AGUA_EDIF_32|96                 |0.02                  |
+------------+-------------------+----------------------+

-------------------------------------------
Batch: 32
-------------------------------------------
+------------+-------------------+----------------------+
|Edificio    |Número de registros|Media de los registros|
+------------+-------------------+----------------------+
|AGUA_EDIF_31|96                 |0.0            

+------------+-------------------+----------------------+
|Edificio    |Número de registros|Media de los registros|
+------------+-------------------+----------------------+
|AGUA_EDIF_31|96                 |0.0                   |
|AGUA_EDIF_42|96                 |0.0                   |
|AGUA_EDIF_3 |60                 |0.04                  |
|AGUA_EDIF_36|96                 |0.03                  |
|AGUA_EDIF_4 |96                 |0.02                  |
|AGUA_EDIF_44|56                 |0.01                  |
|AGUA_EDIF_43|96                 |0.02                  |
|AGUA_EDIF_32|96                 |0.02                  |
+------------+-------------------+----------------------+

-------------------------------------------
Batch: 43
-------------------------------------------
+------------+-------------------+----------------------+
|Edificio    |Número de registros|Media de los registros|
+------------+-------------------+----------------------+
|AGUA_EDIF_31|96               

-------------------------------------------
Batch: 52
-------------------------------------------
+------------+-------------------+----------------------+
|Edificio    |Número de registros|Media de los registros|
+------------+-------------------+----------------------+
|AGUA_EDIF_31|96                 |0.0                   |
|AGUA_EDIF_42|96                 |0.0                   |
|AGUA_EDIF_3 |60                 |0.04                  |
|AGUA_EDIF_45|96                 |0.0                   |
|AGUA_EDIF_5 |25                 |0.0                   |
|AGUA_EDIF_36|96                 |0.03                  |
|AGUA_EDIF_4 |96                 |0.02                  |
|AGUA_EDIF_44|96                 |0.01                  |
|AGUA_EDIF_43|96                 |0.02                  |
|AGUA_EDIF_32|96                 |0.02                  |
+------------+-------------------+----------------------+

-------------------------------------------
Batch: 53
-----------------------------------

-------------------------------------------
Batch: 61
-------------------------------------------
+------------+-------------------+----------------------+
|Edificio    |Número de registros|Media de los registros|
+------------+-------------------+----------------------+
|AGUA_EDIF_31|96                 |0.0                   |
|AGUA_EDIF_6 |69                 |0.02                  |
|AGUA_EDIF_42|96                 |0.0                   |
|AGUA_EDIF_3 |60                 |0.04                  |
|AGUA_EDIF_45|96                 |0.0                   |
|AGUA_EDIF_5 |96                 |0.02                  |
|AGUA_EDIF_36|96                 |0.03                  |
|AGUA_EDIF_4 |96                 |0.02                  |
|AGUA_EDIF_44|96                 |0.01                  |
|AGUA_EDIF_43|96                 |0.02                  |
|AGUA_EDIF_32|96                 |0.02                  |
+------------+-------------------+----------------------+

-------------------------------

+------------+-------------------+----------------------+
|Edificio    |Número de registros|Media de los registros|
+------------+-------------------+----------------------+
|AGUA_EDIF_8 |49                 |0.03                  |
|AGUA_EDIF_31|96                 |0.0                   |
|AGUA_EDIF_6 |96                 |0.01                  |
|AGUA_EDIF_42|96                 |0.0                   |
|AGUA_EDIF_3 |60                 |0.04                  |
|AGUA_EDIF_45|96                 |0.0                   |
|AGUA_EDIF_5 |96                 |0.02                  |
|AGUA_EDIF_36|96                 |0.03                  |
|AGUA_EDIF_4 |96                 |0.02                  |
|AGUA_EDIF_7 |96                 |0.1                   |
|AGUA_EDIF_44|96                 |0.01                  |
|AGUA_EDIF_43|96                 |0.02                  |
|AGUA_EDIF_32|96                 |0.02                  |
+------------+-------------------+----------------------+

-------------

#### * Calculo del número de consumos que se realizan en los ultimos 15 minutos*

In [19]:
val query_3_aux = datos_cabe_ok.groupBy($"Edificio", window($"Fecha", "15 minutes", "9 minutes")).count()

query_3_aux = [Edificio: string, window: struct<start: timestamp, end: timestamp> ... 1 more field]


[Edificio: string, window: struct<start: timestamp, end: timestamp> ... 1 more field]

In [20]:
val query_3 = query_3_aux.writeStream.format("console").
outputMode("update").queryName("query_3").option("truncate", false).
start()

query_3 = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3f3ab03d


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3f3ab03d

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+------------------------------------------+-----+
|Edificio            |window                                    |count|
+--------------------+------------------------------------------+-----+
|AGUA_EDIF_1_CAFETERI|[2013-10-15 12:39:00, 2013-10-15 12:54:00]|1    |
+--------------------+------------------------------------------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+------------------------------------------+-----+
|Edificio            |window                                    |count|
+--------------------+------------------------------------------+-----+
|AGUA_EDIF_1_CAFETERI|[2013-10-15 14:27:00, 2013-10-15 14:42:00]|1    |
|AGUA_EDIF_1_CAFETERI|[2013-10-15 15:12:00, 2013-10-15 15:27:00]|1    |
|AGUA_EDIF_1_CAFETERI|[2013-10-15 17:09:00, 2013-10-15 17:24:00]|1    |
|AGUA_EDIF_1_

#### * Teniendo e cuenta que el mecanismo de watermarking permite a Spark ignorar aquellos datos que han sido generados en un instante de tiempo en el que NO es relevante para los resultados, por ejemplo se supone que no es relevante conservar los datos generados en los últimos 15 minutos. Además se añade una columan con el consumo máximo *

In [15]:
val query_4_aux = datos_cabe_ok.withWatermark("Fecha","15 minutes")
    .groupBy($"Edificio", window($"Fecha", "30 minutes", "30 minutes"))
    .agg(max($"Consumo") as "Consumo máximo")
    .sort(asc("window"))

query_4_aux = [Edificio: string, window: struct<start: timestamp, end: timestamp> ... 1 more field]


[Edificio: string, window: struct<start: timestamp, end: timestamp> ... 1 more field]

In [16]:
val query_4 = query_4_aux.writeStream.format("console").
outputMode("complete").queryName("query_4").option("truncate", false).
start()

query_4 = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@36089358


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@36089358

-------------------------------------------
Batch: 0
-------------------------------------------
+------------+------------------------------------------+--------------+
|Edificio    |window                                    |Consumo máximo|
+------------+------------------------------------------+--------------+
|AGUA_EDIF_14|[2013-10-15 01:30:00, 2013-10-15 02:00:00]|0.0           |
+------------+------------------------------------------+--------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------+------------------------------------------+--------------+
|Edificio    |window                                    |Consumo máximo|
+------------+------------------------------------------+--------------+
|AGUA_EDIF_15|[2013-10-15 00:00:00, 2013-10-15 00:30:00]|0.26          |
|AGUA_EDIF_15|[2013-10-15 00:30:00, 2013-10-15 01:00:00]|0.04          |
|AGUA_EDIF_15|[2013-10-15 01:00:00, 2013-10-15 01:30:00]|0.0           |
|A

#### * Cálculo del Edificio en el que ha habido el mayor consumo en los últimos 2 días*

In [17]:
val query_5_aux = datos_cabe_ok.groupBy(window($"Fecha", "48 hours", "48 hours"), $"Edificio")
    .agg(sum($"Consumo") as "Mayor Consumo")
    .sort(asc("window"))
    .sort(desc("Mayor Consumo"))

query_5_aux = [window: struct<start: timestamp, end: timestamp>, Edificio: string ... 1 more field]


[window: struct<start: timestamp, end: timestamp>, Edificio: string ... 1 more field]

In [18]:
val query_5 = query_5_aux.writeStream.format("console").
outputMode("complete").queryName("query_4").option("truncate", false).
start()

query_5 = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@495f96ba


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@495f96ba

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-------------+
|window                                    |Edificio    |Mayor Consumo|
+------------------------------------------+------------+-------------+
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_36|0.04         |
+------------------------------------------+------------+-------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+------------+------------------+
|window                                    |Edificio    |Mayor Consumo     |
+------------------------------------------+------------+------------------+
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_4 |2.3000000000000007|
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_36|1.900000000000001 |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_42|0.0

-------------------------------------------
Batch: 11
-------------------------------------------
+------------------------------------------+------------+------------------+
|window                                    |Edificio    |Mayor Consumo     |
+------------------------------------------+------------+------------------+
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_4 |2.3000000000000007|
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_43|1.968             |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_36|1.900000000000001 |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_5 |1.49              |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_44|0.8500000000000001|
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_6 |0.34              |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_42|0.02              |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_45|0.0               |
+------------------------------------------+-----------

-------------------------------------------
Batch: 19
-------------------------------------------
+------------------------------------------+------------+------------------+
|window                                    |Edificio    |Mayor Consumo     |
+------------------------------------------+------------+------------------+
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_UPO    |159.11            |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_7 |9.59              |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_4 |2.3000000000000007|
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_8 |2.1000000000000005|
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_43|1.968             |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_36|1.900000000000001 |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_5 |1.49              |
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_6 |1.3900000000000001|
|[2013-10-14 02:00:00, 2013-10-16 02:00:00]|AGUA_EDIF_4

In [19]:
query_5.stop

### * Se listan las queries que se encuentran en ejecución en este momento. *

In [None]:
spark.streams.active

### * Se finalizan las queries*

In [20]:
query_1.stop
query_2.stop
query_3.stop
query_4.stop
query_5.stop
query_6.stop

Name: Unknown Error
Message: <console>:64: error: not found: value query_6
       query_6.stop
       ^
<console>:57: error: not found: value query_1
       query_1.stop
       ^
<console>:58: error: not found: value query_2
       query_2.stop
       ^
<console>:59: error: not found: value query_3
       query_3.stop
       ^
<console>:60: error: not found: value query_4
       query_4.stop
       ^

StackTrace: 