
### Programación de Spark Streaming

Spark Streaming es una extensión de la API principal de Spark que permite el procesamiento de secuencias escalables, de alto rendimiento y tolerantes a fallos de transmisiones de datos en vivo. Los datos se pueden ingerir de muchas fuentes como Kafka, Kinesis o sockets TCP, y se pueden procesar utilizando algoritmos complejos expresados con funciones de alto nivel como , , y . Finalmente, los datos procesados se pueden enviar a sistemas de archivos, bases de datos y paneles en vivo. De hecho, puede aplicar los algoritmos de aprendizaje automático y procesamiento de gráficos de Spark en los flujos de datos.mapreducejoinwindow

![imgen](https://spark.apache.org/docs/latest/img/streaming-arch.png)

Internamente, funciona de la siguiente manera. Spark Streaming recibe flujos de datos de entrada en vivo y divide los datos en lotes, que luego son procesados por el motor Spark para generar el flujo final de resultados en lotes.

![imagen](https://spark.apache.org/docs/latest/img/streaming-flow.png)

Spark Streaming proporciona una abstracción de alto nivel llamada flujo discretizado o DStream, que representa un flujo continuo de datos. Los DStreams se pueden crear a partir de flujos de datos de entrada de fuentes como Kafka y Kinesis, o mediante la aplicación de operaciones de alto nivel en otros DStreams. Internamente, un DStream se representa como una secuencia de RDD.

Esta guía le muestra cómo comenzar a escribir programas de Spark Streaming con DStreams. Puede escribir programas de Spark Streaming en Scala, Java o Python (introducidos en Spark 1.2).



## Un ejemplo rápido
Antes de entrar en los detalles de cómo escribir su propio programa Spark Streaming, echemos un vistazo rápido a cómo se ve un simple programa Spark Streaming. Digamos que queremos contar el número de palabras en los datos de texto recibidos de un servidor de datos que escucha en un socket TCP.

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

Primero, importamos StreamingContext, que es el punto de entrada principal para toda la funcionalidad de transmisión. Creamos un StreamingContext local con dos subprocesos de ejecución e intervalo por lotes de 1 segundo.

El parámetro es un nombre para que la aplicación se muestre en la interfaz de usuario del clúster. es una URL de clúster spark, mesos o YARN, o una cadena especial "local[*]" para ejecutarse en modo local. En la práctica, cuando se ejecuta en un clúster, no querrá codificar en el programa, sino más bien iniciar la aplicación con spark-submit y recibirla allí. Sin embargo, para las pruebas locales y las pruebas unitarias, puede pasar "local[*]" para ejecutar Spark Streaming en proceso (detecta el número de núcleos en el sistema local).appNamemastermaster

El intervalo por lotes debe establecerse en función de los requisitos de latencia de la aplicación y los recursos de clúster disponibles. Consulte la sección Ajuste del rendimiento para obtener más detalles.

![image](https://miro.medium.com/max/786/1*p3U__F7ztdmvQR99Hg1-Vw.png)

Entonces, los datos comenzarían a verterse en un flujo en lotes, este flujo continuo de datos se llama DStream. Cada lote de dsteam contendría una colección de elementos que se pueden procesar en paralelo, esta colección se llama RDD.


In [None]:
# Create a local StreamingContext with two working thread and batch interval of 10 seconds
sc = SparkContext("local[2]", "Contarpalabras")
ssc = StreamingContext(sc, 10)

### Comience a transmitir los datos
Para recibir datos, el contexto de transmisión proporciona un método para transmitir datos desde una conexión de socket TCP o desde archivos como orígenes de entrada. Las fuentes pueden ser fuentes como HDFS, S3, etc. Para leer archivos de texto, existe el método textFileStream de javastreamingcontext.

Pero no podrá leer los archivos ya presentes en el directorio antes de que comience el contexto de transmisión, ya que solo lee los archivos recién creados.

Así que aquí transmitiré los datos a través de la conexión de socket a través del puerto 9999 y crearé un receptor java de entrada DStream con él.

In [None]:
lines = ssc.socketTextStream('localhost', 9999)

Así que ahora, si establece una conexión de socket y escribe algo en el terminal, y ejecuta el dstream, verá que el texto aparece en la consola.

Nota: Para iniciar un contexto de transmisión java, debemos decirle a spark que lo inicie, esperar a que finalice el cálculo y luego detenerlo. Y necesitamos imprimir el DStream por el método pprint().

Ejecute en el cmd python server_console.py

In [None]:
#NO No ejecutar cuando pase a la siguiente celda
lines.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()

Observe cuando imprime la salida en el tiempo t1, pero no se imprime ninguna salida en el tiempo t2 y t3, ya que obtiene datos para cada 10 segundos. En los siguientes intervalos de lote, no recibió ninguna entrada, por lo que no imprime nada.

Ahora te mostraré cómo podemos usar algunas transformaciones en estos dstreams usando funciones lanbda.

### Transformación  map

La transformación de mapas aplica la función que especificamos en el DStream y produce un valor de salida para cada valor de entrada. Así que básicamente transforma un flujo en otro. Al igual que aquí, quiero contar la longitud de la línea de texto, así que usaré la transformación de mapas para ello.

In [None]:
#NO No ejecutar cuando pase a la siguiente celda
length = lines.map(lambda x: len(x))
length.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()

### Transformación de FlatMap

La transformación FlatMap aplica la función en DStream, pero puede producir uno o más valores de salida para cada valor de entrada. Entonces, si quiero transformar el RDD de tal manera que produzca más de un valor, usaré la transformación FlatMap.

Así que aquí le di a la entrada una línea de texto 'hola cómo estás' y quiero dividirla en palabras individuales. Utilicé la función lambda para lo mismo. Una transformación FlatMap devuelve un número arbitrario de valores que depende del rdd y la función aplicada, por lo que el tipo devuelto tiene que ser un flujo de valores.

In [None]:
palabras = lines.flatMap(lambda x: x.split(" "))
palabras.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()

### Transformación de filtros

La transformación del filtro filtra el DStream de acuerdo con la función dada. Al igual que después de la transformación de flatMap, digamos que quiero filtrar las lineas que empiezan en A del flujo de palabras.



In [None]:
output = lines.filter(lambda x: x.startswith('A'))
output.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()

Una vez definido un contexto, debe hacer lo siguiente.

Defina las fuentes de entrada creando DStreams de entrada.
Defina los cálculos de streaming aplicando operaciones de transformación y salida a DStreams.
Comience a recibir datos y procesarlos utilizando .streamingContext.start()
Espere a que se detenga el procesamiento (manualmente o debido a cualquier error) utilizando .streamingContext.awaitTermination()
El procesamiento se puede detener manualmente utilizando .streamingContext.stop()


Usando este contexto, podemos crear un DStream que represente la transmisión de datos desde una fuente TCP, especificada como nombre de host ("127.0.0.1" ) y puerto (65432 ). 127.0.0.1:65432

In [None]:
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("127.0.0.1", 65432)
#lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12020)

Este DStream representa el flujo de datos que se recibirá del servidor de datos. Cada registro de este DStream es una línea de texto. A continuación, queremos dividir las líneas por espacio en palabras.

In [None]:
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

flatMap es una operación DStream de uno a varios que crea un nuevo DStream generando varios registros nuevos a partir de cada registro en el DStream de origen. En este caso, cada línea se dividirá en varias palabras y el flujo de palabras se representará como DStream. A continuación, queremos contar estas palabras.words

In [None]:
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

El DStream se asigna aún más (transformación uno a uno) a un DStream de pares, que luego se reduce para obtener la frecuencia de las palabras en cada lote de datos. Por último, imprimirá algunos de los recuentos generados cada segundo.words(word, 1)wordCounts.print()

Tenga en cuenta que cuando se ejecutan estas líneas, Spark Streaming solo configura el cálculo que realizará cuando se inicie y aún no se ha iniciado ningún procesamiento real. Para iniciar el procesamiento después de que se hayan configurado todas las transformaciones, finalmente llamamos

In [None]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

Cree una sesion cmd y ejecute python server_send.py, para leer el libro de Don Quijote y enviar las lineas a Stream de Spark

In [None]:
Puntos a recordar:
Una vez que se ha iniciado un contexto, no se pueden configurar ni agregar nuevos cálculos de transmisión.
Una vez que se ha detenido un contexto, no se puede reiniciar.
Solo un StreamingContext puede estar activo en una JVM al mismo tiempo.

stop() en StreamingContext también detiene SparkContext. Para detener solo StreamingContext, establezca el parámetro opcional de called en false.stop()stopSparkContext
Un SparkContext se puede reutilizar para crear varios StreamingContexts, siempre y cuando el StreamingContext anterior se detenga (sin detener el SparkContext) antes de que se cree el siguiente StreamingContext.
