Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [1]:
NAME = "Francisco Javier Morales Hidalgo"
COLLABORATORS = ""

---

![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)  ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)

# PEC_5_2: Structured Streaming.

En esta PEC vamos a trabajar con [Spark Structured Streaming](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html), un motor de procesamiento de flujo escalable y tolerante a fallos construido sobre el motor Spark SQL. 

Spark Structured Streaming nos permite realizar nuestro análisis de datos en streaming de la misma manera que lo hacemos con el procesamiento por lotes sobre datos estáticos. Ahora bien, hay que tener en cuenta que el structured streaming tiene una serie de ventajas. Por ejemplo, que el motor Spark SQL se encargará de ejecutar los analísis programados de forma incremental y continua, generando el resultado final a medida que datos de transmisión. Spark Streaming se basa en la API de Dataset/DataFrame que se puede utilizar Scala, Java, Python o R para expresar agregaciones de transmisión, ventanas de tiempo de eventos, etc. Finalmente, el sistema asegura garantías de tolerancia a fallos de un extremo a otro a través de puntos de control y registros de escritura anticipada.


**IMPORTANTE: Para realizar esta práctica debes hacerlo mediante SSH desde terminal o VSCODE, y poner el código de la misma en este NOTEBOOK solo para su corrección.**

### entrega: 
El formato de entrega será un directorio comprimido en formato gnuzip bajo el nombre `PEC5_username.tar.gz`, substituyendo username por vuestro	nombre	de	usuario. El contenido debe ser un fichero para cada programa Python por cada ejercicio indicando el apartado de los notebooks de enunciado, por ejemplo `PEC5_username_2_1_4.py` .Adjuntar los dos notebooks de la PEC, con el nombre `PEC5_1_username`, y `PEC5_2_username` con las salidas obtenidas que se piden, y las respuestas a las preguntas conceptuales planteadas.

1. PARTE 1. Word Count con Structured Streaming
1. PARTE 2. Operaciones de ventana sobre eventos temporales
1. PARTE 3. Captura y procesamiento de datos en tiempo real de la API OpenSky

## PARTE 1. Word Count con Structured Streaming

En esta primera parte de la PEC vamos a ver como implementar un word count conectando por sockets mediante un proceso netcat https://en.wikipedia.org/wiki/Netcat corriendo en una terminal vía SSH o VSCode y donde vais a ir escribiendo palabras, que posteriormente van a ser contadas.

Para empezar, vamos a realizar un primer ejercicio guiado donde vamos a contar las palabras haciendo uso de los DataFrames que nos ofrece Structured Streaming. 

La siguiente celda de Jupyter Notebook crea un objeto spark que corresponde a una instancia de SparkSession. En las versiones modernas de Spark, la clase SparkSession es el punto de entrada a una aplicación Spark para cualquier tipo de Spark API (RDD, SparkSQL, Streaming, etc). Se pide ejecutar la siguiente celda y comprobar que se ha ejecutado correctamente.

In [2]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

# Introducid el nombre de la app PEC5_ seguido de vuestro nombre de usuario
spark = SparkSession \
    .builder \
    .appName("PEC5_fmoralesh") \
    .getOrCreate()

2.4.0-cdh6.2.0


Mediante el objeto spark vamos a configurar una lectura de datos en streaming reportados en el puerto que tenéis asociado, dado que es donde está el netcat estará funcionando. En el código de la siguiente celda debéis cambiar \<PUERTO_ASIGNADO\> por vuestro puerto.

El DataFrame `linesDF` representa una tabla ilimitada que contiene la transmisión de datos de texto. Esta tabla contiene una columna de cadenas denominada `value`, y cada línea de los datos de texto de transmisión se convierte en una fila de la tabla. Tened en cuenta que todavía no está recibiendo ningún dato ya que solo estamos configurando la transformación y aún no hemos comenzado a recibir datos. Se pide al estudiante leer el código con detalle, revisar que se entienden todas las operaciones (consultar documentación en caso necesario) y ejecutar la celda.

In [3]:
# Creamos el DataFrame representando el streaming de las lineas que nos entran por host:port
linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20036)\
    .load()

# Separamos las lineas en palabras en un nuevo DF
#las funciones explode y split estan explicadas en
#https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
wordsDF = linesDF.select(
    explode(
        split(linesDF.value, ' ')
    ).alias('palabra')
)

# Generamos el word count en tiempo de ejecución
wordCountsDF = wordsDF.groupBy('palabra').count()

Ahora que hemos configurado la consulta (análisis) sobre los datos de transmisión, declaramos la consulta para comenzar a recibir los datos y contar las palabras. Para hacer esto, vamos a configurar la salida del análisis para que imprima el conjunto completo de recuentos, especificado por `outputMode("complete")` y configurado para trabajar en memoria cada vez que se actualizan. Finalmente iniciamos el cálculo de streaming usando `start()`.

In [6]:
# Iniciamos la consuta que muestra por consola o almacena en memoria el word count. 
# Trabajamos a partir del DataFrame que contiene la agrupación de las palabras y el numero de repeticiones
# Utilizamos el formato memory para poder mostrarlo en Notebook, 
#si ejecutamos en consola debemos poner el formato console
query = wordCountsDF\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

#en una ejecución desde el terminal de sistema, necesitamos evitar que el programa finalice mientras 
#se está ejecutando la consulta en un Thread separado y en segundo plano. 
#query.awaitTermination() 

En una **sesión de terminal mediante SSH, no mediante Jupyter terminal** debéis ejecutar un netcat `$ nc -lk <puerto_asignado>`.

Mediante esta celda podemos mostrar en el Notebook los datos de la consulta a la tabla `palabras` en una celda, y vamos actualizando esta celda cada 5 segundos. En este caso utilizamos una sentencia SQL. Como se trata de un bucle sobre el Notebook deberéis parar el kernel una vez vista la salida.

