<a href="https://colab.research.google.com/github/frannsanchez/Mineeria-de-Datos-II/blob/main/Cloud%20Service.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

Configuracion

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

BASE_PATH      = "/content"

LANDING_PATH   = BASE_PATH
BRONZE_PATH    = f"{BASE_PATH}/bronze"
SILVER_PATH    = f"{BASE_PATH}/silver"
GOLD_PATH      = f"{BASE_PATH}/gold"
CHECKPOINT_DIR = f"{BASE_PATH}/checkpoints"

spark = (SparkSession.builder
         .appName("cloud-provider-analytics-mvp")
         .getOrCreate())


Batch Bronze con 3 maestros



In [None]:
schema_customers = StructType([
    StructField("org_id", StringType(), True),
    StructField("org_name", StringType(), True),
    StructField("region", StringType(), True),
    StructField("country", StringType(), True),
    StructField("industry", StringType(), True),
    StructField("org_created_at", TimestampType(), True)
])

schema_users = StructType([
    StructField("user_id", StringType(), True),
    StructField("org_id", StringType(), True),
    StructField("email", StringType(), True),
    StructField("role", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("status", StringType(), True)
])

schema_billing = StructType([
    StructField("org_id", StringType(), True),
    StructField("billing_month", StringType(), True),  # YYYY-MM
    StructField("amount_usd", DoubleType(), True),
    StructField("credits_usd", DoubleType(), True),
    StructField("taxes_usd", DoubleType(), True),
    StructField("fx_rate", DoubleType(), True)
])



CSV a Bronze

In [None]:
def csv_to_bronze(csv_path: str,
                  schema: StructType,
                  pk_cols: list,
                  bronze_subdir: str):

    df = (spark.read
          .option("header", True)
          .schema(schema)
          .csv(csv_path))

    df = (df
          .withColumn("ingest_ts", F.current_timestamp())
          .withColumn("source_file", F.input_file_name()))

    total_raw = df.count()

    if pk_cols:
        df = df.dropDuplicates(pk_cols)

    total_dedup = df.count()

    print(f"Archivo: {csv_path}")
    print(f"Registros totales: {total_raw}")
    print(f"Tras dedupe ({pk_cols}): {total_dedup}")

    df = df.withColumn("ingest_date", F.to_date("ingest_ts"))

    out_path = f"{BRONZE_PATH}/{bronze_subdir}"
    (df.write
       .mode("overwrite")
       .partitionBy("ingest_date")
       .parquet(out_path))

    print(f"Escrito en Bronze: {out_path}")



Ejecutar cada maestro



In [None]:
# customers_orgs
csv_to_bronze(
    csv_path=f"{LANDING_PATH}/customers_orgs.csv",
    schema=schema_customers,
    pk_cols=["org_id"],
    bronze_subdir="customers_orgs"
)

# users
csv_to_bronze(
    csv_path=f"{LANDING_PATH}/users.csv",
    schema=schema_users,
    pk_cols=["user_id"],
    bronze_subdir="users"
)

# billing_monthly
csv_to_bronze(
    csv_path=f"{LANDING_PATH}/billing_monthly.csv",
    schema=schema_billing,
    pk_cols=["org_id", "billing_month"],
    bronze_subdir="billing_monthly"
)


Archivo: /content/customers_orgs.csv
Registros totales: 80
Tras dedupe (['org_id']): 80
Escrito en Bronze: /content/bronze/customers_orgs
Archivo: /content/users.csv
Registros totales: 800
Tras dedupe (['user_id']): 800
Escrito en Bronze: /content/bronze/users
Archivo: /content/billing_monthly.csv
Registros totales: 240
Tras dedupe (['org_id', 'billing_month']): 240
Escrito en Bronze: /content/bronze/billing_monthly


In [None]:
#Carpeta stream
# Crear la carpeta de ingesta para eventos
import os, shutil, glob

BASE_PATH = "/content"
events_dir = os.path.join(BASE_PATH, "usage_events_stream")
os.makedirs(events_dir, exist_ok=True)


for f in glob.glob(os.path.join(BASE_PATH, "events_part_*.jsonl")):
    shutil.move(f, events_dir)

print("Archivos movidos a:", events_dir)
print(os.listdir(events_dir))


Archivos movidos a: /content/usage_events_stream
['events_part_0070.jsonl', 'events_part_0062.jsonl', 'events_part_0007.jsonl', 'events_part_0085.jsonl', 'events_part_0030.jsonl', 'events_part_0102.jsonl', 'events_part_0003.jsonl', 'events_part_0082.jsonl', 'events_part_0058.jsonl', 'events_part_0081.jsonl', 'events_part_0046.jsonl', 'events_part_0041.jsonl', 'events_part_0060.jsonl', 'events_part_0056.jsonl', 'events_part_0033.jsonl', 'events_part_0087.jsonl', 'events_part_0006.jsonl', 'events_part_0026.jsonl', 'events_part_0036.jsonl', 'events_part_0048.jsonl', 'events_part_0029.jsonl', 'events_part_0069.jsonl', 'events_part_0034.jsonl', 'events_part_0098.jsonl', 'events_part_0039.jsonl', 'events_part_0055.jsonl', 'events_part_0096.jsonl', 'events_part_0049.jsonl', 'events_part_0045.jsonl', 'events_part_0017.jsonl', 'events_part_0014.jsonl', 'events_part_0061.jsonl', 'events_part_0113.jsonl', 'events_part_0021.jsonl', 'events_part_0002.jsonl', 'events_part_0099.jsonl', 'events_part_0

Streaming Bonze

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

BASE_PATH      = "/content"
LANDING_PATH   = BASE_PATH
BRONZE_PATH    = f"{BASE_PATH}/bronze"
CHECKPOINT_DIR = f"{BASE_PATH}/checkpoints"

schema_events = StructType([
    StructField("event_id", StringType(), True),
    StructField("org_id", StringType(), True),
    StructField("service", StringType(), True),
    StructField("event_time", TimestampType(), True),
    StructField("unit", StringType(), True),
    StructField("value", DoubleType(), True),
    StructField("cost_usd_increment", DoubleType(), True),
    StructField("schema_version", IntegerType(), True),
    StructField("carbon_kg", DoubleType(), True),
    StructField("genai_tokens", LongType(), True),
])

events_stream = (spark.readStream
    .schema(schema_events)
    .option("maxFilesPerTrigger", 1)
    .json(f"{LANDING_PATH}/usage_events_stream")   # ‚Üê ahora apunta a la carpeta nueva
    .withWatermark("event_time", "2 hours")
    .dropDuplicates(["event_id"])
    .withColumn("ingest_ts", F.current_timestamp())
    .withColumn("ingest_date", F.to_date("ingest_ts"))
)

bronze_events_path = f"{BRONZE_PATH}/events"
events_checkpoint  = f"{CHECKPOINT_DIR}/events_checkpoint"

query = (events_stream
    .writeStream
    .format("parquet")
    .option("checkpointLocation", events_checkpoint)
    .option("path", bronze_events_path)
    .partitionBy("service", "ingest_date")
    .outputMode("append")
    .start())


In [None]:
# Para detener el streaming:
query.stop()

In [None]:
events_bronze = spark.read.parquet(f"{BRONZE_PATH}/events")
events_bronze.show(5)
print("Filas en events_bronze:", events_bronze.count())



+----------------+------------+----------+--------+-------+------------------+--------------+---------+------------+--------------------+-------+-----------+
|        event_id|      org_id|event_time|    unit|  value|cost_usd_increment|schema_version|carbon_kg|genai_tokens|           ingest_ts|service|ingest_date|
+----------------+------------+----------+--------+-------+------------------+--------------+---------+------------+--------------------+-------+-----------+
|evt_nhdv5gly2l8t|org_c11ertj5|      NULL|gb_hours|21.4332|            1.6704|             1|     NULL|        NULL|2025-11-26 00:18:...|compute| 2025-11-26|
|evt_0tn1qai0oq0w|org_zhwjr64d|      NULL|   count|  112.0|           10.6031|             2|   0.0224|        NULL|2025-11-26 00:18:...|compute| 2025-11-26|
|evt_6qnstbp9b0d6|org_cwi64ciy|      NULL|   hours| 1.2416|             0.104|             2|  2.48E-4|        NULL|2025-11-26 00:18:...|compute| 2025-11-26|
|evt_iuxf5fdnf7b2|org_xaji0y6d|      NULL|   hours| 

Bronze Chek point

Silver ‚Äì Enriquecimiento + Features + Calidad + Quarantine
Leer Bronze


In [None]:
from pyspark.sql import functions as F

BASE_PATH    = "/content"
BRONZE_PATH  = f"{BASE_PATH}/bronze"
SILVER_PATH  = f"{BASE_PATH}/silver"



In [None]:
events_bronze = spark.read.parquet(f"{BRONZE_PATH}/events")
customers_bronze = spark.read.parquet(f"{BRONZE_PATH}/customers_orgs")

print("Events Bronze:", events_bronze.count())
print("Customers Bronze:", customers_bronze.count())

events_bronze.printSchema()
customers_bronze.printSchema()


Events Bronze: 13320
Customers Bronze: 80
root
 |-- event_id: string (nullable = true)
 |-- org_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)
 |-- cost_usd_increment: double (nullable = true)
 |-- schema_version: integer (nullable = true)
 |-- carbon_kg: double (nullable = true)
 |-- genai_tokens: long (nullable = true)
 |-- ingest_ts: timestamp (nullable = false)
 |-- service: string (nullable = true)
 |-- ingest_date: date (nullable = true)

root
 |-- org_id: string (nullable = true)
 |-- org_name: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- org_created_at: timestamp (nullable = true)
 |-- ingest_ts: timestamp (nullable = true)
 |-- source_file: string (nullable = true)
 |-- ingest_date: date (nullable = true)



In [None]:
#
customers_bronze_clean = customers_bronze.drop("ingest_ts", "source_file", "ingest_date")
customers_bronze_clean.printSchema()


root
 |-- org_id: string (nullable = true)
 |-- org_name: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- org_created_at: timestamp (nullable = true)



Join para enriquecer

In [None]:
# Join enriquecido
events_enriched = (events_bronze.alias("e")
    .join(customers_bronze_clean.alias("c"), on="org_id", how="left"))

# Features
events_feat = (events_enriched
    .withColumn("daily_date", F.to_date("event_time"))
    .withColumn("daily_cost_usd", F.col("cost_usd_increment"))
    .withColumn(
        "requests",
        F.when(F.col("unit") == F.lit("request"), F.col("value")).otherwise(F.lit(0.0))
    )
    .withColumn(
        "genai_tokens_eff",
        F.coalesce(F.col("genai_tokens"), F.lit(0))
    )
    .withColumn(
        "cost_anomaly_flag",
        F.when(F.col("cost_usd_increment") < -0.01, F.lit(1)).otherwise(F.lit(0))
    )
)

events_feat.printSchema()
events_feat.show(5)



root
 |-- org_id: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)
 |-- cost_usd_increment: double (nullable = true)
 |-- schema_version: integer (nullable = true)
 |-- carbon_kg: double (nullable = true)
 |-- genai_tokens: long (nullable = true)
 |-- ingest_ts: timestamp (nullable = false)
 |-- service: string (nullable = true)
 |-- ingest_date: date (nullable = true)
 |-- org_name: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- org_created_at: timestamp (nullable = true)
 |-- daily_date: date (nullable = true)
 |-- daily_cost_usd: double (nullable = true)
 |-- requests: double (nullable = true)
 |-- genai_tokens_eff: long (nullable = false)
 |-- cost_anomaly_flag: integer (nullable = false)

