# PEC2 – Infr. Big Data – Ejercicio 2 - Spark Streaming

## Tratamiento de datos en streaming
Aquí podéis incluir las sentencias de importación de paquetes e inicialización que necesitéis

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pathToUri import toUri

In [3]:
def update_func(new_val, last_sum):
    return sum(new_val) + (last_sum or 0)

In [25]:
def transforma_a_segundos(dias=None, horas=None):
    if dias != None:
        return dias * 24 * 60 * 60
    if horas != None:
        return  horas * 60 * 60
    else: 
        pass

### Saber en tiempo real cuantas veces pasa un vehículo por cada punto del sistema desde el día que se pone en marcha el mecanismo.  
Incluir las celdas con las respuestas a continuación

In [78]:
sc = SparkContext('local[*]', appName="ejercicio2")
ssc = StreamingContext(sc, 5)

In [81]:
# To maintain status info we need to use checkpoints.
checkpointDir = toUri('/tmp/spark') 
ssc.checkpoint(checkpointDir)  # create dir for checkpoint archives

lines = ssc.socketTextStream("localhost", 9876)
counts = lines.map(lambda line: (line, 1))\
              .updateStateByKey(update_func)

counts.pprint()

In [82]:
ssc.start()

-------------------------------------------
Time: 2021-05-06 10:43:20
-------------------------------------------

-------------------------------------------
Time: 2021-05-06 10:43:25
-------------------------------------------
('1234UUU A7-KM-15', 1)

-------------------------------------------
Time: 2021-05-06 10:43:30
-------------------------------------------
('1234AAA A7-KM-50', 1)
('1234EEE A7-KM-15', 2)
('1234UUU A7-KM-15', 1)
('1234AAA A7-KM-01', 1)

-------------------------------------------
Time: 2021-05-06 10:43:35
-------------------------------------------
('1234AAA A7-KM-50', 1)
('1234EEE A7-KM-15', 2)
('1234UUU A7-KM-50', 2)
('1234UUU A7-KM-15', 1)
('1234AAA A7-KM-01', 2)
('1234AAA A7-KM-30', 1)

-------------------------------------------
Time: 2021-05-06 10:43:40
-------------------------------------------
('1234AAA A7-KM-50', 1)
('1234EEE A7-KM-15', 2)
('1234UUU A7-KM-50', 3)
('1234UUU A7-KM-01', 1)
('1234UUU A7-KM-15', 2)
('1234AAA A7-KM-01', 2)
('1234AAA A7-KM-30

In [83]:
ssc.stop()
sc.stop()

Esta app captura matricula y punto en un mismo string; así puedes saber cuando cada matricula pasa por cada punto del sistema y cuantas veces pasa por el mismo punto. 

### Saber en tiempo real cuántas veces pasa un vehículo por un determinado punto, pero solo teniendo en cuenta los últimos 7 días de información.
Incluir las celdas con las respuestas a continuación

Las operaciones de ventana te permiten operar sobre los datos recibidos en los ultimos n segundos

In [138]:
sc = SparkContext('local[*]', appName="ejercicio2")
ssc = StreamingContext(sc, 10)

In [139]:
segs = transforma_a_segundos(dias=7)

In [140]:
ssc.checkpoint(checkpointDir)
lines = ssc.socketTextStream("localhost", 9876)
counts = lines.map(lambda line:(line, 1))\
              .window(segs, 10)\
              .updateStateByKey(update_func)
counts.pprint()

In [141]:
ssc.start()

-------------------------------------------
Time: 2021-05-06 11:06:20
-------------------------------------------
('1234AAA A7-KM-50', 1)
('1234AAA A7-KM-30', 2)
('1234UUU A7-KM-30', 1)
('1234EEE A7-KM-50', 1)

-------------------------------------------
Time: 2021-05-06 11:06:30
-------------------------------------------
('1234AAA A7-KM-50', 2)
('1234AAA A7-KM-15', 2)
('1234AAA A7-KM-30', 4)
('1234UUU A7-KM-30', 5)
('1234EEE A7-KM-50', 2)
('1234EEE A7-KM-01', 1)
('1234UUU A7-KM-15', 1)



In [142]:
ssc.stop()
sc.stop()

Crea un nuevo DStream que se computa basándose en la ventana que le hemos dado (7 días). 

### Elige una de las matrículas que se genera y realiza una vigilancia de este vehículo, queremos saber cuantas veces a sido detectado en cada punto de control durante la última hora.  
Incluir las celdas con las respuestas a continuación

In [143]:
sc = SparkContext('local[*]',appName="ejercicio2")
ssc = StreamingContext(sc, 5)

In [144]:
segs = transforma_a_segundos(horas=1)

In [145]:
checkpointDir = toUri('/tmp/spark') 
ssc.checkpoint(checkpointDir)
lines = ssc.socketTextStream("localhost", 9876)
counts = lines.map(lambda line:(line, 1))\
              .filter(lambda line: line[0].startswith('1234AAA'))\
              .window(segs, 10)\
              .updateStateByKey(update_func)
counts.pprint()

In [146]:
ssc.start()

-------------------------------------------
Time: 2021-05-06 11:07:00
-------------------------------------------

-------------------------------------------
Time: 2021-05-06 11:07:10
-------------------------------------------
('1234AAA A7-KM-50', 1)
('1234AAA A7-KM-30', 1)

-------------------------------------------
Time: 2021-05-06 11:07:20
-------------------------------------------
('1234AAA A7-KM-50', 4)
('1234AAA A7-KM-15', 1)
('1234AAA A7-KM-30', 4)
('1234AAA A7-KM-01', 1)

-------------------------------------------
Time: 2021-05-06 11:07:30
-------------------------------------------
('1234AAA A7-KM-50', 7)
('1234AAA A7-KM-15', 3)
('1234AAA A7-KM-30', 7)
('1234AAA A7-KM-01', 2)



In [147]:
ssc.stop()
sc.stop()

Esta app filtra solo las matriculas '1234AAA' accediendo al primer termino de la tupla ('matricula + km', numero). Con la ventana cogemos solo los datos de la última hora. 