#**Structured Streaming**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz       
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time
spark = SparkSession.builder.master("local[*]").getOrCreate()

## Fuente de datos 
Rate source (sólo para testing) - Esta fuente de datos genera una cierta cantidad de registros por segundo, cada registro contiene 2 atributos (timestamp y value). El atributo timestamp es una marca de tiempo que registra el momento en el cual el registro es generado. Value es un atributo de typo Long que contiene un valor de tipo autoincrement que comienza en 0. <br>
<b>Option</b>: rowsPerSecond (indica cuántos registros se generan por segundo.

## Selection, projection, aggregation

Creamos la fuente de datos usando la opción **rowsPerSecond**.

In [4]:
streamDF = spark.readStream.format("rate").option("rowsPerSecond",1).load()
streamDF.isStreaming

True

In [5]:
type(streamDF)

pyspark.sql.dataframe.DataFrame

Especificamos donde se almacenarán los datos (usaremos "memory") y el modo en el cual se genera la salida. El método queryName define el de la consulta que se ejecutará de manera continua. 
Finalmente ejecutamos la query usando start().

In [6]:
query = streamDF.writeStream.format("memory").queryName("test").outputMode("append").start()

In [7]:
type(query)

pyspark.sql.streaming.StreamingQuery

Podemos usar el nombre de la consulta (queryName("test")) como nombre de tabla en una consulta SQL.

Luego, ejecutamos una consulta SQL repetidas veces y con el resultado generamos un dataframe.

In [8]:
for x in range(10):
  DF = spark.sql("select * from test order by timestamp desc")
  print(DF.select("value").show(5))
  time.sleep(3)

+-----+
|value|
+-----+
+-----+

None
+-----+
|value|
+-----+
|    5|
|    4|
|    3|
|    2|
|    1|
+-----+
only showing top 5 rows

None
+-----+
|value|
+-----+
|    9|
|    8|
|    7|
|    6|
|    5|
+-----+
only showing top 5 rows

None
+-----+
|value|
+-----+
|   12|
|   11|
|   10|
|    9|
|    8|
+-----+
only showing top 5 rows

None
+-----+
|value|
+-----+
|   15|
|   14|
|   13|
|   12|
|   11|
+-----+
only showing top 5 rows

None
+-----+
|value|
+-----+
|   19|
|   18|
|   17|
|   16|
|   15|
+-----+
only showing top 5 rows

None
+-----+
|value|
+-----+
|   22|
|   21|
|   20|
|   19|
|   18|
+-----+
only showing top 5 rows

None
+-----+
|value|
+-----+
|   25|
|   24|
|   23|
|   22|
|   21|
+-----+
only showing top 5 rows

None
+-----+
|value|
+-----+
|   28|
|   27|
|   26|
|   25|
|   24|
+-----+
only showing top 5 rows

None
+-----+
|value|
+-----+
|   31|
|   30|
|   29|
|   28|
|   27|
+-----+
only showing top 5 rows

None


Para detener la consulta usamos el método stop().

In [9]:
query.stop()

Ahora definiremos otra consulta sobre la misma fuente de datos representada por streamDF.<br>
**Consulta:** filtrar registros usando el atributo value.

In [10]:
from pyspark.sql.functions import *

query2 = streamDF.select("*").where("value % 2 == 0").writeStream.format("memory").queryName("test2").outputMode("append").start()
streamDF.isStreaming

True

In [11]:
type(query2)

pyspark.sql.streaming.StreamingQuery

Ejecutamos una sentencia SQL, la almacenamos en un dataframe y mostramos su contenido cada 3 segundos. 

In [12]:
for x in range(5):
  DF2 = spark.sql("select * from test2 order by value desc")
  time.sleep(3)
  DF2.select("value").show(5)

+-----+
|value|
+-----+
|    0|
+-----+

+-----+
|value|
+-----+
|    4|
|    2|
|    0|
+-----+

+-----+
|value|
+-----+
|    8|
|    6|
|    4|
|    2|
|    0|
+-----+

+-----+
|value|
+-----+
|   10|
|    8|
|    6|
|    4|
|    2|
+-----+
only showing top 5 rows

+-----+
|value|
+-----+
|   14|
|   12|
|   10|
|    8|
|    6|
+-----+
only showing top 5 rows



In [13]:
query2.stop()

Otra consulta sobre el mismo flujo de datos. Ahora filtramos los registros cuyo valor del atributo value es múltiplo de 7 o múltiplo de 11.

In [14]:
query3 = streamDF.select("*").where("value % 7 == 0 or value % 11 == 0").writeStream.format("memory").queryName("testOR").outputMode("append").start()

In [26]:
for x in range(5):
  DF3 = spark.sql("select * from testOR order by value desc")
  time.sleep(3)
  DF3.select("value").show(5)

+-----+
|value|
+-----+
|   14|
|   11|
|    7|
|    0|
+-----+

+-----+
|value|
+-----+
|   14|
|   11|
|    7|
|    0|
+-----+

+-----+
|value|
+-----+
|   14|
|   11|
|    7|
|    0|
+-----+

+-----+
|value|
+-----+
|   14|
|   11|
|    7|
|    0|
+-----+

+-----+
|value|
+-----+
|   14|
|   11|
|    7|
|    0|
+-----+



In [16]:
query3.stop()

En esta consulta reemplazaremos el valor del atributo value por el valor 7 o el valor 1.

In [33]:
query4 = streamDF.withColumn("value", when(col("value") % 7 == 0,7).when(col("value") % 7 != 0,1)).groupBy("value").count().writeStream.format("memory").queryName("testGB").outputMode("complete").start()

In [34]:
type(query4)

pyspark.sql.streaming.StreamingQuery

In [35]:
for x in range(5):
  DF4 = spark.sql("select * from testGroupBY")
  time.sleep(3)
  DF4.show(5)

+-----+-----+
|value|count|
+-----+-----+
|    1| 1385|
|    7|  231|
+-----+-----+

+-----+-----+
|value|count|
+-----+-----+
|    1| 1392|
|    7|  232|
+-----+-----+

+-----+-----+
|value|count|
+-----+-----+
|    1| 1404|
|    7|  234|
+-----+-----+

+-----+-----+
|value|count|
+-----+-----+
|    1| 1415|
|    7|  236|
+-----+-----+

+-----+-----+
|value|count|
+-----+-----+
|    1| 1425|
|    7|  238|
+-----+-----+



In [36]:
query4.stop()

## **Ejercicio 1**

Cuál es el objetivo de la sgte consulta? Muestre el resultado. 

In [42]:
query5 = streamDF.selectExpr("value % 10 as deviceID").groupBy("deviceID").count().toDF("deviceID","count").writeStream.format("memory").queryName("Agg").outputMode("complete").start()

In [44]:
#respuesta 
for x in range(5):
  DF4 = spark.sql("select * from Agg")
  time.sleep(3)
  DF4.show(5)

+--------+-----+
|deviceID|count|
+--------+-----+
|       0|   12|
|       7|   12|
|       6|   12|
|       9|   12|
|       5|   12|
+--------+-----+
only showing top 5 rows

+--------+-----+
|deviceID|count|
+--------+-----+
|       0|   14|
|       7|   14|
|       6|   14|
|       9|   14|
|       5|   14|
+--------+-----+
only showing top 5 rows

+--------+-----+
|deviceID|count|
+--------+-----+
|       0|   19|
|       7|   18|
|       6|   18|
|       9|   18|
|       5|   18|
+--------+-----+
only showing top 5 rows

+--------+-----+
|deviceID|count|
+--------+-----+
|       0|   19|
|       7|   18|
|       6|   18|
|       9|   18|
|       5|   18|
+--------+-----+
only showing top 5 rows

+--------+-----+
|deviceID|count|
+--------+-----+
|       0|   21|
|       7|   20|
|       6|   20|
|       9|   20|
|       5|   20|
+--------+-----+
only showing top 5 rows



In [45]:
query5.stop()

In [53]:
from pyspark.sql.functions import expr
for x in range(5):
  DF4 = spark.sql("select * from Agg").withColumn('suma', expr("deviceID + count"))
  time.sleep(3)
  DF4.show(5)

+--------+-----+----+
|deviceID|count|suma|
+--------+-----+----+
|       0|  378| 378|
|       7|  377| 384|
|       6|  377| 383|
|       9|  377| 386|
|       5|  377| 382|
+--------+-----+----+
only showing top 5 rows

+--------+-----+----+
|deviceID|count|suma|
+--------+-----+----+
|       0|  378| 378|
|       7|  377| 384|
|       6|  377| 383|
|       9|  377| 386|
|       5|  377| 382|
+--------+-----+----+
only showing top 5 rows

+--------+-----+----+
|deviceID|count|suma|
+--------+-----+----+
|       0|  378| 378|
|       7|  377| 384|
|       6|  377| 383|
|       9|  377| 386|
|       5|  377| 382|
+--------+-----+----+
only showing top 5 rows

+--------+-----+----+
|deviceID|count|suma|
+--------+-----+----+
|       0|  378| 378|
|       7|  377| 384|
|       6|  377| 383|
|       9|  377| 386|
|       5|  377| 382|
+--------+-----+----+
only showing top 5 rows

+--------+-----+----+
|deviceID|count|suma|
+--------+-----+----+
|       0|  378| 378|
|       7|  377| 384

## **Fin ejercicio 1**

## Ventanas de tiempo (sliding windows) usando el atributo timestamp 
Agrupamiento por ventana, duración de la ventana = 3 mins, la ventana se desliza cada 1 min.

In [67]:
query6 = streamDF.withColumn("value",when(col("value") % 7 == 0,7).when(col("value") % 7 != 0,1)).groupBy(window("timestamp", "3 minutes", "1 minutes"),"value").count().writeStream.format("memory").queryName("winTable2").outputMode("complete").start()

In [69]:
# sql select value, date_format(window.end, "yyyy MM dd") as time, count from windowTable order by time, value
for x in range(3):
  DF6 = spark.sql("select value, date_format(window.end, 'yyyy MM dd') as time, count from winTable2 order by time")
  time.sleep(3)
  DF6.show(10)

+-----+----------+-----+
|value|      time|count|
+-----+----------+-----+
|    1|2022 05 20|   57|
|    1|2022 05 20|   24|
|    7|2022 05 20|    6|
|    7|2022 05 20|   10|
|    1|2022 05 20|   33|
|    7|2022 05 20|   10|
|    7|2022 05 20|    4|
|    1|2022 05 20|   57|
+-----+----------+-----+

+-----+----------+-----+
|value|      time|count|
+-----+----------+-----+
|    1|2022 05 20|   86|
|    1|2022 05 20|   53|
|    1|2022 05 20|    2|
|    7|2022 05 20|    6|
|    7|2022 05 20|   15|
|    1|2022 05 20|   33|
|    7|2022 05 20|   15|
|    7|2022 05 20|    9|
|    1|2022 05 20|   84|
+-----+----------+-----+

+-----+----------+-----+
|value|      time|count|
+-----+----------+-----+
|    1|2022 05 20|  114|
|    1|2022 05 20|   81|
|    1|2022 05 20|   30|
|    7|2022 05 20|    6|
|    7|2022 05 20|   20|
|    1|2022 05 20|   33|
|    7|2022 05 20|   15|
|    7|2022 05 20|   14|
|    1|2022 05 20|   84|
|    7|2022 05 20|    5|
+-----+----------+-----+



In [65]:
query6.stop()

## Stream-static Join 

In [70]:
from pyspark.sql import Row

multOfSeven = [7,14,21,28,35,42,49,56,63,70,77,84,91,98,105]
staticDF = spark.createDataFrame(list(map(lambda x: Row(value=x), multOfSeven)))

print(staticDF.schema)
staticDF.show()

StructType(List(StructField(value,LongType,true)))
+-----+
|value|
+-----+
|    7|
|   14|
|   21|
|   28|
|   35|
|   42|
|   49|
|   56|
|   63|
|   70|
|   77|
|   84|
|   91|
|   98|
|  105|
+-----+



In [90]:
joinDF = streamDF.select("*").join(staticDF, "value")
type(joinDF)

pyspark.sql.dataframe.DataFrame

In [93]:
query7 = joinDF.writeStream.format("memory").queryName("joinTable2").outputMode("append").start()

In [98]:
for x in range(3):
  time.sleep(3)
  spark.sql("select * from joinTable2").show()

+-----+--------------------+
|value|           timestamp|
+-----+--------------------+
|    7|2022-05-20 20:45:...|
|   28|2022-05-20 20:45:...|
|   35|2022-05-20 20:45:...|
|   14|2022-05-20 20:45:...|
|   21|2022-05-20 20:45:...|
+-----+--------------------+

+-----+--------------------+
|value|           timestamp|
+-----+--------------------+
|    7|2022-05-20 20:45:...|
|   28|2022-05-20 20:45:...|
|   35|2022-05-20 20:45:...|
|   14|2022-05-20 20:45:...|
|   21|2022-05-20 20:45:...|
|   77|2022-05-20 20:46:...|
|   63|2022-05-20 20:45:...|
|   56|2022-05-20 20:45:...|
|   49|2022-05-20 20:45:...|
|   42|2022-05-20 20:45:...|
|   70|2022-05-20 20:46:...|
|   84|2022-05-20 20:46:...|
|   98|2022-05-20 20:46:...|
|  105|2022-05-20 20:46:...|
|   91|2022-05-20 20:46:...|
+-----+--------------------+

+-----+--------------------+
|value|           timestamp|
+-----+--------------------+
|    7|2022-05-20 20:45:...|
|   28|2022-05-20 20:45:...|
|   35|2022-05-20 20:45:...|
|   14|2022-

## Stream-stream Join

Ejemplo de monetización --> cuales ADs provocaron el click de un usuario. <br>
Dos stream, uno de ADs y otro de clicks. 

https://docs.databricks.com/spark/latest/structured-streaming/examples.html#stream-stream-joins

In [83]:
from pyspark.sql.functions import rand

#spark.conf.set("spark.sql.shuffle.partitions", "1")

impressions = spark.readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load().selectExpr("value AS adId", "timestamp AS impressionTime")

clicks = (
  spark
  .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()
  .where((rand() * 100).cast("integer") < 10)      # 10 out of every 100 impressions result in a click
  .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")      # -50 so that a click with same id as impression is generated later (i.e. delayed data).
  .where("adId > 0")
)    
 
  

In [84]:
impressionsQuery = impressions.writeStream.format("memory").queryName("imprTable").outputMode("append").start()

In [85]:
clicksQuery = clicks.writeStream.format("memory").queryName("clicksTable").outputMode("append").start()

In [88]:
for x in range(5):
  #impressions.join(clicks, "adId").show(5)
  DF8 = spark.sql("select * from imprTable, clicksTable where imprTable.adId = clicksTable.adId order by clickTime desc")
  time.sleep(3)
  DF8.show(5)

+----+--------------------+----+--------------------+
|adId|      impressionTime|adId|           clickTime|
+----+--------------------+----+--------------------+
|  76|2022-05-20 20:37:...|  76|2022-05-20 20:37:...|
|  68|2022-05-20 20:36:...|  68|2022-05-20 20:37:...|
|  65|2022-05-20 20:36:...|  65|2022-05-20 20:37:...|
|  39|2022-05-20 20:36:...|  39|2022-05-20 20:36:...|
|  36|2022-05-20 20:36:...|  36|2022-05-20 20:36:...|
+----+--------------------+----+--------------------+
only showing top 5 rows

+----+--------------------+----+--------------------+
|adId|      impressionTime|adId|           clickTime|
+----+--------------------+----+--------------------+
| 168|2022-05-20 20:37:...| 168|2022-05-20 20:37:...|
| 167|2022-05-20 20:37:...| 167|2022-05-20 20:37:...|
| 166|2022-05-20 20:37:...| 166|2022-05-20 20:37:...|
| 156|2022-05-20 20:37:...| 156|2022-05-20 20:37:...|
| 151|2022-05-20 20:37:...| 151|2022-05-20 20:37:...|
+----+--------------------+----+--------------------+
onl

In [89]:
clicksQuery.stop()
impressionsQuery.stop()