In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# ‚ö†Ô∏è Ruta EXACTA a tu carpeta (tiene espacios)
SRC = "/content/drive/MyDrive/mineria de datos II/proyecto1"

# Carpeta final del repo en Drive (puede ser distinta a la fuente)
REPO = "/content/drive/MyDrive/cloud-provider-analytics"  # puedes cambiar el nombre

print("Origen (datos/notebook):", SRC)
print("Repo destino:", REPO)


Origen (datos/notebook): /content/drive/MyDrive/mineria de datos II/proyecto1
Repo destino: /content/drive/MyDrive/cloud-provider-analytics


In [None]:
!apt-get -y install openjdk-11-jdk-headless > /dev/null

import os, subprocess, sys

In [None]:
# 2) JAVA_HOME y path
java_home = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["JAVA_HOME"] = java_home
os.environ["PATH"] = f'{java_home}/bin:' + os.environ["PATH"]

In [None]:
# 3) Limpiar pyspark previo y reinstalar versi√≥n compatible
!pip -q uninstall -y pyspark py4j > /dev/null

In [None]:
# Spark 3.5.x funciona bien con Java 11 en Colab; si tu runtime es muy viejo, probaremos 3.4.1 como fallback.
target_version = "3.5.1"
code = !pip -q install pyspark=={target_version}
print(f"pip pyspark=={target_version} -> OK")


pip pyspark==3.5.1 -> OK


In [None]:
# 4) Verificar Java
print("JAVA_HOME =", os.environ.get("JAVA_HOME"))
!java -version

JAVA_HOME = /usr/lib/jvm/java-11-openjdk-amd64
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)


In [None]:
# 5) Iniciar Spark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("CloudProviderAnalytics")
         .config("spark.sql.shuffle.partitions", "200")
         .config("spark.driver.memory", "4g")
         .config("spark.executor.memory", "4g")
         .getOrCreate())

from pyspark.sql import functions as F
from pyspark.sql import types as T

print("Spark version:", spark.version)
print("Context OK ‚úÖ")

Spark version: 3.5.1
Context OK ‚úÖ


**bronze**

In [None]:
# === BRONZE BATCH: CSV -> PARQUET ===
from pyspark.sql import functions as F, types as T
import os

#  RUTA A TU DRIVE
BASE_LANDING = "/content/drive/MyDrive/mineria de datos II/proyecto1"

LANDING = {
    "customers": f"{BASE_LANDING}/customers_orgs.csv",
    "users":     f"{BASE_LANDING}/users.csv",
    "resources": f"{BASE_LANDING}/resources.csv",
    "tickets":   f"{BASE_LANDING}/support_tickets.csv",
    "marketing": f"{BASE_LANDING}/marketing_touches.csv",
    "nps":       f"{BASE_LANDING}/nps_surveys.csv",
    "billing":   f"{BASE_LANDING}/billing_monthly.csv",
}

BASE_WORK = "/content/datalake"
BRONZE = f"{BASE_WORK}/bronze"
os.makedirs(BRONZE, exist_ok=True)

schema_customers = T.StructType([
    T.StructField("org_id", T.StringType()),
    T.StructField("org_name", T.StringType()),
    T.StructField("industry", T.StringType()),
    T.StructField("region", T.StringType()),
    T.StructField("plan", T.StringType()),
    T.StructField("nps_last", T.DoubleType()),
    T.StructField("created_at", T.StringType())
])

schema_users = T.StructType([
    T.StructField("user_id", T.StringType()),
    T.StructField("org_id", T.StringType()),
    T.StructField("role", T.StringType()),
    T.StructField("is_active", T.StringType()),
    T.StructField("last_login_ts", T.StringType())
])

schema_resources = T.StructType([
    T.StructField("resource_id", T.StringType()),
    T.StructField("org_id", T.StringType()),
    T.StructField("service", T.StringType()),
    T.StructField("region", T.StringType()),
    T.StructField("created_ts", T.StringType())
])

schema_tickets = T.StructType([
    T.StructField("ticket_id", T.StringType()),
    T.StructField("org_id", T.StringType()),
    T.StructField("category", T.StringType()),
    T.StructField("severity", T.StringType()),
    T.StructField("opened_ts", T.StringType()),
    T.StructField("closed_ts", T.StringType()),
    T.StructField("sla_breached", T.StringType()),
    T.StructField("csat", T.DoubleType())
])

schema_marketing = T.StructType([
    T.StructField("touch_id", T.StringType()),
    T.StructField("org_id", T.StringType()),
    T.StructField("channel", T.StringType()),
    T.StructField("touch_ts", T.StringType()),
    T.StructField("converted", T.StringType())
])

schema_nps = T.StructType([
    T.StructField("org_id", T.StringType()),
    T.StructField("survey_ts", T.StringType()),
    T.StructField("score", T.DoubleType())
])

schema_billing = T.StructType([
    T.StructField("org_id", T.StringType()),
    T.StructField("month", T.StringType()),        # yyyy-MM
    T.StructField("subtotal_usd", T.DoubleType()),
    T.StructField("credits_usd", T.DoubleType()),
    T.StructField("tax_usd", T.DoubleType()),
    T.StructField("fx_rate", T.DoubleType()),
    T.StructField("currency", T.StringType())
])

def add_ingest_cols(df):
    return (df.withColumn("ingest_ts", F.current_timestamp())
              .withColumn("source_file", F.input_file_name()))

def batch_to_bronze(csv_path, schema, out_path, partition_cols):
    df = (spark.read.option("header", True).schema(schema).csv(csv_path))
    df = add_ingest_cols(df)
    # normalizaciones de timestamps si existen
    for c in ["created_at","last_login_ts","created_ts","opened_ts","closed_ts","touch_ts","survey_ts"]:
        if c in df.columns:
            df = df.withColumn(c, F.to_timestamp(F.col(c)))
    # snapshot para particionar si hace falta
    if "snapshot_date" not in df.columns:
        df = df.withColumn("snapshot_date", F.current_date())

    writer = df.write.mode("overwrite").partitionBy(*partition_cols)
    writer.parquet(out_path)
    return df

bronze_paths = {
    "customers": f"{BRONZE}/customers",
    "users":     f"{BRONZE}/users",
    "resources": f"{BRONZE}/resources",
    "tickets":   f"{BRONZE}/tickets",
    "marketing": f"{BRONZE}/marketing",
    "nps":       f"{BRONZE}/nps",
    "billing":   f"{BRONZE}/billing",
}

_ = batch_to_bronze(LANDING["customers"], schema_customers, bronze_paths["customers"], ["snapshot_date"])
_ = batch_to_bronze(LANDING["users"],     schema_users,     bronze_paths["users"],     ["snapshot_date"])
_ = batch_to_bronze(LANDING["resources"], schema_resources, bronze_paths["resources"], ["snapshot_date"])
_ = batch_to_bronze(LANDING["tickets"],   schema_tickets,   bronze_paths["tickets"],   ["snapshot_date"])
_ = batch_to_bronze(LANDING["marketing"], schema_marketing, bronze_paths["marketing"], ["snapshot_date"])
_ = batch_to_bronze(LANDING["nps"],       schema_nps,       bronze_paths["nps"],       ["snapshot_date"])
_ = batch_to_bronze(LANDING["billing"],   schema_billing,   bronze_paths["billing"],   ["month"])

print("‚úî Batch ‚Üí Bronze completado")

# sanity checks
for k,v in bronze_paths.items():
    try:
        cnt = spark.read.parquet(v).count()
        print(f"{k:<10} -> {cnt} filas")
    except Exception as e:
        print(f"{k:<10} -> ERROR: {e}")


‚úî Batch ‚Üí Bronze completado
customers  -> 80 filas
users      -> 800 filas
resources  -> 400 filas
tickets    -> 1000 filas
marketing  -> 1500 filas
nps        -> 92 filas
billing    -> 240 filas


Streaming ‚Üí Bronze (JSON ‚Üí Parquet con watermark + dedup + quarantine)

In [None]:
# === BRONZE STREAMING: usage_events_stream ===
from pyspark.sql import functions as F, types as T
import os

BASE_LANDING = "/content/drive/MyDrive/mineria de datos II/proyecto1"
EVENTS_DIR = f"{BASE_LANDING}/usage_events_stream"

BASE_WORK = "/content/datalake"
BRONZE = f"{BASE_WORK}/bronze"
CHECKPOINTS = "/content/checkpoints"
QUARANTINE = f"{BRONZE}/_quarantine"
os.makedirs(CHECKPOINTS, exist_ok=True)
os.makedirs(QUARANTINE, exist_ok=True)

schema_events = T.StructType([
    T.StructField("event_id", T.StringType()),
    T.StructField("org_id", T.StringType()),
    T.StructField("service", T.StringType()),
    T.StructField("region", T.StringType()),
    T.StructField("unit", T.StringType()),
    T.StructField("value", T.DoubleType()),
    T.StructField("cost_usd_increment", T.DoubleType()),
    T.StructField("event_ts", T.StringType()),
    T.StructField("schema_version", T.IntegerType()),
    T.StructField("carbon_kg", T.DoubleType()),
    T.StructField("genai_tokens", T.DoubleType())
])

raw_stream = (spark.readStream
              .schema(schema_events)
              .option("maxFilesPerTrigger", 2)  # micro-lotes para demo
              .json(EVENTS_DIR))

events = (raw_stream
    .withColumn("value", F.col("value").cast("double"))
    .withColumn("cost_usd_increment", F.col("cost_usd_increment").cast("double"))
    .withColumn("event_ts", F.to_timestamp("event_ts"))
    .withColumn("schema_version", F.coalesce(F.col("schema_version"), F.lit(1)))
    .withColumn("unit", F.when(F.col("unit").isNull() & F.col("value").isNotNull(), F.lit("unitless"))
                         .otherwise(F.col("unit")))
    .withColumn("usage_date", F.to_date("event_ts"))
    .withColumn("ingest_ts", F.current_timestamp())
    .withColumn("source_file", F.input_file_name())
)

valid_cond = (F.col("event_id").isNotNull() & (F.col("cost_usd_increment") >= F.lit(-0.01)))
valid = events.where(valid_cond)
quar  = (events.where(~valid_cond)
         .withColumn("error_reason",
                     F.when(F.col("event_id").isNull(),"NULL_EVENT_ID")
                      .when(F.col("cost_usd_increment") < -0.01, "NEGATIVE_COST")
                      .otherwise("UNKNOWN")))

valid_dedup = (valid
    .withWatermark("event_ts","2 days")
    .dropDuplicates(["event_id"])
)

bronze_events_path = f"{BRONZE}/usage_events"

q_quar = (quar.writeStream
          .format("parquet")
          .option("checkpointLocation", f"{CHECKPOINTS}/quarantine_events")
          .option("path", f"{QUARANTINE}/usage_events")
          .outputMode("append")
          .start())