In [11]:
from IPython.display import display, clear_output
from time import sleep
while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM palabras').show())
    sleep(5)

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

+-------+-----+
|palabra|count|
+-------+-----+
|   Hola|    1|
|      a|    4|
|    ASD|    3|
+-------+-----+



None

KeyboardInterrupt: 

Salida de ejemplo:

`{'isDataAvailable': False,
 'isTriggerActive': True,
 'message': 'Waiting for data to arrive'}
+-------+-----+
|palabra|count|
+-------+-----+
|   Data|    2|
|    UOC|    2|
|    Big|    2|
|  Spark|    1|
+-------+-----+`

Alternativamente podemos consultar los datos que estamos recibiendo por streaming mediante la tabla `palabras`, pero tendremos que actualizar manualmente el show(). Tarda un tiempo en aparecer la primera salida en Jupyter Notebook.

In [13]:
spark.table("palabras").show()

+--------+-----+
| palabra|count|
+--------+-----+
|    Hola|    2|
| artista|    1|
|      er|    1|
|Caracola|    1|
|   vamor|    1|
|     Que|    1|
|     tio|    1|
|  mostro|    1|
|       a|    4|
|    dise|    1|
|     ASD|    3|
|    Como|    1|
+--------+-----+



Salida de ejemplo:

`+-------+-----+
|palabra|count|
+-------+-----+
|   Data|    2|
|    UOC|    2|
|    Big|    2|
|  Spark|    1|
+-------+-----+`

A partir de este ejemplo que hemos visto y que el alumno debe ejecutar para probar su funcionamiento, se pide:

> **Pregunta 1. (1 punto)** Realiza un programa en Python que cuente las palabras que empiezan por A y que tengan más de 5 counts. Debéis ejecutarlo el programa en una **terminal**, no dentro del Jupyter, y en otra terminal el netcat. Para ello utilizamos un programa Python en local mediante `./python3 PEC5_2_1_1.py localhost <puerto_asignado>`  

> Adjunta el código y la salida obtenida en **forma textual**. 

Salida de ejemplo:

`+-------+-----+
|palabra|count|
+-------+-----+
| Albert|    6|
+-------+-----+`

In [None]:
import findspark
findspark.init()
import sys
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from IPython.display import display, clear_output
from time import sleep

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

# Introducid el nombre de la app PEC5_ seguido de vuestro nombre de usuario
spark = SparkSession \
    .builder \
    .appName("PEC5_fmoralesh") \
    .getOrCreate()

# Creamos el DataFrame representando el streaming de las lineas que nos entran por host:port
linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', sys.argv[1])\
    .option('port', sys.argv[2])\
    .load()

# Separamos las lineas en palabras en un nuevo DF
#las funciones explode y split estan explicadas en
#https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
wordsDF = linesDF.select(
    explode(
        split(linesDF.value, ' ')
    ).alias('palabra')
)

# Generamos el word count en tiempo de ejecución
wordCountsDF = wordsDF.groupBy('palabra').count()

# Iniciamos la consuta que muestra por consola o almacena en memoria el word count. 
# Trabajamos a partir del DataFrame que contiene la agrupación de las palabras y el numero de repeticiones
# Utilizamos el formato memory para poder mostrarlo en Notebook, 
#si ejecutamos en consola debemos poner el formato console
query = wordCountsDF\
    .writeStream\
    .outputMode('complete')\
    .format("memory") \
    .queryName("palabras") \
    .start()

#en una ejecución desde el terminal de sistema, necesitamos evitar que el programa finalice mientras 
#se está ejecutando la consulta en un Thread separado y en segundo plano. 
#query.awaitTermination() 

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM palabras WHERE palabra LIKE "A%" AND count > 5').show())
    sleep(5)

Copia la salida obtenida en formato de texto:

<code>
+-------+-----+
|palabra|count|
+-------+-----+
|      A|   15|
|    Arg|    7|
+-------+-----+

Ahora vamos a realizar un ejercicio que nos permita realizar una consulta SQL sobre los datos recibidos.  Además, utilizaremos el mecanismo de control de fallos que Spark utiliza, los [*checkpoint*](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing), que van guardando información en el HDFS por si es necesario recuperarla. 

>**Pregunta 2. (1 punto)** Crea una tabla temporal para poder realizar una consulta SQL sobre las palabras que estamos obteniendo mediante streaming. El programa debe extraer las diferentes palabras de una frase y solo mostrar por consola aquellas que tengan una longitud superior a 3 caracteres. Tenéis que mostrar el tiempo de adquisición y poner un checkpoint en HDFS que se denomine `punto_control_pec5`.

Salida de ejemplo:

`
+-------+--------------------+
|palabra|              tiempo|
+-------+--------------------+
|   Data|2021-12-2  12:21:...|
+-------+--------------------+
`

In [None]:
# YOUR CODE HERE
import findspark
findspark.init()
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, length
from pyspark.sql.functions import split, size, col
from IPython.display import display, clear_output
from time import sleep
import sys

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)


# Introducid el nombre de la app PEC5_ seguido de vuestro nombre de usuario
spark = SparkSession \
    .builder \
    .appName("PEC5_fmoralesh") \
    .getOrCreate()

# Creamos el DataFrame representando el streaming de las lineas que nos entran por host:port
linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', sys.argv[1])\
    .option('port', sys.argv[2])\
    .option('includeTimestamp', 'true')\
    .load()

# Separamos las lineas en palabras en un nuevo DF
#las funciones explode y split estan explicadas en
#https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
wordsDF = linesDF.select(
    explode(
        split(linesDF.value, ' ')
    ).alias('palabra'),linesDF.timestamp
)