+------------+----------------+----------+--------+-------+----

Reglas del quarentine


event_id no nulo

cost_usd_increment >= -0.01

unit no nulo si value no es nulo

In [None]:
total_events = events_feat.count()
print("Total events:", total_events)

valid = (events_feat
    .filter(F.col("event_id").isNotNull())
    .filter(F.col("cost_anomaly_flag") == 0)
    .filter(~(F.col("value").isNotNull() & F.col("unit").isNull()))
)

valid_count = valid.count()
print("Valid events:", valid_count)

quarantine = events_feat.subtract(valid)
quarantine_count = quarantine.count()
print("Quarantine events:", quarantine_count)

quarantine.show(10)



Total events: 13320
Valid events: 12637
Quarantine events: 683
+------------+----------------+----------+--------+-------+------------------+--------------+---------+------------+--------------------+----------+-----------+-----------------+----------+------------+----------+--------------+----------+--------------+--------+----------------+-----------------+
|      org_id|        event_id|event_time|    unit|  value|cost_usd_increment|schema_version|carbon_kg|genai_tokens|           ingest_ts|   service|ingest_date|         org_name|    region|     country|  industry|org_created_at|daily_date|daily_cost_usd|requests|genai_tokens_eff|cost_anomaly_flag|
+------------+----------------+----------+--------+-------+------------------+--------------+---------+------------+--------------------+----------+-----------+-----------------+----------+------------+----------+--------------+----------+--------------+--------+----------------+-----------------+
|org_i3qk2iag|evt_0ih1qmfmcxg9|      NUL

