## Introducción
- Spark Streaming implementa la aproximación micro-batch para el procesamiento de datos masivos en tiempo real.
- Extensión del núcleo de la API Spark
    - Se utiliza la sintaxis de Spark
- Flujo de datos clásico
    - Input: Kafka, Flume, Twitter...
    - Output: HDFS, BBDD, dashboards...
- El stream de entrada se divide en micro-batches y se procesa

## Esquema básico
- Flujo de entrada -> Micro-batches(tamaño "delta", medido en segundos)
- Micro-batches -> Spark RDDs (denominado DStream)
- Cada RDD se procesa con Spark (grafo de computación)
    - Mismas transformaciones que en Spark (map, reduce, filter, join...)
    - Funcionalidad adicional de Dstream
- El resultado son micro-batches procesados
- Spark Streming RDD = Spark RDD
![03_spark_streaming_1]

## Características principales
- Capa de abstracción de alto nivel+
    - Detalles de la implementación de streaming más transparentes al usuario
- Mismo código que Spark clásico (reutilización de cualquier librería del ecosistema Spark)
    - SparkSQL, SparkML, etc
- Aproximación Micro-batch
    - Alto Rendimiento
    - Aumento de latencia (mayor retraso en el procesamiento ~ segundos)
- Uso en arquitecturas Lambda:
    - Misma arquitectura (Spark para offline, Spark Streaming para tiempo real)
    - Se reduce la utilización de distintas tecnologías
- No la mejor solución para arquitecturas Kappa
    - No proporciona métodos sencillos para re-calcular datos históricos
    
## Aplicaciones Spark Streaming
- Creación de Spark Streaming Context -> A partir de Spark Context -> Se especifica la duración del micro-batch

[03_spark_streaming_1]:images/03_spark_streaming_1.png

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext()
ssc = StreamingContext(sc, batchDuration = 10)

- Entrada de datos -> Construcción de Dstream
    - Entrada básica:
        - fileStream(): carpeta con archivos (nuevos datos = nuevos ficheros añadidos)
        - socketTextStream(): el input proviene del socket de una red (nuevos datos = datos recibidos en el socket durante el último intervalo de tiempo con duración "batchDuration")
        - queueStream(): secuencia de RDDs de Spark, cada RDD se trata como un batch simple
    - Entrada avazanzada:
        - Kafka, Flume, Kinesis, ...
        - Normalmente mediante el uso de librerías adicionales
    - Entrada personalizada:
        - Conectores ad-hoc

- Ejemplo: Input con sockets

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext()
ssc = StreamingContext(sc, batchDuration = 10)

dstream = ssc.socketTextStream("localhost", 9999)

En la consola de comandos:

- Si no existe el comando "nc" -> sudo apt-get install netcat

> nc -lk 9999 (mantiene abierto un socket en el puerto 9999 de localhost)

Se introduce el texto deseado (también se pueden utilizar pipes)

- Ejemplo: Input con Kafka

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext()
ssc = StreamingContext(sc, batchDuration = 10)

dstream = KafkaUtils.createDirectStream(
    scc,
    topics = ['my_topic']
    kafkaParams = {'metadata.broker.list':
                      'kafka.br01:9092',
                      'kafka.br02:9092',
                      'kafka.br03:9092'}

)

## Transformaciones
|Método|Funcionamiento|
|---|---|
|map(func)|Se aplica la función func a cada elemento de DStream. Devuelve un nuevo DStream|
|flatMap(func)| Similar a map, pero por cada elemento de entrada se devuelven de 0 a n elementos de salida|
|filter(func)|Nuevo DStream con los elementos del DStream original para los que func devuelva True|
|repartition(nPartitions)|Modifica el nivel de paralelismo de un DStream|
|union(otherStream)| Nuevo DStream con la unión del actual y de otherStream|
|count()| Nuevo DStream con RDDs simples con el número de elementos de cada RDD en el DStream original|
|reduce(func)| Agregación de elementos de cada RDD del DStream original para generar un nuevo DStream on RDDs simples. La función func ha de ser conmutativa y asociativa, recibir dos argumentos y evolver uno.|
|countByValue()| Sobre un DStream de elementos de tipo K, devuelve la frecuencia de cada elemento del DStream en formato (K, Long)|
|reduceByKey(func,[numTasks])|Sobre un DStream de elementos clave-valor (K,V), devuelve la agregación de todos los valores con la misma clave, según la función func. Se puede especificar el número de tareas paralelas de forma opcional (por defecto 2).|
|join(otherStream,[numTasks])|Sobre dos DStreams (original y otherStream) de clave-valor (K,V) y (K,W), devuelve un DStream (K, (V,W)), sólo para aquéllas claves que aparezcan en ambos DStreams. Número de tareas paralelas opcional.|
|cogroup(otherStream,[numTasks])|Sobre dos DStreams (original y otherStream) de clave-valor (K,V) y (K,W), devuelve un nuevo DStream con tuplas (K, Seq[V], Seq[W]), para todas las claves que aparezcan al menos en uno de los DStreams. Número de tareas paralelas opcional.|