# Generamos el word count en tiempo de ejecución
wordCountsDF = wordsDF.where(length(col("palabra")) > 2)\
    .groupBy('timestamp','palabra').count()

# Iniciamos la consuta que muestra por consola o almacena en memoria el word count. 
# Trabajamos a partir del DataFrame que contiene la agrupación de las palabras y el numero de repeticiones
# Utilizamos el formato memory para poder mostrarlo en Notebook, 
#si ejecutamos en consola debemos poner el formato console
query = wordCountsDF\
    .writeStream\
    .outputMode('complete')\
    .option("checkpointLocation", "punto_control_pec5")\
    .format("memory") \
    .queryName("palabras") \
    .start()

#en una ejecución desde el terminal de sistema, necesitamos evitar que el programa finalice mientras 
#se está ejecutando la consulta en un Thread separado y en segundo plano. 
#query.awaitTermination() 

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT palabra, timestamp as tiempo FROM palabras').show())
    sleep(5)

Copia la salida obtenida en formato de texto:

<code>
+---------+--------------------+
|  palabra|              tiempo|
+---------+--------------------+
|Mardision|2021-12-29 10:55:...|
|   prueba|2021-12-29 10:55:...|
|      una|2021-12-29 10:55:...|
|     Esto|2021-12-29 10:55:...|
+---------+--------------------+

>**Pregunta 3. (1 punto)** Modifica el programa para que haga uso del outputMode *append*. Debemos de guardar cada entrada en un fichero de texto en HDFS. Adjunta la salida del HDFS del contenido del directorio creado.

Salida de ejemplo:

`
hdfs dfs -ls /user/<usuario>/pec5_1_3
Found 4 items
drwxr-xr-x   - usuario usuario          0 2021-12-02 12:43 /user/<usuario>/pec5_1_3/_spark_metadata
-rw-r--r--   3 usuario usuario          9 2021-12-02 12:43 /user/<usuario>/pec5_1_3/part-00000-499014ff-cf00-4f2f-a8a4-d282cdac1a19-c000.txt
-rw-r--r--   3 usuario usuario          7 2021-12-02 12:43 /user/<usuario>/pec5_1_3/part-00000-7d8cb984-95ad-4e50-8887-a72f4d2814a2-c000.txt
-rw-r--r--   3 usuario usuario          0 2021-12-02 12:43 /user/<usuario>/pec5_1_3/part-00000-a4918ce0-c930-439f-a5cd-a1bd777f609b-c000.txt
    `

In [None]:
import findspark
findspark.init()
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, length
from pyspark.sql.functions import split, size, col, concat, lit
from IPython.display import display, clear_output
from time import sleep
import sys

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)


# Introducid el nombre de la app PEC5_ seguido de vuestro nombre de usuario
spark = SparkSession \
    .builder \
    .appName("PEC5_fmoralesh") \
    .getOrCreate()

# Creamos el DataFrame representando el streaming de las lineas que nos entran por host:port
linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', sys.argv[1])\
    .option('port', sys.argv[2])\
    .option('includeTimestamp', 'true')\
    .load()

# Separamos las lineas en palabras en un nuevo DF
#las funciones explode y split estan explicadas en
#https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
wordsDF = linesDF.select(
    explode(
        split(linesDF.value, ' ')
    ).alias('palabra'),linesDF.timestamp
).where(length(col("palabra")) > 2).select(concat(col("palabra"), lit(","), col("timestamp")))

# Generamos el word count en tiempo de ejecución
wordCountsDF = wordsDF

# Iniciamos la consuta que muestra por consola o almacena en memoria el word count. 
# Trabajamos a partir del DataFrame que contiene la agrupación de las palabras y el numero de repeticiones
# Utilizamos el formato memory para poder mostrarlo en Notebook, 
#si ejecutamos en consola debemos poner el formato console
query = wordCountsDF\
    .writeStream\
    .format("text")\
    .outputMode("append")\
    .option("path", "/user/fmoralesh/pec5_1_3")\
    .option("checkpointLocation", "./punto_control_pec5")\
    .start()

#en una ejecución desde el terminal de sistema, necesitamos evitar que el programa finalice mientras 
#se está ejecutando la consulta en un Thread separado y en segundo plano. 
query.awaitTermination() 



Copia la salida obtenida en el HDFS en formato de texto:

<code>
hdfs dfs -ls /user/fmoralesh/pec5_1_3
Found 16 items
drwxr-xr-x   - fmoralesh fmoralesh          0 2021-12-28 17:06 /user/fmoralesh/pec5_1_3/ 
drwxr-xr-x   - fmoralesh fmoralesh          0 2021-12-28 17:06 /user/fmoralesh/pec5_1_3/%20
drwxr-xr-x   - fmoralesh fmoralesh          0 2021-12-28 17:15 /user/fmoralesh/pec5_1_3/_spark_metadata
-rw-r--r--   3 fmoralesh fmoralesh         36 2021-12-28 17:15 /user/fmoralesh/pec5_1_3/part-00000-046de910-8e83-4de9-b016-2b1c8723b31a-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh          0 2021-12-28 17:13 /user/fmoralesh/pec5_1_3/part-00000-0474cf19-fd0b-4336-acf2-2e441660a5c1-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh          0 2021-12-27 21:19 /user/fmoralesh/pec5_1_3/part-00000-5bb03b86-ab32-403c-98ff-b1c688d4d522-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh          0 2021-12-27 21:09 /user/fmoralesh/pec5_1_3/part-00000-68240808-9c06-423c-b5d7-7ad29b8dde53-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh         40 2021-12-28 17:15 /user/fmoralesh/pec5_1_3/part-00000-77545e69-5e90-4086-9bd8-40f85cc680aa-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh         33 2021-12-28 17:15 /user/fmoralesh/pec5_1_3/part-00000-7c6ec129-b107-42d1-bbd5-76f85aa8aeda-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh          0 2021-12-26 18:12 /user/fmoralesh/pec5_1_3/part-00000-7c7e5dd6-0b5f-473d-9fce-ddfaaeb27ae4-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh          0 2021-12-28 17:16 /user/fmoralesh/pec5_1_3/part-00000-bba0563c-cfbe-4971-86d2-47b151488a1c-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh          0 2021-12-27 21:17 /user/fmoralesh/pec5_1_3/part-00000-cb010a5b-f456-4ce3-9495-0f07addfd57e-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh          0 2021-12-28 17:15 /user/fmoralesh/pec5_1_3/part-00000-eaac067f-64b1-409c-ad18-32caab934cf3-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh         31 2021-12-28 17:15 /user/fmoralesh/pec5_1_3/part-00000-edc037f9-0430-40e6-abe7-bbc0ccff3541-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh          0 2021-12-27 21:18 /user/fmoralesh/pec5_1_3/part-00000-f1df973d-e37a-4727-a8ff-38e13b3c0569-c000.txt
-rw-r--r--   3 fmoralesh fmoralesh         88 2021-12-27 21:17 /user/fmoralesh/pec5_1_3/part-00000-fcbbe6f6-814c-45f1-9aab-8f599a59c6c8-c000.txt

