## Instalación del paquete confluent-kafka para interactuar con Kafka desde Python sin usar Spark

In [None]:
!pip install confluent-kafka

## Funciones auxiliares para crear y borrar el topic revisiones (completa el código con el nombre de tu cluster)

In [None]:
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
import socket

def crear_topic(admin_client: AdminClient, topic_name: str):
    """
    Crea el topic llamado topic_name con una sola partición y factor replicación 1.
    No devuelve nada pero mostrará un mensaje indicando si ha ido bien o no.
    
    :param admin_client: objeto AdminClient con el cliente de Kafka ya configurado
    :param topic_name: string con el nombre del topic que deseamos crear
    """
    topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)
    fs = admin_client.create_topics([topic])
    for topic, f in fs.items():
        try:
            f.result()
            print(f"Topic {topic_name} creado")
        except Exception as e:
            print(f"Fallo al crear el topic {topic_name}: {e}")

def borrar_topic(admin_client: AdminClient, topic_name: str):
    """
    Borra un topic existente llamado topic_name que debe existir.
    No devuelve nada pero mostrará un mensaje indicando si ha ido bien o no.
    
    :param admin_client: objeto AdminClient con el cliente de Kafka ya configurado
    :param topic_name: string con el nombre del topic que deseamos borrar
    """
    fs = admin_client.delete_topics([topic_name])
    for topic, f in fs.items():
        try:
            f.result()
            print(f"Topic {topic_name} borrado")
        except Exception as e:
            print(f"Fallo al borrar el topic {topic_name}: {e}")

## Cliente para interactuar con Kafka mediante el paquete confluent-kafka, sin usar Spark

In [None]:
nombre_cluster = "jppg"    # COMPLETA ESTA LÍNEA

# DESCOMENTA ESTAS LÍNEAS PARA CREAR EL CLIENTE CON EL NOMBRE DEL CLUSTER

conf = {"bootstrap.servers": f"{nombre_cluster}-w-0:9092,{nombre_cluster}-w-1:9092",
         "sasl.mechanism": "PLAIN",
        "client.id": socket.gethostname()}

producer = Producer(conf)
admin_client = AdminClient(conf)

### Funciones para borrar el topic, si algo hubiera ido mal y queremos repetir, y para crearlo tras haberlo borrado (o la primera vez)

In [None]:
# borrar_topic(admin_client, "Call Event")
crear_topic(admin_client, "Call Event")

## Función para insertar datos en Kafka, en lotes de 10.000 cada 2 segundos, para dar tiempo a visualizar el resultado cambiante

In [None]:
import os
from concurrent.futures import ThreadPoolExecutor
import pandas as pd

executor = ThreadPoolExecutor(max_workers=1)

def insertar_kafka(producer: Producer, ruta_csv: str):
    """
    Inserta en Kafka los mensajes de las filas del fichero CSV en ruta_csv,
    escogiendo solo las columnas dest, dep_time y arr_delay. Cada fila es
    un mensaje formateado como JSON. 
    """
    # Función que ignoraremos ya que la arquitectura de Data Lakehouse solo utiliza almacenamiento del lago de datos
    if not os.path.isfile("calls.csv"):
        print("Trayendo fichero flights.csv desde HDFS al bucket de DBFS de Databricks Comunity Edition ...")
        os.system(f"hdfs dfs -copyToLocal -f {ruta_csv} .")
    
    print("Leyendo datos del CSV ...")
    calls_df = pd.read_csv("calls.csv")
    
    # Formateamos los mensajes como cadenas de texto con estructura de JSON con todos los campos a escribir
    calls_dict = calls_df.transpose().to_dict()
    mensajes = [str(calls_dict[i]).replace("'", "\"") for i in range(len(calls_dict))]
    
    print("Insertando mensajes en el topic Call Event ...")
    
    def insertar_bucle():
        import time
        for i, msj in enumerate(mensajes):
            producer.produce("Call Event", value=msj)
            if i % 10000 == 0:
                print(f"Se ha insertado el mensaje {i} con valor {msj}")
                producer.flush()
                time.sleep(5)
    
    # Ejecutamos de manera asíncrona todo el bucle de inserciones, para simultáneamente poder 
    # ejecutar la otra celda en la que hacemos show de la tabla de salida donde escribe Spark Structured Streaming
    resultado = executor.submit(insertar_bucle)
    return resultado

## Lectura de mensajes del topic creado

In [None]:
# Esto hace más eficientes las transformaciones wide con volúmenes pequeños de datos
spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")

# Reemplaza por el código correcto siguiendo las indicaciones anteriores
callsStreamingDF = spark.readStream\
                        .format("kafka")\
                        .option("kafka.bootstrap.servers", "jppg-w-0:9092,jppg-w-1:9092")\
                        .option("subscribe", "retrasos")\
                        .load()

Muestra por pantalla el esquema del DataFrame resultante de la lectura con `printSchema()`. Verás que todas estas columnas son creadas automáticamente por Spark cuando leemos de Kafka. De ellas, la que nos interesa es `value` que contiene propiamente el mensaje de Kafka, en formato datos binarios. 


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import pyspark.sql.functions as F

# Inicia Spark
spark = SparkSession.builder \
    .appName("Bronze_By_Candidate") \
    .getOrCreate()

