# **2do parcial - MVP TECNICO**

# Materia: Mineria de datos II


---


Pipelines

**(1) Batch**

Landing ----batch---->Bronze (csv)--->Silver---> Gold ---> Serving (cassandra)

**(2) Stream**

Landing ----speed----> Bronze (jsonl)--->Silver --> gold (stream) --->Serving (Cassandra)

**(3) Queries de cassandra**
---

Resumen de la implementación

*Cada celda (con excepcion de los demos) corresponde a un archivo .py que eventualmente va a estar en el repo final al moment ode la entrega final.

Por eso, se usaron parches como:


```
try:
    from cassandra_utils import get_cassandra_session, KEYSPACE
except ModuleNotFoundError:
    pass
```
En el archivo .py solo va a estar la llamada a cassandra_utils, pero como en el colab no es necesario, epuse ese parchecito temporal solo al efecto de la dentrega de este MVP.


Este notebook contiene un end-to-end que incluye:

    *implementacion de left anti joins para manejo de cuarentena (evitando duplicidad de errores) y estrategias de escritura segura.

    *reglas de de negocio activas (costos negativos, integridad referencial) y desvío automático a zona de Quarantine.

    *ptimizaciones Spark: Configuracion de shuffle.partitions para entorno local/Colab y uso de broadcast joins.

    -*streaming con watermarking, dedupe y checkpointing para tolerancia a fallos.

    *serving Layer: conexin a AstraDB (Cassandra) con modelado Query-First.


