### Enfoques en el Procesamiento de Flujos:
- **Tradicional:** se reprocesa todo el conjunto de datos cada vez que recibe una nueva actualización de datos.
- **Lógica personalizada:** capturar solo aquellos archivos o registros que se ha agregado desde la ultima vez que se ejecutó la actualización.

### Input Streaming Table
**Un flujo de datos de entrada podría ser:**
- Directorio de archivos
- Sistema de mensajería como Kafka
- Delta Table

### Spark.readStream():
- Leer los datos en real time.
- Permite procesar los datos presentes y los que van llegando.
- Podemos aplicar transformaciones como si fuera una tabla estática.

In [0]:
# Sintaxis
streamDF = spark.readStream.table("input_table")

### dataframe.writeStream():
- Conserva el resultado de una consulta streaming.

In [0]:
#Sintaxis:
StreamDf.writeStream
        .trigger(processingTime="2 minutes")       #Ejecutar cada 2 min  
        .outputMode("append")               #Agregar a la tabla destino los datos nuevos que van llegando
        .option("checkpointLocation", "/path") # Seguimiento del proceso de streaming 
        .table("output_table")

###Práctica

Ejecutamos un Notebook que descargará el conjunto de datos en Databricks.

In [0]:
%run ../Includes_1/Copy-Datasets

#### spark.readStream en SQL
- Consulta una Delta Table como origen de flujo.
- A partir de ahí podemos registrar una vista temporal.
- La vista temporal es de **"streaming"** que permite aplicar transformaciones.

##### Crear Delta Table a partir de archivos CSV

In [0]:
%sql
CREATE TABLE books 
  (book_id STRING, title STRING, author STRING, category STRING, price DOUBLE) --Definimos el esquema de la tabla
USING CSV
OPTIONS (header = "true", 
        delimiter=";")
LOCATION "${dataset.bookstore}/books-csv" -- archivos CSV

In [0]:
%sql
SELECT * FROM books LIMIT 10;

book_id,title,author,category,price
B07,The Hundred-Page Machine Learning,Andriy Burkov,Computer Science,33.0
B08,Quantum Computing for Everyone,Chris Bernhardt,Computer Science,41.0
B09,Advanced Data Structures,Peter Brass,Computer Science,24.0
B10,Beginning Database Design Solutions,Rod Stephens,Computer Science,44.0
B11,Business Intelligence for Dummies,Swain Scheps,Computer Science,38.0
B12,Big Data in Practice,Bernard Marr,Computer Science,30.0
B01,The Soul of a New Machine,Tracy Kidder,Computer Science,49.0
B02,Learning JavaScript Design Patterns,Addy Osmani,Computer Science,28.0
B03,Make Your Own Neural Network,Tariq Rashid,Computer Science,35.0
B04,Robot Dynamics and Control,Mark W. Spong,Computer Science,20.0


##### Leer en Streaming el Delta Table

In [0]:
(spark.readStream
    .table("books") # consulta delta table como origen
    .createOrReplaceTempView("books_streaming_tmp_vw")) # creamos una vista temporal

##### Consultamos la Vista Temporal de Streaming

In [0]:
%sql
SELECT * FROM books_streaming_tmp_vw;

book_id,title,author,category,price
B07,The Hundred-Page Machine Learning,Andriy Burkov,Computer Science,33.0
B08,Quantum Computing for Everyone,Chris Bernhardt,Computer Science,41.0
B09,Advanced Data Structures,Peter Brass,Computer Science,24.0
B10,Beginning Database Design Solutions,Rod Stephens,Computer Science,44.0
B11,Business Intelligence for Dummies,Swain Scheps,Computer Science,38.0
B12,Big Data in Practice,Bernard Marr,Computer Science,30.0
B01,The Soul of a New Machine,Tracy Kidder,Computer Science,49.0
B02,Learning JavaScript Design Patterns,Addy Osmani,Computer Science,28.0
B03,Make Your Own Neural Network,Tariq Rashid,Computer Science,35.0
B04,Robot Dynamics and Control,Mark W. Spong,Computer Science,20.0


##### Aplicamos algunas agregaciones a la Vista Temporal de Streaming