# 1) Definimos el super‑schema con todas las columnas
esquema = StructType([
    StructField("call_date",           StringType(), nullable=True),
    StructField("phone_number_dialed", StringType(), nullable=True),
    StructField("status",              StringType(), nullable=True),
    StructField("user",                StringType(), nullable=True),
    StructField("full_name",           StringType(), nullable=True),
    StructField("campaign_id",         StringType(), nullable=True),
    StructField("vendor_lead_code",    StringType(), nullable=True),
    StructField("source_id",           StringType(), nullable=True),
    StructField("list_id",             StringType(), nullable=True),
    StructField("gmt_offset",          DoubleType(), nullable=True),
    StructField("alt_phone",           StringType(), nullable=True),
    StructField("security_phrase",     StringType(), nullable=True),
    StructField("comments",            StringType(), nullable=True),
    StructField("length_in_sec",       DoubleType(), nullable=True),
    StructField("user_group",          StringType(), nullable=True),
    StructField("alt_dial",            StringType(), nullable=True),
    StructField("list_name",           StringType(), nullable=True),  # clave de partición con el nombre del candidato de la campaña
    StructField("status_name",         StringType(), nullable=True),
    StructField("custom_fields",       StringType(), nullable=True),
    StructField("yard_sign",           StringType(), nullable=True),
    StructField("top_issue",           StringType(), nullable=True),
    StructField("campaign",            StringType(), nullable=True)
])

# 2) Leemos el stream original (p. ej. desde Kafka) en callsStreamingDF...
# callsStreamingDF = spark.readStream.format("kafka")...load()

# 3) Parseamos el JSON usando el esquema
parsedDF = (
    callsStreamingDF
      .select(F.col("value").cast("string").alias("json_str"))
      .withColumn("parsed", F.from_json("json_str", esquema))
      .select(
          F.col("parsed.call_date"),
          F.col("parsed.phone_number_dialed"),
          F.col("parsed.status"),
          F.col("parsed.user"),
          F.col("parsed.full_name"),
          F.col("parsed.campaign_id"),
          F.col("parsed.vendor_lead_code"),
          F.col("parsed.source_id"),
          F.col("parsed.list_id"),
          F.col("parsed.gmt_offset"),
          F.col("parsed.alt_phone"),
          F.col("parsed.security_phrase"),
          F.col("parsed.comments"),
          F.col("parsed.length_in_sec"),
          F.col("parsed.user_group"),
          F.col("parsed.alt_dial"),
          F.col("parsed.list_name"),        # nombre del candidato
          F.col("parsed.status_name"),
          F.col("parsed.custom_fields"),
          F.col("parsed.yard_sign"),
          F.col("parsed.top_issue"),
          F.col("parsed.campaign")
      )
)

# 4) Función para escribir cada microbatch por candidato
bronze_root = "/mnt/Data Call Center/Bronze"

def write_by_candidate(batch_df, batch_id):
    batch_df.persist()
    # obtenemos candidatos únicos
    candidates = [row.list_name for row in batch_df.select("list_name").distinct().collect()]
    for cand in candidates:
        out_path = f"{bronze_root}/{cand}/Calls"
        (batch_df
            .filter(F.col("list_name") == cand)
            .write
            .mode("append")
            .parquet(out_path)      # o .format("delta").save(out_path)
        )
    batch_df.unpersist()

# 5) Arrancamos el streaming con foreachBatch
(
    parsedDF.writeStream
        .foreachBatch(write_by_candidate)
        .option("checkpointLocation", "/mnt/bronze/_checkpoints/by_candidate")
        .start()
        .awaitTermination()
)

## Inserción en Kafka + lectura desde Kafka con Spark y agregación en tiempo real

Se crea una tabla apuntando a los ficheros de la capa Bronze en Calls para verificar que efectivamente los mensajes se están escribiendo 
correctamente en la capa Bronze

In [None]:
# Recuerda que en el proceso de escritura en el tópico, renombrarás previamente las columnas vacías del Dataframe para evitar lidiar con 
# columnas vacías en el super-esquema que se proporcionará al Streaming Dataframe. 
# Recuerda el ejemplo:
#import pandas as pd

#df = pd.DataFrame({
 #   'A': [1,2],
  #  'B': [3,4]
#})

# Diccionario con una clave que no está en df ('C')
#mapping = {'A': 'alpha', 'C': 'gamma'}

#df2 = df.rename(columns=mapping)
#print(df2.columns)
# Index(['alpha', 'B'], dtype='object')

In [None]:
import time

ruta_calls_dbfs = "dbfs://jppg/datos/calls.csv"   # ruta del fichero calls.csv en el sistema de ficheros dsitribuido de Databricks CE
resultado = insertar_kafka(producer, ruta_calls_dbfs)

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Registro_Bronze_Dinamico") \
    .enableHiveSupport() \
    .getOrCreate()

# 1) Creamos la base de datos si no existe
spark.sql("CREATE DATABASE IF NOT EXISTS call_center_db")

# 2) Creamos la tabla externa apuntando recursivamente al root de Bronze
spark.sql("""
  CREATE TABLE IF NOT EXISTS call_center_db.bronze_calls_dynamic (
    call_date            STRING,
    phone_number_dialed  STRING,
    status               STRING,
    user                 STRING,
    full_name            STRING,
    campaign_id          STRING,
    vendor_lead_code     STRING,
    source_id            STRING,
    list_id              STRING,
    gmt_offset           DOUBLE,
    alt_phone            STRING,
    security_phrase      STRING,
    comments             STRING,
    length_in_sec        DOUBLE,
    user_group           STRING,
    alt_dial             STRING,
    list_name            STRING,
    status_name          STRING,
    custom_fields        STRING,
    yard_sign            STRING,
    top_issue            STRING,
    campaign             STRING
  )
  USING parquet
  OPTIONS (
  path '/mnt/Data Call Center/Bronze/*/Calls/*.parquet',
  recursiveFileLookup 'false'
)
""")


Llamamos la tabla SQL y la mostramos cada 7 segundos

In [None]:
import time

for i in range(15):
    time.sleep(7)
    # Re‑ejecutar la consulta
    df = spark.sql("SELECT * FROM call_center_db.bronze_calls_dynamic")
    df.show(truncate=False)