- Salida de datos
    - dstream.pprint(): salida estándar (stout)
    - dstream.saveAsTextFiles(): salida a almacenamiento externo (HDFS, AmazonS3, ...)
    - dstream.foreachRDD(): salida manual personalizada (por ejemplo, a un almacenamiento clave-valor externo)
        - Opción más utilizada
        - La semántica ha de ser proporcionada por el desarrollador
- Funciones para la ejecución del programa
    - scc.start(): comienza los cálculos sin bloquear la ejecución del programa
    - ssc.awaitTermination(): No permite la finalización prematura del porograma ( por ejemplo, si lo ejecutamos desde un sript Python)
    
- Ejecución desde la consola de comandos:
    - Iniciar el flujo de datos (desde socker, kafka, etc.)
    > spark-submit streaming_application.py
    - Comenzar a enviar datos
    - Finalización con Control-C
    
- Ejecución desde iPython o desde notebooks
    - Iniciar el flujo de datos (desde socket, kafka, etc)
    - Ejecutar el código (sin la función awaitTermination())
    - Comenzar a enviar datos
    - Ejecutar la función ssc.stop()
    
- Funcionalidades de Spark Streaming:
    - dstream.window(...)
        - Aproximación basada en Windowing
        - Transformaciones sobre una ventana deslizante de los datos
        - Características
            - Tamaño (size)
            - Desplazamiento (shift)
            - El desplazamiento ha de ser múltiplo de la duración del batch, y el tamaño múltiplo del desplazamiento
            - Ejemplo: batch=1 seg, shift = 2 seg, size = 5 seg

Paso 1
![03_spark_streaming_2]

Paso 2
![03_spark_streaming_3]

Paso 3
![03_spark_streaming_4]

Paso 4
![03_spark_streaming_5]

- Transformaciones en ventanas:

|Método| Funcionamiento|
|---|---|
|window(windowLength,slideInterval)|Nuevo DStream calculado según batches en ventanas del DStream original,tamaño de ventana windowLength y salto entre ventanas slideInterval|
|countByWindow(windowLength, slideInterval)|Devuelve el conteo de elementos de la ventana deslizante definida con windowLength y slideInterval, sobre el DStream original|
|reduceByWindow(func,windowLength,slideInterval)|Similar a reduce, permite la agregación de elementos de cada RDD del DStream original para generar un nuevo DStream con RDDs simples, sobre los batches de la ventana deslizante definida con windowLength y slideInterval. La función func ha de ser conmutativa y asociativa, recibir dos argumentos y devolver uno.|
|educeByKeyAndWindow(func, windowLength,slideInterval, [numTasks])|Similar a reduceByKey, sobre un stream original con pares clave-valor (K,V),devuelve el agregado según la función func para cada clave, sobre los batches de la ventana deslizante definida con windowLength y slideInterval. Número de tareas paralelas opcional.|
|countByValueAndWindow(windowLength,slideInterval, [numTasks])|Sobre un DStream de pares clave-valor (K,V), devuelve la frecuencia de cada clave en formato (K, Long), sobre los batches de la ventana deslizante definida con windowLength y slideInterval. Número de tareas paralelas opcional.|

[03_spark_streaming_2]:images/03_spark_streaming_2.png
[03_spark_streaming_3]:images/03_spark_streaming_3.png
[03_spark_streaming_4]:images/03_spark_streaming_4.png
[03_spark_streaming_5]:images/03_spark_streaming_5.png