>**Pregunta 4.(1 punto)** Realiza un programa en Python para que haga uso del outputMode update, y que los datos entrantes por consola. La lectura debe realizar en intervalos de 5 segundos

Salida de ejemplo:

<code>
-------------------------------------------
Batch: 5
-------------------------------------------
+-------+
|palabra|
+-------+
|    Big|
|   Data|
| Hadoop|
+-------+
<5 ... segundos>
-------------------------------------------
Batch: 6
-------------------------------------------
+-------+
|palabra|
+-------+
|  Spark|
+-------+
    </code>

In [None]:
import findspark
findspark.init()
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, length
from pyspark.sql.functions import split, size, col
from IPython.display import display, clear_output
from time import sleep
import sys

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)


# Introducid el nombre de la app PEC5_ seguido de vuestro nombre de usuario
spark = SparkSession \
    .builder \
    .appName("PEC5_fmoralesh") \
    .getOrCreate()

# Creamos el DataFrame representando el streaming de las lineas que nos entran por host:port
linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', sys.argv[1])\
    .option('port', sys.argv[2])\
    .load()

# Separamos las lineas en palabras en un nuevo DF
#las funciones explode y split estan explicadas en
#https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
wordsDF = linesDF.select(
    explode(
        split(linesDF.value, ' ')
    ).alias('palabra'))

# Generamos el word count en tiempo de ejecución
wordCountsDF = wordsDF

# Iniciamos la consuta que muestra por consola o almacena en memoria el word count. 
# Trabajamos a partir del DataFrame que contiene la agrupación de las palabras y el numero de repeticiones
# Utilizamos el formato memory para poder mostrarlo en Notebook, 
#si ejecutamos en consola debemos poner el formato console
query = wordCountsDF\
    .writeStream\
    .outputMode("update")\
    .option("path", "./user/fmoralesh/pec5_1_4")\
    .option("checkpointLocation", "./punto_control_pec5")\
    .trigger(processingTime="5 second")\
    .format("console") \
    .start()

#en una ejecución desde el terminal de sistema, necesitamos evitar que el programa finalice mientras 
#se está ejecutando la consulta en un Thread separado y en segundo plano. 
#query.awaitTermination() 

while True:
    clear_output(wait=True)
    display(query.status)
    sleep(5)


Copia la salida obtenida en formato de texto:

<code>
-------------------------------------------
Batch: 7
-------------------------------------------
+-------+
|palabra|
+-------+
|   ESto|
|     es|
|    una|
| prueba|
+-------+
-------------------------------------------
Batch: 8
-------------------------------------------
+-------+
|palabra|
+-------+
|    Eje|
|      4|
+-------+
    </code>

>**Pregunta 5.(1 punto)** Explica las diferencias y similitudes entre los tipos de salidas existentes en Structured Streaming (complete, update y append). El texto debe ser claro, explicativo y tener una extensión de 10 líneas aproximadamente.

* Complete. Outputmode con todos las filas del dataset que tengamos en el flujo y que queramos volcar en el sink cada vez que haya una actualización. Utilizaremos este modo cuando queramos ir añadiendo información agregada al sink cada vez que se active un trigger. En este modo sólo se usa información agregada con alguna función tipo count, sum, etc.
* Append. Output mode con únicamente las nuevas filas que vayamos añadiendo en el flujo y que volcaremos en el sink.
* Update. Output similar al anterior, con la diferencia que volcaremos en el Sink únicamente la nuevas filas comparando lo que teníamos en el estado anterior.

## PARTE 2. Operaciones de ventana sobre eventos temporales
En esta parte vamos a trabajar con operaciones de ventana sobre eventos temporales. Para ello vamos a utilizar el formato *rate*. El source [RateStreamSource](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#api-using-datasets-and-dataframes) es una fuente de transmisión que genera números consecutivos con marca de tiempo y es utilizada habitualmente para hacer pruebas y PoC (*Proof of Concept*). Para configurar un RateStreamSource utilizaremos  `format('rate')`, y el esquema de los datos entrantes es el adjunto siguiente. A diferencia de los ejercicios de la parte 1 no tendremos dos programas corriendo simultáneamente en el terminal, solo tendremos una, la de nuestro programa pyspark, dado que el formato rate se controla directamente desde la configuración del source Spark.


`root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true) `


>**Pregunta 1. (1 punto)** Realiza un programa mediante Structured Streaming que genere números mediante un formato *rate* como origen del streaming y donde debéis realizar el tratamiento de los mismos para acumularlos. Los números deben generarse cada segundo, y debemos utilizar una ventana de agrupación del streaming de 10 segundos y que se actualice cada 5 segundos.

In [None]:
import findspark
findspark.init()

from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, length
from pyspark.sql.functions import split, size, col, window
from IPython.display import display, clear_output
from time import sleep

conf = SparkConf()
conf.setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc.version)
ssc = StreamingContext(sc, 1)

