# Análisis de la demanda agregada para la compra 2027-2028

## Bronze table

In [0]:
# Módulos
%pip install openpyxl
%pip install pandas
import pandas as pd
import openpyxl
from functools import reduce
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
import re
import operator


In [0]:
# -----------------------------
# 1) Read messy Excel as raw grid (string-first)
# -----------------------------
xlsx_path = "/Volumes/workspace/default/eseotres/Análisis Plataforma Salud.xlsx"

pdf = pd.read_excel(
    xlsx_path,
    sheet_name="Info licitación",
    header=None,
    dtype=str,
    na_filter=False
)

# Add a true row index BEFORE Spark (preserves order 1:1 with Excel)
pdf["_row_id"] = range(len(pdf))

# 2) Convert to Spark (keep your existing schema build, but include _row_id)
data = pdf.astype(str).values.tolist()  # ok, but we must treat "nan" as empty later

schema = StructType(
    [StructField(f"_c{i}", StringType(), True) for i in range(pdf.shape[1] - 1)] +
    [StructField("_row_id", StringType(), True)]  # as string for now; we cast below
)

df_bronze = spark.createDataFrame(data, schema=schema).withColumn("_row_id", F.col("_row_id").cast("long"))

# 3) Drop ONLY rows that are entirely empty (treat "", null, "nan", "none" as empty)
data_cols = [c for c in df_bronze.columns if c != "_row_id"]

is_empty_col = lambda c: (
    F.lower(F.trim(F.coalesce(F.col(c), F.lit("")))).isin("", "nan", "none", "null")
)

all_empty_expr = reduce(lambda a, b: a & b, [is_empty_col(c) for c in data_cols])

df_bronze = df_bronze.filter(~all_empty_expr)

# Now you can inspect in the real order:
df_bronze.orderBy("_row_id").limit(30).show(truncate=False)
print("Spark bronze cols:", len(df_bronze.columns), "| rows:", df_bronze.count())

# 4) Persist Bronze Delta table
# (optional but safest) drop the old table first
spark.sql("DROP TABLE IF EXISTS workspace.default.bronze_licitacion_info")

# write the new one
(df_bronze.write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("workspace.default.bronze_licitacion_info"))

print("✅ Created table: workspace.default.bronze_licitacion_info")

## Silver table


In [0]:
import re
import operator
from functools import reduce
from pyspark.sql import functions as F

