# Tecnologías de Almacenamiento

## Tema 6. Apache Spark. Spark Streaming

Este notebook incluye el código de ejemplo del manual del módulo

Usamos el contenedor jupyter/all-spark-notebook
```
docker run --name spark-stack -p 10000:8888 -p 4040:4040 jupyter/all-spark-notebook
```

Ejecutamos con el kernel de Scala: Spylon-kernel

Antes de empezar creamos un socket en el puerto 9999 de nuestro contenedor:
```
docker exec -it spark-stack nc -l 9999
```
No mates el proceso, se quedará bloqueado el shell

(acg)

### 2.1 Structured Streaming

In [1]:
import org.apache.spark.sql.Row 
import spark.implicits._

Intitializing Scala interpreter ...

Spark Web UI available at http://16a66b851a51:4040
SparkContext available as 'sc' (version = 3.2.0, master = local[*], app id = local-1636196821567)
SparkSession available as 'spark'


import org.apache.spark.sql.Row
import spark.implicits._


In [2]:
val lineas = spark.readStream
                  .format("socket")
                  .option("host", "localhost")
                  .option("port", 9999)
                  .load() 

lineas.isStreaming

lineas: org.apache.spark.sql.DataFrame = [value: string]
res0: Boolean = true


In [3]:
lineas.printSchema

root
 |-- value: string (nullable = true)



In [4]:
val palabras = lineas.as[String].flatMap(_.split(" "))

val numPalabras = palabras.groupBy("value").count()


palabras: org.apache.spark.sql.Dataset[String] = [value: string]
numPalabras: org.apache.spark.sql.DataFrame = [value: string, count: bigint]


La siguiente query se va a quedar escuchando (y refrescando resultados) durante 30 segundos (30000 msecs).

Acuerdate de ir al shell donde lanzaste el netcat (nc) y escribir varias palabras:

```
(master_big_data) acg@MSI ~ $ docker exec -it spark-stack nc -l 9999
Hola
Hola otra vez
donde esta la otra casa
ve a casa

```

In [6]:
val query = numPalabras.writeStream
                       .outputMode("update")
                       .format("console")
                       .start() 

query.awaitTermination(30000)


-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| Hola|    1|
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|  Hola|    2|
|master|    1|
|    es|    1|
|    el|    1|
|  data|    1|
|  esto|    1|
|   big|    1|
+------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|   de|    1|
| UEMC|    1|
|   la|    1|
+-----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| Hola|    3|
+-----+-----+



query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@472b8ee3
res3: Boolean = false


In [7]:
query.stop()

In [8]:
query.isActive

res5: Boolean = false


Fuentes

In [2]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._

val schema = new StructType()
      .add("Nombre",StringType)
      .add("Edad",IntegerType)
      .add("Genero",StringType)

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Nombre,StringType,true), StructField(Edad,IntegerType,true), StructField(Genero,StringType,true))


Asegurate de subir el fichero de personajes a un **directorio**, el Stream debe recibir un directorio.

Para hacerlo en nuestro contenedor de Docker
```
docker exec -it spark-stack mkdir todos_personajes
docker cp ./strangersCharacters.txt spark-stack:/home/jovyan/todos_personajes/strangersCharacters.txt

```


In [3]:
val personasDF = spark
                .readStream
                .schema(schema)
                .csv("todos_personajes/")


personasDF: org.apache.spark.sql.DataFrame = [Nombre: string, Edad: int ... 1 more field]


#### Sumideros

In [4]:
val mujeres = personasDF.filter("Genero == 'mujer'")

mujeres: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Nombre: string, Edad: int ... 1 more field]


In [5]:
val query = mujeres.writeStream
  .outputMode("append")
  .format("console")
  .start

query.awaitTermination(30000)

-------------------------------------------
Batch: 0
-------------------------------------------
+------------+----+------+
|      Nombre|Edad|Genero|
+------------+----+------+
|    Catwoman|  32| mujer|
|ScarletWitch|  28| mujer|
+------------+----+------+



query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@196d9c1
res0: Boolean = false


Repetimos ahora con fichero json como sumidero para el filtro de hombres

In [11]:
val personasDF = spark
                .readStream
                .schema(schema)
                .csv("todos_personajes/")


personasDF: org.apache.spark.sql.DataFrame = [Nombre: string, Edad: int ... 1 more field]


In [12]:
val hombres = personasDF.filter("Genero == 'hombre'")

hombres: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Nombre: string, Edad: int ... 1 more field]


In [13]:
val query = hombres.writeStream.outputMode("append")
            .format("json")
            .option("path", "./hombres_output")
            .option("checkpointLocation", "./hombres_output")
            .start() 
query.awaitTermination(30000)


query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@62bbe58c
res3: Boolean = false


Veamos que ha generado

In [17]:
!ls hombres_output

commits   part-00000-da91256b-cf28-43b2-80ca-390c28c9702f-c000.json
metadata  sources
offsets   _spark_metadata



In [18]:
!cat hombres_output/part-00000-da91256b-cf28-43b2-80ca-390c28c9702f-c000.json

{"Nombre":"Lobezno","Edad":125,"Genero":"hombre"}
{"Nombre":"Batman","Edad":43,"Genero":"hombre"}



### 2.2 Ventanas de tiempo y watermark

In [19]:
import java.sql.Timestamp 
import org.apache.spark.sql.Row 
import spark.implicits._

val lineas = spark.readStream
                  .format("socket")
                  .option("host", "localhost")
                  .option("port", 9999)
                  .option("includeTimestamp", true)
                  .load()