problemas con ingest

In [None]:
valid_to_write = valid.drop("ingest_date")
quarantine_to_write = quarantine.drop("ingest_date")

print("Schema valid_to_write:")
valid_to_write.printSchema()


Schema valid_to_write:
root
 |-- org_id: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)
 |-- cost_usd_increment: double (nullable = true)
 |-- schema_version: integer (nullable = true)
 |-- carbon_kg: double (nullable = true)
 |-- genai_tokens: long (nullable = true)
 |-- ingest_ts: timestamp (nullable = false)
 |-- service: string (nullable = true)
 |-- org_name: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- org_created_at: timestamp (nullable = true)
 |-- daily_date: date (nullable = true)
 |-- daily_cost_usd: double (nullable = true)
 |-- requests: double (nullable = true)
 |-- genai_tokens_eff: long (nullable = false)
 |-- cost_anomaly_flag: integer (nullable = false)



In [None]:
# Eventos v√°lidos
(valid_to_write.write
 .mode("overwrite")
 .partitionBy("daily_date", "service")
 .parquet(f"{SILVER_PATH}/events_valid"))

# Quarantine
(quarantine_to_write.write
 .mode("overwrite")
 .parquet(f"{SILVER_PATH}/events_quarantine"))


In [None]:
#Verificamos
events_valid = spark.read.parquet(f"{SILVER_PATH}/events_valid")
events_quarantine = spark.read.parquet(f"{SILVER_PATH}/events_quarantine")