q_bronze = (valid_dedup.writeStream
            .format("parquet")
            .option("checkpointLocation", f"{CHECKPOINTS}/bronze_usage_events")
            .option("path", bronze_events_path)
            .partitionBy("usage_date","service")
            .outputMode("append")
            .start())

print("‚è≥ Streaming corriendo... procesando micro-lotes.")

import time
for i in range(6):
    time.sleep(10)
    lp = q_bronze.lastProgress
    rows = lp["numInputRows"] if lp else 0
    print(f"t+{(i+1)*10}s -> inputRows={rows}")

# Para ver lo que cay√≥ ya:
try:
    df_bz = spark.read.parquet(bronze_events_path)
    print("Bronze events count:", df_bz.count())
    df_bz.groupBy("usage_date","service").count().orderBy("usage_date","service").show(10, False)
except Exception as e:
    print("A√∫n no hay archivos en Bronze events:", e)

# (Parar cuando quieras)
# q_bronze.stop(); q_quar.stop()


‚è≥ Streaming corriendo... procesando micro-lotes.
t+10s -> inputRows=0
t+20s -> inputRows=0
t+30s -> inputRows=0
t+40s -> inputRows=720
t+50s -> inputRows=720
t+60s -> inputRows=720
Bronze events count: 718
+----------+----------+-----+
|usage_date|service   |count|
+----------+----------+-----+
|NULL      |analytics |67   |
|NULL      |compute   |221  |
|NULL      |database  |112  |
|NULL      |genai     |81   |
|NULL      |networking|110  |
|NULL      |storage   |127  |
+----------+----------+-----+



parser robusto multi-patr√≥n y relanzar el stream. Tambi√©n reescribimos lo ya procesado para corregir los NULL.

Fix ‚Äî Parser de timestamp + relanzar streaming

In [None]:
# === FIX STREAMING: parseo robusto de event_ts y reprocesamiento limpio ===
from pyspark.sql import functions as F, types as T
import shutil, os

BASE_LANDING = "/content/drive/MyDrive/mineria de datos II/proyecto1"
EVENTS_DIR = f"{BASE_LANDING}/usage_events_stream"

BASE_WORK = "/content/datalake"
BRONZE = f"{BASE_WORK}/bronze"
CHECKPOINTS = "/content/checkpoints"
QUARANTINE = f"{BRONZE}/_quarantine"
bronze_events_path = f"{BRONZE}/usage_events"

# 0) Parar queries previas si siguen vivas
try:
    q_bronze.stop()
except: pass
try:
    q_quar.stop()
except: pass

# 1) Limpiar solo lo del stream para reprocesar correcto
for p in [bronze_events_path, f"{CHECKPOINTS}/bronze_usage_events", f"{CHECKPOINTS}/quarantine_events", f"{QUARANTINE}/usage_events"]:
    try:
        shutil.rmtree(p)
    except FileNotFoundError:
        pass
os.makedirs(f"{CHECKPOINTS}", exist_ok=True)
os.makedirs(f"{QUARANTINE}", exist_ok=True)

# 2) Esquema y lectura
schema_events = T.StructType([
    T.StructField("event_id", T.StringType()),
    T.StructField("org_id", T.StringType()),
    T.StructField("service", T.StringType()),
    T.StructField("region", T.StringType()),
    T.StructField("unit", T.StringType()),
    T.StructField("value", T.DoubleType()),
    T.StructField("cost_usd_increment", T.DoubleType()),
    T.StructField("event_ts", T.StringType()),     # mantenemos string; luego lo parseamos robusto
    T.StructField("schema_version", T.IntegerType()),
    T.StructField("carbon_kg", T.DoubleType()),
    T.StructField("genai_tokens", T.DoubleType()),
    # posibles variantes por si algunos archivos usan otros nombres
    T.StructField("timestamp", T.StringType()),
    T.StructField("time", T.StringType()),
    T.StructField("event_time", T.StringType()),
    T.StructField("event_unix", T.LongType()),
    T.StructField("event_ms", T.LongType())
])

raw_stream = (spark.readStream
              .schema(schema_events)
              .option("maxFilesPerTrigger", 2)
              .json(EVENTS_DIR))

# 3) Parser robusto del timestamp (ISO8601 con y sin milis, con 'Z' o zona)
evt_str = F.coalesce(
    F.col("event_ts"), F.col("timestamp"), F.col("event_time"), F.col("time")
)

evt_ts = F.coalesce(
    F.to_timestamp(evt_str, "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"),
    F.to_timestamp(evt_str, "yyyy-MM-dd'T'HH:mm:ssXXX"),
    F.to_timestamp(evt_str, "yyyy-MM-dd HH:mm:ss"),
    F.to_timestamp(evt_str)  # fallback gen√©rico
)

# fallback extra si vino epoch
evt_ts = F.coalesce(
    evt_ts,
    F.to_timestamp(F.from_unixtime(F.col("event_ms")/1000.0)),
    F.to_timestamp(F.from_unixtime(F.col("event_unix")))
)

events = (raw_stream
    .withColumn("value", F.col("value").cast("double"))
    .withColumn("cost_usd_increment", F.col("cost_usd_increment").cast("double"))
    .withColumn("event_ts_parsed", evt_ts)
    .withColumn("schema_version", F.coalesce(F.col("schema_version"), F.lit(1)))
    .withColumn("unit", F.when(F.col("unit").isNull() & F.col("value").isNotNull(), F.lit("unitless"))
                         .otherwise(F.col("unit")))
    .withColumn("usage_date", F.to_date("event_ts_parsed"))
    .withColumn("ingest_ts", F.current_timestamp())
    .withColumn("source_file", F.input_file_name())
)

valid_cond = (F.col("event_id").isNotNull() & (F.col("cost_usd_increment") >= F.lit(-0.01)))
valid = events.where(valid_cond)
quar  = (events.where(~valid_cond)
         .withColumn("error_reason",
                     F.when(F.col("event_id").isNull(),"NULL_EVENT_ID")
                      .when(F.col("cost_usd_increment") < -0.01, "NEGATIVE_COST")
                      .otherwise("UNKNOWN")))

valid_dedup = (valid
    .withWatermark("event_ts_parsed","2 days")
    .dropDuplicates(["event_id"])
)

q_quar = (quar.writeStream
          .format("parquet")
          .option("checkpointLocation", f"{CHECKPOINTS}/quarantine_events")
          .option("path", f"{QUARANTINE}/usage_events")
          .outputMode("append")
          .start())

q_bronze = (valid_dedup.writeStream
            .format("parquet")
            .option("checkpointLocation", f"{CHECKPOINTS}/bronze_usage_events")
            .option("path", bronze_events_path)
            .partitionBy("usage_date","service")
            .outputMode("append")
            .start())

print("‚è≥ Streaming relanzado con parser robusto. Esperando micro-lotes...")

import time
for i in range(6):
    time.sleep(10)
    lp = q_bronze.lastProgress
    rows = lp["numInputRows"] if lp else 0
    print(f"t+{(i+1)*10}s -> inputRows={rows}")

# Chequeo r√°pido
try:
    df_bz = spark.read.parquet(bronze_events_path)
    print("Bronze events count:", df_bz.count())
    df_bz.groupBy("usage_date","service").count().orderBy("usage_date","service").show(12, False)
except Exception as e:
    print("A√∫n no hay archivos en Bronze events:", e)


‚è≥ Streaming relanzado con parser robusto. Esperando micro-lotes...
t+10s -> inputRows=0
t+20s -> inputRows=0
t+30s -> inputRows=720
t+40s -> inputRows=720
t+50s -> inputRows=720
t+60s -> inputRows=720
Bronze events count: 1464
+----------+----------+-----+
|usage_date|service   |count|
+----------+----------+-----+
|2025-07-03|analytics |4    |
|2025-07-03|compute   |6    |
|2025-07-03|database  |5    |
|2025-07-03|networking|4    |
|2025-07-03|storage   |5    |
|2025-07-04|analytics |4    |
|2025-07-04|compute   |7    |
|2025-07-04|database  |6    |
|2025-07-04|genai     |2    |
|2025-07-04|networking|2    |
|2025-07-04|storage   |5    |
|2025-07-05|analytics |2    |
+----------+----------+-----+
only showing top 12 rows



Celda √∫nica ‚Äî Silver (dimensiones, facts, usage y anomal√≠as)

In [None]:
# === SILVER END-TO-END (con fix de create_map) ===
from pyspark.sql import functions as F, types as T
from pyspark.sql.window import Window as W
import os

BRONZE = "/content/datalake/bronze"
SILVER = "/content/datalake/silver"
os.makedirs(SILVER, exist_ok=True)

# --- Normalizaci√≥n (FIX: create_map con *args) ---
REGION_MAP = {
    "us-east-1":"us-east-1","us_east_1":"us-east-1","use1":"us-east-1",
    "eu-west-1":"eu-west-1","euw1":"eu-west-1",
    "sa-east-1":"sa-east-1","sae1":"sa-east-1"
}
SERVICE_MAP = {
    "compute":"compute","vm":"compute","ec2":"compute",
    "storage":"storage","s3":"storage",
    "database":"database","db":"database",
    "networking":"networking","vpc":"networking",
    "analytics":"analytics",
    "genai":"genai","llm":"genai"
}

def dict_map(col, mapping):
    pairs = []
    for k, v in mapping.items():
        pairs += [F.lit(k.lower()), F.lit(v)]
    # Si el mapping est√° vac√≠o, devolvemos lower(col)
    return F.coalesce(F.create_map(*pairs)[F.lower(col)] if pairs else F.lower(col), F.lower(col))

def norm_region(c):  return dict_map(c, REGION_MAP)
def norm_service(c): return dict_map(c, SERVICE_MAP)

def write_silver(df, path, partition_cols=None, mode="overwrite"):
    w = df.write.mode(mode)
    if partition_cols: w = w.partitionBy(*partition_cols)
    w.parquet(path)

# --- 1) Dimensiones ---
dim_orgs = (spark.read.parquet(f"{BRONZE}/customers")
  .withColumn("org_name", F.initcap("org_name"))
  .withColumn("industry", F.lower("industry"))
  .withColumn("region",  norm_region(F.col("region")))
  .withColumn("plan",    F.lower("plan"))
  .withColumn("nps_last", F.col("nps_last").cast("double"))
  .select("org_id","org_name","industry","region","plan","nps_last","ingest_ts","source_file","snapshot_date")
  .dropDuplicates(["org_id"])
)
write_silver(dim_orgs, f"{SILVER}/dim_orgs")

dim_users = (spark.read.parquet(f"{BRONZE}/users")
  .withColumn("is_active", F.when(F.lower("is_active").isin("true","1","yes"), F.lit(True)).otherwise(F.lit(False)))
  .select("user_id","org_id","role","is_active","last_login_ts","snapshot_date","ingest_ts","source_file")
)
write_silver(dim_users, f"{SILVER}/dim_users")