# -----------------------------
# Helpers
# -----------------------------
def norm(s: str) -> str:
    if s is None:
        return ""
    s = str(s).strip().lower()
    s = (s.replace("á","a").replace("é","e").replace("í","i")
          .replace("ó","o").replace("ú","u").replace("ñ","n"))
    s = re.sub(r"[^a-z0-9]+", "_", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return s

def to_double(colname):
    return F.when(
        F.trim(F.coalesce(F.col(colname), F.lit(""))) == "", None
    ).otherwise(
        F.regexp_replace(F.col(colname), ",", "").cast("double")
    )

# -----------------------------
# 1) Load Bronze
# -----------------------------
df_bronze = spark.table("workspace.default.bronze_licitacion_info")

# -----------------------------
# 2) Detect header rows
# -----------------------------
data_cols = [c for c in df_bronze.columns if c != "_row_id"]

contains_clave = reduce(
    lambda a, b: a | b,
    [F.lower(F.col(c)).contains("clave") for c in data_cols]
)

header_row_id = df_bronze.where(contains_clave).select(F.min("_row_id")).first()[0]
group_row_id  = header_row_id - 1

# -----------------------------
# 3) Extract header rows to pandas
# -----------------------------
pdf_head = (
    df_bronze
    .where(F.col("_row_id").isin([group_row_id, header_row_id]))
    .orderBy("_row_id")
    .drop("_row_id")
    .toPandas()
)

r_group = pdf_head.iloc[0].replace(["", "nan", "None", None], pd.NA).ffill().fillna("")
r_head  = pdf_head.iloc[1].fillna("")

# -----------------------------
# 4) Build FINAL column names (allow duplicates)
# -----------------------------
final_cols = []
for g, h in zip(r_group.tolist(), r_head.tolist()):
    g0 = norm(g)
    h0 = norm(h)

    if h0 in ("clave", "descripcion"):
        final_cols.append(h0)
    elif h0 in ("min", "max") and g0:
        final_cols.append(f"{g0}_{h0}")
    else:
        final_cols.append("col")

# -----------------------------
# 5) Create structured Silver base
# -----------------------------
df_data = (
    df_bronze
    .where(F.col("_row_id") > header_row_id)
    .drop("_row_id")
    .toDF(*final_cols)
)
# --- define aggregation prefixes ---
# Define the aggregation families
AGG_FAMILIES = ["imss_bienestar", "ccinshae", "salud_spps"]

# Start with your existing dataframe
df_aggregated = df_data

# For each family, find matching columns and aggregate them
for family in AGG_FAMILIES:
    # Find columns that match the pattern: family_*_min and family_*_max
    min_cols = [col for col in df_data.columns if col.startswith(f"{family}_") and col.endswith("_min")]
    max_cols = [col for col in df_data.columns if col.startswith(f"{family}_") and col.endswith("_max")]
    
    # Sum these columns to create aggregated columns
    if min_cols:
        # Create sum expression for min columns
        min_sum_expr = sum([F.col(c) for c in min_cols])
        df_aggregated = df_aggregated.withColumn(f"{family}_min", min_sum_expr)
    
    if max_cols:
        # Create sum expression for max columns
        max_sum_expr = sum([F.col(c) for c in max_cols])
        df_aggregated = df_aggregated.withColumn(f"{family}_max", max_sum_expr)

# Now select only the columns you need
# Get the base columns
base_cols = ['clave', 'descripcion', 'imss_min', 'imss_max', 'issste_min', 'issste_max', 
             'pemex_min', 'pemex_max']

# Add the aggregated family columns
for family in AGG_FAMILIES:
    base_cols.extend([f"{family}_min", f"{family}_max"])

# Add totales if they exist
if 'totales_min' in df_data.columns:
    base_cols.extend(['totales_min', 'totales_max'])

# Create the final dataframe by selecting only the needed columns
df_final = df_aggregated.select(base_cols)

df_final.display()

# -----------------------------
# 7) Persist Silver
# -----------------------------
spark.sql("DROP TABLE IF EXISTS workspace.default.silver_licitacion_info")

(df_final.write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("workspace.default.silver_licitacion_info"))

print("✅ Silver table created cleanly")

In [0]:
%sql
-- Validación de la agregación. 
WITH validacion AS (
  SELECT
    clave,
    totales_min,
    totales_max,

    /* sums with null-safe coalesce */
    COALESCE(imss_min,0) + COALESCE(issste_min,0) + COALESCE(pemex_min,0)
    + COALESCE(imss_bienestar_min,0) + COALESCE(ccinshae_min,0) + COALESCE(salud_spps_min,0)
      AS sum_min,

    COALESCE(imss_max,0) + COALESCE(issste_max,0) + COALESCE(pemex_max,0)
    + COALESCE(imss_bienestar_max,0) + COALESCE(ccinshae_max,0) + COALESCE(salud_spps_max,0)
      AS sum_max
  FROM workspace.default.silver_licitacion_info
),

deltas AS (
  SELECT
    clave,
    totales_min,
    sum_min,
    (sum_min - COALESCE(totales_min,0)) AS delta_min,

    totales_max,
    sum_max,
    (sum_max - COALESCE(totales_max,0)) AS delta_max
  FROM validacion
)

SELECT *
FROM deltas
WHERE delta_min <> 0 OR delta_max <> 0
ORDER BY ABS(delta_min) DESC, ABS(delta_max) DESC;


In [0]:
out_dir = "dbfs:/Volumes/workspace/default/eseotres/silver_licitacion_info_export_csv"

(spark.table("workspace.default.silver_licitacion_info")
 .coalesce(1)  # single CSV file (ok if not huge)
 .write.mode("overwrite")
 .option("header", "true")
 .csv(out_dir))

print("✅ Exported to:", out_dir)