In [0]:
%sql
SELECT author, count(book_id) AS total_books
FROM books_streaming_tmp_vw
GROUP BY author

author,total_books
Mark W. Spong,1
Chris Bernhardt,1
Tariq Rashid,1
Peter Brass,1
Luciano Ramalho,1
Addy Osmani,1
Andriy Burkov,1
Tracy Kidder,1
Swain Scheps,1
François Chollet,1


##### La operación "ORDER BY" no es compatible con los datos de transmisión

In [0]:
%sql
SELECT * FROM books_streaming_tmp_vw
ORDER BY author

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-838019026335140>:7[0m
[1;32m      5[0m     display(df)
[1;32m      6[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 7[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m      8[0m [38;5;28;01mfinally[39;00m:
[1;32m      9[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-838019026335140>:5[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      3[0m [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[1;32m      4[0m df [38;5;241m=[39m spark[38;5;241m.[39msql(base64[38;5;241m.[39mstandard_b64decode([38;5;124m"[39m[38;5;124mU0VMRUNUICogRlJPTSBib29rc19zdHJlYW1pbmdfdG1wX3Z3Ck9SREVSIEJZIGF1dGhvcg==[39m[38;5;124m"[39m)[38;5;241m.[39mdecode())
[0;32m----> 5[0m [43mdisplay[49m

#####Creamos una Vista Temporal a partir de una consulta

In [0]:
%sql
-- Al usar una vista temporal de streaming, esta vista también lo será
CREATE OR REPLACE TEMP VIEW author_counts_tmp_vw AS (
  --Consulta de la vista temporal de streaming
  SELECT author, count(book_id) AS total_books
  FROM books_streaming_tmp_vw
  GROUP BY author
  )

#### dataframe.writeStream

In [0]:
(spark.table("author_counts_tmp_vw")  #cargar datos de una vista temporal de streaming a un DF
    .writeStream
    .trigger(processingTime="4 seconds") # ejecutar cada 4 seg
    .outputMode("complete") # modo de salida completo
    .option("checkpointLocation", "dbfs:/mnt/demo/author_counts_tmp_vw") # ubicación del punto de control
    .table("author_counts") # almacenarlo en la tabla "author_counts"
    )

Out[33]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f44ad1ef7c0>

In [0]:
%sql
SELECT count(*) FROM author_counts;

count(1)
12


##### Agregamos nuevos datos para verlos reflejados en el streaming

In [0]:
%sql
INSERT INTO books
values ("B19", "Introduction to Modeling and Simulation", "Mark W. Spong", "Computer Science", 25),
        ("B20", "Robot Modeling and Control", "Mark W. Spong", "Computer Science", 30),
        ("B21", "Turing's Vision: The Birth of Computer Science", "Chris Bernhardt", "Computer Science", 35)

##### 2 Escenario:

###### Insertamos datos 

In [0]:
%sql
INSERT INTO books
values ("B16", "Hands-On Deep Learning Algorithms with Python", "Sudharsan Ravichandiran", "Computer Science", 25),
        ("B17", "Neural Network Methods in Natural Language Processing", "Yoav Goldberg", "Computer Science", 30),
        ("B18", "Understanding digital signal processing", "Richard Lyons", "Computer Science", 35)

###### Consulta siempre activa desencadenada - Ejecución en modo por Lotes

In [0]:
#.trigger(availableNow=True): procesará todos los nuevos datos y se detendrá por sí sola después de la ejecución

(spark.table("author_counts_tmp_vw")                               
      .writeStream           
      .trigger(availableNow=True) # lote incremental activado
      .outputMode("complete")
      .option("checkpointLocation", "dbfs:/mnt/demo/author_counts_checkpoint")
      .table("author_counts")
      .awaitTermination() # bloquea la ejecución de cualquier celda, hasta que el lote incremental se haya realizado
)

In [0]:
%sql
SELECT * FROM author_counts

author,total_books
Sudharsan Ravichandiran,1
François Chollet,1
Chris Bernhardt,2
Luciano Ramalho,1
Mark W. Spong,3
Richard Lyons,1
Andriy Burkov,1
Yoav Goldberg,1
Tariq Rashid,1
Tracy Kidder,1