print("events_valid:", events_valid.count())
print("events_quarantine:", events_quarantine.count())
events_valid.show(5)


events_valid: 12637
events_quarantine: 683
+------------+----------------+----------+--------+-------+------------------+--------------+---------+------------+--------------------+----------------+---------+------------+--------+--------------+--------------+--------+----------------+-----------------+----------+-------+
|      org_id|        event_id|event_time|    unit|  value|cost_usd_increment|schema_version|carbon_kg|genai_tokens|           ingest_ts|        org_name|   region|     country|industry|org_created_at|daily_cost_usd|requests|genai_tokens_eff|cost_anomaly_flag|daily_date|service|
+------------+----------------+----------+--------+-------+------------------+--------------+---------+------------+--------------------+----------------+---------+------------+--------+--------------+--------------+--------+----------------+-----------------+----------+-------+
|org_c11ertj5|evt_nhdv5gly2l8t|      NULL|gb_hours|21.4332|            1.6704|             1|     NULL|        NULL|2

Gold

Primero leer el silver valido.


In [None]:
BASE_PATH = "/content"
SILVER_PATH = f"{BASE_PATH}/silver"
GOLD_PATH = f"{BASE_PATH}/gold"

events_valid = spark.read.parquet(f"{SILVER_PATH}/events_valid")
events_valid.show(5)
events_valid.printSchema()

print("Eventos v√°lidos:", events_valid.count())


+------------+----------------+----------+--------+-------+------------------+--------------+---------+------------+--------------------+----------------+---------+------------+--------+--------------+--------------+--------+----------------+-----------------+----------+-------+
|      org_id|        event_id|event_time|    unit|  value|cost_usd_increment|schema_version|carbon_kg|genai_tokens|           ingest_ts|        org_name|   region|     country|industry|org_created_at|daily_cost_usd|requests|genai_tokens_eff|cost_anomaly_flag|daily_date|service|
+------------+----------------+----------+--------+-------+------------------+--------------+---------+------------+--------------------+----------------+---------+------------+--------+--------------+--------------+--------+----------------+-----------------+----------+-------+
|org_c11ertj5|evt_nhdv5gly2l8t|      NULL|gb_hours|21.4332|            1.6704|             1|     NULL|        NULL|2025-11-26 00:18:...|   Apex Cloud 17|   Ret

