In [None]:
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
from pyspark.sql import functions as F


EH_CONN_STR                     = "<cadena conexion event hubs>"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"<servidor event hub>.servicebus.windows.net:9093",
  "subscribe"                : "<nombre event hub>",
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";"
}

# PAYLOAD SCHEMA
payload_ddl = """timestamp TIMESTAMP, productos ARRAY<STRUCT<id INT, nombre STRING, precio DOUBLE, cantidad INT>>, metodo_pago STRING, tienda_id INT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("timestamp", to_timestamp(col("parsed_records.timestamp")))
    .withColumn("fecha_hora", col("timestamp").cast("datetime"))
    .withColumn("metodo_pago", col("parsed_records.metodo_pago").cast("string"))
    .withColumn("tienda_id", col("parsed_records.tienda_id").cast("int"))
    .withColumn("productos_explode", F.explode("parsed_records.productos"))
    .withColumn("productos_id", col("productos_explode.id").cast("int"))
    .withColumn("nombre", col("productos_explode.nombre").cast("string"))
    .withColumn("precio", col("productos_explode.precio").cast("double"))
    .withColumn("cantidad", col("productos_explode.cantidad").cast("int"))
    .select("fecha_hora", "metodo_pago", "tienda_id", "productos_id", "nombre", "precio", "cantidad")
  )


@dlt.create_table(
  comment="Raw Sales Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "true" 
  }
)
def sales_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

@dlt.create_table(
  comment="Sales",
  table_properties={
    "quality": "silver",
    "pipelines.reset.allowed": "true" 
  }
)
def sales_silver():
  return (
   spark.table("sales_raw")
    .withColumn("venta_individual", col("precio") * col("cantidad"))
    .withColumn("fecha", col("fecha_hora").cast("date"))
    .filter(col("cantidad") > 0)
  )

@dlt.create_table(
  comment = "Gold Summary Sales",
  table_properties={
    "quality": "gold",
    "pipelines.reset.allowed": "true" 
  }
)
def sales_gold():
  return (
    spark.table("sales_silver")
      .select("metodo_pago", "fecha", "nombre", "venta_individual", "cantidad")
      .groupBy("fecha", "nombre", "metodo_pago")
      .agg(
        sum("venta_individual").alias("venta_total"),
        sum("cantidad").alias("cantidad_total")
      )
  )