dim_resources = (spark.read.parquet(f"{BRONZE}/resources")
  .withColumn("service", norm_service(F.col("service")))
  .withColumn("region",  norm_region(F.col("region")))
  .select("resource_id","org_id","service","region","created_ts","snapshot_date","ingest_ts","source_file")
)
write_silver(dim_resources, f"{SILVER}/dim_resources")

print("‚úî Silver dimensiones: dim_orgs, dim_users, dim_resources")

# --- 2) Fact tickets diarios (Soporte) ---
bz_tickets = spark.read.parquet(f"{BRONZE}/tickets")
fact_tickets_daily = (bz_tickets
  .withColumn("opened_date", F.to_date("opened_ts"))
  .withColumn("sev", F.lower("severity"))
  .withColumn("sla_breached", F.when(F.lower("sla_breached").isin("true","1","yes"), F.lit(1)).otherwise(F.lit(0)))
  .withColumn("csat", F.col("csat").cast("double"))
  .groupBy("org_id","opened_date","sev")
  .agg(
      F.count("*").alias("tickets_opened"),
      F.sum("sla_breached").alias("sla_breach_count"),
      F.avg("csat").alias("csat_avg")
  ).withColumnRenamed("opened_date","date")
)
write_silver(fact_tickets_daily, f"{SILVER}/fact_tickets_daily", ["date"])
print("‚úî Silver soporte: fact_tickets_daily")

# --- 3) Billing normalizado (FinOps) ---
bz_bill = spark.read.parquet(f"{BRONZE}/billing")
fact_billing_monthly = (bz_bill
  .withColumn("fx_rate", F.coalesce(F.col("fx_rate").cast("double"), F.lit(1.0)))
  .withColumn("subtotal_usd", F.col("subtotal_usd").cast("double"))
  .withColumn("credits_usd",  F.col("credits_usd").cast("double"))
  .withColumn("tax_usd",      F.col("tax_usd").cast("double"))
  .withColumn("revenue_usd", (F.col("subtotal_usd") - F.col("credits_usd") + F.col("tax_usd")) * F.col("fx_rate"))
  .select("org_id","month","revenue_usd","subtotal_usd","credits_usd","tax_usd","fx_rate","currency")
)
write_silver(fact_billing_monthly, f"{SILVER}/fact_billing_monthly", ["month"])
print("‚úî Silver finops: fact_billing_monthly")

# --- 4) NPS por fecha ---
bz_nps = spark.read.parquet(f"{BRONZE}/nps")
nps_by_org_date = (bz_nps
  .withColumn("date", F.to_date("survey_ts"))
  .groupBy("org_id","date").agg(F.avg("score").alias("nps_avg"))
)
write_silver(nps_by_org_date, f"{SILVER}/nps_by_org_date", ["date"])
print("‚úî Silver nps_by_org_date")

# --- 5) Usage diario por servicio (FinOps/Producto) ---
events_path = f"{BRONZE}/usage_events"
has_events = False
try:
    _ = spark.read.parquet(events_path).limit(1).count()
    has_events = True
except Exception:
    has_events = False

if has_events:
    ev = spark.read.parquet(events_path)
    ev = (ev
      .withColumn("service", norm_service(F.col("service")))
      .withColumn("region",  norm_region(F.col("region")))
      .withColumn("value", F.col("value").cast("double"))
      .withColumn("cost_usd_increment", F.col("cost_usd_increment").cast("double"))
      .withColumn("carbon_kg", F.col("carbon_kg").cast("double"))
      .withColumn("genai_tokens", F.col("genai_tokens").cast("double"))
    )
    if "usage_date" not in ev.columns or ev.filter(F.col("usage_date").isNotNull()).count()==0:
        ts_col = None
        for c in ["event_ts_parsed","event_ts"]:
            if c in ev.columns:
                ts_col = c; break
        ev = ev.withColumn("usage_date", F.to_date(ts_col) if ts_col else F.current_date())

    usage_daily = (ev.groupBy("org_id","usage_date","service")
      .agg(
        F.sum("cost_usd_increment").alias("daily_cost_usd"),
        F.sum(F.when(F.lower("unit")=="request", F.col("value")).otherwise(F.lit(0.0))).alias("requests"),
        F.sum(F.when(F.lower("unit").isin("cpu_hour","cpu_hours"), F.col("value")).otherwise(F.lit(0.0))).alias("cpu_hours"),
        F.sum(F.when(F.lower("unit").isin("gb_hour","gb_hours","storage_gb_hours"), F.col("value")).otherwise(F.lit(0.0))).alias("storage_gb_hours"),
        F.sum(F.coalesce(F.col("genai_tokens"), F.lit(0.0))).alias("genai_tokens"),
        F.sum(F.coalesce(F.col("carbon_kg"), F.lit(0.0))).alias("carbon_kg")
      )
      .withColumnRenamed("usage_date","date")
    )
    write_silver(usage_daily, f"{SILVER}/usage_daily_by_service", ["date","service"])
    print("‚úî Silver usage_daily_by_service")

    # --- 6) Anomal√≠as de costo (percentiles + z-score + MAD) ---
    ud = spark.read.parquet(f"{SILVER}/usage_daily_by_service")

    percs = (ud.groupBy("service")
               .agg(F.expr("percentile_approx(daily_cost_usd, array(0.95,0.99)) as ptiles"))
               .withColumn("p95", F.col("ptiles")[0])
               .withColumn("p99", F.col("ptiles")[1])
               .select("service","p95","p99"))

    stats = (ud.groupBy("service")
               .agg(F.avg("daily_cost_usd").alias("mu"),
                    F.stddev_pop("daily_cost_usd").alias("sigma")))
    udz = (ud.join(stats, "service","left")
             .withColumn("zscore", (F.col("daily_cost_usd")-F.col("mu"))/F.col("sigma")))

    med = ud.groupBy("service").agg(F.expr("percentile_approx(daily_cost_usd, 0.5)").alias("med"))
    mad = (ud.join(med,"service")
             .withColumn("abs_dev", F.abs(F.col("daily_cost_usd")-F.col("med")))
             .groupBy("service","med")
             .agg(F.expr("percentile_approx(abs_dev, 0.5)").alias("mad")))
    udm = (ud.join(mad,"service")
             .withColumn("mad_score", (F.col("daily_cost_usd")-F.col("med"))/F.col("mad")))

    cost_anomaly = (ud
      .join(percs, "service", "left")
      .join(udz.select("service","date","org_id","zscore"), ["service","date","org_id"], "left")
      .join(udm.select("service","date","org_id","mad_score"), ["service","date","org_id"], "left")
      .withColumn("flag_p99", F.col("daily_cost_usd") > F.col("p99"))
      .withColumn("flag_z3",  F.col("zscore") > F.lit(3))
      .withColumn("flag_mad3",F.col("mad_score") > F.lit(3))
      .withColumn("anomaly_flag", (F.col("flag_p99") | F.col("flag_z3") | F.col("flag_mad3")).cast("boolean"))
    )
    write_silver(cost_anomaly, f"{SILVER}/cost_anomaly_mart", ["date","service"])
    print("‚úî Silver cost_anomaly_mart")
else:
    print("‚ö† No se encontraron eventos en Bronze (/bronze/usage_events). Se salta usage/anomal√≠as por ahora.")

# --- Vistas r√°pidas ---
print("\n=== CHECKS ===")
spark.read.parquet(f"{SILVER}/dim_orgs").orderBy("org_id").show(5, False)
spark.read.parquet(f"{SILVER}/fact_tickets_daily").orderBy("date","org_id","sev").show(5, False)
spark.read.parquet(f"{SILVER}/fact_billing_monthly").orderBy("month","org_id").show(5, False)
spark.read.parquet(f"{SILVER}/nps_by_org_date").orderBy("date","org_id").show(5, False)

try:
    spark.read.parquet(f"{SILVER}/usage_daily_by_service").orderBy("date","org_id","service").show(5, False)
    spark.read.parquet(f"{SILVER}/cost_anomaly_mart").orderBy(F.desc("anomaly_flag"),F.desc("daily_cost_usd")).show(5, False)
except Exception as e:
    print("Usage/Anomal√≠as a√∫n no disponibles:", e)


‚úî Silver dimensiones: dim_orgs, dim_users, dim_resources
‚úî Silver soporte: fact_tickets_daily
‚úî Silver finops: fact_billing_monthly
‚úî Silver nps_by_org_date
‚úî Silver usage_daily_by_service
‚úî Silver cost_anomaly_mart

=== CHECKS ===
+------------+--------------+----------+----------+----------+--------+--------------------------+------------------------------------------------------------------------------------+-------------+
|org_id      |org_name      |industry  |region    |plan      |nps_last|ingest_ts                 |source_file                                                                         |snapshot_date|
+------------+--------------+----------+----------+----------+--------+--------------------------+------------------------------------------------------------------------------------+-------------+
|org_0lvsnujz|Nova Cloud 45 |retail    |us-east   |standard  |NULL    |2025-11-13 00:19:17.985387|file:///content/drive/MyDrive/mineria%20de%20datos%20II/proyecto

gold


In [None]:
# === GOLD (Parquet) DESDE SILVER ‚Äî Query-first, idempotente ===
from pyspark.sql import functions as F
import os, re

SILVER = "/content/datalake/silver"
GOLD   = "/content/datalake/gold"
os.makedirs(GOLD, exist_ok=True)

def write_gold(df, path, partition_cols=None, mode="overwrite"):
    w = df.write.mode(mode)
    if partition_cols: w = w.partitionBy(*partition_cols)
    w.parquet(path)

# 1) FinOps: org_daily_usage_by_service
usage = spark.read.parquet(f"{SILVER}/usage_daily_by_service")
gold_usage = (usage
    .select(
        "org_id","date","service",
        F.coalesce("daily_cost_usd", F.lit(0.0)).alias("daily_cost_usd"),
        F.coalesce("requests", F.lit(0.0)).alias("requests"),
        F.coalesce("cpu_hours", F.lit(0.0)).alias("cpu_hours"),
        F.coalesce("storage_gb_hours", F.lit(0.0)).alias("storage_gb_hours"),
        F.coalesce("genai_tokens", F.lit(0.0)).alias("genai_tokens"),
        F.coalesce("carbon_kg", F.lit(0.0)).alias("carbon_kg"),
    )
)
write_gold(gold_usage, f"{GOLD}/org_daily_usage_by_service", ["date","service"])
print("‚úî Gold: org_daily_usage_by_service")

# 2) FinOps: revenue_by_org_month  (con fix si 'month' y 'org_id' vinieron invertidos)
bill = spark.read.parquet(f"{SILVER}/fact_billing_monthly")
def looks_like_yyyymm(s): return bool(re.match(r"^\d{4}-\d{2}$", str(s or "")))
sample = bill.select("org_id","month").limit(50).collect()
need_swap = any(m is not None and not looks_like_yyyymm(m) for (_,m) in [(r["org_id"],r["month"]) for r in sample])

