### Ingesta en tiempo real

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType

In [0]:
datalake_container = "abfss://datalake@dls0tfm.dfs.core.windows.net"
bronze_path = f"{datalake_container}/bronze"
path_silver = 'abfss://datalake@dls0tfm.dfs.core.windows.net/silver/'


In [0]:
datasource = "puerta"
topic = "pedidos"
dataset = topic
dataset_bronze_path = f"{bronze_path}/{datasource}/{dataset}"
dataset_bronze_checkpoint_path = f"{bronze_path}/{datasource}/{dataset}_checkpoint"

In [0]:
def read_config():
  config = {}
  with open("/dbfs/FileStore/confluent_client_properties") as fh:
    for line in fh:
      line = line.strip()
      if len(line) != 0 and line[0] != "#":
        parameter, value = line.strip().split('=', 1)
        config[parameter] = value.strip()
  return config

conf = read_config()

In [0]:
kafka_options = {
      "kafka.bootstrap.servers": conf["bootstrap.servers"],
      "kafka.security.protocol": conf["security.protocol"],
      "kafka.sasl.mechanism":   conf["sasl.mechanism"],
      "kafka.sasl.jaas.config":
              f"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{conf.get('sasl.username')}" password="{conf.get('sasl.password')}"; """,
      "subscribe":topic,
      "includeHeaders" : "true",
      "startingOffsets": "earliest"
}

In [0]:
df = (spark
      .readStream
      .format("kafka") 
      .options(**kafka_options)
      .load()
)
df.display()

key,value,topic,partition,offset,timestamp,timestampType,headers
MzAz,eyJpZCI6IDIsICJtZXNhIjogMzAzLCAiY29taWRhIjogWyJlbXAgNCBxdWVzb3MgKDMsLSkiLCAiY29yYXpvbmVzIGRlIGFsY2FjaG9mYXMiLCAiZW1wYW5hZGEgbGVudGVqYXMiLCAicGluY2hvcyBtb3J1bm9zIiwgImVtcGE= (truncated),pedidos,2,0,2025-09-11T11:15:50.844Z,0,
NTA0,eyJpZCI6IDgsICJtZXNhIjogNTA0LCAiY29taWRhIjogWyJ0YWNvIHJhcGUiLCAibWFuY2hlZ28iLCAiZmFiYWRhIGFzdHVyaWFuYSIsICJmaWxldGVzIGRlIHNhcmRpbmFzIiwgImVtcGFuYWRhIGNlYm9sbGEgeSBxdWVzbyI= (truncated),pedidos,2,1,2025-09-11T11:15:53.386Z,0,
MzAz,eyJpZCI6IDksICJtZXNhIjogMzAzLCAiY29taWRhIjogWyJlbXAgY2ViICgyLDgwKSIsICJwdWxwbyBhIGxhIGdhbGxlZ2EiLCAiY2hpbGkgc2luIGNhcm5lIl0sICJiZWJpZGEiOiBbImVuYXRlIGNhYiAxLzgiLCAiY3VhdHI= (truncated),pedidos,2,2,2025-09-11T11:15:54.365Z,0,
NDA0,eyJpZCI6IDEwLCAibWVzYSI6IDQwNCwgImNvbWlkYSI6IFsiYWNlaXR1bmFzIGNvbiBsaW1vbiJdLCAiYmViaWRhIjogWyJlbmF0ZSBjYWIgMCw3bCJdLCAidGltZXN0YW1wIjogMTc1NzU4OTM1NDgyN30=,pedidos,2,3,2025-09-11T11:15:54.827Z,0,
NTAz,eyJpZCI6IDEzLCAibWVzYSI6IDUwMywgImNvbWlkYSI6IFsiZW1wIHBvbGxvICgyLDgwKSIsICJmYWJhZGEgYXN0dXJpYW5hIiwgImVtcCBjZWIgKDIsODApIiwgImJlcmVuamVuYXMgYyBtaWVsIiwgImVtcGFuYWRhIGFyYWI= (truncated),pedidos,2,4,2025-09-11T11:15:56.244Z,0,
MjAy,eyJpZCI6IDIwLCAibWVzYSI6IDIwMiwgImNvbWlkYSI6IFsiZW1wIGxlbnRlamFzICgyLDgwKSJdLCAiYmViaWRhIjogWyJyaWNhcmRvIHNhbmNoZXogMS84IiwgInp3ZWlnZWx0IHN0cmF1c3MgMS84IiwgImNhbXBhcnJvbiA= (truncated),pedidos,2,5,2025-09-11T11:15:59.659Z,0,
MTA1,eyJpZCI6IDIxLCAibWVzYSI6IDEwNSwgImNvbWlkYSI6IFsidGFwYXMgbWl4dGFzIiwgInRhcGEgZGVsIGRpYSIsICJlbXAgYXJhYmUgKDIsODApIiwgImNob3Jpem8gZnJpdG8iLCAicGFsbWl0b3MiXSwgImJlYmlkYSI6IFs= (truncated),pedidos,2,6,2025-09-11T11:15:59.709Z,0,
MTA1,eyJpZCI6IDI0LCAibWVzYSI6IDEwNSwgImNvbWlkYSI6IFsiYnV0aWZhcnJhIiwgImxvbW8gYWRvYmFkbyIsICJ0YXBhIGRlbCBkaWEgNSIsICJlbXAgY2FwcmVzZSAoMiw4MCkiLCAiYWNlaXR1bmFzIGNvbiBsaW1vbiIsICI= (truncated),pedidos,2,7,2025-09-11T11:16:01.663Z,0,
MjAy,eyJpZCI6IDI2LCAibWVzYSI6IDIwMiwgImNvbWlkYSI6IFsidGFwYXMgbWl4dGFzIiwgInBvbGxvIGEgbGEgbW9zdGF6YSIsICJndWFjYW1vbGUiLCAicGFsbWl0b3MiLCAidGFwYSBkZWwgZGlhIl0sICJiZWJpZGEiOiBbImw= (truncated),pedidos,2,8,2025-09-11T11:16:02.402Z,0,
NDA0,eyJpZCI6IDI5LCAibWVzYSI6IDQwNCwgImNvbWlkYSI6IFsicGFuIHR1bWFjYSIsICJwaW1pZW50b3MgZGUgcGFkcm9uIiwgImFjZWl0dW5hcyBtaXgiLCAiZXNwaW5hY2EgYy9wYXNhcyB5IHBpbm9uZXMiLCAidG9ydGlsbGE= (truncated),pedidos,2,9,2025-09-11T11:16:03.457Z,0,


In [0]:
columns = [F.col(column).alias(f'_{column}') for column in df.columns]
df = df.select(*columns)

In [0]:
schema="""
  id long, 
  mesa string, 
  comida string, 
  bebida string
"""  

In [0]:
df=(df
      .withColumn("_ingested_at",F.current_timestamp()) #metadata
      .withColumn("value",F.from_json(F.col("_value").cast("string"),schema))
      .select("*","value.*")
      .drop("value")
)

In [0]:
(df
  .writeStream
  .trigger(availableNow=True)
  #.trigger(processingTime="60 seconds") # modo continuo
  .format("delta")
  .outputMode("append")
  .option("path", dataset_bronze_path)
  .option("mergeSchema", "true")
  .option("checkpointLocation", dataset_bronze_checkpoint_path)
  #.table(table_name)
  .start()
)


<pyspark.sql.streaming.query.StreamingQuery at 0x7f2faccf3510>

### Analítica en tiempo real

In [0]:
df_art = spark.read.format('delta').load(f'{path_silver}articulos')

In [0]:
productos = df_art.filter(df_art.wgr.isin(1, 2, 3, 4,7,8,9)).select('name')

In [0]:
lista_productos = [producto.name for producto in productos.collect()[:-1]]

In [0]:
!pip install confluent_kafka

Collecting confluent_kafka
  Obtaining dependency information for confluent_kafka from https://files.pythonhosted.org/packages/40/aa/ebf3facd881cb0b4e79fae29f040079ff2cb1ae4aabee08456e3ab536828/confluent_kafka-2.11.1-cp311-cp311-manylinux_2_28_x86_64.whl.metadata
  Downloading confluent_kafka-2.11.1-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (23 kB)
Downloading confluent_kafka-2.11.1-cp311-cp311-manylinux_2_28_x86_64.whl (3.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.9 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/3.9 MB[0m [31m4.1 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/3.9 MB[0m [31m5.0 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.6/3.9 MB[0m [31m5.7 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━

In [0]:
from random import randint
import json
from confluent_kafka import Producer 
from confluent_kafka.admin import NewTopic
from confluent_kafka.admin import AdminClient 

In [0]:
umbrales = {}
for producto in lista_productos:
    i = randint(1, 20)
    umbrales[producto] = {'bajo': i, 'agotado': i + randint(2,5)}

In [0]:
topic = "alertas"
new_topic = NewTopic(topic, num_partitions=3, replication_factor=3)
admin_client = AdminClient(conf) 
fs = admin_client.create_topics([new_topic])

In [0]:
topic_list = admin_client.list_topics().topics 
print(f"Topics en el cluster de Kafka:\n{topic_list}") 

Topics en el cluster de Kafka:
{'alertas': TopicMetadata(alertas, 3 partitions), 'pedidos': TopicMetadata(pedidos, 3 partitions)}


In [0]:
def alerta_doble_umbral(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    rows = batch_df.collect()
    for row in rows:
        producto = row["producto"]
        tipo = row["tipo_producto"]
        conteo = row["count"]

        umbral = umbrales.get(producto)
        if umbral:
            bajo = umbral.get("bajo")
            agotado = umbral.get("agotado")

            mensaje = None
            nivel = None

            if conteo >= agotado:
                nivel = "CRÍTICA"
                mensaje = f"{tipo.upper()} '{producto}' se ha AGOTADO: {conteo} pedidos (umbral = {agotado})"

            elif conteo >= bajo:
                nivel = "AVISO"
                mensaje = f"{tipo.upper()} '{producto}' con stock bajo: {conteo} pedidos (umbral = {bajo})"

            if mensaje:
                print(f"[{nivel}] {mensaje}")

                alerta = {
                    "producto": producto,
                    "tipo": tipo,
                    "nivel": nivel,
                    "mensaje": mensaje
                }

                producer = Producer({
                    "bootstrap.servers": conf["bootstrap.servers"],
                    "security.protocol": conf["security.protocol"],
                    "sasl.mechanism": conf["sasl.mechanism"],
                    "sasl.username": conf["sasl.username"],
                    "sasl.password": conf["sasl.password"],
                })

                producer.produce(
                    topic="alertas", 
                    key="alerta",
                    value=json.dumps(alerta)
                )
                producer.flush()


In [0]:
df_comida = df.select(F.explode(F.from_json("comida", ArrayType(StringType()))).alias("producto"),F.lit("comida").alias("tipo_producto"))
df_bebida = df.select(F.explode(F.from_json("bebida", ArrayType(StringType()))).alias("producto"),F.lit("bebida").alias("tipo_producto"))
df_productos = df_comida.union(df_bebida)

In [0]:
df_conteo = df_productos.groupBy("producto", "tipo_producto").count()

In [0]:
query = (
    df_conteo.writeStream
    .outputMode("complete")
    .foreachBatch(alerta_doble_umbral)
    .option("checkpointLocation", "/dbfs/tmp/checkpoints/alertas")
    .start()
)