# Contar palabras con Structured Streaming

In [1]:
%%writefile words.txt
value
A B C A
A B A A

Writing words.txt


In [2]:
##
## Se copia el archivo al HDFS
##
!hdfs dfs -rm -f /tmp/words.txt
!hdfs dfs -copyFromLocal words.txt /tmp/words.txt

In [3]:
##
## Se inicia la aplicación en PySpark
##
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

sparkConf = SparkConf().setAppName("My SparkQL Application")
sc = SparkContext(conf=sparkConf)
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
##
## Se lee el archivo del hdfs en formato CSV.
## Cada fila del DataFrame es un renglón del archivo
##
df = spark.read.load(
    "/tmp/words.txt",
    format="csv",
    sep=",",
    inferSchema="true",
    header="true")

df.show()

+-------+
|  value|
+-------+
|A B C A|
|A B A A|
+-------+



In [5]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

##
## La función split parte cada línea de texto por los espacios en
## blanco, retornando un vector; por ejemplo, para la primera
## línea retorna ['A', 'B', 'C', 'A']. Seguidamente, la función
## explode genera un registro por cada elemento del vector, tal
## como se muestra a continuación.
##
words = df.select(
   explode(
       split(df.value, " ")
   ).alias("word")
)

words.show()

+----+
|word|
+----+
|   A|
|   B|
|   C|
|   A|
|   A|
|   B|
|   A|
|   A|
+----+



In [6]:
##
## Para realizar el conteo propiamente, se realizar un
## groupBy por letra, y se cuenta la cantidad de registros
## por grupo usando la función `count`.
##
wordCounts = words.groupBy("word").count()
wordCounts.show()

+----+-----+
|word|count|
+----+-----+
|   B|    2|
|   C|    1|
|   A|    5|
+----+-----+



## Ahora vamos a usar Spark Streaming
Nótese que escribimos un fichero Python a ejecutar fuera de Jupyter. Necesitaremos ejecutar algunas cosas desde un terminal. Jupyter nos lo permite, desde una pestaña nueva. 

In [7]:
%%writefile wc-pyspark.py

## Identico ------>>>

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

## <<<------


##
## Los datos se leen desde un flujo de entrada en vez de un archivo
## en disco. Para ello, se crea un Dstream de entrada que representa las líneas
## de texto de entrada, las cuales son leídas desde una conexión a
## localhost:9999. El Dstream puede considerarse como un DataFrame
## infinito, donde los nuevos datos se van adicionando al final.
##
df = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()


## Identico ------>>>

words = df.select(
   explode(
       split(df.value, " ")
   ).alias("word")
)

wordCounts = words.groupBy("word").count()

## <<<------


##
## Crea un Dstream de salida a la consola, en la que se van
## escribiendo los resultados a medida que se van ingresando
## datos.
##
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

Writing wc-pyspark.py


Ahora abramos DOS pestañas terminal. 

En una de ellas ejecutamos "nc -lk 9999". Esto lanza un servidor de red en el puerto 9999. El servidor queda a la espera de conexiones (-l, listening). Cuando una conexión se cierra, queda a la espera de nuevas (-k).

En la otra pestaña terminal lanzamos nuestra aplicación Spark Streaming. La misma se va a conectar a nuestro servidor, desde donde recibirá datos a procesar. 

Una vez lanzadas las dos partes, todo lo que escribamos en la primera pestaña (donde ejecutamos "nc") será procesado por la aplicación. Lo que escribamos será una secuencia de palabras. La aplicación Spark contará las instancias. En cada batch, se actualizará el conteo. Inicialmente aparecerá una lista vacía. 

Esto es lo que tecleamos en una pestaña:
    
```sh
root@24730d8dd172:/workspace# nc -lk 9999
A B C A
A B A A
El perro de san roque no tiene rabo porque es
perro y el rabo se lo han cortado
pobre san roque y su perro sin rabo
```

Y esto es (un extracto) de lo que vemos en la otra:
```sh
...
|     se|    1|
|     El|    1|
|     lo|    1|
+-------+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+-------+-----+
|   word|count|
+-------+-----+
|    san|    2|
|  tiene|    1|
| porque|    1|
|      B|    2|
|  roque|    2|
|  perro|    3|
|     de|    1|
|   rabo|    3|
|     es|    1|
|      C|    1|
|     el|    1|
|     su|    1|
|      A|    5|
|      y|    2|
|cortado|    1|
|    han|    1|
|  pobre|    1|
|     no|    1|
|    sin|    1|
|     se|    1|
+-------+-----+
only showing top 20 rows
```

La aplicación hace lo siguiente:
    
1. Crea una SparkSession
2. Crea un Dstream "df", leyendo de un stream. El tipo de Dstream es "socket", es decir, se lee desde un servidor de red. La conexión queda abierta permanentemente hasta que se cierre el Dstream. "df" está en actualización permanente.
3. El Dstream "df" se procesa exactamente igual que en el caso batch: cada línea se separa en palabras usando el espacio como separador. Por cada línea se devuelve una lista de palabras (explode). La lista de palabras se guarda en "words". Como "words" está asociado a "df", su actualización también es permanente.
4. Igual que en el caso batch, se agrupan las palabras iguales, generando "wordCounts" -- que también se actualiza de forma permanente.
5. Se crea una salida "query" de tipo Dstream, permanente, por la consola. 
    
Todo queda en marcha, hasta que termina la operación "query".

Spark Streaming utiliza varias fuentes de entrada (source):

1. Socket. Se lee de un puerto de red.
2. File. Se monitoriza un directorio. Por cada fichero que se añade, se actualiza el stream.
3. Kafka. Se lee desde Kafka.
4. Rate. Se generan datos a una velocidad de N filas/segundo. Cada campo tiene un timestamp y un valor. 

En cuanto a las salidas, se puede especificar el modo:
    
1. Complete. Cada vez que hay una actualización, se escribe el DataFrame completo
2. Append. Se escriben las filas nuevas. Esto es útil si las anteriores ya no cambian. 
3. Update. Se escriben las filas que han cambiado. 

Y también se puede especificar hacia dónde se envía (sink):

1. File. Hay que indicar el formato: csv, orc, json, parquet
2. Kafka
3. Console -- solo para depuración
4. Custom

En cuanto a cuándo se ejecuta un procesamiento, se puede indicar:

1. Unspecified. Por omisión. Micro-lotes, que se ejecutan cuando el anterior ha terminado. Un micro-lote es un conjunto de datos que llegan a la vez. 
2. Fixed interval micro-batches. Se especifica el tiempo que dura cada micro-batch (por ejemplo, 2 segundos)
3. One-time micro-batch. Se ejecuta una vez -- habrá algún proceso externo que lo lance
4. Continuous. Nuevo, experimental. Se usa para emular un proceso continuo.