Agregacion por org, por dia, por servicio (marts)

In [None]:
mart_finops = (events_valid
    .groupBy("org_id", "daily_date", "service")
    .agg(
        F.sum("daily_cost_usd").alias("total_cost_usd"),
        F.sum("requests").alias("total_requests"),
        F.sum("genai_tokens_eff").alias("total_genai_tokens"),
        F.countDistinct("event_id").alias("event_count")
    )
)

mart_finops.show(10)
print("Total filas del mart:", mart_finops.count())



+------------+----------+----------+------------------+--------------+------------------+-----------+
|      org_id|daily_date|   service|    total_cost_usd|total_requests|total_genai_tokens|event_count|
+------------+----------+----------+------------------+--------------+------------------+-----------+
|org_w3zp08j3|      NULL|   storage| 71.31820000000002|           0.0|                 0|         55|
|org_gv0e38da|      NULL|networking|           20.0106|           0.0|                 0|         33|
|org_pnsm43d8|      NULL|   compute| 786.8366000000001|           0.0|                 0|        166|
|org_sg65kxvf|      NULL|     genai| 603.9423999999997|           0.0|             62196|         94|
|org_ujv6oh9s|      NULL|  database|106.02150000000002|           0.0|                 0|         41|
|org_i7p5tb94|      NULL|   storage|          121.4931|           0.0|                 0|         45|
|org_53lc58dr|      NULL|   compute|367.34659999999997|           0.0|            

Escribir en el GOLD

In [None]:
#error para parar la ejeucion

NameError: name 'error' is not defined

In [None]:
from pyspark.sql.types import DateType

mart_finops_fixed = (mart_finops
    .withColumn("daily_date", F.col("daily_date").cast(DateType()))
)

(mart_finops_fixed
 .write
 .mode("overwrite")
 .partitionBy("daily_date")
 .parquet(f"{GOLD_PATH}/org_daily_usage_by_service"))



Preparamos cassandra


In [None]:
!pip install cassandra-driver

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (6.9 kB)
Collecting geomet>=1.1 (from cassandra-driver)
  Downloading geomet-1.1.0-py3-none-any.whl.metadata (11 kB)