import java.sql.Timestamp
import org.apache.spark.sql.Row
import spark.implicits._
lineas: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]


In [20]:
val palabras = lineas.as[(String, Timestamp)].flatMap(line =>
                                                        line._1.split(" ").map(word => (word, line._2))
                                                      ).toDF("palabra", "timestamp")


palabras: org.apache.spark.sql.DataFrame = [palabra: string, timestamp: timestamp]


In [21]:
val windowedCounts = palabras.groupBy(window($"timestamp", "10 seconds", "5 seconds"), $"palabra")
                             .count()


windowedCounts: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, palabra: string ... 1 more field]


Después de ejecutar el siguiente bloque (que se quedara ejecutando), vuelve a ir a la ventana del netcat a escribir palabras para enviarlas como streams a la query:
```
(master_big_data) acg@MSI ~ $ docker exec -it spark-stack nc -l 9999
Hola
Como estas
donde estan las llaves
estan en el fondo del mar
```

In [22]:
val query = windowedCounts.writeStream
                          .outputMode("update")
                          .option("truncate", false)
                          .format("console")
                          .start()

query.awaitTermination(60000)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------+-----+
|window|palabra|count|
+------+-------+-----+
+------+-------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-------+-----+
|window                                    |palabra|count|
+------------------------------------------+-------+-----+
|{2021-11-06 11:12:20, 2021-11-06 11:12:30}|Hola   |1    |
|{2021-11-06 11:12:15, 2021-11-06 11:12:25}|Hola   |1    |
+------------------------------------------+-------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+---------+-----+
|window                                    |palabra  |count|
+------------------------------------------+---------+-----+
|{2021-11-06 11:12:25, 2021-11-06 11:12:35}|es       |1    |
|{2021-11-06 11:1

query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4a65ba76
res4: Boolean = false


#### watermarking

In [23]:
val windowedCounts = palabras.withWatermark("timestamp", "15 seconds")
                             .groupBy(window($"timestamp", "10 seconds", "5 seconds"),$"palabra")
                             .count()


windowedCounts: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, palabra: string ... 1 more field]


Recuerda de nuevo escribir palabras en el socket cuando ejecutes el siguiente bloque:
```
(master_big_data) acg@MSI ~ $ docker exec -it spark-stack nc -l 9999
aaa
bbb
ccc
bbb

```

In [24]:
val query = windowedCounts.writeStream
                          .outputMode("update")
                          .option("truncate", false)
                          .format("console")
                          .start() 

query.awaitTermination(1000)


query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1dcb6d5f
res5: Boolean = false


-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------+-----+
|window|palabra|count|
+------+-------+-----+
+------+-------+-----+

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+-------+-----+
|window                                    |palabra|count|
+------------------------------------------+-------+-----+
|{2021-11-06 11:13:30, 2021-11-06 11:13:40}|aaaa   |1    |
|{2021-11-06 11:13:25, 2021-11-06 11:13:35}|aaaa   |1    |
+------------------------------------------+-------+-----+

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-------+-----+
|window                                    |palabra|count|
+------------------------------------------+-------+-----+
|{2021-11-06 11:13:30, 2021-11-06 11:13:40}|bbbb   |1    |
|{2021-11-06 11:13:25, 20

### 2.3 Spark Streaming y DStream

In [25]:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._


In [26]:
val ssc = new StreamingContext(sc, Seconds(15))

ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@c0d0110


De nuevo, una vez lanzado el comando, escribimos palabras en el socket:
```
(master_big_data) acg@MSI ~ $ docker exec -it spark-stack nc -l 9999
aaaa
bbbb
cccc
Hola
Vamos
Master de Big Data
Master de Movilidad
```

In [None]:
val lines = ssc.socketTextStream("localhost", 9999)

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey((a,b) => a+b) 

wordCounts.print()

ssc.start() 
ssc.awaitTerminationOrTimeout(45000)


Si hemos iniciado el streaming arriba (ssc.start()), ya no nos va a dejar añadir este nuevo calculo, se puede volver a reiniciar el contexto y ejecutar ahora este bloque en vez del anterior (la principal differencia es el watermark de este en 15 segundos

In [27]:
val lines = ssc.socketTextStream("localhost", 9998)
val linesWindow = lines.window(Seconds(15))

val words = linesWindow.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey((a,b) => a+b) 
wordCounts.print()

ssc.start()
ssc.awaitTerminationOrTimeout(60000)


-------------------------------------------
Batch: 13
-------------------------------------------
+------------------------------------------+-------+-----+
|window                                    |palabra|count|
+------------------------------------------+-------+-----+
|{2021-11-06 11:14:50, 2021-11-06 11:15:00}|Hola   |1    |
|{2021-11-06 11:14:55, 2021-11-06 11:15:05}|Hola   |1    |
+------------------------------------------+-------+-----+

-------------------------------------------
Time: 1636197300000 ms
-------------------------------------------

-------------------------------------------
Batch: 14
-------------------------------------------
+------------------------------------------+-------+-----+
|window                                    |palabra|count|
+------------------------------------------+-------+-----+
|{2021-11-06 11:15:00, 2021-11-06 11:15:10}|socket |1    |
|{2021-11-06 11:14:55, 2021-11-06 11:15:05}|el     |1    |
|{2021-11-06 11:15:00, 2021-11-06 11:15:10

lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@2c7566bb
linesWindow: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.WindowedDStream@47b00120
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@1dbe78bd
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@7f1511d8
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@4c97e228
res6: Boolean = false


-------------------------------------------
Time: 1636197360000 ms
-------------------------------------------

-------------------------------------------
Time: 1636197375000 ms
-------------------------------------------

