# Spark Streaming



Spark Streaming es una extensión de la API core de Spark que habilita un procesamiento de flujos para canales de datos en vivo, con un soporte escalable, de alto rendimiento y tolerante a fallas. La ingesta de datos puede provenir de distintas fuentes como Kafka, Flume, Kinesis o sockets TCP. Y pueden ser procesados con algoritmos expresados mediante funciones de alto nivel como *map*, *reduce*, *join* y *window*. 

Finalmente, los datos procesados se pueden guardar en un sistema de archivos, bases de datos y dashboards en vivo. También se dispone la capacidad de usar los algoritmos de Spark para Machine Learning y procesamiento de Grafos sobre los flujos de datos.


![Spark Streaming](https://spark.apache.org/docs/latest/img/streaming-arch.png)

Internamente, Spark Streaming recibe datos de canales en vivo y los divide en series (batches), que son procesados por el motor de Spark para generar el flujo final en forma de series finales.

![Spark Streaming internals](https://spark.apache.org/docs/latest/img/streaming-flow.png)


Spark Streaming provee una abstracción de alto nivel llamada *discretized stream* o *DStream*, que representa una corritente continua de datos. Los DStream, se pueden crear a partir de flujos de datos como Kafka, Flume y Kinesis, o después de aplicarse operaciones de alto nivel sobre otros DStream. De forma interna, un DStream se representa como una secuencia de RDDs.

## Ejemplo

Importamos StreamingContext, que es el acceso para todas las funcionalidades de Spark Streaming. Creamos una instancia del objeto con dos hilos de ejecución, y a 10 segundos como intervalo para la creación del batch.

In [1]:
from pyspark import SparkContext
# https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext
from pyspark.streaming import StreamingContext

In [2]:
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 10)

Usando el contecto anterior, creamos un DStream que representa el flujo de datos de una fuente TCP (socket), se especifica un hostname y un puerto.

In [3]:
lines = ssc.socketTextStream("localhost", 9999)

El DStream *lines* representa un flujo de datos que serán recibidos desde otro servidor. Cada registro en este DStream es una línea de texto. Después de recibir el registro, separaremos las palabras usando los espacios entre ellas.

![](https://spark.apache.org/docs/latest/img/streaming-dstream.png)

In [4]:
words = lines.flatMap(lambda line: line.split(" "))

*flatMap* es una operación (transformación one-to-many) para DStream que crea un nuevo objeto al generar multiples registros por cada registro en el DStream de la fuente. En este caso, cada línea será cortada para obtener multiples palabras y representarse en el DStream *words*. Después, vamos a imprimir esas palabras.

![](https://spark.apache.org/docs/latest/img/streaming-dstream-ops.png)

In [5]:
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)


wordCounts.pprint()

El DStream  *words* es transformado (*map*, one-to-one) al siguiente DStream de pares llave-valor con el siguiente formato (palabra, 1), donde se reduce para obtener la frecuencia de palabras en cada batch de datos. Finalmente, *wordCounts.pprint()* imprime el conteo generado en ese intervalo.


Debemos recordar, que aún cuando las líneas de código anteriores ya fueron ejecutadas, Spark Streaming ejecutará el cómputo hasta que el proceso sea requerido. Mientras tanto, no ha habido un procesamiento real de datos.

Para iniciar el procesamiento de datos, después de que las transformaciones fueron establecidas, llamamos a las funciones:

In [8]:
ssc.start()
ssc.awaitTermination()

Py4JJavaError: An error occurred while calling o23.start.
: java.lang.IllegalStateException: StreamingContext has already been stopped
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:608)
	at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [7]:
ssc.stop()   # 😉


sc.stop()

## Despliegue del programa de Spark 

```bash
$ /usr/local/spark/bin/spark-submit network_wordcount.py localhost 9999
```

Entrar a http://localhost:4040 para ver la interfaz de monitoreo de Spark



### Lista completa de las transformaciones para DStreams: https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams


Puntos a  recordar:
- Una vez que el contexto ha sido creado, no se pueden agregar nuevas computaciones  o modificar las existentes.
- Una vez que el contexto ha sido detenido, no puede reiniciarse.
- Solo un contexto de StreamingContext puede estar activo por cada JVM.
- stop() en StreamingContext también detiene el contexto SparkContext. Para detener solo el contexto StreamingContext,  hay que establecer el parámetro *stopSparkContext a Falso en stop().
- Una instancia de SparkContext puede ser reusada para crear multiples objetos StreamingContext, en el caso de que el contexto de streaming haya sido detenido con anterioridad y sin detener SparkContext