if need_swap:
    bill = (bill
        .withColumnRenamed("org_id","_tmp_org")
        .withColumnRenamed("month","_tmp_month")
        .withColumn("org_id", F.col("_tmp_month"))
        .withColumn("month",  F.col("_tmp_org"))
        .drop("_tmp_org","_tmp_month")
    )
bill = bill.withColumn("month", F.col("month").cast("string"))
gold_revenue = bill.select("org_id","month","revenue_usd","subtotal_usd","credits_usd","tax_usd","fx_rate","currency")
write_gold(gold_revenue, f"{GOLD}/revenue_by_org_month", ["month"])
print("‚úî Gold: revenue_by_org_month", "(swap aplicado)" if need_swap else "")

# 3) FinOps: cost_anomaly_mart
anom = spark.read.parquet(f"{SILVER}/cost_anomaly_mart")
gold_anomaly = (anom.select(
    "org_id","date","service","daily_cost_usd","zscore","mad_score","p95","p99",
    "flag_p99","flag_z3","flag_mad3","anomaly_flag"
))
write_gold(gold_anomaly, f"{GOLD}/cost_anomaly_mart", ["date","service"])
print("‚úî Gold: cost_anomaly_mart")

# 4) Soporte: tickets_by_org_date
tks = spark.read.parquet(f"{SILVER}/fact_tickets_daily")
gold_tickets = (tks
    .select(
        "org_id","date",
        F.col("sev").alias("severity"),
        F.col("tickets_opened").cast("int").alias("tickets_opened"),
        F.col("sla_breach_count").cast("int").alias("sla_breach_count"),
        "csat_avg"
    )
)
write_gold(gold_tickets, f"{GOLD}/tickets_by_org_date", ["date"])
print("‚úî Gold: tickets_by_org_date")

# 5) Producto/GenAI: genai_tokens_by_org_date
TOKENS_COST_PER_1K = 0.002  # ajustar si ten√©s tarifa distinta
gold_genai = (usage
    .groupBy("org_id","date")
    .agg(F.sum(F.coalesce("genai_tokens",F.lit(0.0))).alias("total_tokens"))
    .withColumn("est_cost_usd", (F.col("total_tokens")/1000.0) * F.lit(TOKENS_COST_PER_1K))
)
write_gold(gold_genai, f"{GOLD}/genai_tokens_by_org_date", ["date"])
print("‚úî Gold: genai_tokens_by_org_date")

# Checks
print("\n=== CHECKS GOLD ===")
spark.read.parquet(f"{GOLD}/org_daily_usage_by_service").limit(5).show(truncate=False)
spark.read.parquet(f"{GOLD}/revenue_by_org_month").limit(5).show(truncate=False)
spark.read.parquet(f"{GOLD}/cost_anomaly_mart").orderBy(F.desc("anomaly_flag"),F.desc("daily_cost_usd")).limit(5).show(truncate=False)
spark.read.parquet(f"{GOLD}/tickets_by_org_date").limit(5).show(truncate=False)
spark.read.parquet(f"{GOLD}/genai_tokens_by_org_date").limit(5).show(truncate=False)


‚úî Gold: org_daily_usage_by_service
‚úî Gold: revenue_by_org_month (swap aplicado)
‚úî Gold: cost_anomaly_mart
‚úî Gold: tickets_by_org_date
‚úî Gold: genai_tokens_by_org_date

=== CHECKS GOLD ===
+------------+------------------+--------+---------+------------------+------------+-------------------+----------+-------+
|org_id      |daily_cost_usd    |requests|cpu_hours|storage_gb_hours  |genai_tokens|carbon_kg          |date      |service|
+------------+------------------+--------+---------+------------------+------------+-------------------+----------+-------+
|org_kdgigatj|35.8909           |0.0     |0.0      |13.1736           |0.0         |0.084516           |2025-08-31|compute|
|org_3t60rjiw|0.6673            |0.0     |0.0      |9.5265            |0.0         |0.002054           |2025-08-31|compute|
|org_y9v86wzs|40.7354           |0.0     |0.0      |5.5961            |0.0         |0.09793600000000001|2025-08-31|compute|
|org_cwi64ciy|44.9063           |0.0     |0.0      |42.098

Mini-fix en revenue_by_org_month

In [None]:
# RE-GENERAR SOLO revenue_by_org_month con coalesce defensivo
from pyspark.sql import functions as F

SILVER = "/content/datalake/silver"
GOLD   = "/content/datalake/gold"

bill = spark.read.parquet(f"{SILVER}/fact_billing_monthly")
bill = (bill
    .withColumn("subtotal_usd", F.coalesce("subtotal_usd", F.lit(0.0)))
    .withColumn("credits_usd",  F.coalesce("credits_usd",  F.lit(0.0)))
    .withColumn("tax_usd",      F.coalesce("tax_usd",      F.lit(0.0)))
    .withColumn("fx_rate",      F.coalesce("fx_rate",      F.lit(1.0)))
    .withColumn("revenue_usd", (F.col("subtotal_usd") - F.col("credits_usd") + F.col("tax_usd")) * F.col("fx_rate"))
    .select("org_id","month","revenue_usd","subtotal_usd","credits_usd","tax_usd","fx_rate","currency")
)
(bill.write.mode("overwrite").partitionBy("month").parquet(f"{GOLD}/revenue_by_org_month"))
print("‚úî Gold (revenue_by_org_month) regenerado con coalesce")


‚úî Gold (revenue_by_org_month) regenerado con coalesce


Crear keyspace y tablas en Astra (CQL)

In [None]:
KEYSPACE = "cloud_analytics"


In [None]:
ASTRA_BUNDLE = "/content/secure-connect-cloud-analytics.zip"
ASTRA_CLIENT_ID = "privado"
ASTRA_CLIENT_SECRET = "privado"


Ahora ejecut√° la carga Gold ‚Üí Astra

In [None]:
# === Carga GOLD -> AstraDB usando cassandra-driver (sin conector Spark) ===
!pip -q install cassandra-driver==3.29.1

import datetime as dt
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement, PreparedStatement
from cassandra import ConsistencyLevel