- Transformaciones stateful:
    - dstream.updateStateByKey(upadate_func, ...)
        - Almacenamiento de pares clave-valor y actualización con nuevos datos -> Cálculos stateful
        - Se ha de definir un directorio para el checkpoint
        - Se ha de especificar la función para actualizar los datos
            - La función recibe los nuevos valores (de cada clave), así como el estado anterior (si existe)
            - Devuelve un nuevo valor, del mismo tipo que el estado guardado

- Transformaciones stateful (ejemplo): ocurrencias de palabras guardando el total de apariciones de cada una:

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext()
ssc = StreamingContext(sc, 5)
ssc.checkpoint("checkpoint")

lines = ssc.socketTextStream("localhost", 9999)

# Recibe la lista de nuevos valores para cada key, y último estado guardado
def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

running_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).updateStateByKey(updateFunc)
running_counts.pprint()

context.start()
context.awaitTermination()

- Checkpointing
    - Recuperación del estado del sistema después de un fallo -> procesamiento de datos desde el momento del fallo
    - Se almacena información sobre el estado del sistema (metadatos o RDDs) en un almacenamiento tolerante a fallos (por ejemplo, HDFS)
    - Backup para regresar al último punto estable
    - Permite restaurar valores de operaciones internas stateful como updateStateByKey
    - Creación periódica -> Definición manual del intervalo
        - Cuanto más pequeño, se reduce el rendimiento
        - Recomendado 5-10 batchDurations
    - El uso de checkpoints no soporta actualizaciones de código
        - Para elo habría que correr dos instancias de SparkStreaming (una antigua y una nueva) -> recursos limitados, pérdida de estados internos, etc.
        - O almacenar manualmente los datos antiguos antes de inicializar la nueva versión -> Se pierden las ventajas del checkpointing
    - Ejemplo:

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

checkpoint_directory = '/hdfs://path/to/cp' # Definimos la ruta de almacenamiento

def createContext(): # La lógica de la aplicación se incluye en una función
    sc = SparkContext(...)
    ssc = StreamingContext(...)
    dstream = ... # Lógica de la aplicación
    
    ...
    
    ssc.checkpoint(checkpoint_directory) # Se almacenan los datos
    return ssc

# La función getOrCreate genera el contexto si se trata de la primera ejecución, 
# o recupera los datos de checkpoint en caso contratrio
context = StreamingContext.getOrCreate(checkpoint_directory, get_context)
context.start()
context.awaitTermination()

## Semántica de entrega y tolerancia a fallos en Spark Streaming
- ¿Qué semántica de entrega necesito asegurar?
- ¿Cómo puedo proporcionarla?
- Depende de todos los elementos del pipeline de streaming y de las interacciones entre ellos
- Siempre se solicitará semántica exactly-once
    - Muy difícil de proporcionar (y no siempre se necesita)
    - Se necesita conseguir en la captura de datos, en su procesamiento y en su almacenamiento
    - Spark Streaming -> exactly-once únicamente en sus cálculos internos durante la fase de procesamiento
- Recepción de datos
    - Semántica de entrega dependientes de las fuentes (desde at-least-once a exactly-once)
- Transformación de datos
    - La semántica de entrega interna en DStreams y RDDs es siempre exactly-once
- Salida de datos
    - Normalmente (por ejemplo, en la función foreachRDD) se asegura la salida at-least-once. La semántica exactly-once ha de ser implementada por los usuarios
- Estrategias adicionales: Write-ahead Log (WAL)
    - Permite realizar checkpointing sin pérdida de datos, aunque la fuente de datos de entrada no permita su re-envío (asegura at-leat-once para cualquier fuente de datos)
    - Para ello, se almacenan todos los datos recibidos en un sistema de ficheros tolerante a fallos
        - Sólo se considera leído el evento cuando se ha escrito en WAL
        - La aplicación lee los datos de WAL en lugar de la fuente
    - En SparkStreaming
        - Uso de chekpoint directoy -> WAL se almacena también
        - En los parámetros de configuración que se pueden definir al crear SparkContext hay que especificar "spark.streaming.receiver.WriteAheadLog.enable = True"
        - Se evita la pérdida de datos
        - Se aumenta el tiempo de procesamiento de cada batcha
        > conf.set("spark.streaming.receiver.WriteAheadLog.enable", "True")
        >
        > sc = SparkContext(conf)
        >
        > ssc = StreamingContext(sc, 1)