Downloading cassandra_driver-3.29.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (374 kB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m374.4/374.4 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-1.1.0-py3-none-any.whl (31 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.3 geomet-1.1.0


In [None]:
pip install --upgrade astrapy



In [None]:
#Conexion
from astrapy import DataAPIClient

# Initialize the client
client = DataAPIClient("AstraCS:rZpUhtsQFZrdhwAJIueaxcsR:23c78df488ff1973d561967d2d504db9adbc7916cc7031b344aee54a189e498c")
db = client.get_database_by_api_endpoint(
  "https://3e6dbeb3-87d1-4e6e-8a08-42cf326af347-us-east1.apps.astra.datastax.com"
)

print(f"Connected to Astra DB: {db.list_collection_names()}")

Connected to Astra DB: []


Revisar esta parte de cassandra que me costo mucho. Revisar tambien la parte GOLD

In [None]:
from astrapy import DataAPIClient

ASTRA_TOKEN  = "AstraCS:rZpUhtsQFZrdhwAJIueaxcsR:23c78df488ff1973d561967d2d504db9adbc7916cc7031b344aee54a189e498c"
API_ENDPOINT = "https://3e6dbeb3-87d1-4e6e-8a08-42cf326af347-us-east1.apps.astra.datastax.com"

client = DataAPIClient(ASTRA_TOKEN)
db = client.get_database(API_ENDPOINT)

print("Colecciones actuales:", db.list_collection_names())

COLL_NAME = "org_daily_usage_by_service_coll"   # <<< NOMBRE NUEVO

# Si la colecci√≥n no existe, la creo
if COLL_NAME not in db.list_collection_names():
    db.create_collection(COLL_NAME)

# Ahora s√≠, obtengo la colecci√≥n
collection = db.get_collection(COLL_NAME)
print("Collection OK:", collection.name)


Colecciones actuales: []
Collection OK: org_daily_usage_by_service_coll


In [None]:
#Insertamos gold
from pyspark.sql import functions as F

GOLD_PATH = "/content/gold"

mart_finops = spark.read.parquet(f"{GOLD_PATH}/org_daily_usage_by_service")
mart_finops.printSchema()
mart_finops.show(5)

pdf = mart_finops.toPandas()

# Para que DataAPI no se vuelva loco con el tipo fecha:
if "daily_date" in pdf.columns:
    pdf["daily_date"] = pdf["daily_date"].astype(str)

rows = pdf.to_dict(orient="records")
len(rows)


root
 |-- org_id: string (nullable = true)
 |-- service: string (nullable = true)
 |-- total_cost_usd: double (nullable = true)
 |-- total_requests: double (nullable = true)
 |-- total_genai_tokens: long (nullable = true)
 |-- event_count: long (nullable = true)
 |-- daily_date: void (nullable = true)

+------------+----------+------------------+--------------+------------------+-----------+----------+
|      org_id|   service|    total_cost_usd|total_requests|total_genai_tokens|event_count|daily_date|
+------------+----------+------------------+--------------+------------------+-----------+----------+
|org_w3zp08j3|   storage| 71.31820000000002|           0.0|                 0|         55|      NULL|
|org_gv0e38da|networking|           20.0106|           0.0|                 0|         33|      NULL|
|org_pnsm43d8|   compute| 786.8366000000001|           0.0|                 0|        166|      NULL|
|org_sg65kxvf|     genai| 603.9423999999997|           0.0|             62196|      

262

In [None]:
result = collection.insert_many(rows)
print("Documentos insertados:", len(result.inserted_ids))


Documentos insertados: 262


In [None]:
docs = collection.find(
    projection={"_id": 0, "org_id": 1},
    limit=20
)

org_ids = {d["org_id"] for d in docs if "org_id" in d}
print("ORG IDs disponibles:", org_ids)


ORG IDs disponibles: {'org_okep7y6w', 'org_sg65kxvf', 'org_id25owf7', 'org_tvhhpbmy', 'org_ofqewaou', 'org_1t2tala7', 'org_fel6246h', 'org_pnsm43d8', 'org_1swjckjl', 'org_n9j2qp89', 'org_kdgigatj', 'org_4zw9xa3k', 'org_pht0hl9x', 'org_cvs4f8cg', 'org_dhylurtp', 'org_cwi64ciy', 'org_0lvsnujz', 'org_dppq0y9d'}


Casandra
Se ejecuta en el panel

In [None]:
#listamos para consulta

docs_sample = collection.find(
    filter={},
    projection={"_id": 0, "org_id": 1},
    limit=50
)

org_ids = sorted({d["org_id"] for d in docs_sample if "org_id" in d})
print("ORG IDs disponibles:")
for oid in org_ids:
    print(" -", oid)


ORG IDs disponibles:
 - org_0lvsnujz
 - org_1n13jcat
 - org_1swjckjl
 - org_1t2tala7
 - org_4zw9xa3k
 - org_53lc58dr
 - org_5935a0l7
 - org_9mx2x18h
 - org_afeyuhz1
 - org_c11ertj5
 - org_cvs4f8cg
 - org_cwi64ciy
 - org_d14ve92m
 - org_dbdw2pcn
 - org_dhylurtp
 - org_dppq0y9d
 - org_fel6246h
 - org_g8sbi4q2
 - org_hv3a3zmf
 - org_id25owf7
 - org_jxepq85j
 - org_kdgigatj
 - org_n9j2qp89
 - org_nam148p0
 - org_ofqewaou
 - org_okep7y6w
 - org_pbhsahxt
 - org_pht0hl9x
 - org_pja1wj0t
 - org_pnsm43d8
 - org_sg65kxvf
 - org_teiyzcot
 - org_tvhhpbmy
 - org_zbikcidk


In [None]:
# Tomo una muestra de documentos para ver qu√© org_id existen
org_id = "org_zbikcidk"  # o, por ejemplo: org_id = "org_123"
print("Trabajando con org_id =", org_id)


Trabajando con org_id = org_zbikcidk


In [None]:
from pprint import pprint

docs_evol = collection.find(
    filter={"org_id": org_id},
    projection={
        "_id": 0,
        "daily_date": 1,
        "service": 1,
        "total_cost_usd": 1,
        "total_requests": 1,
        "total_genai_tokens": 1
    }
)

print(f"Evoluci√≥n diaria del costo por servicio para org_id={org_id}:")
for d in docs_evol:
    pprint(d)


Evoluci√≥n diaria del costo por servicio para org_id=org_zbikcidk:
{'daily_date': 'None',
 'service': 'storage',
 'total_cost_usd': 35.65769999999999,
 'total_genai_tokens': 0,
 'total_requests': 0}
{'daily_date': 'None',
 'service': 'compute',
 'total_cost_usd': 648.2119000000002,
 'total_genai_tokens': 0,
 'total_requests': 0}
{'daily_date': 'None',
 'service': 'genai',
 'total_cost_usd': 209.32520000000002,
 'total_genai_tokens': 20102,
 'total_requests': 0}


In [None]:
docs_all = collection.find(
    filter={},
    projection={
        "_id": 0,
        "org_id": 1,
        "total_cost_usd": 1
    }
)

df_all = pd.DataFrame(docs_all)

top_orgs = (df_all
    .groupby("org_id", as_index=False)
    .agg(total_cost_usd=("total_cost_usd", "sum"))
    .sort_values("total_cost_usd", ascending=False)
    .head(10)
)

print("Top 10 organizaciones por costo total (FinOps global):")
print(top_orgs)



Top 10 organizaciones por costo total (FinOps global):
          org_id  total_cost_usd
42  org_kdgigatj       1626.1545
53  org_pbhsahxt       1421.7887
22  org_chj755nf       1383.3688
9   org_53lc58dr       1193.2476
43  org_ktakpuxq       1161.3391
4   org_1t2tala7       1141.9248
61  org_sg65kxvf        965.2466
57  org_pnsm43d8        914.8869
78  org_zbikcidk        893.1948
23  org_cvs4f8cg        882.9202


In [None]:
#OJo, No corran esto aca. No va dar bien, Es lenguaje CQL, el de casandra, como dijo Fran ayer

#CREATE KEYSPACE IF NOT EXISTS cloud_analytics
#WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

#CREATE TABLE IF NOT EXISTS cloud_analytics.org_daily_usage_by_service (
  #org_id text,
  #daily_date date,
  #service text,
  #total_cost_usd double,
  #total_requests bigint,
  #total_genai_tokens bigint,
  #event_count bigint,
  #PRIMARY KEY ((org_id, daily_date), service)
#) WITH CLUSTERING ORDER BY (service ASC);


Collecting cassandra-driver
  Downloading cassandra_driver-3.29.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (6.9 kB)
Collecting geomet>=1.1 (from cassandra-driver)
  Downloading geomet-1.1.0-py3-none-any.whl.metadata (11 kB)
Downloading cassandra_driver-3.29.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (374 kB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m374.4/374.4 kB[0m [31m11.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-1.1.0-py3-none-any.whl (31 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.3 geomet-1.1.0


In [None]:
from pyspark.sql import SparkSession

ASTRA_HOST = "<TU_HOST_ASTRA>"        # ej: "xxx-xxx.apps.astra.datastax.com"
ASTRA_USER = "<TOKEN_CLIENT_ID>"
ASTRA_PASS = "<TOKEN_CLIENT_SECRET>"

spark = (SparkSession.builder
    .appName("cassandra-serving")
    .config("spark.cassandra.connection.host", ASTRA_HOST)
    .config("spark.cassandra.auth.username", ASTRA_USER)
    .config("spark.cassandra.auth.password", ASTRA_PASS)
    .config("spark.cassandra.connection.port", "9042")
    .getOrCreate())


Cargamos el gold mart en casandra

In [None]:
mart_finops = spark.read.parquet(f"{GOLD_PATH}/org_daily_usage_by_service")

(mart_finops.write
 .format("org.apache.spark.sql.cassandra")
 .mode("append")
 .options(table="org_daily_usage_by_service", keyspace="cloud_analytics")
 .save())


Estas serian consultas de negocios para evidencias



In [None]:
-- 1) Evoluci√≥n diaria del costo por servicio de una organizaci√≥n
SELECT daily_date, service, total_cost_usd
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = 'ORG_123'
  AND daily_date >= '2025-01-01'
  AND daily_date <= '2025-01-31';

-- 2) Servicios con mayor costo en un rango de fechas
SELECT service, total_cost_usd
FROM cloud_analytics.org_daily_usage_by_service
WHERE org_id = 'ORG_123'
  AND daily_date >= '2025-01-01'
  AND daily_date <= '2025-01-14';


Prueba de Idempotencias

In [None]:
print("--- INICIO PRUEBA DE IDEMPOTENCIA ---")

# 1. Conteo previo de la re-ejecuci√≥n
# Leemos lo que ya existe en Bronze (Customers)
df_before = spark.read.parquet(f"{BRONZE_PATH}/customers_orgs")
count_before = df_before.count()
print(f"Registros en Bronze (Customers) - ANTES: {count_before}")

# 2. Re-ejecuci√≥n del proceso de ingesta
# Volvemos a procesar el MISMO archivo de landing
print("Re-procesando archivo 'customers_orgs.csv'...")
csv_to_bronze(
    csv_path=f"{LANDING_PATH}/customers_orgs.csv",
    schema=schema_customers,
    pk_cols=["org_id"],
    bronze_subdir="customers_orgs"
)

# 3. Conteo post re-ejecuci√≥n
# Leemos nuevamente la ruta Bronze
df_after = spark.read.parquet(f"{BRONZE_PATH}/customers_orgs")
count_after = df_after.count()
print(f"Registros en Bronze (Customers) - DESPU√âS: {count_after}")

# 4. Validaci√≥n autom√°tica
if count_before == count_after:
    print(f"√âXITO: Idempotencia verificada. {count_before} == {count_after}. No se generaron duplicados.")
else:
    print(f"FALLO: Se encontraron discrepancias. Antes: {count_before}, Despu√©s: {count_after}")

Verificaci√≥n de particionado sensato

In [None]:
import os

def auditar_particiones(base_path, nombre_capa):
    """
    Recorre la ruta dada, muestra las carpetas de partici√≥n creadas
    y calcula el tama√±o total de los archivos Parquet.
    """
    print(f"\n{'='*50}")
    print(f" AUDITOR√çA DE PARTICIONES: {nombre_capa}")
    print(f" Ruta Base: {base_path}")
    print(f"{'='*50}")

    if not os.path.exists(base_path):
        print(f"La ruta {base_path} no existe. (¬øEjecutaste la escritura?)")
        return

    # 1. Listar particiones (carpetas)
    # Obtenemos items que sean directorios y no ocultos (como ._SUCCESS)
    items = [i for i in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, i)) and not i.startswith(".")]
    items.sort()

    total_size = 0
    file_count = 0

    print(f"üìÇ Estructura de Particiones encontradas ({len(items)}):")

    # Mostramos las primeras 5 particiones como evidencia visual
    for partition in items[:5]:
        print(f"   ‚îú‚îÄ‚îÄ üìÅ {partition}")

    if len(items) > 5:
        print(f"   ‚îú‚îÄ‚îÄ ... (y {len(items)-5} particiones m√°s)")

    # 2. Calcular tama√±os (recorriendo recursivamente para sumar los .parquet)
    for root, dirs, files in os.walk(base_path):
        for f in files:
            if f.endswith(".parquet"):
                fp = os.path.join(root, f)
                total_size += os.path.getsize(fp)
                file_count += 1

    # Convertir bytes a KB o MB para legibilidad
    size_mb = total_size / (1024 * 1024)
    print(f"\nüìä Resumen de Almacenamiento:")
    print(f"   ‚Ä¢ Total de archivos Parquet: {file_count}")
    print(f"   ‚Ä¢ Tama√±o total en disco: {size_mb:.4f} MB")
    print("-" * 50)

# --- EJECUCI√ìN DE LA AUDITOR√çA POR CAPA ---

# 1. BRONZE (Batch) - Particionado por ingest_date
auditar_particiones(f"{BRONZE_PATH}/customers_orgs", "BRONZE: Customers (Batch)")

# 2. BRONZE (Streaming) - Particionado por service / ingest_date
auditar_particiones(f"{BRONZE_PATH}/events", "BRONZE: Events (Streaming)")

# 3. SILVER (Valid) - Particionado por daily_date / service (Clave para negocio)
auditar_particiones(f"{SILVER_PATH}/events_valid", "SILVER: Events Valid")

# 4. GOLD (Mart) - Particionado por daily_date (Optimizado para queries)
auditar_particiones(f"{GOLD_PATH}/org_daily_usage_by_service", "GOLD: FinOps Mart")