## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Laboratorio Ventas Ecommers
1. Construyendo streaming DataFrames
1. Mostrando consultas streaming
1. Escribiendo resultados streaming 
1. Monitoreando consultas streaming

In [2]:
# Only when is Local
import findspark

findspark.init()
findspark.find()

'E:\\LibreriasPython\\spark-3.1.2-bin-hadoop2.7\\python\\pyspark'

In [3]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ManagementStream').getOrCreate()

In [18]:
schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

eventsPath = "../data/events.parquet" # path to events


df = (spark.readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .parquet(eventsPath)
)

In [28]:
df.printSchema()

root
 |-- device: string (nullable = true)
 |-- ecommerce: struct (nullable = true)
 |    |-- purchase_revenue_in_usd: double (nullable = true)
 |    |-- total_item_quantity: long (nullable = true)
 |    |-- unique_items: long (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_previous_timestamp: long (nullable = true)
 |-- event_timestamp: long (nullable = true)
 |-- geo: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- coupon: string (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- item_name: string (nullable = true)
 |    |    |-- item_revenue_in_usd: double (nullable = true)
 |    |    |-- price_in_usd: double (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- user_first_touch_timestamp: long (nullable = true)

### 1. Construyendo streaming DataFrames

- Valide que el DataFrame se procesa en Streaming 'isStreaming'

- Genere emailTrafficDF a partir de:
  - Filtrando df según  (traffic_source == 'email') 
  - Use método withColumn generando la columna "mobile" cuya lógica es col("device").isin(["iOS", "Android"])
  - Seleccione "user_id", "event_timestamp", "mobile"

In [30]:
# validar si procesamiento es streaming
df.isStreaming

True

In [29]:
# Transforma en STREAMING
from pyspark.sql import functions as F

dfStreamFilter = (df.filter(F.col('traffic_source')== F.lit('email')).withColumn('mobile',F.col("device").isin(["iOS", "Android"])).select("user_id", "event_timestamp", "mobile"))


### 2. Mostrando consultas streaming

In [32]:
# En databrick nos permite monitorear mostrando por defect el grarfico de como esta llendo los datos 
userhome = "../data"
checkpointPath = userhome + "/email_traffic/checkpoint"
outputPath = userhome + "/email_traffic/output"

devicesQuery = (dfStreamFilter.writeStream
  .outputMode("append")
  .format("parquet")
  .queryName("email_traffic_p")
  .trigger(processingTime="1 second")
  .option("checkpointLocation", checkpointPath)
  .start(outputPath))

### ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Monitoreando consultas Streaming

- Muestre 'id' de streaming
- Muestre status de streaming
- Pare ejecución streaming

In [33]:
# ID
devicesQuery.id

'468ff6c2-50f1-4774-84ce-4faa8e4554e9'

In [34]:
# status
devicesQuery.status

{'message': "Terminated with exception: Option 'basePath' must be a directory",
 'isDataAvailable': False,
 'isTriggerActive': False}

In [35]:
# stop
devicesQuery.stop()

In [36]:
devicesQuery.id

'468ff6c2-50f1-4774-84ce-4faa8e4554e9'

In [25]:
# df_parquet_describe = spark.read.parquet(outputPath)
df_parquet_describe = spark.read.parquet(userhome+"/events.parquet")

df_parquet_describe.show(10,vertical = True)

-RECORD 0------------------------------------------
 device                     | macOS                
 ecommerce                  | {null, null, null}   
 event_name                 | warranty             
 event_previous_timestamp   | 1593878899217692     
 event_timestamp            | 1593878946592107     
 geo                        | {Montrose, MI}       
 items                      | []                   
 traffic_source             | google               
 user_first_touch_timestamp | 1593878899217692     
 user_id                    | UA000000107379500    
-RECORD 1------------------------------------------
 device                     | Windows              
 ecommerce                  | {null, null, null}   
 event_name                 | press                
 event_previous_timestamp   | 1593876662175340     
 event_timestamp            | 1593877011756535     
 geo                        | {Northampton, MA}    
 items                      | []                   
 traffic_sou

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Laboratorio de ventas de cupones

Procese y agregue datos de transmisión en transacciones usando cupones.

1. Leer flujo de datos
1. Filtrar por transacciones con códigos de cupones
1. Escribir resultados de consultas de transmisión en parquet
1. Supervisar la consulta de transmisión
1. Detener consulta de transmisión

##### Classes
- [DataStreamReader](http://spark.apache.org/docs/3.0.0/api/scala/org/apache/spark/sql/streaming/DataStreamReader.html)
- [DataStreamWriter](http://spark.apache.org/docs/3.0.0/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html)
- [StreamingQuery](http://spark.apache.org/docs/3.0.0/api/scala/org/apache/spark/sql/streaming/StreamingQuery.html)

In [5]:
schema = "order_id BIGINT, email STRING, transaction_timestamp BIGINT, total_item_quantity BIGINT, purchase_revenue_in_usd DOUBLE, unique_items BIGINT, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>"

### 1. Lectura de Datos Streaming
- Genere el schema correspondiente **`schema`**
- Setea el proceso para la lectura de 1 fila por trigger
- Lea a partir de archivo parquet almanenado en **`salesPath`**

Assign the resulting DataFrame to **`df`**

In [50]:
# TODO
salesPath = '../data/events.parquet'

dfStreaming = (spark
      .readStream
      .format("parquet")
      .option("maxFilesPerTrigger",1)
      .schema(schema)
      .load(salesPath)
      
)

df = (spark
      .read
      .schema(schema)
      .load(salesPath)
)

In [51]:
df.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- transaction_timestamp: long (nullable = true)
 |-- total_item_quantity: long (nullable = true)
 |-- purchase_revenue_in_usd: double (nullable = true)
 |-- unique_items: long (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- coupon: string (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- item_name: string (nullable = true)
 |    |    |-- item_revenue_in_usd: double (nullable = true)
 |    |    |-- price_in_usd: double (nullable = true)
 |    |    |-- quantity: long (nullable = true)



In [53]:
assert dfStreaming.isStreaming
assert dfStreaming.columns == ['order_id', 'email', 'transaction_timestamp', 'total_item_quantity', 'purchase_revenue_in_usd', 'unique_items', 'items']

In [56]:
df.show(3,vertical=True,truncate = False)

-RECORD 0------------------------------------------------------------------------------
 order_id                | null                                                        
 email                   | null                                                        
 transaction_timestamp   | null                                                        
 total_item_quantity     | null                                                        
 purchase_revenue_in_usd | null                                                        
 unique_items            | null                                                        
 items                   | []                                                          
-RECORD 1------------------------------------------------------------------------------
 order_id                | null                                                        
 email                   | null                                                        
 transaction_timestamp   | null 

### 2. Filtrado por Transacción
- Explotar campo  **`items`**  en **`df`**
- Filre cada uno de los archivos donde **`items.coupon`** es no nulo

Asigne el Dataframe como resultado **`couponSalesDF`**.

In [10]:
from pyspark.sql import functions as F


df.select(F.explode(df.items), df.items.coupon).show(5,truncate = False)

+-----------------------------------------------------------+------------+
|col                                                        |items.coupon|
+-----------------------------------------------------------+------------+
|{null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1}  |[null]      |
|{null, M_STAN_F, Standard Full Mattress, 945.0, 945.0, 1}  |[null]      |
|{null, M_PREM_T, Premium Twin Mattress, 1095.0, 1095.0, 1} |[null]      |
|{null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1}|[null]      |
|{null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1}|[null]      |
+-----------------------------------------------------------+------------+
only showing top 5 rows



In [28]:
couponSalesDF = (dfStreaming.withColumn("items",F.explode(F.col("items")))
                 .filter(F.col("items.coupon").isNotNull()))

In [20]:
couponSalesDF.show(3,truncate = False,vertical=True)

-RECORD 0----------------------------------------------------------------------------------
 order_id                | null                                                            
 email                   | null                                                            
 transaction_timestamp   | null                                                            
 total_item_quantity     | null                                                            
 purchase_revenue_in_usd | null                                                            
 unique_items            | null                                                            
 items                   | {NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1} 
-RECORD 1----------------------------------------------------------------------------------
 order_id                | null                                                            
 email                   | null                                                 

In [29]:
schemaStr = str(couponSalesDF.schema)
assert "StructField(items,StructType(List(StructField(coupon" in schemaStr, "items column was not exploded"

### 3. Escriba consulta Streaming en archivo parquet
- Configure la consulta de estreaming a modo "append"
- Asigne el nombre de "coupon_sales" a la consulta
- Setee el intervalo de guardado a 1 segundo
- Setee la localizaciòn del checkpoint a **`couponsCheckpointPath`**
- Coloque el output path a **`couponsOutputPath`**

Asigne el resultado del streaming a **`couponSalesQuery`**.

In [51]:
# TODO
workingDir = "../data"
couponsCheckpointPath = workingDir + "/coupon-sales/checkpoint"
couponsOutputPath = workingDir + "/coupon-sales/output"

couponSalesQuery = (couponSalesDF
                    .writeStream
                    .format("parquet") #.format("parquet")
                    .option("checkpointLocation",couponsCheckpointPath)
                    .outputMode("append")
                    .start(couponsOutputPath)
)

In [0]:
untilStreamIsReady("coupon_sales")
assert couponSalesQuery.isActive
assert len(dbutils.fs.ls(couponsOutputPath)) > 0
assert len(dbutils.fs.ls(couponsCheckpointPath)) > 0
assert "coupon_sales" in couponSalesQuery.lastProgress["name"]

### 4. Monitoree consulta streaming
- Obtenga ID de consulta streaming
- Obtenga el status del streaming

In [42]:
# TODO
queryID = couponSalesQuery.id
queryID

'92340b59-0174-418c-9963-9a92e5f09d1e'

In [43]:
# TODO
queryStatus = couponSalesQuery.status
queryStatus

{'message': "Terminated with exception: Option 'basePath' must be a directory",
 'isDataAvailable': False,
 'isTriggerActive': False}

In [36]:
assert type(queryID) == str
assert list(queryStatus.keys()) == ['message', 'isDataAvailable', 'isTriggerActive']

### 5. Pare la ejecuciòn streaming
- Pare ejecuciòn streaming

In [52]:
# TODO
couponSalesQuery.stop()

In [49]:
assert not couponSalesQuery.isActive

### 6. Verifique las filas guardadas en archivo parquet

In [0]:
display(spark.read.parquet(couponsOutputPath))