# Structured streaming
## packages
* pyspark.streaming -> streaming to RDD
* pyspark.sql.streaming -> streaming to DataFrames (focusing on this one)

## Fuentes
Las fuentes de datos en streaming pueden ser:
* Kafka
* carpetas (incrementos en archivos nuevos)
* socket (para pruebas)
* rate (generador de lineas, par puebas y debugging)

## Salidas
El resultado de un DataFrame en streaming se puede grabar en :
* Console (para pruebas)
* Archivos
* Kafka
* Memoria (para consultar el contenido con otras queries)
* Foreach... (actionnes)

Varios modos de salida:
* "*Complete*" (no disponible con salida en Archivos)
* "*Append*"
* "*Update*"  (no disponible con salida en Archivos ni en memoria)

### Fuente CSV
En este ejemplo, ir añadiendp archivos en la carpeta "stream_input/". Las modificaciones a archivos ya leidos no se tomaran en cuenta.

In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType

In [2]:
userSchema = StructType().\
    add("time", "timestamp").\
    add("item", "integer").\
    add("nb_sales", "integer")

In [10]:
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("data/streaming/stream_input")

In [None]:
type(csvDF)

In [4]:
csvStream = csvDF.writeStream. \
outputMode("append"). \
format("console"). \
start() # outputs in the bash console

In [5]:
csvStream.stop()

In [None]:
memStream = csvDF.writeStream. \
outputMode("append"). \
format("memory").\
queryName("sales").\
start()

In [None]:
spark.sql("select * from sales").show()

In [None]:
spark.sql("select * from sales").show() # after a file was added

In [None]:
memStream.stop()

In [None]:
csvStream = csvDF. \
    writeStream. \
    outputMode("append"). \
    format("csv"). \
    option("checkpointLocation", "checkpoint_dir").\
    option("path", "output/").\
    start()

In [None]:
csvStream.stop()

### Fuente socket
En linea de comando ejecutar ```nc -lk 9999``` lo que envia cada linea de lo que se escribe por el puerto 9999

In [None]:
socDF = spark \
    .readStream \
    .format("socket") \
    .option("host","localhost") \
    .option("port",9999).load()

In [None]:
stream = socDF.\
    writeStream. \
    outputMode("append"). \
    format("console"). \
    start()

In [None]:
stream.stop()

In [None]:
csvDF = socDF.\
    withColumn("values",F.split(socDF["value"],";")).\
    withColumn("time",F.col("values")[0].cast("timestamp")).\
    withColumn("item",F.col("values")[1].cast("integer")).\
    withColumn("nb_sales",F.col("values")[2].cast("integer")).\
    select("time", "item", "nb_sales")

In [None]:
stream = csvDF.\
    writeStream. \
    outputMode("append"). \
    format("console"). \
    start()

In [None]:
stream.stop()

In [None]:
stream = csvDF. \
    writeStream. \
    outputMode("append"). \
    format("csv"). \
    option("checkpointLocation", "checkpoint_dir").\
    option("path", "output/").\
    start()

## Operaciones
* SQL: select, filter, groupBy
* join
    * con una fuente estatica: nada obligatorio, watermarking ayuda a contener el uso de memoria
    * con otra fuente streaming: limitaciones con obligacion a uso de watermarking.

## Aggregation

In [None]:
salesTot = csvDF.select(F.sum(csvDF.nb_sales).alias("nb_sales"))

In [None]:
aggStream = salesTot.writeStream. \
outputMode("complete"). \
format("console"). \
start()

In [None]:
aggStream.stop()

## GroupBy

In [None]:
salesPerItem = csvDF\
    .groupBy(csvDF.item)\
    .agg(F.sum(csvDF.nb_sales).alias("nb_sales"))

In [None]:
aggStream = salesPerItem.writeStream. \
outputMode("complete"). \
format("console"). \
start()

In [None]:
aggStream.stop()

## Watermark

In [None]:
stream = csvDF.withWatermark("time", "5 minutes").writeStream. \
outputMode("append"). \
format("csv"). \
option("checkpointLocation", "checkpoint_dir").\
option("path", "output/").\
start()

In [None]:
stream = csvDF.withWatermark("time", "5 minutes").writeStream. \
outputMode("append"). \
format("console"). \
start()

In [None]:
stream.stop()

## Window aggregation

In [None]:
windowedSales = csvDF.\
withWatermark("time", "10 minutes").\
groupBy(
  F.window("time", "5 minutes", "5 minutes"),
  "item"
).agg(F.sum(csvDF.nb_sales).alias("nb_sales")).\
select(
    F.col("window.start").alias("start"),
    F.col("window.end").alias("end"),
    "item",
    "nb_sales"
)

In [None]:
stream = windowedSales.writeStream. \
outputMode("complete"). \
format("console"). \
start()

In [None]:
stream.stop()

In [None]:
stream = windowedSales. \
    writeStream. \
    outputMode("append"). \
    format("csv"). \
    option("checkpointLocation", "checkpoint_dir").\
    option("path", "output/").\
    start()

In [None]:
stream.stop()

## Join

In [6]:
itemDF = spark.createDataFrame([(1, "patatas"), (2, "jamon"), (3, "pimientos")], ["item_id", "item_name"])

In [7]:
enrichedDF = csvDF.join(itemDF, csvDF.item==itemDF.item_id, "left_outer")

In [8]:
stream = enrichedDF.writeStream. \
outputMode("append"). \
format("console"). \
start()

In [9]:
stream.stop()

### Join with stream

In [11]:
cajaSchema = StructType().\
    add("time", "timestamp").\
    add("item", "integer").\
    add("amount", "integer")

In [12]:
cajaDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(cajaSchema) \
    .csv("data/streaming/caja/")

In [13]:
ventaConSaldo = csvDF.alias("ventas").\
    join(cajaDF.withWatermark("time", "10 minutes").alias("caja"), 
           F.expr("""caja.item=ventas.item AND 
            caja.time >= ventas.time AND 
            caja.time <= ventas.time + interval 5 minutes"""), 
           "full_outer")

In [14]:
stream = ventaConSaldo.writeStream. \
outputMode("append"). \
format("console"). \
start()

In [15]:
stream.stop()