# 1) Conexi√≥n a Astra (usa TUS variables ya definidas)
cloud_config = {"secure_connect_bundle": ASTRA_BUNDLE}
auth_provider = PlainTextAuthProvider(ASTRA_CLIENT_ID, ASTRA_CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect("cloud_analytics")  # keyspace ya creado

# 2) Utilidad: castear tipos Python correctos para CQL
def to_date(v):
    if v is None: return None
    if isinstance(v, dt.date): return v
    return dt.date.fromisoformat(str(v))

def to_float(v):
    if v is None or v == "": return None
    try: return float(v)
    except: return None

def to_int(v):
    if v is None or v == "": return None
    try: return int(v)
    except: return None

# 3) Leemos GOLD con Spark (solo para leer Parquet local)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
GOLD = "/content/datalake/gold"

# ---- Tabla 1: org_daily_usage_by_service ----
df1 = spark.read.parquet(f"{GOLD}/org_daily_usage_by_service").select(
    "org_id","date","service","daily_cost_usd","requests","cpu_hours","storage_gb_hours","genai_tokens","carbon_kg"
)
stmt1 = session.prepare("""
INSERT INTO org_daily_usage_by_service
(org_id, date, service, daily_cost_usd, requests, cpu_hours, storage_gb_hours, genai_tokens, carbon_kg)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""")
batch_size = 200
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
count = 0
for r in df1.toLocalIterator():
    batch.add(stmt1, (
        r["org_id"], to_date(r["date"]), r["service"],
        to_float(r["daily_cost_usd"]), to_float(r["requests"]), to_float(r["cpu_hours"]),
        to_float(r["storage_gb_hours"]), to_float(r["genai_tokens"]), to_float(r["carbon_kg"])
    ))
    count += 1
    if len(batch) >= batch_size:
        session.execute(batch); batch.clear()
if len(batch) > 0: session.execute(batch)
print(f"‚úî Insertados org_daily_usage_by_service: {count}")

# ---- Tabla 2: revenue_by_org_month ----
df2 = spark.read.parquet(f"{GOLD}/revenue_by_org_month").select(
    "org_id","month","revenue_usd","subtotal_usd","credits_usd","tax_usd","fx_rate","currency"
)
stmt2 = session.prepare("""
INSERT INTO revenue_by_org_month
(org_id, month, revenue_usd, subtotal_usd, credits_usd, tax_usd, fx_rate, currency)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""")
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
count = 0
for r in df2.toLocalIterator():
    batch.add(stmt2, (
        r["org_id"], str(r["month"]),
        to_float(r["revenue_usd"]), to_float(r["subtotal_usd"]), to_float(r["credits_usd"]),
        to_float(r["tax_usd"]), to_float(r["fx_rate"]), r["currency"]
    ))
    count += 1
    if len(batch) >= batch_size:
        session.execute(batch); batch.clear()
if len(batch) > 0: session.execute(batch)
print(f"‚úî Insertados revenue_by_org_month: {count}")

# ---- Tabla 3: cost_anomaly_mart ----
df3 = spark.read.parquet(f"{GOLD}/cost_anomaly_mart").select(
    "org_id","date","service","daily_cost_usd","zscore","mad_score","p95","p99",
    "flag_p99","flag_z3","flag_mad3","anomaly_flag"
)
stmt3 = session.prepare("""
INSERT INTO cost_anomaly_mart
(org_id, date, service, daily_cost_usd, zscore, mad_score, p95, p99, flag_p99, flag_z3, flag_mad3, anomaly_flag)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""")
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
count = 0
for r in df3.toLocalIterator():
    batch.add(stmt3, (
        r["org_id"], to_date(r["date"]), r["service"],
        to_float(r["daily_cost_usd"]), to_float(r["zscore"]), to_float(r["mad_score"]),
        to_float(r["p95"]), to_float(r["p99"]),
        bool(r["flag_p99"]), bool(r["flag_z3"]), bool(r["flag_mad3"]), bool(r["anomaly_flag"])
    ))
    count += 1
    if len(batch) >= batch_size:
        session.execute(batch); batch.clear()
if len(batch) > 0: session.execute(batch)
print(f"‚úî Insertados cost_anomaly_mart: {count}")

# ---- Tabla 4: tickets_by_org_date ----
df4 = spark.read.parquet(f"{GOLD}/tickets_by_org_date").select(
    "org_id","date","severity","tickets_opened","sla_breach_count","csat_avg"
)
stmt4 = session.prepare("""
INSERT INTO tickets_by_org_date
(org_id, date, severity, tickets_opened, sla_breach_count, csat_avg)
VALUES (?, ?, ?, ?, ?, ?)
""")
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
count = 0
for r in df4.toLocalIterator():
    batch.add(stmt4, (
        r["org_id"], to_date(r["date"]), r["severity"],
        to_int(r["tickets_opened"]), to_int(r["sla_breach_count"]), to_float(r["csat_avg"])
    ))
    count += 1
    if len(batch) >= batch_size:
        session.execute(batch); batch.clear()
if len(batch) > 0: session.execute(batch)
print(f"‚úî Insertados tickets_by_org_date: {count}")

# ---- Tabla 5: genai_tokens_by_org_date ----
df5 = spark.read.parquet(f"{GOLD}/genai_tokens_by_org_date").select(
    "org_id","date","total_tokens","est_cost_usd"
)
stmt5 = session.prepare("""
INSERT INTO genai_tokens_by_org_date
(org_id, date, total_tokens, est_cost_usd)
VALUES (?, ?, ?, ?)
""")
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
count = 0
for r in df5.toLocalIterator():
    batch.add(stmt5, (
        r["org_id"], to_date(r["date"]),
        to_float(r["total_tokens"]), to_float(r["est_cost_usd"])
    ))
    count += 1
    if len(batch) >= batch_size:
        session.execute(batch); batch.clear()
if len(batch) > 0: session.execute(batch)
print(f"‚úî Insertados genai_tokens_by_org_date: {count}")

print("üöÄ Carga Gold ‚Üí AstraDB finalizada con cassandra-driver")


[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m19.6/19.6 MB[0m [31m38.5 MB/s[0m eta [36m0:00:00[0m
[?25h



‚úî Insertados org_daily_usage_by_service: 1685




‚úî Insertados revenue_by_org_month: 240




‚úî Insertados cost_anomaly_mart: 1685
‚úî Insertados tickets_by_org_date: 984
‚úî Insertados genai_tokens_by_org_date: 1336
üöÄ Carga Gold ‚Üí AstraDB finalizada con cassandra-driver


costos/requests diarios en un rango (loop por d√≠a

In [None]:
import datetime as dt
from collections import defaultdict

org = "org_cwi64ciy"
start, end = dt.date(2025,8,1), dt.date(2025,8,31)

stmt = session.prepare("""
SELECT date, service, daily_cost_usd, requests, cpu_hours, storage_gb_hours
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = ? AND date = ?;
""")

rows = []
d = start
while d <= end:
    rows.extend(session.execute(stmt, (org, d)))
    d += dt.timedelta(days=1)

# resumen por servicio
tot_cost = defaultdict(float)
tot_req  = defaultdict(float)
for r in rows:
    tot_cost[r.service] += float(r.daily_cost_usd or 0.0)
    tot_req[r.service]  += float(r.requests or 0.0)

print("Costos por servicio (rango):", dict(sorted(tot_cost.items())))
print("Requests por servicio (rango):", dict(sorted(tot_req.items())))
print("Registros devueltos:", len(rows))


Costos por servicio (rango): {'compute': 137.23059999999998, 'networking': 3.8691999999999998}
Requests por servicio (rango): {'compute': 0.0, 'networking': 0.0}
Registros devueltos: 12


Top-N servicios por costo (√∫ltimos 14 d√≠as)

In [None]:
from collections import defaultdict
org = "org_cwi64ciy"
start, end = dt.date(2025,10,30), dt.date(2025,11,12)

stmt = session.prepare("""
SELECT date, service, daily_cost_usd
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = ? AND date = ?;
""")

agg = defaultdict(float)
d = start
while d <= end:
    for r in session.execute(stmt, (org, d)):
        agg[r.service] += float(r.daily_cost_usd or 0.0)
    d += dt.timedelta(days=1)

topN = sorted(agg.items(), key=lambda x: x[1], reverse=True)[:5]
print("Top-5 servicios por costo (14d):", topN)


Top-5 servicios por costo (14d): []


cr√≠ticos y SLA breach rate (√∫ltimos 30 d√≠as)

In [None]:
org = "org_cwi64ciy"
start, end = dt.date(2025,10,13), dt.date(2025,11,12)

stmt = session.prepare("""
SELECT date, severity, tickets_opened, sla_breach_count, csat_avg
FROM cloud_analytics.tickets_by_org_date
WHERE org_id = ? AND date = ? AND severity = 'critical';
""")

series = []
d = start
while d <= end:
    opened = breach = 0
    for r in session.execute(stmt, (org, d)):
        opened += int(r.tickets_opened or 0)
        breach += int(r.sla_breach_count or 0)
    rate = (breach / opened) if opened else 0.0
    series.append((d, opened, breach, rate))
    d += dt.timedelta(days=1)

print("Primeros 10 d√≠as (fecha, tickets, breaches, breach_rate):")
for s in series[:10]:
    print(s)


Primeros 10 d√≠as (fecha, tickets, breaches, breach_rate):
(datetime.date(2025, 10, 13), 0, 0, 0.0)
(datetime.date(2025, 10, 14), 0, 0, 0.0)
(datetime.date(2025, 10, 15), 0, 0, 0.0)
(datetime.date(2025, 10, 16), 0, 0, 0.0)
(datetime.date(2025, 10, 17), 0, 0, 0.0)
(datetime.date(2025, 10, 18), 0, 0, 0.0)
(datetime.date(2025, 10, 19), 0, 0, 0.0)
(datetime.date(2025, 10, 20), 0, 0, 0.0)
(datetime.date(2025, 10, 21), 0, 0, 0.0)
(datetime.date(2025, 10, 22), 0, 0, 0.0)


In [None]:
# === Re-cargar revenue_by_org_month usando cassandra-driver (sin conector Spark) ===
!pip -q install cassandra-driver==3.29.1

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement
from cassandra import ConsistencyLevel
from pyspark.sql import functions as F

KEYSPACE = "cloud_analytics"
GOLD = "/content/datalake/gold"

# 1) Conexi√≥n a Astra (reutiliza tus variables ASTRA_BUNDLE/ID/SECRET)
cloud_config = {"secure_connect_bundle": ASTRA_BUNDLE}
auth_provider = PlainTextAuthProvider(ASTRA_CLIENT_ID, ASTRA_CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

# 2) TRUNCATE tabla en Astra
session.execute(f"TRUNCATE {KEYSPACE}.revenue_by_org_month;")
print("‚úî TRUNCATE revenue_by_org_month")

# 3) Leer parquet GOLD y corregir cruce org_id/month
rev = spark.read.parquet(f"{GOLD}/revenue_by_org_month").select(
    "org_id","month","revenue_usd","subtotal_usd","credits_usd","tax_usd","fx_rate","currency"
)

# detecta filas mal mapeadas (org_id=inv_*, month=org_*)
bad = rev.filter(F.col("org_id").startswith("inv_") & F.col("month").startswith("org_"))
good = rev.exceptAll(bad)

fixed_bad = bad.select(
    F.col("month").alias("org_id"),
    F.col("org_id").alias("month"),
    "revenue_usd","subtotal_usd","credits_usd","tax_usd","fx_rate","currency"
)

fixed = good.unionByName(fixed_bad)

# 4) Insertar en Astra por batches
stmt = session.prepare(f"""
INSERT INTO {KEYSPACE}.revenue_by_org_month
(org_id, month, revenue_usd, subtotal_usd, credits_usd, tax_usd, fx_rate, currency)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""")

def to_float(x):
    try:
        return float(x) if x is not None else None
    except:
        return None

batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
count = 0
for r in fixed.toLocalIterator():
    batch.add(stmt, (
        r["org_id"],
        str(r["month"]),                      # month es texto 'YYYY-MM'
        to_float(r["revenue_usd"]),
        to_float(r["subtotal_usd"]),
        to_float(r["credits_usd"]),
        to_float(r["tax_usd"]),
        to_float(r["fx_rate"]),
        r["currency"]
    ))
    count += 1
    if len(batch) >= 200:
        session.execute(batch); batch.clear()
if len(batch) > 0:
    session.execute(batch)

print(f"üöÄ Revenue corregido insertado: {count} filas")




‚úî TRUNCATE revenue_by_org_month
üöÄ Revenue corregido insertado: 240 filas


In [None]:
# ==== Recargar revenue_by_org_month (corrigiendo org_id/month) ====
!pip -q install cassandra-driver==3.29.1

import re, datetime as dt
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement
from cassandra import ConsistencyLevel
from pyspark.sql import functions as F

KEYSPACE = "cloud_analytics"
GOLD = "/content/datalake/gold"

# 1) Conexi√≥n a Astra (usa tus variables ASTRA_BUNDLE / ASTRA_CLIENT_ID / ASTRA_CLIENT_SECRET ya definidas)
cloud_config = {"secure_connect_bundle": ASTRA_BUNDLE}
auth_provider = PlainTextAuthProvider(ASTRA_CLIENT_ID, ASTRA_CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

# 2) Vaciar la tabla antes de recargar
session.execute(f"TRUNCATE {KEYSPACE}.revenue_by_org_month;")
print("‚úî TRUNCATE revenue_by_org_month")

# 3) Leer Parquet GOLD
rev = (spark.read.parquet(f"{GOLD}/revenue_by_org_month")
       .select("org_id","month","revenue_usd","subtotal_usd","credits_usd","tax_usd","fx_rate","currency"))

# 4) Separar filas ‚Äúmalas‚Äù (org_id=inv_* y month=org_*) y ‚Äúbuenas‚Äù
bad  = rev.filter(F.col("org_id").startswith("inv_") & F.col("month").startswith("org_"))
good = rev.filter(~(F.col("org_id").startswith("inv_") & F.col("month").startswith("org_")))

# 5) Hacer swap en las malas: org_id <- month ; month <- org_id
fixed_bad = bad.select(
    F.col("month").alias("org_id"),
    F.col("org_id").alias("month"),
    "revenue_usd","subtotal_usd","credits_usd","tax_usd","fx_rate","currency"
)

# 6) Unir corregidas + buenas
fixed = good.unionByName(fixed_bad)

# 7) Normalizar 'month' a 'YYYY-MM' por si viniera 'YYYY-MM-DD' o tipo fecha
def normalize_month(v):
    if v is None:
        return None
    # objetos date/datetime
    if isinstance(v, (dt.date, dt.datetime)):
        return f"{v.year:04d}-{v.month:02d}"
    s = str(v)
    if re.match(r"^\d{4}-\d{2}$", s):
        return s
    if re.match(r"^\d{4}-\d{2}-\d{2}$", s):
        return s[:7]
    # si todav√≠a es algo raro, lo dejamos como string
    return s

def fnum(x):
    try:
        return float(x) if x is not None else None
    except:
        return None

# 8) Insertar en Astra por batches
stmt = session.prepare(f"""
INSERT INTO {KEYSPACE}.revenue_by_org_month
(org_id, month, revenue_usd, subtotal_usd, credits_usd, tax_usd, fx_rate, currency)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""")

batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
count = 0
for r in fixed.toLocalIterator():
    month_norm = normalize_month(r["month"])
    batch.add(stmt, (
        r["org_id"],
        month_norm,
        fnum(r["revenue_usd"]),
        fnum(r["subtotal_usd"]),
        fnum(r["credits_usd"]),
        fnum(r["tax_usd"]),
        fnum(r["fx_rate"]),
        r["currency"]
    ))
    count += 1
    if len(batch) >= 200:
        session.execute(batch); batch.clear()
if len(batch) > 0:
    session.execute(batch)

print(f"üöÄ Revenue corregido insertado: {count} filas")

# 9) Verificaci√≥n r√°pida desde Python (muestra 1 org y sus meses)
rows = session.execute(f"SELECT org_id, month, revenue_usd FROM {KEYSPACE}.revenue_by_org_month LIMIT 5;")
sample_org = None
for rr in rows:
    print("Sample:", rr)
    sample_org = rr.org_id
    break

if sample_org:
    print("\nMeses para:", sample_org)
    for rr in session.execute(f"""
        SELECT month, revenue_usd FROM {KEYSPACE}.revenue_by_org_month
        WHERE org_id = %s
    """, (sample_org,)):
        print(rr.month, rr.revenue_usd)




‚úî TRUNCATE revenue_by_org_month
üöÄ Revenue corregido insertado: 240 filas
Sample: Row(org_id='org_c11ertj5', month='inv_8fkcek2g', revenue_usd=-236799.24000000002)

Meses para: org_c11ertj5
inv_8fkcek2g -236799.24000000002
inv_ovk2hqkq -331061.4153
inv_zebcvf6f -624240.5565


In [None]:
# ==== RECONSTRUIR revenue_by_org_month (correcto) y cargar a Astra con cassandra-driver ====
!pip -q install cassandra-driver==3.29.1

import re, datetime as dt, os
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement
from cassandra import ConsistencyLevel
from pyspark.sql import functions as F
from pyspark.sql import types as T

KEYSPACE = "cloud_analytics"

# 0) Conexi√≥n a Astra (usa tus variables ASTRA_BUNDLE/ID/SECRET ya definidas)
cloud_config = {"secure_connect_bundle": ASTRA_BUNDLE}
auth_provider = PlainTextAuthProvider(ASTRA_CLIENT_ID, ASTRA_CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

# 1) Intentar leer de SILVER; si no, leer CSV de landing y construir
silver_path = "/content/datalake/silver/finops/fact_billing_monthly"
landing_csv = "/content/drive/MyDrive/mineria de datos II/proyecto1/billing_monthly.csv"

df = None
if os.path.exists(silver_path):
    df = spark.read.parquet(silver_path)
    print("‚úì Le√≠do Silver:", silver_path)
else:
    print("‚ö† No se encontr√≥ Silver. Leyendo CSV de landing‚Ä¶")
    df = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv(landing_csv))

# 2) Normalizar columnas esperadas
# Intentamos mapear a: org_id, month(YYYY-MM), subtotal_usd, credits_usd, tax_usd, fx_rate, currency, revenue_usd
cols = [c.lower() for c in df.columns]
df = df.toDF(*cols)

# Heur√≠sticas de nombres
org_col = "org_id" if "org_id" in cols else ("tenant_id" if "tenant_id" in cols else None)
inv_date_col = "invoice_date" if "invoice_date" in cols else None
month_col = "month" if "month" in cols else None
subtotal_col = "subtotal_usd" if "subtotal_usd" in cols else ("subtotal" if "subtotal" in cols else None)
credits_col  = "credits_usd"  if "credits_usd"  in cols else ("credits"  if "credits"  in cols else None)
tax_col      = "tax_usd"      if "tax_usd"      in cols else ("tax"      if "tax"      in cols else None)
fx_col       = "fx_rate"      if "fx_rate"      in cols else None
cur_col      = "currency"     if "currency"     in cols else None

# Si no hay month pero hay invoice_date, lo derivamos.
work = df
if month_col is None and inv_date_col is not None:
    work = work.withColumn("invoice_ts", F.to_timestamp(F.col(inv_date_col)))
    work = work.withColumn("month", F.date_format(F.col("invoice_ts"), "yyyy-MM"))
    month_col = "month"
elif month_col is not None:
    # Si month viene como fecha completa, recortar a yyyy-MM
    work = work.withColumn("month",
                           F.when(F.length(F.col(month_col)) == 7, F.col(month_col))
                            .otherwise(F.date_format(F.to_date(F.col(month_col)), "yyyy-MM")))
else:
    # √öltimo recurso: si hay alg√∫n 'period' o similar
    for c in cols:
        if c.startswith("period") or c.endswith("_month"):
            work = work.withColumn("month",
                                   F.when(F.length(F.col(c)) == 7, F.col(c))
                                    .otherwise(F.date_format(F.to_date(F.col(c)), "yyyy-MM")))
            month_col = "month"
            break

# Cast num√©ricos con fallback
def try_float(col):
    return F.when(F.col(col).cast("double").isNotNull(), F.col(col).cast("double")).otherwise(F.lit(None).cast("double"))

if subtotal_col is None: subtotal_col = "subtotal_usd"
if credits_col  is None: credits_col  = "credits_usd"
if tax_col      is None: tax_col      = "tax_usd"
if fx_col       is None: fx_col       = "fx_rate"
if cur_col      is None: cur_col      = "currency"

# Si faltan columnas, cr√©alas en 0/NULL
for c, defv in [(subtotal_col, 0.0), (credits_col, 0.0), (tax_col, 0.0)]:
    if c not in work.columns:
        work = work.withColumn(c, F.lit(defv).cast("double"))

if fx_col not in work.columns:
    work = work.withColumn(fx_col, F.lit(None).cast("double"))
if cur_col not in work.columns:
    work = work.withColumn(cur_col, F.lit("USD"))

if org_col is None or month_col is None:
    raise ValueError("No pude identificar org_id y/o month. Revisa las columnas del origen.")

work = (work
    .withColumn("org_id", F.col(org_col))
    .withColumn("month", F.col(month_col))
    .withColumn("subtotal_usd", try_float(subtotal_col))
    .withColumn("credits_usd",  try_float(credits_col))
    .withColumn("tax_usd",      try_float(tax_col))
    .withColumn("fx_rate",      try_float(fx_col))
    .withColumn("currency",     F.col(cur_col))
)

# revenue_usd: usamos el ya calculado si existe, sino: subtotal_usd - credits_usd + tax_usd
if "revenue_usd" in work.columns:
    work = work.withColumn("revenue_usd", try_float("revenue_usd"))
else:
    work = work.withColumn("revenue_usd",
                           F.coalesce(F.col("subtotal_usd"), F.lit(0.0)) -
                           F.coalesce(F.col("credits_usd"),  F.lit(0.0)) +
                           F.coalesce(F.col("tax_usd"),      F.lit(0.0)))

# Normalizar month definitivamente a 'YYYY-MM'
work = work.withColumn("month",
                       F.when(F.length("month")==7, F.col("month"))
                        .otherwise(F.date_format(F.to_date("month"), "yyyy-MM")))

# Selecci√≥n final y drop nulos clave
final_rev = (work
    .select("org_id","month","revenue_usd","subtotal_usd","credits_usd","tax_usd","fx_rate","currency")
    .where(F.col("org_id").isNotNull() & F.col("month").isNotNull())
    .dropDuplicates(["org_id","month"])
)

print("Schema final:")
final_rev.printSchema()
final_rev.show(5, False)

# 2) Limpiar tabla antes de insertar (ya truncamos arriba)
session.execute("TRUNCATE cloud_analytics.revenue_by_org_month;")

# 3) Insertar por batches
stmt = session.prepare("""
INSERT INTO cloud_analytics.revenue_by_org_month
(org_id, month, revenue_usd, subtotal_usd, credits_usd, tax_usd, fx_rate, currency)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""")

def fnum(x):
    try: return float(x) if x is not None else None
    except: return None

batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
count = 0
for r in final_rev.toLocalIterator():
    batch.add(stmt, (r["org_id"],
                     str(r["month"])[:7],
                     fnum(r["revenue_usd"]), fnum(r["subtotal_usd"]),
                     fnum(r["credits_usd"]), fnum(r["tax_usd"]),
                     fnum(r["fx_rate"]), r["currency"]))
    count += 1
    if len(batch) >= 200:
        session.execute(batch); batch.clear()
if len(batch) > 0: session.execute(batch)

print(f"üöÄ Revenue correcto insertado: {count} filas")




‚ö† No se encontr√≥ Silver. Leyendo CSV de landing‚Ä¶
Schema final:
root
 |-- org_id: string (nullable = true)
 |-- month: string (nullable = true)
 |-- revenue_usd: double (nullable = false)
 |-- subtotal_usd: double (nullable = true)
 |-- credits_usd: double (nullable = true)
 |-- tax_usd: double (nullable = true)
 |-- fx_rate: double (nullable = true)
 |-- currency: string (nullable = true)

+------------+-------+-----------+------------+-----------+-------+-------+--------+
|org_id      |month  |revenue_usd|subtotal_usd|credits_usd|tax_usd|fx_rate|currency|
+------------+-------+-----------+------------+-----------+-------+-------+--------+
|org_0lvsnujz|2025-06|1316.02    |1316.02     |NULL       |0.0    |NULL   |ARS     |
|org_0lvsnujz|2025-07|1579.22    |1600.22     |21.0       |0.0    |NULL   |USD     |
|org_0lvsnujz|2025-08|675.07     |675.07      |NULL       |0.0    |NULL   |USD     |
|org_0lzjjege|2025-06|1179.39    |1179.39     |NULL       |0.0    |NULL   |ARS     |
|org_0l

Costos y requests diarios por org y servicio (rango)

In [None]:
import datetime as dt
from collections import defaultdict

org = "org_c11ertj5"
start, end = dt.date(2025,8,1), dt.date(2025,8,31)

stmt = session.prepare("""
SELECT date, service, daily_cost_usd, requests, cpu_hours, storage_gb_hours
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = ? AND date = ?;
""")

rows = []
d = start
while d <= end:
    rows.extend(session.execute(stmt, (org, d)))
    d += dt.timedelta(days=1)

# sumar por servicio (√∫til para el dashboard)
cost_by_service = defaultdict(float)
for r in rows:
    cost_by_service[r.service] += float(r.daily_cost_usd or 0.0)

print("Registros:", len(rows))
print("Costos por servicio:", dict(sorted(cost_by_service.items())))


Top-N servicios por costo (√∫ltimos 14 d√≠as)

In [None]:
from collections import defaultdict
org = "org_c11ertj5"
start, end = dt.date(2025,10,30), dt.date(2025,11,12)

stmt = session.prepare("""
SELECT date, service, daily_cost_usd
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = ? AND date = ?;
""")

agg = defaultdict(float)
d = start
while d <= end:
    for r in session.execute(stmt, (org, d)):
        agg[r.service] += float(r.daily_cost_usd or 0.0)
    d += dt.timedelta(days=1)

topN = sorted(agg.items(), key=lambda x: x[1], reverse=True)[:5]
print("Top-5 servicios por costo (14d):", topN)


Top-5 servicios por costo (14d): []


Evoluci√≥n de cr√≠ticos y tasa de SLA breach (30 d√≠as)

In [None]:
org = "org_c11ertj5"
start, end = dt.date(2025,10,13), dt.date(2025,11,12)

stmt = session.prepare("""
SELECT date, severity, tickets_opened, sla_breach_count, csat_avg
FROM cloud_analytics.tickets_by_org_date
WHERE org_id = ? AND date = ? AND severity = 'critical';
""")

series = []
d = start
while d <= end:
    opened = breach = 0
    for r in session.execute(stmt, (org, d)):
        opened += int(r.tickets_opened or 0)
        breach += int(r.sla_breach_count or 0)
    rate = (breach / opened) if opened else 0.0
    series.append((d, opened, breach, rate))
    d += dt.timedelta(days=1)

print("Primeros d√≠as (fecha, tickets, breaches, breach_rate):")
for s in series[:10]:
    print(s)


Primeros d√≠as (fecha, tickets, breaches, breach_rate):
(datetime.date(2025, 10, 13), 0, 0, 0.0)
(datetime.date(2025, 10, 14), 0, 0, 0.0)
(datetime.date(2025, 10, 15), 0, 0, 0.0)
(datetime.date(2025, 10, 16), 0, 0, 0.0)
(datetime.date(2025, 10, 17), 0, 0, 0.0)
(datetime.date(2025, 10, 18), 0, 0, 0.0)
(datetime.date(2025, 10, 19), 0, 0, 0.0)
(datetime.date(2025, 10, 20), 0, 0, 0.0)
(datetime.date(2025, 10, 21), 0, 0, 0.0)
(datetime.date(2025, 10, 22), 0, 0, 0.0)


Costos/requests diarios por org y servicio (rango de fechas)

In [None]:
org = "org_c11ertj5"
start, end = dt.date(2025,8,1), dt.date(2025,8,31)

stmt = session.prepare("""
SELECT date, total_tokens, est_cost_usd
FROM cloud_analytics.genai_tokens_by_org_date
WHERE org_id = ? AND date = ?;
""")

rows = []
d = start
while d <= end:
    rows.extend(session.execute(stmt, (org, d)))
    d += dt.timedelta(days=1)

print("Filas:", len(rows))
for r in rows[:10]:
    print(r.date, r.total_tokens, r.est_cost_usd)


Filas: 8
2025-08-02 0.0 0.0
2025-08-03 0.0 0.0
2025-08-06 0.0 0.0
2025-08-09 0.0 0.0
2025-08-23 0.0 0.0
2025-08-27 0.0 0.0
2025-08-30 0.0 0.0
2025-08-31 3392.0 0.006784


Top-N servicios por costo (√∫ltimos 14 d√≠as)

In [None]:
from collections import defaultdict
org = "org_c11ertj5"
start, end = dt.date(2025,10,30), dt.date(2025,11,12)

stmt = session.prepare("""
SELECT date, service, daily_cost_usd
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = ? AND date = ?;
""")

agg = defaultdict(float)
d = start
while d <= end:
    for r in session.execute(stmt, (org, d)):
        agg[r.service] += float(r.daily_cost_usd or 0.0)
    d += dt.timedelta(days=1)

topN = sorted(agg.items(), key=lambda x: x[1], reverse=True)[:5]
print("Top-5 servicios por costo (14d):", topN)


Top-5 servicios por costo (14d): []


Evoluci√≥n de cr√≠ticos y tasa de SLA breach (√∫ltimos 30 d√≠as)

In [None]:
org = "org_c11ertj5"
start, end = dt.date(2025,10,13), dt.date(2025,11,12)

stmt = session.prepare("""
SELECT date, severity, tickets_opened, sla_breach_count, csat_avg
FROM cloud_analytics.tickets_by_org_date
WHERE org_id = ? AND date = ? AND severity = 'critical';
""")

series = []
d = start
while d <= end:
    opened = breach = 0
    for r in session.execute(stmt, (org, d)):
        opened += int(r.tickets_opened or 0)
        breach += int(r.sla_breach_count or 0)
    rate = (breach / opened) if opened else 0.0
    series.append((d, opened, breach, rate))
    d += dt.timedelta(days=1)

print("Primeros 10 d√≠as (fecha, tickets, breaches, breach_rate):")
for s in series[:10]:
    print(s)


Primeros 10 d√≠as (fecha, tickets, breaches, breach_rate):
(datetime.date(2025, 10, 13), 0, 0, 0.0)
(datetime.date(2025, 10, 14), 0, 0, 0.0)
(datetime.date(2025, 10, 15), 0, 0, 0.0)
(datetime.date(2025, 10, 16), 0, 0, 0.0)
(datetime.date(2025, 10, 17), 0, 0, 0.0)
(datetime.date(2025, 10, 18), 0, 0, 0.0)
(datetime.date(2025, 10, 19), 0, 0, 0.0)
(datetime.date(2025, 10, 20), 0, 0, 0.0)
(datetime.date(2025, 10, 21), 0, 0, 0.0)
(datetime.date(2025, 10, 22), 0, 0, 0.0)


Tokens GenAI y costo estimado por d√≠a (rango)

In [None]:
org = "org_c11ertj5"
start, end = dt.date(2025,8,1), dt.date(2025,8,31)

stmt = session.prepare("""
SELECT date, total_tokens, est_cost_usd
FROM cloud_analytics.genai_tokens_by_org_date
WHERE org_id = ? AND date = ?;
""")

rows = []
d = start
while d <= end:
    rows.extend(session.execute(stmt, (org, d)))
    d += dt.timedelta(days=1)

print("Filas:", len(rows))
for r in rows[:10]:
    print(r.date, r.total_tokens, r.est_cost_usd)


Filas: 8
2025-08-02 0.0 0.0
2025-08-03 0.0 0.0
2025-08-06 0.0 0.0
2025-08-09 0.0 0.0
2025-08-23 0.0 0.0
2025-08-27 0.0 0.0
2025-08-30 0.0 0.0
2025-08-31 3392.0 0.006784


In [None]:
# ========= Pretty prints para las 4 consultas que corren desde Colab =========
import datetime as dt
from collections import defaultdict

# --- Par√°metros (modific√° si quer√©s) ---
org   = "org_c11ertj5"
r1    = (dt.date(2025,8,1),  dt.date(2025,8,31))   # #1 costos/requests rango
r2    = (dt.date(2025,10,30),dt.date(2025,11,12))  # #2 Top-N 14d
r3    = (dt.date(2025,10,13),dt.date(2025,11,12))  # #3 cr√≠ticos 30d
r5    = (dt.date(2025,8,1),  dt.date(2025,8,31))   # #5 GenAI agosto

# ---------------- #1 Costos/requests diarios por org y servicio (rango) ----------------
stmt1 = session.prepare("""
SELECT date, service, daily_cost_usd, requests, cpu_hours, storage_gb_hours
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = ? AND date = ?;
""")
rows1 = []
d = r1[0]
while d <= r1[1]:
    rows1.extend(session.execute(stmt1, (org, d)))
    d += dt.timedelta(days=1)

cost_by_service = defaultdict(float)
req_by_service  = defaultdict(float)
for r in rows1:
    cost_by_service[r.service] += float(r.daily_cost_usd or 0.0)
    req_by_service[r.service]  += float(r.requests or 0.0)

print("=== #1 Costos/Requests diarios (rango) ===")
print(f"org = {org} | fechas = {r1[0]} .. {r1[1]} | filas = {len(rows1)}\n")
print("Servicio       | Costos USD total | Requests total")
print("---------------+------------------+---------------")
for svc in sorted(cost_by_service):
    print(f"{svc:<14} | {cost_by_service[svc]:>16,.2f} | {req_by_service[svc]:>13,.0f}")
print()

# ---------------- #2 Top-N servicios por costo acumulado (14 d√≠as) ----------------
stmt2 = session.prepare("""
SELECT date, service, daily_cost_usd
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = ? AND date = ?;
""")
agg2 = defaultdict(float)
d = r2[0]
while d <= r2[1]:
    for r in session.execute(stmt2, (org, d)):
        agg2[r.service] += float(r.daily_cost_usd or 0.0)
    d += dt.timedelta(days=1)

topN = sorted(agg2.items(), key=lambda x: x[1], reverse=True)[:5]
print("=== #2 Top-N servicios por costo (√∫ltimos 14 d√≠as) ===")
print(f"org = {org} | fechas = {r2[0]} .. {r2[1]}\n")
print("Rank | Servicio      | Costo 14d (USD)")
print("-----+---------------+----------------")
for i,(svc,val) in enumerate(topN,1):
    print(f"{i:>4} | {svc:<13} | {val:>14,.2f}")
print()

# ---------------- #3 Evoluci√≥n cr√≠ticos y SLA breach rate (30 d√≠as) ----------------
stmt3 = session.prepare("""
SELECT date, severity, tickets_opened, sla_breach_count, csat_avg
FROM cloud_analytics.tickets_by_org_date
WHERE org_id = ? AND date = ? AND severity = 'critical';
""")
series3 = []
d = r3[0]
while d <= r3[1]:
    opened = breach = 0
    for r in session.execute(stmt3, (org, d)):
        opened += int(r.tickets_opened or 0)
        breach += int(r.sla_breach_count or 0)
    rate = (breach / opened) if opened else 0.0
    series3.append((d, opened, breach, rate))
    d += dt.timedelta(days=1)

print("=== #3 Cr√≠ticos y SLA breach rate (30d) ===")
print(f"org = {org} | fechas = {r3[0]} .. {r3[1]}\n")
print("Fecha       | Tickets | Breaches | Breach rate")
print("------------+---------+----------+------------")
for d,opened,breach,rate in series3[:15]:
    print(f"{d} | {opened:>7d} | {breach:>8d} | {rate:>10.2%}")
if len(series3) > 15:
    print(f"... ({len(series3)-15} filas m√°s)")
print()

# ---------------- #5 Tokens GenAI y costo estimado (rango) ----------------
stmt5 = session.prepare("""
SELECT date, total_tokens, est_cost_usd
FROM cloud_analytics.genai_tokens_by_org_date
WHERE org_id = ? AND date = ?;
""")
rows5 = []
d = r5[0]
while d <= r5[1]:
    rows5.extend(session.execute(stmt5, (org, d)))
    d += dt.timedelta(days=1)

print("=== #5 GenAI tokens y costo (rango) ===")
print(f"org = {org} | fechas = {r5[0]} .. {r5[1]} | filas = {len(rows5)}\n")
print("Fecha       | Tokens      | Est. costo (USD)")
print("------------+-------------+-----------------")
for r in rows5:
    print(f"{r.date} | {float(r.total_tokens or 0):>11,.0f} | {float(r.est_cost_usd or 0):>15,.6f}")


=== #1 Costos/Requests diarios (rango) ===
org = org_c11ertj5 | fechas = 2025-08-01 .. 2025-08-31 | filas = 13

Servicio       | Costos USD total | Requests total
---------------+------------------+---------------
compute        |           173.27 |             0
database       |            25.93 |             0
genai          |            31.13 |             0
storage        |             4.98 |             0

=== #2 Top-N servicios por costo (√∫ltimos 14 d√≠as) ===
org = org_c11ertj5 | fechas = 2025-10-30 .. 2025-11-12

Rank | Servicio      | Costo 14d (USD)
-----+---------------+----------------

=== #3 Cr√≠ticos y SLA breach rate (30d) ===
org = org_c11ertj5 | fechas = 2025-10-13 .. 2025-11-12

Fecha       | Tickets | Breaches | Breach rate
------------+---------+----------+------------
2025-10-13 |       0 |        0 |      0.00%
2025-10-14 |       0 |        0 |      0.00%
2025-10-15 |       0 |        0 |      0.00%
2025-10-16 |       0 |        0 |      0.00%
2025-10-17 |      

## Conclusi√≥n

Implementamos un pipeline **Lambda** con batch (maestros, NPS y facturaci√≥n) y streaming (usage events) en PySpark, almacenando **Bronze/Silver/Gold** en **Parquet** y **sirviendo Gold en AstraDB (Cassandra)**. Aseguramos **calidad** con tipificaci√≥n consistente, dedupe por `event_id`, reglas (p.ej. `cost_usd_increment ‚â• -0.01`), manejo de **evoluci√≥n de esquema** (`schema_version` v1/v2 con `carbon_kg` y `genai_tokens`) y **anomal√≠as** (p99, z-score, MAD), con *quarantine* aparte. En **Silver** normalizamos y enriquecimos (joins) y calculamos m√©tricas diarias (`daily_cost_usd`, `requests`, `cpu_hours`, `storage_gb_hours`, `genai_tokens`, `carbon_kg`). En **Gold** modelamos por **query-first**:
- `org_daily_usage_by_service ((org_id, date), service)`
- `revenue_by_org_month ((org_id), month=YYYY-MM)` *(reconstruido correctamente para rangos por mes)*
- `cost_anomaly_mart`, `tickets_by_org_date`, `genai_tokens_by_org_date`.

El **serving** se resolvi√≥ con `cassandra-driver` (batches, idempotente por PK) y las **5 consultas** requeridas quedaron demostradas:
1) Costos/requests diarios por org/servicio: lectura diaria por PK y agregaci√≥n en cliente.
2) Top-N 14 d√≠as: agregado por servicio sobre lecturas diarias.
3) Cr√≠ticos y SLA breach rate 30 d√≠as: serie diaria con `breach/tickets`.
4) Revenue mensual USD: filtro por rango `month` (YYYY-MM) directo en CQL.
5) GenAI tokens & costo por d√≠a: lectura diaria por PK (ej. 2025-08-31 ‚Üí 3.392 tokens / USD 0.006784).

**Idempotencia**: checkpointing en streaming y upsert por PK en cargas. **Performance**: particionamiento Parquet, `repartition/coalesce` en escrituras, y consultas en Astra por partici√≥n (org_id + date).  
Como mejora futura, se sugiere materializar tablas ‚Äúrange-friendly‚Äù (PK `(org_id, date, ‚Ä¶)`) para rangos por fecha directamente en CQL sin loop del cliente.


In [None]:
# Usa el Gold en Parquet para detectar el √∫ltimo d√≠a con datos de ese org
from pyspark.sql import functions as F

org = "org_c11ertj5"
GOLD = "/content/datalake/gold"
usage_g = spark.read.parquet(f"{GOLD}/org_daily_usage_by_service")

mx = (usage_g.filter(F.col("org_id")==org)
              .agg(F.max("date").alias("max_date"))
              .collect()[0]["max_date"])
print("max_date =", mx)

import datetime as dt
end = mx
start = end - dt.timedelta(days=13)
print("ventana 14d:", start, "->", end)

# Re-usa el driver de Cassandra para calcular Top-N en esa ventana
from collections import defaultdict
stmt = session.prepare("""
SELECT date, service, daily_cost_usd
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = ? AND date = ?;
""")

agg = defaultdict(float)
d = start
while d <= end:
    for r in session.execute(stmt, (org, d)):
        agg[r.service] += float(r.daily_cost_usd or 0.0)
    d += dt.timedelta(days=1)

topN = sorted(agg.items(), key=lambda x: x[1], reverse=True)[:5]
print("Top-5 servicios por costo (√∫ltimos 14 d√≠as del org):")
for i,(svc,val) in enumerate(topN,1):
    print(f"{i:>4} | {svc:<13} | {val:>14,.2f} USD")


max_date = 2025-08-31
ventana 14d: 2025-08-18 -> 2025-08-31
Top-5 servicios por costo (√∫ltimos 14 d√≠as del org):
   1 | compute       |         172.37 USD
   2 | genai         |          31.13 USD
   3 | database      |          19.98 USD
   4 | storage       |           2.82 USD


Conclusiones

Implementamos una arquitectura Lambda:
Batch (maestros, NPS y facturaci√≥n) + Streaming (usage events) en PySpark, con Bronze ‚Üí Silver ‚Üí Gold en Parquet y serving en AstraDB (Cassandra).

Calidad: tipificaci√≥n consistente, dedupe por event_id, validaciones (p.ej. cost_usd_increment ‚â• -0.01), manejo de evoluci√≥n de esquema (v1/v2 con carbon_kg, genai_tokens), detecci√≥n de anomal√≠as (p99, z-score, MAD) y quarantine aparte.

Silver: normalizaci√≥n + joins a dimensiones; c√°lculo de m√©tricas diarias: daily_cost_usd, requests, cpu_hours, storage_gb_hours, genai_tokens, carbon_kg.

Gold (query-first):
org_daily_usage_by_service, revenue_by_org_month (reconstruido a YYYY-MM), cost_anomaly_mart, tickets_by_org_date, genai_tokens_by_org_date.

Serving: carga a Astra con cassandra-driver (batches, upsert por PK).
Idempotencia con checkpointing (streaming) y llaves naturales.

Consultas requeridas (evidencia):

Costos/requests diarios (rango): OK (agosto 2025, org_c11ertj5).

Top-N 14 d√≠as: OK (2025-08-18..31) ‚Üí compute, genai, database, storage.

Cr√≠ticos y SLA breach rate 30d: OK (sin incidentes ‚Üí tasa 0%).

Revenue mensual (USD): OK (org_c11ertj5 ‚Üí 2025-07: 1255.59 ARS, 2025-08: 1061.88 USD).

GenAI tokens/costo por d√≠a: OK (31/08: 3.392 tokens ‚Üí $0.006784).

Trabajo futuro / mejoras

Modelo range-friendly en Cassandra: materializar tablas paralelas con PK (org_id, date, ‚Ä¶) para soportar rangos por fecha directamente en CQL (evitar loops del cliente en #1/#3/#5).

Expectations formales (Great Expectations/Deequ): versionar reglas, data docs y CI checks de calidad.

Orquestaci√≥n: programar batch (Airflow/Prefect) y monitoreo de streaming (alertas sobre watermarks y lag).

Cost anomaly scoring: combinar p99/z/MAD en un score √∫nico (ponderado) y feedback loop con equipo FinOps.

Cat√°logo/Diccionario de datos: publicar diccionario de campos claves (or√≠genes, tipos, unidades, sem√°ntica).

Dashboards: Superset/Power BI con vistas por FinOps, Soporte y GenAI (tendencias, top-N, alertas de anomal√≠as).

**consultas**

-- (opcional) Crear keyspace si no existe
CREATE KEYSPACE IF NOT EXISTS cloud_analytics
WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'replication_factor': '3'};

-- FinOps: uso diario por org y servicio
CREATE TABLE IF NOT EXISTS cloud_analytics.org_daily_usage_by_service (
  org_id text,
  date date,
  service text,
  daily_cost_usd double,
  requests double,
  cpu_hours double,
  storage_gb_hours double,
  genai_tokens double,
  carbon_kg double,
  PRIMARY KEY ((org_id, date), service)
);

-- FinOps: revenue mensual normalizado a USD
CREATE TABLE IF NOT EXISTS cloud_analytics.revenue_by_org_month (
  org_id text,
  month text,         -- 'YYYY-MM'
  revenue_usd double,
  subtotal_usd double,
  credits_usd double,
  tax_usd double,
  fx_rate double,
  currency text,
  PRIMARY KEY ((org_id), month)
);

-- FinOps: anomal√≠as de costo
CREATE TABLE IF NOT EXISTS cloud_analytics.cost_anomaly_mart (
  org_id text,
  date date,
  service text,
  daily_cost_usd double,
  zscore double,
  mad_score double,
  p95 double,
  p99 double,
  flag_p99 boolean,
  flag_z3 boolean,
  flag_mad3 boolean,
  anomaly_flag boolean,
  PRIMARY KEY ((org_id, date), service)
);

-- Soporte: tickets por org y fecha
CREATE TABLE IF NOT EXISTS cloud_analytics.tickets_by_org_date (
  org_id text,
  date date,
  severity text,
  tickets_opened int,
  sla_breach_count int,
  csat_avg double,
  PRIMARY KEY ((org_id, date), severity)
);

-- Producto/GenAI: tokens por org y d√≠a
CREATE TABLE IF NOT EXISTS cloud_analytics.genai_tokens_by_org_date (
  org_id text,
  date date,
  total_tokens double,
  est_cost_usd double,
  PRIMARY KEY ((org_id, date))
);


**Nota: Por dise√±o de PK, en #1, #2 y #3 el rango de fechas se resolvi√≥ desde el cliente (loop diario). A continuaci√≥n dejo los CQL por d√≠a que usamos (v√°lidos para capturas) y el CQL directo para #4.**

-- #1 Costos y requests diarios por org y servicio (ejemplo un d√≠a)
SELECT date, service, daily_cost_usd, requests, cpu_hours, storage_gb_hours
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = 'org_c11ertj5'
  AND date = '2025-08-15';
-- #2 (Base para Top-N 14d) costo por servicio por d√≠a (agregado en cliente)
SELECT date, service, daily_cost_usd
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = 'org_c11ertj5'
  AND date = '2025-08-31';
-- #3 Evoluci√≥n de tickets cr√≠ticos por d√≠a (ejemplo un d√≠a)
SELECT date, severity, tickets_opened, sla_breach_count, csat_avg
FROM cloud_analytics.tickets_by_org_date
WHERE org_id = 'org_c11ertj5'
  AND date = '2025-10-20'
  AND severity = 'critical';
-- #4 Revenue mensual (rango por 'YYYY-MM')  ‚Üê ejecutado y con resultados
SELECT month, revenue_usd, subtotal_usd, credits_usd, tax_usd, fx_rate, currency
FROM cloud_analytics.revenue_by_org_month
WHERE org_id = 'org_c11ertj5'
  AND month >= '2025-07' AND month <= '2025-09';
-- #5 GenAI tokens/costo por d√≠a (ejemplo un d√≠a)
SELECT date, total_tokens, est_cost_usd
FROM cloud_analytics.genai_tokens_by_org_date
WHERE org_id = 'org_c11ertj5'
  AND date = '2025-08-31';