spark = SparkSession \
    .builder \
    .appName("PEC5_fmoralesh") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('rate').option("rowsPerSecond", 1).load()

# Separamos las lineas en palabras en un nuevo DF
#las funciones explode y split estan explicadas en
#https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
wordsDF = linesDF

# Generamos el word count en tiempo de ejecución

wordCountsDF = wordsDF.withColumn(
    "window",
    window(
         "timestamp", 
         windowDuration="10 seconds"
    )
).groupBy("window", "value").count().select("window", "value", "count")


# Iniciamos la consuta que muestra por consola o almacena en memoria el word count. 
# Trabajamos a partir del DataFrame que contiene la agrupación de las palabras y el numero de repeticiones
# Utilizamos el formato memory para poder mostrarlo en Notebook, 
#si ejecutamos en consola debemos poner el formato console
query = wordCountsDF\
    .writeStream\
    .outputMode('update')\
    .format("console") \
    .queryName("palabras") \
    .trigger(processingTime="5 second")\
    .start()

query.awaitTermination()

#while True:
#    clear_output(wait=True)
#    display(query.status)
#    sleep(5)


>**Pregunta 2. (1 punto)** Comenta el código, muestra la salida que has obtenido y coméntala en una extensión en 4 y 8 líneas.

Salida de ejemplo:

<code>
-------------------------------------------                                     
Batch: 1
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |value|count|
+------------------------------------------+-----+-----+
|[2021-12-03 10:33:40, 2021-12-03 10:33:50]|0    |1    |
|[2021-12-03 10:33:45, 2021-12-03 10:33:55]|5    |1    |
|[2021-12-03 10:33:45, 2021-12-03 10:33:55]|0    |1    |
|[2021-12-03 10:33:45, 2021-12-03 10:33:55]|3    |1    |
|[2021-12-03 10:33:45, 2021-12-03 10:33:55]|4    |1    |
</code>

<code>
-------------------------------------------                                     
Batch: 5
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |value|count|
+------------------------------------------+-----+-----+
|[2021-12-29 12:52:10, 2021-12-29 12:52:20]|26   |1    |
|[2021-12-29 12:52:20, 2021-12-29 12:52:30]|28   |1    |
|[2021-12-29 12:52:20, 2021-12-29 12:52:30]|30   |1    |
|[2021-12-29 12:52:20, 2021-12-29 12:52:30]|29   |1    |
|[2021-12-29 12:52:20, 2021-12-29 12:52:30]|27   |1    |
+------------------------------------------+-----+-----+   
</code>


El código es muy similar al los anterior con las siguientes diferencias:
* El Spark lo creamos con el formato "rate" y lo configuramos para que genere una línea por segundo.
* Agrupamos el dataframe creando una ventana con un intervalo de 10 segundos.
* Lo sacamos en pantalla con el modo update cy configuramos el Trigger para que se lance cada 5 segundos.

El resultado es que vamos generando una serie de valores que vamos guardando junto al timestamp de creación y las veces que se ha repetido en ese Batch.


En la ejecución de las consultas es muy interesante poder ir obteniendo información sobre el progreso realizado en el último disparador del flujo: qué datos se procesaron, cuáles fueron las tasas de procesamiento, latencias, etc.

>**Pregunta 3 (1 punto).** Modifica el programa para que muestre 3 [métricas](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#monitoring-streaming-queries) del streaming mientras este se realiza. Solo se deben mostrar métricas mientras la consulta está activa.

In [None]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
import sys
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import col, window
from time import sleep

conf = SparkConf()
conf.setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc.version)

ssc = StreamingContext(sc, 1)

# Introducid el nombre de la app PEC5_ seguido de vuestro nombre de usuario
spark = SparkSession \
    .builder \
    .appName("PEC5_fmoralesh") \
    .getOrCreate()

# Creamos el DataFrame representando el streaming de las lineas que nos entran por host:port
linesDF = spark\
    .readStream\
    .format('rate').option("rowsPerSecond", 1).load()

# Separamos las lineas en palabras en un nuevo DF
#las funciones explode y split estan explicadas en
#https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
wordsDF = linesDF

# Generamos el word count en tiempo de ejecución

wordCountsDF = wordsDF.withColumn(
    "window",
    window(
         "timestamp", 
         windowDuration="10 seconds"
    )
).groupBy("window", "value").count().select("window", "value", "count")


# Iniciamos la consuta que muestra por consola o almacena en memoria el word count. 
# Trabajamos a partir del DataFrame que contiene la agrupación de las palabras y el numero de repeticiones
# Utilizamos el formato memory para poder mostrarlo en Notebook, 
#si ejecutamos en consola debemos poner el formato console


query = wordCountsDF\
    .writeStream\
    .outputMode('update')\
    .format("console") \
    .queryName("palabras") \
    .trigger(processingTime="5 second")\
    .start()

query_progress =  query.lastProgress
print("progress ", query_progress)
print("status ", query.status)
print("active ", query.isActive)

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")

query.awaitTermination()

>**Pregunta 4 (1 punto).** Explica las 3 métricas aplicadas con una extensión entre 5 y 10 líneas propias.

Salida de ejemplo:

<code>
{'isDataAvailable': False, 'isTriggerActive': False, 'message': 'Initializing sources'}
{'stateOperators': [{'customMetrics': {'loadedMapCacheHitCount': 0, 'stateOnCurrentVersionSizeBytes': 25198, 'loadedMapCacheMissCount': 0}, 'numRowsUpdated': 0, 'memoryUsedBytes': 82798, 'numRowsTotal': 0}], 'timestamp': '2021-12-03T10:16:01.657Z', 'sources': [{'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 'endOffset': 0, 'startOffset': None, 'processedRowsPerSecond': 0.0, 'numInputRows': 0}], 'runId': '9134994e-c17f-4277-974a-21cd09cf9aea', 'durationMs': {'triggerExecution': 36450, 'walCommit': 48, 'getB[Stage 8:======>        (22 + 2) / 200]
</code>

<code>
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'batchId': 7, 'stateOperators': [{'numRowsUpdated': 5, 'memoryUsedBytes': 89183, 'numRowsTotal': 39, 'customMetrics': {'loadedMapCacheHitCount': 2800, 'loadedMapCacheMissCount': 0, 'stateOnCurrentVersionSizeBytes': 25615}}], 'durationMs': {'addBatch': 1938, 'queryPlanning': 42, 'walCommit': 67, 'setOffsetRange': 0, 'getEndOffset': 0, 'triggerExecution': 2107, 'getBatch': 0}, 'timestamp': '2021-12-29T14:53:45.000Z', 'id': '091a4fb1-6700-4b23-a1c4-6d1c4a90aef2', 'name': 'palabras', 'numInputRows': 5, 'processedRowsPerSecond': 2.3730422401518743, 'runId': 'e5f13762-27ae-4265-9878-90901ca816b4', 'inputRowsPerSecond': 1.0, 'sink': {'description': 'org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@46db697b'}, 'sources': [{'startOffset': 34, 'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 'numInputRows': 5, 'processedRowsPerSecond': 2.3730422401518743, 'inputRowsPerSecond': 1.0, 'endOffset': 39}]}
</code>

Las métricas 

## PARTE 3. Captura y procesamiento de datos en tiempo real con la API OpenSky

Es esta parte de la práctica vamos a trabajar la adquisición de datos en tiempo real de [OpenSky](https://opensky-network.org/). OpenSky Network es una asociación sin ánimo de lucro con sede en Suiza que brinda acceso abierto a los datos de control de seguimiento de vuelos.  Fue creado como un proyecto de investigación por varias universidades y entidades gubernamentales con el objetivo de mejorar la seguridad, confiabilidad y eficiencia del espacio aéreo. Su función principal es recopilar, procesar y almacenar datos de control de tráfico aéreo y proporcionar acceso abierto a estos datos al público. Esencialmente los datos de los aviones se obtienen vía satélite haciendo uso de  Automatic Dependent Surveillance–Broadcast (ADS–B). Para realizar este ejercicio no es necesario registrarse en el sistema OpenSky dado que vamos ha relizar actualizaciones de la información e vuelo sobre la superficie de España cada 10 segundos. La API está disponible este [enlace](https://openskynetwork.github.io/opensky-api/python.html). El parámetro bbox es una tupla que indica la latitud mínima, máxima, y las longitudes mínimas y máximas.

Primeramente, vamos a utilizar el servicio OpenSkyApi para leer un rectángulo con las latitudes y longitudes que engloban la península ibérica.

Para ello debéis [instalar](https://github.com/openskynetwork/opensky-api) la biblioteca en vuestro directorio del servidor Cloudera

1. Descargar en formato .zip el repositorio
1. Subir a vuestro directorio personal del servidor de Cloudera el zip. 
1. Descomprimirlo.
1. Dentro del directorio que ha creador ejecutar `pip install -e ./python`

Una vez instalada el módulo anterior, la siguiente celda os mostrará los vuelos registrados sobre la península ibérica en estos momentos. Observad con detenimiento las propiedades del diccionario de cada vuelo.

In [None]:
import json
from random import sample

from opensky_api import OpenSkyApi
api = OpenSkyApi()
states = api.get_states(bbox=(36.173357, 44.024422,-10.137019, 1.736138))
#recuperamos codigo, pais_origen, long, lat, altitud, velocidad, ratio_vertical
#atención en este ejemplo solo estamos mostrando 5 vuelos aleatorios, 
#en vuestros ejercicios deberéis eliminar la función sample
for s in sample(states.states,5):
    vuelo_dict = {
                'callsign':s.callsign,
                'country': s.origin_country,
                'longitude': s.longitude,
                'latitude': s.latitude,
                'velocity': s.velocity,
                'vertical_rate': s.vertical_rate,
            }
    vuelo_encode_data = json.dumps(vuelo_dict, indent=2).encode('utf-8')
    print("(%r, %r,%r, %r, %r, %r)" % (s.callsign, s.origin_country, s.longitude, s.latitude,s.velocity,s.vertical_rate))

Salida de ejemplo:

`('BAW457  ', 'United Kingdom',-3.5196, 40.4292, 86.45, 10.73)
('BLX245  ', 'Sweden',-6.0307, 43.8266, 252.51, 0)
('CFG1HE  ', 'Germany',-8.4689, 40.2967, 236.56, 0)
('TOM3MK  ', 'United Kingdom',-7.2687, 41.5878, 247.02, 0)
('AEA57MC ', 'Spain',-0.5364, 38.2791, 64.7, -3.9)`

Ahora vamos a crear un programa en Python para poder enviar cada 10 segundos por el puerto que tenéis asignado información de los vuelos que hay sobre la península ibérica. Deberéis poner en marcha primero el programa Python con el servidor de sockets que lee de Opensky y después el programa de Spark con structured streaming, es decir, en esta parte volvemos a tener dos terminales abiertas a la vez, y lo podéis realizar con el VSCode o con el SSH.

>**Pregunta 1. (1 punto)** Modifica el programa Python para enviar datos de los vuelos en formato JSON. Os podéis auxiliar de la función [json.dumps](https://docs.python.org/3/library/json.html) que nos permite crear un JSON binario de cada diccionario con las propiedades del vuelo. Prestad atención al salto de línea, '\n', que se adjunta al final de cada envío, es fundamental para cerrar la transmisión de datos a Spark.

In [None]:
from time import sleep
import socket
import json
from opensky_api import OpenSkyApi

HOST = 'localhost'  # hostname o IP address
PORT = 20036        # puerto socket server

api = OpenSkyApi()
states = api.get_states(bbox=(36.173357, 44.024422,-10.137019, 1.736138))

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(1)
while True:
    print('\Esperando un cliente a',HOST , PORT)
    conn, addr = s.accept()
    print('\Connectat per', addr)
    try:
        while(True):
            v = {}
            for vuelo in states.states:
                vuelo_dict = {
                'callsign':vuelo.callsign,
                'country': vuelo.origin_country,
                'longitude': vuelo.longitude,
                'latitude': vuelo.latitude,
                'velocity': vuelo.velocity,
                'vertical_rate': vuelo.vertical_rate,
                }
                v = json.dumps(vuelo_dict, indent=2).encode('utf-8')
                print(v)
                conn.send(v)
                conn.send(b'\n')
            sleep(10)       
    except socket.error:
        print ('Error .\n\nClient desconnectat.\n')
conn.close()

>**Pregunta 2. (1 punto)** Se pide leer los datos recibidos mediante structured streaming y mostrar el esquema de los datos recibidos. En este primer ejercicio solo vamos a tener una cadena con el JSON recibido de cada vuelo y un esquema con un único elemento. Debéis utilizar la función "printSchema()".

>Una vez comprobada que la transmisión funciona, se pide realizar un pre-procesado antes del envío de los datos mediante el socket para eliminar aquellas líneas de datos que no sean útiles ni convenientes.

Salida de ejemplo:

<code>
root
 |-- value: string (nullable = true)

|value             |

|{"velocity": 210.12, "vertical_rate": 0, "latitude": 43.6082, "callsign": "TAP441  ", "longitude": -1.3992, "country": "Portugal"}         |
|{"velocity": 246.5, "vertical_rate": 0, "latitude": 40.5836, "callsign": "TAP844  ", "longitude": -3.8452, "country": "Portugal"}          |
|{"velocity": 0, "vertical_rate": null, "latitude": 40.487, "callsign": "IBE2800 ", "longitude": -3.5889, "country": "Spain"}      
</code>

In [None]:
import findspark
findspark.init()
import sys
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode,col
from pyspark.sql.functions import split
from IPython.display import display, clear_output
from time import sleep

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_fmoralesh") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20036)\
    .load()

vuelosDF = linesDF.select(
    explode(
        split(linesDF.value, '\n')
    ).alias('vuelos')
)

vuelosDF = vuelosDF.filter(col("vuelos").rlike("^(?!.*null.*).*$"))

query = vuelosDF\
    .writeStream\
    .outputMode('update')\
    .format("console") \
    .queryName("vuelos") \
    .option('truncate', 'false')\
    .start()


while query.isActive:
    print("\n")
    vuelosDF.printSchema()
    sleep(5)

query.awaitTermination()

Copia la salida obtenida en formato de texto:

<code>
+---------------------------------------------------------------------------------------------------------------------------------------+
|vuelos                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------+
|{"latitude": 40.6098, "callsign": "TAP649M ", "velocity": 220.6, "longitude": -7.2705, "vertical_rate": -7.15, "country": "Portugal"}  |
|{"latitude": 38.5234, "callsign": "ANE80ZK ", "velocity": 138.52, "longitude": -2.0891, "vertical_rate": -0.33, "country": "Spain"}    |
|{"latitude": 43.0884, "callsign": "IBS36TB ", "velocity": 236.73, "longitude": -2.1074, "vertical_rate": 0, "country": "Spain"}        |
|{"latitude": 39.43, "callsign": "IBS3920 ", "velocity": 219.11, "longitude": 0.3919, "vertical_rate": 0, "country": "Spain"}           |

>**Pregunta 3. (1 punto)** Se pide mostrar la información en forma de tabla, con las columnas, country|callsign|longitude|latitude|velocity|vertical_rate. Para ello vais a tener que crear un esquema mediante [StructType](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html) y aplicarlo a la función SQL [from_json](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.from_json.html). De esta manera podremos pasar de una columna string con todo el JSON, a 6 columnas con el tipo ajustado al valor contenido.

Salida de ejemplo:

<code>
root
 |-- callsign: string (nullable = true)
 |-- country: string (nullable = true)
 |-- longitude: long (nullable = true)
 |-- latitude: long (nullable = true)
 |-- velocity: long (nullable = true)
 |-- vertical_rate: long (nullable = true)


+--------------+--------+---------+--------+--------+-------------+
|       country|callsign|longitude|latitude|velocity|vertical_rate|
+--------------+--------+---------+--------+--------+-------------+
|      Portugal|TAP441  |  -1.3992| 43.6082|  210.12|          0.0|
|      Portugal|TAP844  |  -3.8452| 40.5836|   246.5|          0.0|
|         Spain|IBE2800 |  -3.5889|  40.487|     0.0|         null|
|United Kingdom|ABW713  |  -0.5287|  41.899|  155.82|        -7.48|
|         Spain|FYS161  |   -0.188| 38.8394|   64.77|        -2.93|
|         Spain|IBE3242 |  -3.5896| 40.4919|    0.77|         null|
|         Spain|AEA4025 |   0.1208| 38.5346|  125.53|         3.25|
|         Spain|P21     |  -3.5728| 40.4751|   10.29|         null|
|         Spain|IBE30EA |  -2.6086|  40.914|  181.29|        -6.18|
</code>

In [None]:
import findspark
findspark.init()

import sys
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, StringType, StructField, StructType
from pyspark.sql.functions import explode, from_json, col
from pyspark.sql.functions import split
from IPython.display import display, clear_output
from time import sleep

conf = SparkConf()
conf.setMaster("local[1]")
sc = SparkContext(conf=conf)
print(sc.version)

spark = SparkSession \
    .builder \
    .appName("PEC5_fmoralesh") \
    .getOrCreate()

linesDF = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 20036)\
    .load()

schema = StructType([
            StructField('country', StringType(), True),
            StructField('callsign', StringType(), True),
            StructField('longitude', FloatType(), True),
            StructField('latitude', FloatType(), True),
            StructField('velocity', FloatType(), True),
            StructField('vertical_rate', FloatType(), True)
])


vuelosDF = linesDF.select(
    explode(
        split(linesDF.value, '\n')
    ).alias('vuelos')
)

vuelosDF = vuelosDF.withColumn("json",from_json(col('vuelos'),schema)) \
                    .select('json.*')\
                    .filter(col("vuelos").rlike("^(?!.*null.*).*$"))

query = vuelosDF\
    .writeStream\
    .outputMode('update')\
    .format("console") \
    .queryName("vuelos") \
    .option('truncate', 'false')\
    .start()


while query.isActive:
    print("\n")
    vuelosDF.printSchema()
    sleep(5)

query.awaitTermination()

Copia la salida obtenida en formato de texto:

<code>
root
 |-- country: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- velocity: float (nullable = true)
 |-- vertical_rate: float (nullable = true)


+-----------+--------+---------+--------+--------+-------------+
|country    |callsign|longitude|latitude|velocity|vertical_rate|
+-----------+--------+---------+--------+--------+-------------+
|Portugal   |TAP1357 |-8.2756  |40.4173 |211.39  |-4.88        |
|Spain      |VOE3591 |1.2644   |40.6174 |215.66  |0.0          |
|Austria    |EJU1602 |-0.8185  |39.0415 |225.74  |0.0          |
|Spain      |IBS3905 |-7.5109  |36.8851 |219.92  |0.0          |
|Hungary    |WZZ8284 |1.5146   |39.3735 |216.73  |-0.33        |
|Spain      |VLG8829 |-1.4563  |43.0828 |236.3   |0.0          |
|Spain      |IBE32BP |-3.0783  |40.8207 |196.76  |14.63        |
|Austria    |EJU71RZ |-5.0604  |36.2494 |214.78  |0.0          |
|Spain      |VLG8159 |0.3431   |41.8262 |233.26  |0.0          |
|Sweden     |NSZ2CV  |-8.4239  |38.0333 |224.51  |0.0          |
|Belgium    |OOMBP   |-3.5812  |38.5875 |147.37  |-7.48        |
|Austria    |EJU71PD |-3.6842  |39.7705 |220.64  |0.0          |
|France     |AFR28KX |-1.574   |42.9226 |225.93  |0.0          |
|Spain      |IBE32GT |-2.0457  |42.2193 |239.73  |-4.88        |
|Switzerland|EZS902K |0.4305   |43.131  |218.97  |0.0          |
|Spain      |ANE8865 |-0.7192  |38.592  |196.81  |11.05        |
|Spain      |AEA6096 |0.2407   |39.5875 |214.85  |0.0          |
|Spain      |ANE15KE |-2.6365  |40.5944 |157.56  |-8.13        |
|Sweden     |SAS6004 |-3.4237  |39.672  |222.55  |-0.33        |
|Portugal   |TAP836A |-6.7827  |39.6987 |218.22  |0.0          |
+-----------+--------+---------+--------+--------+-------------+
</code>

>**Pregunta 4. (1 punto)** Muestra el total de vuelos para cada destino agrupados por país de destino que hay en cada momento. Los datos deben mostrarse ordenados por país alfabéticamente. Tened en cuenta que podemos recibir datos duplicados dado que el script de OpenSky lee cada 10 segundos todos los vuelos existentes y los envía por socket. Por defecto Spark crea 200 tareas (cada una implicará una partición de los datos) por stage en el procesamiento en Structured Streaming. Para acelerar el proceso de captura, se pide que [ajustéis](https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options) el parámetro en la configuración de SparkSession a un valor de 4 particiones.

Salida de ejemplo:

<code>
-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------------+-----+
|country                   |count|
+--------------------------+-----+
|Algeria                   |1    |
|Austria                   |3    |
|Belgium                   |1    |
|Chile                     |2    |
|Denmark                   |1    |
|France                    |15   |
|Germany                   |15   |
|Hungary                   |1    |
|Ireland                   |26   |
|Kingdom of the Netherlands|2    |
|Lithuania                 |1    |
|Luxembourg                |1    |
|Malta                     |5    |
|Mexico                    |1    |
</code>

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

Copia la salida obtenida en formato de texto:

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

>**Pregunta 5. (1 punto)** Agrupa todos los vuelos que están subiendo en altura, los que están bajando y los que están en tierra. Indica su numero. Deberás auxiliarte de una consulta SQL para poder indicar con -1 que un vuelo está descendiendo, +1 si está subiendo, y 0 si está en tierra.

Salida de ejemplo:

<code>
+------+-----+
|estado|count|
+------+-----+
|     0|   96|
|    -1|   54|
|     1|   41|
+------+-----+
</code>

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

Copia la salida obtenida en formato de texto:

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

>**Pregunta 6. (1 punto)** ¿De que manera podemos identificar que aparece un nuevo vuelo en el espacio aéreo?. No hace falta escribir el código sino describir con palabras como se plantearía la solución.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

>**Pregunta 7. (1 punto)** Explica brevemente en una extensión de entre 5 y 10 lineas las ventajas e inconvenientes de utilizar Structured Streaming versus Spark Streaming.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()