Laburo completo aca ----->[Repositorio GitHub (Código + Readme + Diagramas) ](https://github.com/Sinnick4r/Cloud_Provider_Analytics_MVP)



# CONFIG.PY

Funciones respecto a archivos y directorios


In [None]:
#por si no estan insalados en la ejecucion de colab, voy a lo seguro
!pip install cassandra-driver
!pip install astrapy
!pip install Plotly
!pip install pyspark

In [1]:
# config.py
from __future__ import annotations
import zipfile
from pathlib import Path
from typing import Final
from datetime import datetime

from pyspark.sql import SparkSession



# integracion con google drive que estuve usando, se puede poner true o false

USE_GOOGLE_DRIVE: Final[bool] = False
GOOGLE_DRIVE_PROJECT_SUBDIR: Final[str] = "Mineria de datos II/Proyecto Cloud Provider Analysis"



def get_project_root() -> Path:
    """
    Devuelve la raíz del proyecto.
    - so USE_GOOGLE_DRIVE es true, monta gdrive en Colab y usa la carpeta indicada.
    - si es Ffalse, usa el directorio actual (repo descomprimido).
    """
    if USE_GOOGLE_DRIVE:
        try:
            from google.colab import drive as gdrive
        except ImportError as exc:
            raise RuntimeError(
                "USE_GOOGLE_DRIVE=True pero no esamos en colab."
            ) from exc

        gdrive.mount("/content/drive")
        return (Path("/content/drive/MyDrive") / GOOGLE_DRIVE_PROJECT_SUBDIR).resolve()

    return Path(".").resolve()

#aca van loss directorios de todo el proyecto

PROJECT_ROOT: Final[Path] = get_project_root()
DATA_DIR: Final[Path] = PROJECT_ROOT / "data"
DATALAKE_ROOT: Final[Path] = PROJECT_ROOT / "datalake"

LANDING_PATH: Final[Path] = DATALAKE_ROOT / "landing"
BRONZE_PATH: Final[Path] = DATALAKE_ROOT / "bronze"
SILVER_PATH: Final[Path] = DATALAKE_ROOT / "silver"
GOLD_PATH: Final[Path] = DATALAKE_ROOT / "gold"
QUARANTINE_PATH: Final[Path] = DATALAKE_ROOT / "quarantine"

RAW_ZIP_NAME: Final[str] = "cloud_provider_challenge_dataset_v1.zip"


# sesion de spark

def create_spark(app_name: str = "CloudProviderAnalytics_Pipeline") -> SparkSession:
    spark = (
        SparkSession.builder
        .appName(app_name)
        .master("local[*]")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
    return spark


# utils de archivos/directorios

def ensure_dirs() -> None:
# Crea la estructura del datalake si no existe.
    for path in (LANDING_PATH, BRONZE_PATH, SILVER_PATH, GOLD_PATH, QUARANTINE_PATH):
        path.mkdir(parents=True, exist_ok=True)


def unpack_raw_dataset() -> None:

    #descomprime el zip del datalake
    zip_path = DATA_DIR / RAW_ZIP_NAME

    if not zip_path.exists():
        print(f"[WARN] No se encontró el ZIP de datos en {zip_path}. Saltando unpack.")
        return
    with zipfile.ZipFile(zip_path, "r") as zf:
        zf.extractall(PROJECT_ROOT)
    print(f"[OK] Dataset descomprimido en {PROJECT_ROOT / 'datalake' / 'landing'}")


'''
esta funcion log la hice para otro proyecto hace mucho, la uso aca solo para que
el resultado de la consola sea visualmente mas agradable en el ideo de presentacion
'''
def log(msg: str, level: str = "INFO"):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    colors = {
        "INFO": "\033[94m", # Azul
        "WARN": "\033[93m", # Amarillo
        "ERR":  "\033[91m", # Rojo
        "OK":   "\033[92m", # Verde
        "RUN":   "\033[96m", # Cyan
        "RESET": "\033[0m"
    }
    color = colors.get(level, colors["RESET"])
    print(f"{color}[{timestamp}] [{level}] {msg}{colors['RESET']}")

# schemas.py

Schemas estaticos para spark

In [2]:
# schemas.py


from __future__ import annotations

from typing import Final
from pyspark.sql import types as T


# Esquemascsv y json totalmente manuales despeus de chequearlos

customers_orgs_schema: Final[T.StructType] = T.StructType([
    T.StructField("org_id", T.StringType(), nullable=False),
    T.StructField("org_name", T.StringType(), nullable=True),
    T.StructField("industry", T.StringType(), nullable=True),
    T.StructField("hq_region", T.StringType(), nullable=True),
    T.StructField("plan_tier", T.StringType(), nullable=True),
    T.StructField("is_enterprise", T.BooleanType(), nullable=True),
    T.StructField("signup_date", T.DateType(), nullable=True),
    T.StructField("sales_rep", T.StringType(), nullable=True),
    T.StructField("lifecycle_stage", T.StringType(), nullable=True),
    T.StructField("marketing_source", T.StringType(), nullable=True),
    T.StructField("nps_score", T.DoubleType(), nullable=True),
])

users_schema: Final[T.StructType] = T.StructType([
    T.StructField("user_id", T.StringType(), nullable=False),
    T.StructField("org_id", T.StringType(), nullable=False),
    T.StructField("email", T.StringType(), nullable=True),
    T.StructField("role", T.StringType(), nullable=True),
    T.StructField("active", T.BooleanType(), nullable=True),
    T.StructField("created_at", T.DateType(), nullable=True),
    T.StructField("last_login", T.DateType(), nullable=True),
])

resources_schema: Final[T.StructType] = T.StructType([
    T.StructField("resource_id", T.StringType(), nullable=False),
    T.StructField("org_id", T.StringType(), nullable=False),
    T.StructField("service", T.StringType(), nullable=True),
    T.StructField("region", T.StringType(), nullable=True),
    T.StructField("created_at", T.DateType(), nullable=True),
    T.StructField("state", T.StringType(), nullable=True),
    T.StructField("tags_json", T.StringType(), nullable=True),
])

support_tickets_schema: Final[T.StructType] = T.StructType([
    T.StructField("ticket_id", T.StringType(), nullable=False),
    T.StructField("org_id", T.StringType(), nullable=False),
    T.StructField("category", T.StringType(), nullable=True),
    T.StructField("severity", T.StringType(), nullable=True),
    T.StructField("created_at", T.DateType(), nullable=True),
    T.StructField("resolved_at", T.DateType(), nullable=True),
    T.StructField("csat", T.DoubleType(), nullable=True),
    T.StructField("sla_breached", T.BooleanType(), nullable=True),
])

marketing_touches_schema: Final[T.StructType] = T.StructType([
    T.StructField("touch_id", T.StringType(), nullable=False),
    T.StructField("org_id", T.StringType(), nullable=False),
    T.StructField("campaign", T.StringType(), nullable=True),
    T.StructField("channel", T.StringType(), nullable=True),
    T.StructField("timestamp", T.DateType(), nullable=True),
    T.StructField("clicked", T.BooleanType(), nullable=True),
    T.StructField("converted", T.BooleanType(), nullable=True),
])

nps_surveys_schema: Final[T.StructType] = T.StructType([
    T.StructField("org_id", T.StringType(), nullable=False),
    T.StructField("survey_date", T.DateType(), nullable=True),
    T.StructField("nps_score", T.DoubleType(), nullable=True),
    T.StructField("comment", T.StringType(), nullable=True),
])

billing_monthly_schema: Final[T.StructType] = T.StructType([
    T.StructField("invoice_id", T.StringType(), nullable=False),
    T.StructField("org_id", T.StringType(), nullable=False),
    T.StructField("month", T.DateType(), nullable=True),
    T.StructField("subtotal", T.DecimalType(10,4), nullable=True),
    T.StructField("credits", T.DecimalType(10,4), nullable=True),
    T.StructField("taxes", T.DecimalType(10,4), nullable=True),
    T.StructField("currency", T.StringType(), nullable=True),
    T.StructField("exchange_rate_to_usd", T.DoubleType(), nullable=True),
])


usage_events_schema: Final[T.StructType] = T.StructType([
    T.StructField("event_id", T.StringType(), nullable=False),
    T.StructField("timestamp", T.StringType(), nullable=False),
    T.StructField("org_id", T.StringType(), nullable=False),
    T.StructField("resource_id", T.StringType(), nullable=False),
    T.StructField("service", T.StringType(), nullable=True),
    T.StructField("region", T.StringType(), nullable=True),
    T.StructField("metric", T.StringType(), nullable=True),
    T.StructField("value", T.DecimalType(10,4), nullable=True),
    T.StructField("unit", T.StringType(), nullable=True),
    T.StructField("cost_usd_increment", T.DecimalType(10,4), nullable=True),
    T.StructField("schema_version", T.IntegerType(), nullable=True),
    T.StructField("carbon_kg", T.DoubleType(), nullable=True),
])


# io_utils.py

Funciones de lecto-escritura de CSV y Parquet

In [3]:
# io_utils.py

from __future__ import annotations
from pathlib import Path
from typing import Final
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import types as T


# helpers de rutas genéricos

def zone_path(zone_root: Path, table_name: str) -> Path:
    """
    Devuelve la ruta completa a una tabla dentro de una zona del datalake.

    """

    return (zone_root / table_name).resolve()


def add_audit_columns(df: DataFrame) -> DataFrame:
    """
    Enriquece un DataFrame con columnas técnicas de auditoría:
    - ingest_ts: Timestamp de ingestión
    - source_file: Nombre del archivo origen
    """
    return df.withColumn("ingest_ts", F.current_timestamp()) \
             .withColumn("source_file", F.input_file_name())


def read_csv(
    spark: SparkSession,
    path: Path,
    schema: T.StructType,
    header: bool = True,
) -> DataFrame:

    return (
        spark.read
        .option("header", str(header).lower())
        .schema(schema)
        .csv(str(path))
    )



def write_parquet(
    df: DataFrame,
    base_path: Path,
    partition_cols: list[str] | None = None,
    mode: str = "overwrite",
) -> None:

    writer = df.write.mode(mode)
    if partition_cols:
        writer = writer.partitionBy(*partition_cols)
    writer.parquet(str(base_path))


def read_parquet(spark, base_path, partition_glob: str | None = None):

    #lee un Parquet
    #si no hay partition_glob lee la ruta / si hay artition_glob usa basePath y patron

    base_path = Path(base_path)
    base_str = str(base_path)

    if partition_glob:
        return (
            spark.read
                 .option("basePath", base_str)
                 .parquet(f"{base_str}/{partition_glob}")
        )

    return spark.read.parquet(base_str)


# audit.py

Funciones de chequeo de Quality de cada fase

In [4]:
# audit.py
from __future__ import annotations

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException

# Esto solo esta en el colab, en el codigo definitivo solo se importa
try:
    from config import BRONZE_PATH, SILVER_PATH, GOLD_PATH, QUARANTINE_PATH
    from io_utils import read_parquet, zone_path
except ModuleNotFoundError:
    import __main__ as _m
    BRONZE_PATH = _m.BRONZE_PATH
    SILVER_PATH = _m.SILVER_PATH
    GOLD_PATH = _m.GOLD_PATH
    QUARANTINE_PATH = _m.QUARANTINE_PATH
    read_parquet = _m.read_parquet
    zone_path = _m.zone_path


def audit_bronze_layer(spark: SparkSession, table_name: str, pk_col: str) -> None:

    # se chequea vol, uni de PK y el llenado con ingest_ts

    print(f"\n chequeo Bronze: {table_name}")

    path = zone_path(BRONZE_PATH, table_name)
    try:
        df = read_parquet(spark, path)
    except AnalysisException:
        print(f"[ERR] no se encontro la tabla en {path}")
        return

    # Vol
    count_total = df.count()

    # Uni
    count_distinct = df.select(pk_col).distinct().count()
    duplicates = count_total - count_distinct

    # ingest_ts
    if "ingest_ts" in df.columns:
        null_tech = df.filter(F.col("ingest_ts").isNull()).count()
    else:
        null_tech = "falta columna ingest_ts"

    print(f"registros totales: {count_total}")
    print(f"duplicados en PK ({pk_col}): {duplicates}")
    print(f"nulos en ingest_ts: {null_tech}")

    # Resulktado
    if duplicates == 0 and (isinstance(null_tech, int) and null_tech == 0):
        print("Resultado: todo ok!!")
    else:
        print("Resultado: revisar data por posibles duplicados ")


def audit_silver_quality(spark: SparkSession) -> None:

    #Chequea el resultado del proceso Silver Batch, calculando un ratio entre registros en Silver vs Cuarentena
    print(f"\n Chequeop Silver:")

    path_good = zone_path(SILVER_PATH, "usage_events_enriched")
    path_bad  = zone_path(QUARANTINE_PATH, "usage_events_quarantine")

    # contar buenos
    try:
        good_df = read_parquet(spark, path_good)
        count_good = good_df.count()
    except AnalysisException:
        count_good = 0

    # contar malos
    try:
        bad_df = read_parquet(spark, path_bad)
        count_bad = bad_df.count()
        has_bad = True
    except AnalysisException:
        count_bad = 0
        has_bad = False

    total = count_good + count_bad
    if total == 0:
        print("[WARN] No hay datos procesados en Silver ni cuarentena.")
        return

    bad_ratio = (count_bad / total) * 100

    print(f"Total: {total}")
    print(f"Aceptados (Silver): {count_good}")
    print(f"Rechazados (cuarentena): {count_bad} ({bad_ratio:.2f}%)")

    if bad_ratio == 0:
        print("CALIDAD: PERFECTA")
    elif bad_ratio < 5:
        print("CALIDAD: ACEPTABLE")
    else:
        print("CALIDAD: CRÍTICA (>5% Rechazo). Revisar reglas de negocio.")

    if has_bad:
        print("ejemplo de rechazo:")
        bad_df.select("event_id", "quarantine_reason").show(1, truncate=False)


def audit_gold_layer(spark: SparkSession, table_name: str, check_col: str = "daily_cost_usd") -> None:

    #Audita un Mart Gold verificando reglas de negocio para serving layer: Vol, Integridad y KPIs
    print(f"\n Chequeo Gold: {table_name}")

    path = zone_path(GOLD_PATH, table_name)
    try:
        # Intentamos leer con wildcards para atrapar cualquier partición (event_date, ticket_date, etc.)
        # El truco es usar recursiveFileLookup o simplemente leer la carpeta raíz
        df = spark.read.option("basePath", str(path)).parquet(str(path / "*"))
    except AnalysisException:
        try:
            # Fallback: leer la raíz directa (a veces funciona mejor en local)
            df = spark.read.parquet(str(path))
        except AnalysisException:
            print(f"   [ERR] No se encontró el mart en {path}")
            return

    count_total = df.count()

    # Regla: Costos Negativos
    neg_costs = df.filter(F.col(check_col) < 0).count()

    print(f"registros Totales (Agregados): {count_total}")
    print(f"Costos Negativos detectados: {neg_costs}")

def audit_quarantine(spark: SparkSession):
    print(f"\nCalidad de Datos (Silver Batch)")

    path_good = zone_path(SILVER_PATH, "usage_events_enriched")
    path_bad  = zone_path(QUARANTINE_PATH, "usage_events_quarantine")

    # conteo de datos buenos
    try:
        good_df = read_parquet(spark, path_good)
        count_good = good_df.count()
    except AnalysisException:
        count_good = 0
        print("[WARN] No hay data en Silver.")

    # conteo  malos
    try:
        bad_df = read_parquet(spark, path_bad)
        count_bad = bad_df.count()
        has_bad_data = True
    except AnalysisException:
        count_bad = 0
        has_bad_data = False
        print("[INFO] No hay data en cuarentena")

    # ratio
    total = count_good + count_bad
    if total == 0:
        print("[ERR] No hay data procesada")
        return

    bad_ratio = (count_bad / total) * 100

    print(f"\n Estadisticas:")
    print(f"Total Procesado: {total}")
    print(f"Aceptados (Silver): {count_good} ({(100 - bad_ratio):.2f}%)")
    print(f"Rechazados (Quarantine): {count_bad} ({bad_ratio:.2f}%)")

    print("\nresultado:")

    if bad_ratio == 0:
        print("Satifactorio - sin datos rechazados")
    elif bad_ratio < 5:
        print("Aceptable- rechazo  bajo y esperado.")
    else:
        print("malo -demasiada data rechazada (>5%)")

    # muestra errores
    if has_bad_data:
        print("\n Muestra de registros en Cuarentena (top 5):")
        cols_to_show = ["event_id", "cost_usd_increment", "org_id", "quarantine_reason"]
        actual_cols = [c for c in cols_to_show if c in bad_df.columns]
        bad_df.select(*actual_cols).show(5, truncate=False)


def audit_speed_layer_results(spark: SparkSession):

    #aca se chequea que la Speed Layer haya persistido datos en disco.
    #Se ejecuta despues de parar el stream.

    print(f"\n cheque de Speed Layer ---")
    path = zone_path(GOLD_PATH, "org_daily_usage_by_service_speed")

    try:
        df = read_parquet(spark, path, partition_glob="*")
        total_rows = df.count()

        print(f"Ruta: {path}, Total acumulado en disco: {total_rows}")

        if total_rows > 0:
          print("funcionando todo OK (Datos persistidos correctamente)")
          df.show(3, truncate=False)
        else:
          print("vacio - dejar el stream corriendo mas tiempo")
    except Exception as e:
       print(f"[ERR] No se pudo leer la Speed Layer: {e}")

def run_full_bronze_audit(spark: SparkSession):
    log("Iniciando Auditoría completa de capa Bronze...", "RUN")

    # Mapa de Tabla -> Columna Clave (PK) para chequear unicidad
    # Basado en tus schemas.py
    tables_to_audit = [
        ("customers_orgs", "org_id"),
        ("users", "user_id"),
        ("resources", "resource_id"),
        ("support_tickets", "ticket_id"),
        ("marketing_touches", "touch_id"),
        ("billing_monthly", "invoice_id"),
        # Nota: nps_surveys puede tener varias encuestas por org,
        # así que 'org_id' podría dar duplicados (lo cual es correcto funcionalmente)
        ("nps_surveys", "org_id")
    ]

    for table_name, pk in tables_to_audit:
        # Llamamos a tu función de audit.py
        audit_bronze_layer(spark, table_name, pk_col=pk)

    log("Auditoría Bronze finalizada.", "OK")

def run_full_silver_audit(spark: SparkSession):
    log("Iniciando Auditoría de Calidad Silver...", "RUN")

    # 1. Auditoría de Calidad (Ratio Quarantine)
    audit_silver_quality(spark)

    # 2. Auditoría de la tabla de Cuarentena (Muestreo)
    audit_quarantine(spark)

    log("Auditoría Silver finalizada.", "OK")

def run_full_gold_audit(spark: SparkSession):
    log("Iniciando Auditoría de Marts Gold...", "RUN")

    # Lista de tuplas: (Nombre Tabla, Columna a validar)
    marts_to_audit = [
        ("org_daily_usage_by_service", "daily_cost_usd"),   # FinOps
        ("org_daily_support_metrics",  "total_tickets"),    # Soporte (No queremos tickets negativos)
        ("org_daily_genai_usage",      "genai_daily_cost")  # GenAI
    ]

    for table, col in marts_to_audit:
        audit_gold_layer(spark, table, check_col=col)

    log("Auditoría Gold finalizada.", "OK")

# bronze_batch.py

Ingesta de datos de la Capa Batch

In [5]:
# bronze_batch.py

from __future__ import annotations

from pathlib import Path
from typing import Optional
from tqdm.notebook import tqdm

from pyspark.sql import DataFrame, SparkSession


try:
    from config import LANDING_PATH, BRONZE_PATH

except ModuleNotFoundError:
    import __main__ as _m
    try:
        LANDING_PATH = _m.LANDING_PATH
        BRONZE_PATH = _m.BRONZE_PATH
    except AttributeError as exc:
        raise RuntimeError(
            "No se pudo importar config, hay que corre primero la celda 'config.py'."
        ) from exc


# importacion de squemas

try:
    from schemas import (
        customers_orgs_schema,
        users_schema,
        resources_schema,
        support_tickets_schema,
        marketing_touches_schema,
        nps_surveys_schema,
        billing_monthly_schema,
    )
except ModuleNotFoundError:
    import __main__ as _m  # type: ignore[import]
    try:
        customers_orgs_schema = _m.customers_orgs_schema
        users_schema = _m.users_schema
        resources_schema = _m.resources_schema
        support_tickets_schema = _m.support_tickets_schema
        marketing_touches_schema = _m.marketing_touches_schema
        nps_surveys_schema = _m.nps_surveys_schema
        billing_monthly_schema = _m.billing_monthly_schema
    except AttributeError as exc:
        raise RuntimeError(
            "No se pudo importar schemas, hayque correr primero la celda 'schemas.py'"
        ) from exc


#  importacion de todo lo qe es IO desde io_utils

try:
    from io_utils import read_csv, write_parquet, zone_path, add_audit_columns
except ModuleNotFoundError:
    import __main__ as _m
    try:
        read_csv = _m.read_csv
        write_parquet = _m.write_parquet
        zone_path = _m.zone_path
    except AttributeError as exc:
        raise RuntimeError(
            "No se pudo importar io_utils, hay que correr antes primero la celda 'io_utils.py'."
        ) from exc


# helper interno para leer .csv

def _read_landing_csv(
    spark: SparkSession,
    file_name: str,
    schema,
) -> Optional[DataFrame]:

    csv_path = LANDING_PATH / file_name
    if not csv_path.exists():
        print(f"[WARN] CSV no encontrado en landing: {csv_path}")
        return None

    print(f"[INFO] Leyendo {csv_path}")
    return read_csv(spark, csv_path, schema)


# Ingesta

def ingest_customers_orgs_to_bronze(spark: SparkSession) -> None:
    df = _read_landing_csv(spark, "customers_orgs.csv", customers_orgs_schema)
    if df is None: return
    df = add_audit_columns(df)
    dest = zone_path(BRONZE_PATH, "customers_orgs")
    write_parquet(df, dest, partition_cols=["hq_region"])
    print(f"[OK] Bronze customers_orgs -> {dest}")


def ingest_users_to_bronze(spark: SparkSession) -> None:
    df = _read_landing_csv(spark, "users.csv", users_schema)
    if df is None:
        return
    df = add_audit_columns(df)
    dest = zone_path(BRONZE_PATH, "users")
    write_parquet(df, dest, partition_cols=["role"])
    print(f"[OK] Bronze users -> {dest}")


def ingest_resources_to_bronze(spark: SparkSession) -> None:
    df = _read_landing_csv(spark, "resources.csv", resources_schema)
    if df is None:
        return
    df = add_audit_columns(df)
    dest = zone_path(BRONZE_PATH, "resources")
    write_parquet(df, dest, partition_cols=["region"])
    print(f"[OK] Bronze resources -> {dest}")


def ingest_support_tickets_to_bronze(spark: SparkSession) -> None:
    df = _read_landing_csv(spark, "support_tickets.csv", support_tickets_schema)
    if df is None:
        return
    df = add_audit_columns(df)
    dest = zone_path(BRONZE_PATH, "support_tickets")
    write_parquet(df, dest, partition_cols=["severity"])
    print(f"[OK] Bronze support_tickets -> {dest}")


def ingest_marketing_touches_to_bronze(spark: SparkSession) -> None:
    df = _read_landing_csv(spark, "marketing_touches.csv", marketing_touches_schema)
    if df is None:
        return
    df = add_audit_columns(df)
    dest = zone_path(BRONZE_PATH, "marketing_touches")
    write_parquet(df, dest, partition_cols=["channel"])
    print(f"[OK] Bronze marketing_touches -> {dest}")


def ingest_nps_surveys_to_bronze(spark: SparkSession) -> None:
    df = _read_landing_csv(spark, "nps_surveys.csv", nps_surveys_schema)
    if df is None:
        return
    df = add_audit_columns(df)
    dest = zone_path(BRONZE_PATH, "nps_surveys")
    write_parquet(df, dest, partition_cols=["survey_date"])
    print(f"[OK] Bronze nps_surveys -> {dest}")


def ingest_billing_monthly_to_bronze(spark: SparkSession) -> None:
    df = _read_landing_csv(spark, "billing_monthly.csv", billing_monthly_schema)
    if df is None:
        return
    df = add_audit_columns(df)
    dest = zone_path(BRONZE_PATH, "billing_monthly")
    write_parquet(df, dest, partition_cols=["month"])
    print(f"[OK] Bronze billing_monthly -> {dest}")


# orrquestador de Bronze batch

def run_bronze_batch(spark: SparkSession) -> None:
  print("\n[BATCH] Iniciando Ingesta a Bronze (7 Maestros)...")
  '''
    ingest_customers_orgs_to_bronze(spark)
    ingest_users_to_bronze(spark)
    ingest_resources_to_bronze(spark)
    ingest_support_tickets_to_bronze(spark)
    ingest_marketing_touches_to_bronze(spark)
    ingest_nps_surveys_to_bronze(spark)
    ingest_billing_monthly_to_bronze(spark)
    '''
  tasks = [
    (ingest_customers_orgs_to_bronze, "Customers"),
    (ingest_users_to_bronze, "Users"),
    (ingest_resources_to_bronze, "Resources"),
    (ingest_support_tickets_to_bronze, "Support Tickets"),
    (ingest_marketing_touches_to_bronze, "Marketing"),
    (ingest_nps_surveys_to_bronze, "NPS Surveys"),
    (ingest_billing_monthly_to_bronze, "Billing")
    ]
  # Use barra de progreso de tqdm para hacer visualmente mas atractivo¿a la ejecucion
  for func, name in tqdm(tasks, desc="Procesando Archivos", unit="tablas"):
    func(spark)
    time.sleep(0.1)
  log("Capa Bronze finalizada correctamente", "OK")

# bronze_stream.py

Ingesta de fdatos para el Stream de la capa Speed


In [6]:
 # bronze_stream.py

from __future__ import annotations

from typing import Optional

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F


try:
    from config import LANDING_PATH, BRONZE_PATH
except ModuleNotFoundError:
    import __main__ as _m
    try:
        LANDING_PATH = _m.LANDING_PATH
        BRONZE_PATH = _m.BRONZE_PATH
    except AttributeError as exc:
        raise RuntimeError(
            "No se pudo importar config, hay que correr primero la celda 'config.py'."
        ) from exc


try:
    from schemas import usage_events_schema
except ModuleNotFoundError:
    import __main__ as _m
    try:
        usage_events_schema = _m.usage_events_schema
    except AttributeError as exc:
        raise RuntimeError(
            "No se pudo importar schemas, hay que correr primero la celda 'schemas.py'."
        ) from exc



try:
    from io_utils import zone_path
except ModuleNotFoundError:
    import __main__ as _m
    try:
        zone_path = _m.zone_path
    except AttributeError as exc:
        raise RuntimeError(
            "No se pudo importar io_utils hay qeu ejecutar primero la celda 'io_utils.py'."
        ) from exc


# -creacion de DF de streaming

def create_usage_events_stream(spark: SparkSession) -> DataFrame:

    src_dir = LANDING_PATH / "usage_events_stream"

    return (
        spark.readStream
        .schema(usage_events_schema)
        .option("maxFilesPerTrigger", 1)
        .json(str(src_dir))
    )


def transform_usage_events_bronze(df_stream: DataFrame) -> DataFrame:

    # Transformaciones :'timestamp' a 'event_ts', 'event_date' (date),  watermark y dedupe por event_id

    df = (
        df_stream
        .withColumn("event_ts", F.to_timestamp("timestamp"))
        .withColumn("event_date", F.to_date("event_ts"))
    )

    df = (
      df
      .withWatermark("event_ts", "1 day")
      .dropDuplicates(["event_id"])
    )

    return df

  # Arranca el streaming desde usage_events_stream en Landing

def start_usage_events_to_bronze(spark: SparkSession):

    df_stream = create_usage_events_stream(spark)
    df_bronze = transform_usage_events_bronze(df_stream)

    dest_path = zone_path(BRONZE_PATH, "usage_events")
    checkpoint_path = BRONZE_PATH / "_checkpoints" / "usage_events"

    query = (
        df_bronze
        .writeStream
        .format("parquet")
        .option("checkpointLocation", str(checkpoint_path))
        .option("path", str(dest_path))
        .partitionBy("event_date")
        .outputMode("append")
        .start()
    )

    print(f"[INFO] Streaming usage_events -> {dest_path}")
    print(f"[INFO] Checkpoints en {checkpoint_path}")
    return query

# silver.py

Tratamiento de datos para su pasarlos a la fase gold

In [7]:
# silver.py


from __future__ import annotations

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F

# parche de imports para colab de nuevo, esto en el PY final no va a estar
try:
    from config import BRONZE_PATH, SILVER_PATH, QUARANTINE_PATH
except ModuleNotFoundError:
    import __main__ as _m
    BRONZE_PATH = _m.BRONZE_PATH
    SILVER_PATH = _m.SILVER_PATH
    QUARANTINE_PATH = _m.QUARANTINE_PATH

try:
    from io_utils import read_parquet, write_parquet, zone_path
except ModuleNotFoundError:
    import __main__ as _m
    read_parquet = _m.read_parquet
    write_parquet = _m.write_parquet
    zone_path = _m.zone_path



def read_bronze_usage_events(spark: SparkSession) -> DataFrame:
    return read_parquet(spark, zone_path(BRONZE_PATH, "usage_events"), partition_glob="event_date=*")

def read_bronze_customers_orgs(spark: SparkSession) -> DataFrame:
    return read_parquet(spark, zone_path(BRONZE_PATH, "customers_orgs"))

# -impieza, Joins y cuarentena

def run_silver_batch(spark: SparkSession) -> None:

    print("[INFO] Iniciando Silver...")

    usage_df = read_bronze_usage_events(spark)
    orgs_df = read_bronze_customers_orgs(spark)

    orgs_sel = orgs_df.select(
        "org_id", "org_name", "hq_region", "plan_tier", "is_enterprise"
    )

    # Join broadcast) // se usa broadcast porque orgs es chica comparada con eventos
    enriched_df = usage_df.join(F.broadcast(orgs_sel), on="org_id", how="left")

    # Reglas:
    # 1: El costo no puede ser negativo (permitimos 0 o mayor, o -0.00...1 errores de float y decimal, pero defino corte en -0.01 para mayor seguridad)
    # 2: tiene tener org_id (el join lo mantiene, pero se valida que no sea nulo si era inner logic)
    dq_condition = (F.col("cost_usd_increment") >= -0.01) & (F.col("org_id").isNotNull())

    # split
    good_df = enriched_df.filter(dq_condition)
    bad_df = enriched_df.filter(~dq_condition)

    if not bad_df.rdd.isEmpty():
        # A. Preparar datos fallidos actuales
        bad_df = bad_df.withColumn("quarantine_reason", F.lit("cost_negative_or_null_org"))

        quarantine_dest = zone_path(QUARANTINE_PATH, "usage_events_quarantine")

        #  se verifica si ya existe para no duplicar
        try:
            existing_quarantine = read_parquet(spark, quarantine_dest)

            # C. aca hago el left anti join para no tener dupes en cuarentena
            unique_bad_df = bad_df.join(
                existing_quarantine,
                on="event_id",
                how="left_anti"
            )

            new_errors_count = unique_bad_df.count()
            if new_errors_count > 0:
                print(f"[WARN] Nuevos registros invalidos detectados: {new_errors_count}")
                write_parquet(unique_bad_df, quarantine_dest, mode="append")
            else:
                print(f"[WARN] Errores detectados ya existian en cuarentena")

        except Exception:
            # si no hay archivo, se crea
            print(f"[WARN] Creando cuarentena por primera vez")
            write_parquet(bad_df, quarantine_dest, mode="append")

    # escritura de Silver limpio
    silver_dest = zone_path(SILVER_PATH, "usage_events_enriched")
    good_df = good_df.withColumnRenamed("service", "service_name")

    write_parquet(
        good_df,
        silver_dest,
        partition_cols=["event_date"],
        mode="overwrite"
    )
    print(f"[OK] Silver Batch completado, todo ok -> {silver_dest}")

# gold.py

Creacion de los marts de negocios para ser subidos a Cassandra

In [8]:
#gold.py
import pandas as pd

from __future__ import annotations
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F

# parche de imports para colab de nuevo, esto en el PY final no va a estar
try:
    from config import SILVER_PATH, GOLD_PATH, BRONZE_PATH
except ModuleNotFoundError:
    import __main__ as _m
    SILVER_PATH = _m.SILVER_PATH
    GOLD_PATH = _m.GOLD_PATH
    BRONZE_PATH = _m.BRONZE_PATH

try:
    from io_utils import read_parquet, write_parquet, zone_path
except ModuleNotFoundError:
    import __main__ as _m
    read_parquet = _m.read_parquet
    write_parquet = _m.write_parquet
    zone_path = _m.zone_path

try:
    from bronze_stream import create_usage_events_stream, transform_usage_events_bronze
except ModuleNotFoundError:
    import __main__ as _m
    create_usage_events_stream = _m.create_usage_events_stream
    transform_usage_events_bronze = _m.transform_usage_events_bronze


# Gold de Batch, en las proximas funcioens se generan losmarts de Finopsl, Support y Genai

def build_gold_finops_mart(spark: SparkSession) -> DataFrame:


    silver_path = zone_path(SILVER_PATH, "usage_events_enriched")
    silver_df = read_parquet(spark, silver_path)

    # agregaciones
    aggregated_df = (
        silver_df
        .groupBy("org_id", "org_name", "service_name", "event_date", "hq_region", "plan_tier")
        .agg(
            F.sum("cost_usd_increment").alias("daily_cost_usd"),
            F.sum(
                F.when(F.col("metric") == "requests", F.col("value")).otherwise(0.0)
            ).alias("daily_requests"),
            F.sum("carbon_kg").alias("daily_carbon_kg")
        )
    )

    #KPIs
    gold_df = (
        aggregated_df
        .withColumn(
            "cost_per_request",
            F.when(F.col("daily_requests") > 0,
                   F.col("daily_cost_usd") / F.col("daily_requests")).otherwise(None)
        )
        .withColumn(
            "carbon_per_dollar",
            F.when(F.col("daily_cost_usd") > 0,
                   F.col("daily_carbon_kg") / F.col("daily_cost_usd")).otherwise(None)
        )
    )

    return gold_df

def run_gold_batch_finops_mart(spark: SparkSession) -> None:
    df = build_gold_finops_mart(spark)
    dest = zone_path(GOLD_PATH, "org_daily_usage_by_service")
    write_parquet(df, dest, partition_cols=["event_date"], mode="overwrite")
    print(f"[OK] Gold Batch (FinOps) -> {dest}")


def build_gold_support_mart(spark: SparkSession) -> DataFrame:
    tickets_path = zone_path(BRONZE_PATH, "support_tickets")
    tickets_df = read_parquet(spark, tickets_path, partition_glob="severity=*")
    orgs_path = zone_path(BRONZE_PATH, "customers_orgs")
    orgs_df = read_parquet(spark, orgs_path).select("org_id", "org_name")

    metrics_df = (
        tickets_df
        .groupBy("org_id", "created_at")
        .agg(
            F.count("ticket_id").alias("total_tickets"),
            # FIX: Pasamos a lower() antes de comparar
            F.sum(F.when(F.lower(F.col("severity")) == "critical", 1).otherwise(0)).alias("critical_tickets"),
            F.sum(F.when(F.col("sla_breached") == True, 1).otherwise(0)).alias("sla_breached_count"),
            F.avg("csat").alias("avg_csat")
        )
        .withColumnRenamed("created_at", "ticket_date")
    )
    return metrics_df.join(orgs_df, on="org_id", how="left").withColumn(
        "sla_breach_rate",
        F.when(F.col("total_tickets") > 0, F.round(F.col("sla_breached_count") / F.col("total_tickets"), 4)).otherwise(0.0)
    )

def run_gold_support_batch(spark: SparkSession) -> None:
    df = build_gold_support_mart(spark)
    dest = zone_path(GOLD_PATH, "org_daily_support_metrics")
    write_parquet(df, dest, partition_cols=["ticket_date"], mode="overwrite")
    print(f"[OK] Gold Batch (Support FIX) -> {dest}")

def build_gold_genai_mart(spark: SparkSession) -> DataFrame:
    silver_path = zone_path(SILVER_PATH, "usage_events_enriched")
    df = read_parquet(spark, silver_path)

    genai_df = (
        df
        .filter(F.lower(F.col("service_name")).contains("genai"))
        .groupBy("org_id", "org_name", "event_date", "service_name")
        .agg(
            F.sum("cost_usd_increment").alias("genai_daily_cost"),
            F.sum(
                F.when(F.col("metric") == "requests", F.col("value")).otherwise(0)
            ).alias("genai_requests_count")
        )
    )
    return genai_df

def run_gold_genai_batch(spark: SparkSession) -> None:
    df = build_gold_genai_mart(spark)
    dest = zone_path(GOLD_PATH, "org_daily_genai_usage")
    write_parquet(df, dest, partition_cols=["event_date"], mode="overwrite")
    print(f"[OK] Gold Batch (GenAI final) -> {dest}")


def run_full_gold_batch(spark: SparkSession) -> None:
  run_gold_batch_finops_mart(spark)
  run_gold_support_batch(spark)
  run_gold_genai_batch(spark)


# SPEED GOLD: Streaming Directo a Gold

def start_gold_speed_stream(spark: SparkSession):

    # Speed Layer: Lee stream yvuelca a Gold
    # se usa cache de  Orgs para evitar I/O repetitivo y Coalesce(1) para evitar el problema de tener muchos archivos chicos en Gold.

    print("[INFO] Comenzando Speed Layer...")

    # Stream
    raw_stream = create_usage_events_stream(spark)
    stream_bronze = transform_usage_events_bronze(raw_stream)

    #  cacheo de Orgs

    orgs_df = read_parquet(spark, zone_path(BRONZE_PATH, "customers_orgs"))
    orgs_sel = orgs_df.select("org_id", "org_name", "hq_region", "plan_tier")
    orgs_sel.cache()
    print(f"[INFO] Dimensión Organizaciones cacheada: {orgs_sel.count()} registros.")


    dest_speed = zone_path(GOLD_PATH, "org_daily_usage_by_service_speed")

    def process_microbatch(batch_df: DataFrame, batch_id: int):

        if batch_df.rdd.isEmpty():
            return

        # metricas
        input_count = batch_df.count()

        # procesado
        enriched = batch_df.join(F.broadcast(orgs_sel), on="org_id", how="left")

        # data quality
        valid_stream = enriched.filter(F.col("cost_usd_increment") >= -0.01)
        valid_count = valid_stream.count()
        dropped_count = input_count - valid_count

        # aregaciones
        agg_batch = (
            valid_stream
            .groupBy("org_id", "org_name", "service", "event_date")
            .agg(
                F.sum("cost_usd_increment").alias("daily_cost_usd"),
                F.sum(F.when(F.col("metric") == "requests", F.col("value")).otherwise(0)).alias("daily_requests"),
                F.sum("carbon_kg").alias("daily_carbon_kg")
            )
            .withColumnRenamed("service", "service_name")
        )

        # Append
        (
            agg_batch
            .coalesce(1)
            .write
            .mode("append")
            .partitionBy("event_date")
            .parquet(str(dest_speed))
        )

        #  Logde quality para monitoreo ---

        print(f"[STREAM {batch_id}] Reporte ")
        print(f" Input: {input_count} eventos")
        print(f"Validos: {valid_count}")
        if dropped_count > 0:
            print(f"Dropped (Cost < -0.01): {dropped_count} ({(dropped_count/input_count)*100:.1f}%)")
        print(f"todo pasado a Gold de Speed layer")

    # Arranca Stream con outputMode("update") para permitir agregacionesy con  foreachBatch manejando la salida final
    query = (
        stream_bronze
        .writeStream
        .foreachBatch(process_microbatch)
        .outputMode("update")
        .trigger(processingTime="5 seconds") # trigger para no saturar
        .start()
    )

    print(f"[INFO] Streaming ejecutandose -> {dest_speed}")
    return query

# cassandra_utils.puy

Crea la instancia de cassandra conlas credenciales provistas en la carpeta /creds y agreaga las tablas al Keyspace asignado

In [None]:
# cassandra_utils.py

import os
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from dotenv import load_dotenv
from pathlib import Path

# carga de credenciales
load_dotenv("/content/creds/cred.env", override=True)
# config
SECURE_BUNDLE_PATH = "/content/creds/secure-connect-proyecto-cloud-analytics.zip"
ASTRA_DB_TOKEN = os.getenv("ASTRA_DB_APPLICATION_TOKEN")
KEYSPACE = "Cloud_analytics_db"

# funcines de implementacion de cassandra: tomar, crear e insertar
def get_cassandra_session():

    if not Path(SECURE_BUNDLE_PATH).exists():
        raise FileNotFoundError(f"Falta el Secure Connect Bundle en: {SECURE_BUNDLE_PATH}")

    if not ASTRA_DB_TOKEN:
        raise RuntimeError("No se encontro ASTRA_DB_APPLICATION_TOKEN en cred.env")

    cloud_config = {
        'secure_connect_bundle': SECURE_BUNDLE_PATH
    }

    auth_provider = PlainTextAuthProvider("token", ASTRA_DB_TOKEN)

    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider, protocol_version=4)
    session = cluster.connect()
    return session

def create_schema(session):
    print(f"[CASSANDRA] Creando esquema en keyspace '{KEYSPACE}'...")

    # 1. Tabla FinOps (Ya existía)
    ddl_finops = f"""
    CREATE TABLE IF NOT EXISTS "{KEYSPACE}".org_daily_usage_by_service (
        org_id text,
        usage_date date,
        service_name text,
        daily_cost_usd double,
        daily_requests double,
        daily_carbon_kg double,
        cost_per_request double,
        carbon_per_dollar double,
        PRIMARY KEY ((org_id), usage_date, service_name)
    ) WITH CLUSTERING ORDER BY (usage_date DESC, service_name ASC);
    """
    session.execute(ddl_finops)

    # 2. NUEVA: Tabla de Soporte (Support Mart)
    # PK: org_id (partition), ticket_date (clustering) para ver evolución temporal
    ddl_support = f"""
    CREATE TABLE IF NOT EXISTS "{KEYSPACE}".org_daily_support_metrics (
        org_id text,
        ticket_date date,
        total_tickets int,
        critical_tickets int,
        sla_breached_count int,
        avg_csat double,
        sla_breach_rate double,
        PRIMARY KEY ((org_id), ticket_date)
    ) WITH CLUSTERING ORDER BY (ticket_date DESC);
    """
    session.execute(ddl_support)

    # 3. NUEVA: Tabla de GenAI (Product Mart)
    # PK: org_id (partition), event_date + service (clustering)
    ddl_genai = f"""
    CREATE TABLE IF NOT EXISTS "{KEYSPACE}".org_daily_genai_usage (
        org_id text,
        event_date date,
        service_name text,
        genai_daily_cost double,
        genai_requests_count double,
        PRIMARY KEY ((org_id), event_date, service_name)
    ) WITH CLUSTERING ORDER BY (event_date DESC);
    """
    session.execute(ddl_genai)

    print("[CASSANDRA] Esquema verificado: 3 Tablas listas.")

def insert_batch_to_cassandra(rows: list[dict]):
    if not rows:
        return
    session = get_cassandra_session()

    query = f"""
    INSERT INTO "{KEYSPACE}".org_daily_usage_by_service (
        org_id, usage_date, service_name,
        daily_cost_usd, daily_requests, daily_carbon_kg,
        cost_per_request, carbon_per_dollar
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """
    prepared = session.prepare(query)

    for row in rows:
        session.execute(prepared, (
            row["org_id"],
            row["event_date"],
            row["service_name"],
            row["daily_cost_usd"],
            row["daily_requests"],
            row["daily_carbon_kg"],
            row["cost_per_request"],
            row["carbon_per_dollar"]
        ))

    session.shutdown()




# cassandra_loader.py

Carga todas las tablas generadas en Gold

In [72]:
# cassandra_loader.py

from pyspark.sql import DataFrame
from tqdm.notebook import tqdm

def upload_full_gold_layer(spark: SparkSession):
    """
    Carga TODAS las tablas Gold (FinOps, Support, GenAI) a Cassandra.
    Maneja el renombrado de columnas para coincidir con el esquema de DB.
    """
    print("\n[SERVING] Iniciando carga masiva a AstraDB...")

    # Este try es para asegurear idempotencia
    try:
        session = get_cassandra_session()
        create_schema(session)
        session.shutdown()
    except Exception as e:
        print(f"[ERR] Error creando esquema: {e}")
        return

    #helper interno de carga
    def load_df_to_cassandra(df: DataFrame, table_name: str, columns_map: list[str]):

        try:
            count = df.count()
            print(f"   -> cargando tabla '{table_name}' ({count} registros)...")

            rows = df.collect()
            data_dicts = [row.asDict() for row in rows]

            if not data_dicts:
                print(f"      [WARN] No hay datos para {table_name}.")
                return

            session = get_cassandra_session()

            # query
            cols_str = ", ".join(columns_map)
            bind_markers = ", ".join(["?"] * len(columns_map))
            query = f'INSERT INTO "{KEYSPACE}".{table_name} ({cols_str}) VALUES ({bind_markers})'
            prepared = session.prepare(query)

            # upsert
            for row_data in tqdm(data_dicts, desc=f"Subiendo {table_name}", unit="rows"):
                values = []
                for col in columns_map:
                    # Obtenemos el valor. Si la columna no existe en el DF, inserta None.
                    val = row_data.get(col)
                    values.append(val)

                session.execute(prepared, values)

            session.shutdown()
            print(f"      [OK] Carga completada.")

        except Exception as e:
            print(f"      [ERR] Falló carga de {table_name}. Detalles: {e}")


    # carga de finops(org_daily_usage_by_service)
    path_finops = zone_path(GOLD_PATH, "org_daily_usage_by_service")
    df_finops = read_parquet(spark, path_finops, partition_glob="*")
    df_finops = df_finops.withColumnRenamed("event_date", "usage_date")

    load_df_to_cassandra(
        df_finops,
        "org_daily_usage_by_service",
        ["org_id", "usage_date", "service_name", "daily_cost_usd", "daily_requests", "daily_carbon_kg", "cost_per_request", "carbon_per_dollar"]
    )

    # cargade support (org_daily_support_metrics)
    path_support = zone_path(GOLD_PATH, "org_daily_support_metrics")
    df_support = read_parquet(spark, path_support, partition_glob="*")

    load_df_to_cassandra(
        df_support,
        "org_daily_support_metrics",
        ["org_id", "ticket_date", "total_tickets", "critical_tickets", "sla_breached_count", "avg_csat", "sla_breach_rate"]
    )

    #  carga de genai (org_daily_genai_usage) ---
    path_genai = zone_path(GOLD_PATH, "org_daily_genai_usage")
    df_genai = read_parquet(spark, path_genai, partition_glob="*")

    load_df_to_cassandra(
        df_genai,
        "org_daily_genai_usage",
        ["org_id", "event_date", "service_name", "genai_daily_cost", "genai_requests_count"]
    )
def upload_gold_to_cassandra(spark: SparkSession):

    # la carga  gold en Cassandra
    print("\n[SERVING] Iniciando carga a Cassandra...")

    try:
        session = get_cassandra_session()
        create_schema(session)
        session.shutdown()
    except Exception as e:
        print(f"[ERR] Error conectando a Cassandra: {e}")
        return


    gold_df = read_parquet(spark, zone_path(GOLD_PATH, "org_daily_usage_by_service"), partition_glob="event_date=*")

    # convierte a python con el driver

    print(f"[SERVING] Leyendo {gold_df.count()} filas de Gold...")
    rows = gold_df.collect()
    data_to_insert = [row.asDict() for row in rows]

    insert_batch_to_cassandra(data_to_insert)
    print(f"[SERVING] Carga completada. {len(data_to_insert)} registros insertados.")


def write_stream_to_cassandra(batch_df, batch_id):
    #funcion para usar en .foreachBatch del Streaming.

    if batch_df.rdd.isEmpty(): return
    rows = batch_df.collect()
    data = [row.asDict() for row in rows]

    # upsert
    insert_batch_to_cassandra(data)
    print(f"[CASSANDRA STREAM] Batch {batch_id} cargado ({len(data)} filas).")

# Demo de Batch Layer → Gold

In [10]:
import time
ensure_dirs()
unpack_raw_dataset()
spark = create_spark()

log("PROJECT_ROOT: {PROJECT_ROOT}", "INFO")
log(f"Spark version: {spark.version}", "INFO")
log("Generando Bronze Batch (Maestros)...", "RUN")
ensure_dirs()
unpack_raw_dataset()
spark = create_spark()
run_bronze_batch(spark)

log("Generando Bronze Stream (Eventos)...", "RUN")
query = start_usage_events_to_bronze(spark)

time.sleep(15) # esto es solo para asegurrar en el demo que se procesen datos
query.stop()
log("Stream procesado.", "OK")

log("Generando Silver...", "RUN")
run_silver_batch(spark)
log("Silver finalizado", "OK")
log("ejecuntando proceso Gold...", "RUN")
run_full_gold_batch(spark)
log("proceso Gold finalizado", "OK")

[OK] Dataset descomprimido en /content/datalake/landing
[94m[2025-12-02 22:10:26] [INFO] PROJECT_ROOT: {PROJECT_ROOT}[0m
[94m[2025-12-02 22:10:26] [INFO] Spark version: 3.5.1[0m
[96m[2025-12-02 22:10:26] [RUN] 1. Generando Bronze Batch (Maestros)...[0m
[OK] Dataset descomprimido en /content/datalake/landing

[BATCH] Iniciando Ingesta a Bronze (7 Maestros)...


Procesando Archivos:   0%|          | 0/7 [00:00<?, ?tablas/s]

[INFO] Leyendo /content/datalake/landing/customers_orgs.csv
[OK] Bronze customers_orgs -> /content/datalake/bronze/customers_orgs
[INFO] Leyendo /content/datalake/landing/users.csv
[OK] Bronze users -> /content/datalake/bronze/users
[INFO] Leyendo /content/datalake/landing/resources.csv
[OK] Bronze resources -> /content/datalake/bronze/resources
[INFO] Leyendo /content/datalake/landing/support_tickets.csv
[OK] Bronze support_tickets -> /content/datalake/bronze/support_tickets
[INFO] Leyendo /content/datalake/landing/marketing_touches.csv
[OK] Bronze marketing_touches -> /content/datalake/bronze/marketing_touches
[INFO] Leyendo /content/datalake/landing/nps_surveys.csv
[OK] Bronze nps_surveys -> /content/datalake/bronze/nps_surveys
[INFO] Leyendo /content/datalake/landing/billing_monthly.csv
[OK] Bronze billing_monthly -> /content/datalake/bronze/billing_monthly
[92m[2025-12-02 22:10:45] [OK] Capa Bronze finalizada correctamente[0m
[96m[2025-12-02 22:10:45] [RUN] 2. Generando Bronze 

## Chequeo de Quality de las tres fases

In [11]:
#Chequeo de quality
log("Haciendo quality control...", "RUN")
run_full_bronze_audit(spark)
run_full_silver_audit(spark)
run_full_gold_audit(spark)
log("Fin demo Batch Layer (Bronze CSV → Silver → Gold)", "OK")

[96m[2025-12-02 22:11:31] [RUN] Haciendo quality control...[0m
[96m[2025-12-02 22:11:31] [RUN] Iniciando Auditoría completa de capa Bronze...[0m

 chequeo Bronze: customers_orgs
registros totales: 80
duplicados en PK (org_id): 0
nulos en ingest_ts: 0
Resultado: Todo ok

 chequeo Bronze: users
registros totales: 800
duplicados en PK (user_id): 0
nulos en ingest_ts: 0
Resultado: Todo ok

 chequeo Bronze: resources
registros totales: 400
duplicados en PK (resource_id): 0
nulos en ingest_ts: 0
Resultado: Todo ok

 chequeo Bronze: support_tickets
registros totales: 1000
duplicados en PK (ticket_id): 0
nulos en ingest_ts: 0
Resultado: Todo ok

 chequeo Bronze: marketing_touches
registros totales: 1500
duplicados en PK (touch_id): 0
nulos en ingest_ts: 0
Resultado: Todo ok

 chequeo Bronze: billing_monthly
registros totales: 240
duplicados en PK (invoice_id): 0
nulos en ingest_ts: 0
Resultado: Todo ok

 chequeo Bronze: nps_surveys
registros totales: 92
duplicados en PK (org_id): 32
nulos 

# Demo de Speed Layer → Gold

En un despliegue real, el pipeline de streaming se ejecutaría como servicio/orquestación aparte

In [12]:
# DEMO Speed → Gold
import time

query_speed_gold = start_gold_speed_stream(spark)

print("Streaming Speed → Gold")
print(f"ID: {query_speed_gold.id}")
print(f"Nombre: {query_speed_gold.name}")
print(f"Activo: {query_speed_gold.isActive}")


# pausa para dejar que procese

time.sleep(15)

print(" Progreso del streaming")
print(query_speed_gold.lastProgress)

print("aparando stream...")

'''
El try que viene lo hice por si el stream se para mientras esta procesando algo y tira error java.lang.InterruptedException
ese error que no para la ejecucion del colab y solo pasa aca porque el stream se para un momento arbitrario.
en un deploy real no se haria.

'''
if query_speed_gold.isActive:
    query_speed_gold.stop()
    try:
        query_speed_gold.awaitTermination(timeout=2)
    except Exception:
        pass

audit_speed_layer_results(spark)
print("Streaming Speed → Gold parado.")

[INFO] Comenzando Speed Layer...
[INFO] Dimensión Organizaciones cacheada: 80 registros.
[INFO] Streaming ejecutandose -> /content/datalake/gold/org_daily_usage_by_service_speed
Streaming Speed → Gold
ID: 5cb673d3-9408-4260-8f47-e80a32a7d161
Nombre: None
Activo: True
[STREAM 0] Reporte 
 Input: 360 eventos
Validos: 359
Dropped (Cost < -0.01): 1 (0.3%)
todo pasado a Gold de Speed layer
[STREAM 1] Reporte 
 Input: 360 eventos
Validos: 358
Dropped (Cost < -0.01): 2 (0.6%)
todo pasado a Gold de Speed layer
[STREAM 2] Reporte 
 Input: 11 eventos
Validos: 11
todo pasado a Gold de Speed layer
[STREAM 3] Reporte 
 Input: 7 eventos
Validos: 7
todo pasado a Gold de Speed layer
 Progreso del streaming
{'id': '5cb673d3-9408-4260-8f47-e80a32a7d161', 'runId': 'de517849-55ac-4753-9cd0-2364eab3806e', 'name': None, 'timestamp': '2025-12-02T22:12:00.000Z', 'batchId': 3, 'numInputRows': 360, 'inputRowsPerSecond': 119.76047904191617, 'processedRowsPerSecond': 215.8273381294964, 'durationMs': {'addBatch': 

### DEMO - Carga de datos a Cassandra

In [15]:
upload_full_gold_layer(spark)


[SERVING] Iniciando carga masiva a AstraDB...
[CASSANDRA] Creando esquema en keyspace 'Cloud_analytics_db'...
[CASSANDRA] Esquema verificado: 3 Tablas listas.
   -> Cargando tabla 'org_daily_usage_by_service' (727 registros)...


Subiendo org_daily_usage_by_service:   0%|          | 0/727 [00:00<?, ?rows/s]

      [OK] Carga completada.
   -> Cargando tabla 'org_daily_support_metrics' (944 registros)...


Subiendo org_daily_support_metrics:   0%|          | 0/944 [00:00<?, ?rows/s]

      [OK] Carga completada.
   -> Cargando tabla 'org_daily_genai_usage' (77 registros)...


Subiendo org_daily_genai_usage:   0%|          | 0/77 [00:00<?, ?rows/s]

      [OK] Carga completada.


# DEMO - Queries de Cassandra

In [68]:
# cassandra_queries.py
import datetime

def run_final_business_queries():
    print("\n queries de negocios\n")
    session = get_cassandra_session()

    # aca elije una org al azar para poder ejecutar multiples veces y asi mostrar que funciona

    rows = session.execute(f'SELECT org_id FROM "{KEYSPACE}".org_daily_usage_by_service LIMIT 1000')
    all_orgs = list(set([r.org_id for r in rows]))
    TARGET_ORG = (random.sample(all_orgs, 1))[0]

    print(f"organizacion: {TARGET_ORG}")
    print("-" * 60)

    # QUERY 1: costos y requests diarios por org/servicio
    print("\n1. [FinOps] costos y requests de los ultimos 7 días")
    query_1 = f"""
        SELECT usage_date, service_name, daily_cost_usd, daily_requests
        FROM "{KEYSPACE}".org_daily_usage_by_service
        WHERE org_id = '{TARGET_ORG}'
        LIMIT 10
    """
    rows = session.execute(query_1)
    print(f"{'fecha':<12} | {'servicio':<15} | {'costo ($)':<10} | {'requests'}")
    for r in rows:
        print(f"{str(r.usage_date):<12} | {r.service_name:<15} | {r.daily_cost_usd:<10.2f} | {int(r.daily_requests)}")


    # QUERY 2: top de servicios por costo acumulado de las últimas 2 semanas

    # aca se traen los datos de la partición y suman en client-side agg
    print("\n2. [FinOps] Top 3 Servicios más costosos (acumulado)")
    query_2 = f"""
        SELECT service_name, daily_cost_usd
        FROM "{KEYSPACE}".org_daily_usage_by_service
        WHERE org_id = '{TARGET_ORG}'
    """
    rows = session.execute(query_2)

    cost_map = {}
    for r in rows:
        cost_map[r.service_name] = cost_map.get(r.service_name, 0.0) + r.daily_cost_usd

    sorted_services = sorted(cost_map.items(), key=lambda x: x[1], reverse=True)[:3]

    print(f"{'servicio':<15} | {'costo total ($)'}")
    for svc, cost in sorted_services:
        print(f"{svc:<15} | {cost:.2f}")


    # QUERY 3:  evolucion tickets críticos y SLA breach ---
    print("\n3. [Soporte] historial de tickets criticos y SLA breaches")
    query_3 = f"""
        SELECT ticket_date, critical_tickets, sla_breached_count
        FROM "{KEYSPACE}".org_daily_support_metrics
        WHERE org_id = '{TARGET_ORG}'
        LIMIT 5
    """
    rows = session.execute(query_3)

    # fallback para lademo por si no existen tickets
    if not rows:
        print("(La org seleccionada no tiene tickets, probando query general...)")
        # NOTA: Esto es solo para demo, en prod usaríamos Partition Key siempre
        rows = session.execute(f'SELECT org_id, ticket_date, critical_tickets FROM "{KEYSPACE}".org_daily_support_metrics LIMIT 5')

    print(f"{'fecha':<12} | {'criticos':<8} | {'SLA breached'}")
    for r in rows:
        # Manejo flexible por si saltó al fallback
        d_date = str(getattr(r, 'ticket_date', 'N/A'))
        print(f"{d_date:<12} | {r.critical_tickets:<8} | {getattr(r, 'sla_breached_count', '-')}")


    # QUERY 4: entrada mensual estimada
    '''
    Esto lo pondra la gente de finanzas, pero asumo que la
    entrada es la suma de costos + impuestos . Le meti 21% por IVA como ejemplo
    '''
    print("\n4. [Finanzas] entrada mensual estimado (+IVA)")
    # uso los datos de la query 2 para no hacer otra
    total_net = sum(cost_map.values())
    total_gross = total_net * 1.21
    print(f"total neto:      ${total_net:.2f}")
    print(f"total bruto (est): ${total_gross:.2f}")


    # QUERY 5: consumo GenAI (Tokens/Requests) ---
    # Tome a los requests como tokens de uso de IA
    print("\n5. [Producto] consumo de GenAI")
    query_5 = f"""
        SELECT event_date, service_name, genai_requests_count, genai_daily_cost
        FROM "{KEYSPACE}".org_daily_genai_usage
        WHERE org_id = '{TARGET_ORG}'
        LIMIT 5
    """
    rows = session.execute(query_5)

    # Fallback si no tiene GenAI
    if not rows:
         rows = session.execute(f'SELECT org_id, event_date, genai_requests_count FROM "{KEYSPACE}".org_daily_genai_usage LIMIT 5')
    print(f"{'fecha':<12} | {'modelo/serv':<12} | {'requests':<10} | {'costo ($)'}")
    for r in rows:
        s_name = getattr(r, 'service_name', 'genai')
        cost = getattr(r, 'genai_daily_cost', 0.0)
        print(f"{str(r.event_date):<12} | {s_name:<12} | {int(r.genai_requests_count):<10} | {cost:.2f}")

    session.shutdown()


run_final_business_queries()


 queries de negocios

organizacion: org_i3qk2iag
------------------------------------------------------------

1. [FinOps] costos y requests de los ultimos 7 días
fecha        | servicio        | costo ($)  | requests
2025-08-31   | genai           | 1.95       | 0
2025-08-31   | storage         | 0.03       | 0
2025-08-22   | genai           | 0.16       | 0
2025-08-13   | compute         | 0.80       | 0
2025-08-12   | compute         | 8.05       | 113
2025-08-04   | genai           | 14.68      | 122
2025-08-01   | storage         | 0.08       | 0
2025-07-23   | storage         | 2.20       | 125
2025-07-19   | genai           | 0.50       | 0
2025-07-14   | storage         | 0.00       | 0

2. [FinOps] Top 3 Servicios más costosos (acumulado)
servicio        | costo total ($)
genai           | 17.29
compute         | 8.85
storage         | 2.31

3. [Soporte] historial de tickets criticos y SLA breaches
fecha        | criticos | SLA breached
2025-08-19   | 0        | 0
2025-08-14 

# DEMO - Posible Dashboard

In [71]:
import gradio as gr
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime

# --- FUNCION DE DATOS ---
def get_premium_dashboard_data(org_id, date_start, date_end):
    print(f"[UI] Creando Dashboard de: {org_id}...")

    # validacion de fechas
    if not date_start or not date_end:
        date_start = "2020-01-01"
        date_end = datetime.today().strftime('%Y-%m-%d')

    session = get_cassandra_session()

    # FINOPS
    q_finops = f"SELECT usage_date, service_name, daily_cost_usd FROM \"{KEYSPACE}\".org_daily_usage_by_service WHERE org_id = '{org_id}'"
    rows_finops = list(session.execute(q_finops))

    # procesamiento en pandas
    if rows_finops:
        df_finops = pd.DataFrame([r._asdict() for r in rows_finops])
        df_finops['usage_date'] = pd.to_datetime(df_finops['usage_date'].astype(str))
        df_finops['daily_cost_usd'] = df_finops['daily_cost_usd'].astype(float)

        #filtro de fecha
        mask = (df_finops['usage_date'] >= pd.to_datetime(date_start)) & (df_finops['usage_date'] <= pd.to_datetime(date_end))
        df_finops = df_finops.loc[mask]

        # graf1: Area Chart acumulada (Plotly)
        fig_finops_trend = px.area(df_finops, x="usage_date", y="daily_cost_usd", color="service_name",
                                   title="Evolución de costos por servicio",
                                   labels={"daily_cost_usd": "Costo (USD)", "usage_date": "Fecha"})

        # graf2: Donut Chart
        df_grouped = df_finops.groupby("service_name")["daily_cost_usd"].sum().reset_index()
        fig_finops_dist = px.pie(df_grouped, values="daily_cost_usd", names="service_name", hole=0.4,
                                 title="distribucion del Gasto Total")
    else:
        fig_finops_trend = px.line(title="sin datos de costos")
        fig_finops_dist = px.pie(title="sin datos")

    # SUPPORT
    q_supp = f"SELECT ticket_date, total_tickets, sla_breached_count, critical_tickets FROM \"{KEYSPACE}\".org_daily_support_metrics WHERE org_id = '{org_id}'"
    rows_supp = list(session.execute(q_supp))

    if rows_supp:
        df_supp = pd.DataFrame([r._asdict() for r in rows_supp])
        df_supp['ticket_date'] = pd.to_datetime(df_supp['ticket_date'].astype(str))

        # Filtro fecha
        mask = (df_supp['ticket_date'] >= pd.to_datetime(date_start)) & (df_supp['ticket_date'] <= pd.to_datetime(date_end))
        df_supp = df_supp.loc[mask].sort_values("ticket_date")

        # Graf Combinado
        fig_supp = go.Figure()
        # graf total tickets
        fig_supp.add_trace(go.Bar(x=df_supp['ticket_date'], y=df_supp['total_tickets'], name='Total Tickets', marker_color='lightblue'))
        # graf SLA Breaches
        fig_supp.add_trace(go.Scatter(x=df_supp['ticket_date'], y=df_supp['sla_breached_count'], name='SLA Breaches', line=dict(color='red', width=3)))

        fig_supp.update_layout(title="Volumen de soporte vs calidad", xaxis_title="Fecha", yaxis_title="Cantidad")
    else:
        fig_supp = px.bar(title="Sin datos de soporte")

    # GENAI

    q_genai = f"SELECT event_date, service_name, genai_daily_cost, genai_requests_count FROM \"{KEYSPACE}\".org_daily_genai_usage WHERE org_id = '{org_id}'"
    rows_genai = list(session.execute(q_genai))

    if rows_genai:
        df_genai = pd.DataFrame([r._asdict() for r in rows_genai])
        df_genai['event_date'] = pd.to_datetime(df_genai['event_date'].astype(str))

        # Filtro fecha
        mask = (df_genai['event_date'] >= pd.to_datetime(date_start)) & (df_genai['event_date'] <= pd.to_datetime(date_end))
        df_genai = df_genai.loc[mask]

        # Scatter Plot: Relación Costo vs Requests
        # Tamaño de burbuja = Costo
        fig_genai = px.scatter(df_genai, x="event_date", y="genai_requests_count", size="genai_daily_cost", color="service_name",
                               title="IA: requests vs costo",
                               labels={"genai_requests_count": "Cant. Requests", "event_date": "Fecha"})
    else:
        fig_genai = px.scatter(title="sin datos de GenAI")

    session.shutdown()

    summary = f"### analizando {org_id}\ndesde {date_start} hasta {date_end}."

    return summary, fig_finops_trend, fig_finops_dist, fig_supp, fig_genai


# ui

# Carga inicial de lista de orgs (si no existe)
try:
    if not unique_orgs: raise ValueError
except:
    print("[UI] cargando clientes...")
    session = get_cassandra_session()
    rows = session.execute(f'SELECT org_id FROM \"{KEYSPACE}\".org_daily_usage_by_service LIMIT 300')
    unique_orgs = sorted(list(set([r.org_id for r in rows])))
    session.shutdown()

# layout
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("# 🚀 Cloud Analytics 360° Dashboard")
    gr.Markdown("Visión integral: Finanzas, Operaciones e Innovación.")

    with gr.Row():
        # panel de control del dashboard
        with gr.Column(scale=1, variant="panel"):
            gr.Markdown("### 🎛️ Filtros")
            dd_org = gr.Dropdown(choices=unique_orgs, label="cliente", value=unique_orgs[0] if unique_orgs else None)
            # selectores de fecha
            date_start = gr.Textbox(label="Fecha Inicio (YYYY-MM-DD)", value="2025-06-01")
            date_end = gr.Textbox(label="Fecha Fin (YYYY-MM-DD)", value="2025-12-31")

            btn_run = gr.Button("actualizar", variant="primary")
            lbl_status = gr.Markdown("Listo para consultar.")

        # Panel de grafsl
        with gr.Column(scale=4):
            with gr.Tabs():
                with gr.TabItem("FinOps"):
                    with gr.Row():
                        plot_finops_trend = gr.Plot(label="tendencia")
                        plot_finops_dist = gr.Plot(label="fistribucion")

                with gr.TabItem("Soporte & SLAs"):
                    plot_supp = gr.Plot(label="metricas de soporte")
                    gr.Markdown("*Nota: La línea roja indica incumplimientos de contrato (SLA Breaches).*")

                with gr.TabItem("GenAI "):
                    plot_genai = gr.Plot(label="consumo IA")


    btn_run.click(
        fn=get_premium_dashboard_data,
        inputs=[dd_org, date_start, date_end],
        outputs=[lbl_status, plot_finops_trend, plot_finops_dist, plot_supp, plot_genai]
    )

demo.launch(debug=True)


The 'theme' parameter in the Blocks constructor will be removed in Gradio 6.0. You will need to pass 'theme' to Blocks.launch() instead.



It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://65d21a64220c06f292.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7861 <> https://65d21a64220c06f292.gradio.live


