# BRONZE — Ingesta y validaciones
**Objetivo:** Leer archivos crudos del Volume, validar naming `_yyyyMMdd`, y preparar particiones de proceso.

**Entradas**  
- Volume: `/Volumes/<catalog>/<bronze>/<volume>/Process/<country>/yyyy=YYYY/mm=MM/dd=DD/`  
- Archivos: `iowa_dataset_YYYYMMDD.csv` + (opcional) `..._part-*.csv`

**Salidas**  
- Archivos validados y listados para procesamiento.  
- (Opcional) Archivo(s) movidos a `Archive/` al finalizar.

**Parámetros**  
- `catalog`, `bronze_schema`, `volume`, `country`, `process_date`

**Notas:** Este notebook **no transforma datos**; solo controla entrada y (si está habilitado) archiva.


## Paso 1 — Validación de naming y existencia de archivos
- Verifica que existan CSV en la carpeta de proceso del día.  
- Chequea que el nombre termine en `_<YYYYMMDD>.csv`.  
- Falla controlada si falta algo.


In [0]:
# ==== Validación de naming + utilidades de archivado (Bronze) ====
import re
from pyspark.sql import functions as F

# Helper: crear widget solo si no existe
def ensure_text_widget(name: str, default: str):
    try:
        dbutils.widgets.get(name)  # si existe, no hacemos nada
    except:
        dbutils.widgets.text(name, default)

# 1) Widgets (parámetros)
ensure_text_widget("catalog",       "ct_andresolguin_finalproject")
ensure_text_widget("bronze_schema", "bronze")
ensure_text_widget("volume",        "flatfiles_managed")
ensure_text_widget("country",       "usa")
ensure_text_widget("process_date",  "20251018")  # formato yyyymmdd

catalog      = dbutils.widgets.get("catalog")
bronze       = dbutils.widgets.get("bronze_schema")
volume       = dbutils.widgets.get("volume")
country      = dbutils.widgets.get("country")
process_date = dbutils.widgets.get("process_date")

yyyy, mm, dd = process_date[:4], process_date[4:6], process_date[6:8]

base     = f"/Volumes/{catalog}/{bronze}/{volume}"
proc_dir = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"

# 2) Validar naming: deben terminar en _yyyyMMdd.csv
pattern = re.compile(rf".*_{process_date}\.csv$", re.IGNORECASE)

try:
    items = dbutils.fs.ls(proc_dir)
except Exception as e:
    raise FileNotFoundError(f"No existe la carpeta del día: {proc_dir}. Detalle: {e}")

csvs = [x for x in items if x.name.lower().endswith(".csv")]

if not csvs:
    raise FileNotFoundError(f"No se encontraron CSV en {proc_dir}")

bad = [x.name for x in csvs if not pattern.match(x.name)]
if bad:
    raise ValueError(f"Archivos con naming inválido (esperado *_{process_date}.csv): {bad}")

print(f"OK naming: {len(csvs)} archivo(s) válidos en {proc_dir}")
print([x.name for x in csvs])

# 3) Helper para archivar al finalizar (lo llamaremos al final del notebook)
def archive_processed_files():
    archive_dir = f"{base}/Archive/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
    dbutils.fs.mkdirs(archive_dir)
    moved = []
    for f in csvs:
        dst = f"{archive_dir}/{f.name}"
        # si ya existe en Archive, sobrescribir: idempotente
        dbutils.fs.mv(f.path, dst, True)
        moved.append((f.name, dst))
    print("Archivados:", moved)


OK naming: 1 archivo(s) válidos en /Volumes/ct_andresolguin_finalproject/bronze/flatfiles_managed/Process/usa/yyyy=2025/mm=10/dd=18
['iowa_dataset_20251018.csv']


In [0]:
catalog, schema = "ct_andresolguin_finalproject", "bronze"
country, yyyy, mm, dd = "usa", "2025", "10", "18"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
processed_src = f"{base}/Processed/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/iowa_dataset_{yyyy}{mm}{dd}.csv"
process_dir   = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
process_dst   = f"{process_dir}/iowa_dataset_{yyyy}{mm}{dd}.csv"
dbutils.fs.mkdirs(process_dir)
dbutils.fs.cp(processed_src, process_dst, True)
print("[OK] insumo en Process:", process_dst)


[OK] insumo en Process: /Volumes/ct_andresolguin_finalproject/bronze/flatfiles_managed/Process/usa/yyyy=2025/mm=10/dd=18/iowa_dataset_20251018.csv


In [0]:
# Parámetros globales (widgets) — Parte 3.2
from pyspark.sql import functions as F, types as T

def wtext(name, default):
    try:
        dbutils.widgets.text(name, default)
    except:
        pass

wtext("catalog", "ct_andresolguin_finalproject")
wtext("schema_bronze", "bronze")
wtext("schema_silver", "silver")
wtext("schema_gold", "gold")
wtext("volume_name", "flatfiles_managed")
wtext("country", "usa")
wtext("process_date", "20251018")  # AAAAMMDD (usamos 20251018 para este día)

# Leer widgets
catalog       = dbutils.widgets.get("catalog")
schema_bronze = dbutils.widgets.get("schema_bronze")
schema_silver = dbutils.widgets.get("schema_silver")
schema_gold   = dbutils.widgets.get("schema_gold")
volume_name   = dbutils.widgets.get("volume_name")
country       = dbutils.widgets.get("country")
process_date  = dbutils.widgets.get("process_date")

# Rutas derivadas por fecha
yyyy, mm, dd = process_date[:4], process_date[4:6], process_date[6:8]
base_volume   = f"/Volumes/{catalog}/{schema_bronze}/{volume_name}"
in_dir        = f"{base_volume}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
processed_dir = f"{base_volume}/Processed/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
rejected_dir  = f"{base_volume}/Rejected/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"

# Contexto catálogo
spark.sql(f"USE CATALOG {catalog}")

print("[INFO] Parámetros cargados")
print(f"  catalog={catalog} | bronze={schema_bronze} | silver={schema_silver} | gold={schema_gold}")
print(f"  process_date={process_date} | in_dir={in_dir}")

# Sanity check: ¿existe el directorio de entrada?
try:
    files = dbutils.fs.ls(in_dir)
    print(f"[OK] Directorio de entrada existe y tiene {len(files)} ítems")
except Exception as e:
    raise RuntimeError(f"[ERROR] No se encuentra el directorio de entrada: {in_dir}. Detalle: {e}")


[INFO] Parámetros cargados
  catalog=ct_andresolguin_finalproject | bronze=bronze | silver=silver | gold=gold
  process_date=20251018 | in_dir=/Volumes/ct_andresolguin_finalproject/bronze/flatfiles_managed/Process/usa/yyyy=2025/mm=10/dd=18
[OK] Directorio de entrada existe y tiene 1 ítems


In [0]:
from pyspark.sql import functions as F

# === Config ===
catalog = "ct_andresolguin_finalproject"
schema  = "bronze"
country = "usa"
yyyy, mm, dd = "2025", "10", "18"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
file_name = f"iowa_dataset_{yyyy}{mm}{dd}.csv"
src = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/{file_name}"

# === Cargar fuentes ===
df_csv = (spark.read
          .option("header","true")
          .option("inferSchema","false")
          .csv(src))

bronze_month = spark.table(f"{catalog}.{schema}.iowa_raw_str").where("year=2025 AND month=10")

# === Elegir una clave común (priorizamos invoice_line_no) ===
possible_keys = ["invoice_line_no", "invoice_and_item_number", "invoice_number"]
present = [k for k in possible_keys if (k in df_csv.columns and k in bronze_month.columns)]
assert present, f"No encuentro una clave común entre CSV y BRONZE. CSV: {df_csv.columns}; BRONZE: {bronze_month.columns}"
key = present[0]
print("Usando clave:", key)

# === Normalizar claves y comparar ===
csv_keys    = df_csv.select(F.upper(F.trim(F.col(key))).alias("key"))
bronze_keys = bronze_month.select(F.upper(F.trim(F.col(key))).alias("key"))

missing_keys = csv_keys.join(bronze_keys, ["key"], "left_anti").distinct()
missing_count = missing_keys.count()
print("Faltantes (CSV que no están en BRONZE):", missing_count)

# Recalculo de diferencia esperada para validar
rows_oct_2025 = spark.sql("""
  SELECT COUNT(*) AS c
  FROM ct_andresolguin_finalproject.bronze.iowa_raw_str
  WHERE year = 2025 AND month = 10
""").collect()[0]['c']
row_count_csv = df_csv.count()
expected_diff = row_count_csv - rows_oct_2025
print("expected_diff (csv - bronze):", expected_diff)

# === Ver claves faltantes y filas completas (muestra) ===
display(missing_keys.limit(20))

faltantes_df = df_csv.join(missing_keys, F.upper(F.trim(F.col(key))) == F.col("key"), "inner")
print("Filas completas faltantes (deberían ser 6):")
display(faltantes_df.limit(10))


Usando clave: invoice_line_no
Faltantes (CSV que no están en BRONZE): 1
expected_diff (csv - bronze): 6


key
"PRAIRIE TRAIL SUITE 107-108"""


Filas completas faltantes (deberían ser 6):


invoice_line_no,date,store,name,address,city,zipcode,store_location,county_number,county,category,category_name,vendor_no,vendor_name,itemno,im_desc,pack,bottle_volume_ml,state_bottle_cost,state_bottle_retail,sale_bottles,sale_dollars,sale_liters,sale_gallons,:@computed_region_3r5t_5243,:@computed_region_wnea_7qqw,:@computed_region_i9mz_6gmt,:@computed_region_uhgg_e8y2,:@computed_region_e7ym_nrbf,key
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1022200,100% AGAVE TEQUILA,260,DIAGEO AMERICAS,89154,DON JULIO REPOSADO,6,750,33.49,50.24,6,301.44,4.5,1.18,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1022200,100% AGAVE TEQUILA,85,BROWN FORMAN CORP.,100104,HERRADURA ULTRA ANEJO,6,750,33.0,49.5,6,297.0,4.5,1.18,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1701100,TEMPORARY & SPECIALTY PACKAGES,421,SAZERAC COMPANY INC,21242,1792 SWEET WHEAT BOURBON,6,750,20.0,30.0,1,30.0,0.75,0.19,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1011100,BLENDED WHISKIES,772,INTERCONTINENTAL PACKAGING COMPANY/PRESTIGE BEVERAGE GROUP,21687,2XO OAK SERIES,6,750,25.0,37.5,6,225.0,4.5,1.18,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1052100,IMPORTED BRANDIES,420,MOET HENNESSY USA,48106,HENNESSY VS,12,750,19.99,29.99,12,359.88,9.0,2.37,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1052100,IMPORTED BRANDIES,420,MOET HENNESSY USA,48106,HENNESSY VS,12,750,19.99,29.99,12,359.88,9.0,2.37,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""


In [0]:
# Validación de naming autocontenida (lee/crea widgets, deriva rutas y valida)
import re

def _get_widget(name, default):
    # Lee un widget; si no existe, lo crea con default
    try:
        return dbutils.widgets.get(name)
    except:
        dbutils.widgets.text(name, default)
        return dbutils.widgets.get(name)

# 1) Parámetros (widgets) con defaults seguros
catalog       = _get_widget("catalog", "ct_andresolguin_finalproject")
schema_bronze = _get_widget("schema_bronze", "bronze")
schema_silver = _get_widget("schema_silver", "silver")
schema_gold   = _get_widget("schema_gold", "gold")
volume_name   = _get_widget("volume_name", "flatfiles_managed")
country       = _get_widget("country", "usa")
process_date  = _get_widget("process_date", "20251018").strip()  # AAAAMMDD

# 2) Rutas derivadas
yyyy, mm, dd = process_date[:4], process_date[4:6], process_date[6:8]
base_volume   = f"/Volumes/{catalog}/{schema_bronze}/{volume_name}"
in_dir        = f"{base_volume}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
processed_dir = f"{base_volume}/Processed/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
rejected_dir  = f"{base_volume}/Rejected/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"

# 3) Contexto de catálogo (por claridad/trazabilidad)
spark.sql(f"USE CATALOG {catalog}")

# 4) Patrón estricto exigido por la consigna: _yyyyMMdd (y opcional _partNN)
pattern = rf"^iowa_dataset_{process_date}(?:_part\d{{2}})?\.csv$"
print(f"[INFO] Parámetros OK | catalog={catalog} | bronze={schema_bronze} | process_date={process_date}")
print(f"[INFO] Directorio de entrada: {in_dir}")
print(f"[INFO] Patrón aplicado: {pattern}")

# 5) Listado y validación
entries = dbutils.fs.ls(in_dir)
if not entries:
    raise RuntimeError(f"[ERROR] No hay archivos en {in_dir}")

valid_files, invalid_files = [], []
for e in entries:
    is_match = re.match(pattern, e.name, flags=re.IGNORECASE) is not None
    estado = "VALIDO" if is_match else "INVALIDO"
    print(f"  - {e.name} -> {estado}")
    (valid_files if is_match else invalid_files).append(e.path)

print(f"[RESUMEN] Total={len(entries)} | Válidos={len(valid_files)} | Inválidos={len(invalid_files)}")

# 6) Manejo de error si no hay válidos 
if not valid_files:
    raise RuntimeError("[ERROR] No hay archivos válidos según el patrón (_yyyyMMdd). "
                       "Revise 'process_date' o el nombre del archivo.")

# 7) Devolvemos para el siguiente paso (ingesta / mover a Processed/Rejected)
valid_files, invalid_files


[INFO] Parámetros OK | catalog=ct_andresolguin_finalproject | bronze=bronze | process_date=20251018
[INFO] Directorio de entrada: /Volumes/ct_andresolguin_finalproject/bronze/flatfiles_managed/Process/usa/yyyy=2025/mm=10/dd=18
[INFO] Patrón aplicado: ^iowa_dataset_20251018(?:_part\d{2})?\.csv$
  - iowa_dataset_20251018.csv -> VALIDO
[RESUMEN] Total=1 | Válidos=1 | Inválidos=0


(['dbfs:/Volumes/ct_andresolguin_finalproject/bronze/flatfiles_managed/Process/usa/yyyy=2025/mm=10/dd=18/iowa_dataset_20251018.csv'],
 [])

In [0]:
# BRONZE (crudo) — Ingesta sin casteos (todo STRING) y escritura a Delta
from pyspark.sql import functions as F
import re

# 1) Parámetros / widgets (autocontenidos)
def _w(name, default):
    try:
        return dbutils.widgets.get(name)
    except:
        dbutils.widgets.text(name, default); 
        return dbutils.widgets.get(name)

catalog       = _w("catalog", "ct_andresolguin_finalproject")
schema_bronze = _w("schema_bronze", "bronze")
volume_name   = _w("volume_name", "flatfiles_managed")
country       = _w("country", "usa")
process_date  = _w("process_date", "20251018").strip()

yyyy, mm, dd = process_date[:4], process_date[4:6], process_date[6:8]
base_volume   = f"/Volumes/{catalog}/{schema_bronze}/{volume_name}"
in_dir        = f"{base_volume}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
table_raw     = f"{catalog}.{schema_bronze}.iowa_raw_str"

spark.sql(f"USE CATALOG {catalog}")

# 2) Crear tabla BRONZE cruda si no existe (todo STRING + cols técnicas)
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {table_raw} (
  invoice_line_no STRING,
  date STRING,
  store STRING,
  name STRING,
  address STRING,
  city STRING,
  zipcode STRING,
  store_location STRING,
  county_number STRING,
  county STRING,
  category STRING,
  category_name STRING,
  vendor_no STRING,
  vendor_name STRING,
  itemno STRING,
  im_desc STRING,
  pack STRING,
  bottle_volume_ml STRING,
  state_bottle_cost STRING,
  state_bottle_retail STRING,
  sale_bottles STRING,
  sale_dollars STRING,
  sale_liters STRING,
  sale_gallons STRING,
  ingestion_ts TIMESTAMP,
  year INT,
  month STRING
) USING delta
PARTITIONED BY (year, month)
""")

# 3) Hallar archivo(s) válidos del día
pattern = rf"^iowa_dataset_{process_date}(?:_part\d{{2}})?\.csv$"
entries = dbutils.fs.ls(in_dir)
valid_files = [e.path for e in entries if re.match(pattern, e.name, flags=re.IGNORECASE)]
if not valid_files:
    raise RuntimeError(f"[ERROR] No hay archivos válidos en {in_dir} (esperado *_{process_date}.csv)")
print(f"[INFO] Archivos a ingestar: {len(valid_files)}")

# 4) Leer respetando encabezados reales
df_csv = (spark.read.format("csv")
          .option("header", True)
          .option("multiLine", True)
          .option("escape", '"').option("quote", '"')
          .load(valid_files))
cols = set(df_csv.columns)
print(f"[INFO] Columnas detectadas: {len(cols)}  Ejemplo: {df_csv.columns[:8]}")

# 5) Selección por nombre con alternativas (sin castear — todo STRING)
choices = {
  "invoice_line_no": ["invoice_line_no","Invoice/Item Number"],
  "date":            ["date","Date"],
  "store":           ["store","Store Number"],
  "name":            ["name","Store Name"],
  "address":         ["address","Address"],
  "city":            ["city","City"],
  "zipcode":         ["zipcode","Zip Code"],
  "store_location":  ["store_location","Store Location"],
  "county_number":   ["county_number","County Number"],
  "county":          ["county","County"],
  "category":        ["category","Category"],
  "category_name":   ["category_name","Category Name"],
  "vendor_no":       ["vendor_no","Vendor Number"],
  "vendor_name":     ["vendor_name","Vendor Name"],
  "itemno":          ["itemno","Item Number"],
  "im_desc":         ["im_desc","Item Description"],
  "pack":            ["pack","Pack"],
  "bottle_volume_ml":["bottle_volume_ml","Bottle Volume (ml)"],
  "state_bottle_cost":   ["state_bottle_cost","State Bottle Cost"],
  "state_bottle_retail": ["state_bottle_retail","State Bottle Retail"],
  "sale_bottles":    ["sale_bottles","Bottles Sold"],
  "sale_dollars":    ["sale_dollars","Sale (Dollars)"],
  "sale_liters":     ["sale_liters","Volume Sold (Liters)"],
  "sale_gallons":    ["sale_gallons","Volume Sold (Gallons)"],
}

missing, select_exprs = [], []
for std, alts in choices.items():
    found = next((a for a in alts if a in cols), None)
    if not found:
        missing.append(std)
    else:
        select_exprs.append(F.col(found).cast("string").alias(std))  # todo a STRING

if missing:
    raise RuntimeError(f"[ERROR] Faltan columnas requeridas: {missing}")

df_raw = (df_csv.select(*select_exprs)
          .withColumn("ingestion_ts", F.current_timestamp())
          .withColumn("year",  F.lit(int(yyyy)))
          .withColumn("month", F.lit(mm)))

rows = df_raw.count()
if rows == 0:
    raise RuntimeError("[ERROR] No hay filas válidas para escribir en BRONZE crudo.")

# 6) Escribir a Delta (append)
df_raw.write.format("delta").mode("append").saveAsTable(table_raw)
print(f"[OK] Ingesta BRONZE (crudo) completada → {table_raw} | Filas escritas: {rows} | Partición year={yyyy}, month={mm}")


[INFO] Archivos a ingestar: 1
[INFO] Columnas detectadas: 29  Ejemplo: ['invoice_line_no', 'date', 'store', 'name', 'address', 'city', 'zipcode', 'store_location']
[OK] Ingesta BRONZE (crudo) completada → ct_andresolguin_finalproject.bronze.iowa_raw_str | Filas escritas: 15299000 | Partición year=2025, month=10


In [0]:
# === Config ===
catalog = "ct_andresolguin_finalproject"
schema  = "bronze"
country = "usa"
yyyy, mm, dd = "2025", "10", "18"
file_name = f"iowa_dataset_{yyyy}{mm}{dd}.csv"

# Paths (Volumes)
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
src  = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/{file_name}"
dst  = f"{base}/Processed/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/{file_name}"

# 1.a) Comprobar que el archivo existe y leer tamaño
items = [x for x in dbutils.fs.ls(f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}") if x.name == file_name]
assert items, f"No existe el archivo en Process: {src}"
file_size_bytes = items[0].size

# 1.b) Contar filas del CSV (rápido, sin inferir schema)
df_csv = (spark.read
          .option("header", "true")
          .option("inferSchema", "false")
          .csv(src))
row_count_csv = df_csv.count()

print("OK — Archivo encontrado y leído")
print("file_name:", file_name)
print("file_size_bytes:", file_size_bytes)
print("row_count_csv:", row_count_csv)


OK — Archivo encontrado y leído
file_name: iowa_dataset_20251018.csv
file_size_bytes: 4636663806
row_count_csv: 15299006


In [0]:
%sql
USE CATALOG ct_andresolguin_finalproject;
USE SCHEMA bronze;

SELECT COUNT(*) AS rows_oct_2025
FROM ct_andresolguin_finalproject.bronze.iowa_raw_str
WHERE year = 2025 AND month = 10;


rows_oct_2025
15299000


In [0]:
# === Rechequeo de diferencias entre CSV, BRONZE y REJECTED ===
catalog = "ct_andresolguin_finalproject"
schema  = "bronze"
country = "usa"
yyyy, mm, dd = "2025", "10", "18"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
file_name = f"iowa_dataset_{yyyy}{mm}{dd}.csv"
src = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/{file_name}"

# Contar filas del CSV (de nuevo)
df_csv = (spark.read
          .option("header", "true")
          .option("inferSchema", "false")
          .csv(src))
row_count_csv = df_csv.count()

# Contar filas en BRONZE (mes actual)
rows_oct_2025 = spark.sql("""
SELECT COUNT(*) AS c
FROM ct_andresolguin_finalproject.bronze.iowa_raw_str
WHERE year = 2025 AND month = 10
""").collect()[0]['c']

diff = row_count_csv - rows_oct_2025
print("row_count_csv:", row_count_csv)
print("rows_oct_2025:", rows_oct_2025)
print("diff (csv - bronze):", diff)

# Buscar si hay rechazados del mismo día
rej_path = f"{base}/Rejected/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
rejected_count = 0
try:
    df_rej = (spark.read
              .option("header","true")
              .option("inferSchema","false")
              .csv(rej_path))
    rejected_count = df_rej.count()
except Exception:
    print("Rejected vacío o no existe para este día.")

print("rejected_count:", rejected_count)

if diff == rejected_count:
    print("✅ OK — La diferencia coincide con rechazados. Podemos mover a Processed.")
else:
    print("⚠️ ATENCIÓN — La diferencia NO coincide. No mover hasta revisar.")


row_count_csv: 15299006
rows_oct_2025: 15299000
diff (csv - bronze): 6
Rejected vacío o no existe para este día.
rejected_count: 0
⚠️ ATENCIÓN — La diferencia NO coincide. No mover hasta revisar.


In [0]:
# Releer CSV y BRONZE del mes
catalog = "ct_andresolguin_finalproject"
schema  = "bronze"
country = "usa"
yyyy, mm, dd = "2025", "10", "18"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
file_name = f"iowa_dataset_{yyyy}{mm}{dd}.csv"
src = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/{file_name}"

df_csv = (spark.read
          .option("header","true")
          .option("inferSchema","false")
          .csv(src))

bronze_month = spark.table(f"{catalog}.{schema}.iowa_raw_str").where("year=2025 AND month=10")

csv_cols    = set(df_csv.columns)
bronze_cols = set(bronze_month.columns)
common = sorted(csv_cols.intersection(bronze_cols))

candidates = [c for c in common if any(k in c.lower() for k in ["invoice", "item", "number", "id"])]
print("Columnas comunes:", common)
print("Candidatas de clave:", candidates)

# muestra de valores de posibles claves (primeros 5)
for c in candidates[:3]:
    print(f"\nMuestra CSV de {c}:")
    display(df_csv.select(c).distinct().limit(5))


Columnas comunes: ['address', 'bottle_volume_ml', 'category', 'category_name', 'city', 'county', 'county_number', 'date', 'im_desc', 'invoice_line_no', 'itemno', 'name', 'pack', 'sale_bottles', 'sale_dollars', 'sale_gallons', 'sale_liters', 'state_bottle_cost', 'state_bottle_retail', 'store', 'store_location', 'vendor_name', 'vendor_no', 'zipcode']
Candidatas de clave: ['county_number', 'invoice_line_no', 'itemno']

Muestra CSV de county_number:


county_number
50
61
TAMA
93
79



Muestra CSV de invoice_line_no:


invoice_line_no
INV-14763200169
INV-14884200007
INV-15952100064
INV-14953800026
INV-15850300044



Muestra CSV de itemno:


itemno
25604
41704
19048
66296
BARTENDERS HOT SEX


In [0]:
from pyspark.sql import functions as F

# === Config ===
catalog = "ct_andresolguin_finalproject"
schema  = "bronze"
country = "usa"
yyyy, mm, dd = "2025", "10", "18"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
file_name = f"iowa_dataset_{yyyy}{mm}{dd}.csv"
src = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/{file_name}"

# === Cargar fuentes ===
df_csv = (spark.read
          .option("header","true")
          .option("inferSchema","false")
          .csv(src))

bronze_month = spark.table(f"{catalog}.{schema}.iowa_raw_str").where("year=2025 AND month=10")

# === Elegir una clave común (priorizamos invoice_line_no) ===
possible_keys = ["invoice_line_no", "invoice_and_item_number", "invoice_number"]
present = [k for k in possible_keys if (k in df_csv.columns and k in bronze_month.columns)]
assert present, f"No encuentro una clave común entre CSV y BRONZE. CSV: {df_csv.columns}; BRONZE: {bronze_month.columns}"
key = present[0]
print("Usando clave:", key)

# === Normalizar claves y comparar ===
csv_keys    = df_csv.select(F.upper(F.trim(F.col(key))).alias("key"))
bronze_keys = bronze_month.select(F.upper(F.trim(F.col(key))).alias("key"))

missing_keys = csv_keys.join(bronze_keys, ["key"], "left_anti").distinct()
missing_count = missing_keys.count()
print("Faltantes (CSV que no están en BRONZE):", missing_count)

# Recalculo de diferencia esperada para validar
rows_oct_2025 = spark.sql("""
  SELECT COUNT(*) AS c
  FROM ct_andresolguin_finalproject.bronze.iowa_raw_str
  WHERE year = 2025 AND month = 10
""").collect()[0]['c']
row_count_csv = df_csv.count()
expected_diff = row_count_csv - rows_oct_2025
print("expected_diff (csv - bronze):", expected_diff)

# === Ver claves faltantes y filas completas (muestra) ===
display(missing_keys.limit(20))

faltantes_df = df_csv.join(missing_keys, F.upper(F.trim(F.col(key))) == F.col("key"), "inner")
print("Filas completas faltantes (deberían ser 6):")
display(faltantes_df.limit(10))


Usando clave: invoice_line_no
Faltantes (CSV que no están en BRONZE): 1
expected_diff (csv - bronze): 6


key
"PRAIRIE TRAIL SUITE 107-108"""


Filas completas faltantes (deberían ser 6):


invoice_line_no,date,store,name,address,city,zipcode,store_location,county_number,county,category,category_name,vendor_no,vendor_name,itemno,im_desc,pack,bottle_volume_ml,state_bottle_cost,state_bottle_retail,sale_bottles,sale_dollars,sale_liters,sale_gallons,:@computed_region_3r5t_5243,:@computed_region_wnea_7qqw,:@computed_region_i9mz_6gmt,:@computed_region_uhgg_e8y2,:@computed_region_e7ym_nrbf,key
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1022200,100% AGAVE TEQUILA,260,DIAGEO AMERICAS,89154,DON JULIO REPOSADO,6,750,33.49,50.24,6,301.44,4.5,1.18,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1022200,100% AGAVE TEQUILA,85,BROWN FORMAN CORP.,100104,HERRADURA ULTRA ANEJO,6,750,33.0,49.5,6,297.0,4.5,1.18,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1701100,TEMPORARY & SPECIALTY PACKAGES,421,SAZERAC COMPANY INC,21242,1792 SWEET WHEAT BOURBON,6,750,20.0,30.0,1,30.0,0.75,0.19,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1011100,BLENDED WHISKIES,772,INTERCONTINENTAL PACKAGING COMPANY/PRESTIGE BEVERAGE GROUP,21687,2XO OAK SERIES,6,750,25.0,37.5,6,225.0,4.5,1.18,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1052100,IMPORTED BRANDIES,420,MOET HENNESSY USA,48106,HENNESSY VS,12,750,19.99,29.99,12,359.88,9.0,2.37,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""
"PRAIRIE TRAIL SUITE 107-108""",ANKENY,50023,"{'type': 'Point', 'coordinates': [-93.601740968, 41.716028988]}",POLK,1052100,IMPORTED BRANDIES,420,MOET HENNESSY USA,48106,HENNESSY VS,12,750,19.99,29.99,12,359.88,9.0,2.37,831,25,316,64,1878,,,,,,"PRAIRIE TRAIL SUITE 107-108"""


In [0]:
from pyspark.sql import functions as F

# --- Config ---
catalog, schema = "ct_andresolguin_finalproject", "bronze"
key = "invoice_line_no"
yyyy, mm, dd = "2025","10","18"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
src  = f"{base}/Process/usa/yyyy={yyyy}/mm={mm}/dd={dd}/iowa_dataset_{yyyy}{mm}{dd}.csv"

# --- Cargar CSV del día y BRONZE del mes ---
csv = (spark.read.option("header","true").option("inferSchema","false").csv(src))
bronze_month = spark.table(f"{catalog}.{schema}.iowa_raw_str").where("year=2025 AND month=10")

# --- Contar por clave en cada fuente ---
csv_k = (csv.select(F.upper(F.trim(F.col(key))).alias("key"))
            .groupBy("key").agg(F.count("*").alias("csv_count")))
bronze_k = (bronze_month.select(F.upper(F.trim(F.col(key))).alias("key"))
                         .groupBy("key").agg(F.count("*").alias("bronze_count")))

cmp = (csv_k.join(bronze_k, "key", "left_outer")
           .fillna({"bronze_count":0})
           .withColumn("excess_csv_vs_bronze", F.col("csv_count")-F.col("bronze_count")))

excess_total = (cmp.where("excess_csv_vs_bronze > 0")
                  .agg(F.sum("excess_csv_vs_bronze").alias("excess_total"))
                  .collect()[0]["excess_total"])
print("excess_total:", excess_total)  # debería explicar la diferencia de 6

# --- Ver las claves con diferencia > 0 (top) ---
display(cmp.where("excess_csv_vs_bronze > 0")
          .orderBy(F.desc("excess_csv_vs_bronze"))
          .limit(10))

# --- Detectar posibles registros corruptos donde la clave no parezca un INV-... ---
suspects = (csv.where(~F.col(key).startswith("INV-"))
              .select(key).distinct().limit(20))
print("Posibles claves sospechosas (no empiezan con 'INV-'):")
display(suspects)

# Si querés ver filas completas de esos sospechosos:
sus_keys = [r[key] for r in suspects.collect()]
if sus_keys:
    display(csv.where(F.col(key).isin(sus_keys)).limit(50))


excess_total: 6


key,csv_count,bronze_count,excess_csv_vs_bronze
"PRAIRIE TRAIL SUITE 107-108""",6,0,6


Posibles claves sospechosas (no empiezan con 'INV-'):


invoice_line_no
S15183400047
S12386600035
S14174900099
S05857200097
S24709700007
S26437400031
S14695800009
S15322300065
S03732500027
S14909400006


invoice_line_no,date,store,name,address,city,zipcode,store_location,county_number,county,category,category_name,vendor_no,vendor_name,itemno,im_desc,pack,bottle_volume_ml,state_bottle_cost,state_bottle_retail,sale_bottles,sale_dollars,sale_liters,sale_gallons,:@computed_region_3r5t_5243,:@computed_region_wnea_7qqw,:@computed_region_i9mz_6gmt,:@computed_region_uhgg_e8y2,:@computed_region_e7ym_nrbf
S09738100148,2012-12-27T00:00:00.000,2560,HY-VEE FOOD STORE / MARION,3600 BUSINESS HWY 151 EAST,MARION,52302,,57,LINN,1032200,IMPORTED VODKA - MISC,395,PROXIMO,36010,THREE OLIVES SMORES,12,750,10.16,15.74,6,94.44,4.5,1.19,,,,,
S15183400047,2013-10-16T00:00:00.000,3943,GOOD AND QUICK CO,519 LINCOLNWAY,NEVADA,50201,"{'type': 'Point', 'coordinates': [-93.464816, 42.022781]}",85,STORY,1012100,CANADIAN WHISKIES,260,DIAGEO AMERICAS,11293,CROWN ROYAL CANADIAN WHISKY,48,200,4.7,7.05,3,21.15,0.6,0.16,230.0,11.0,126.0,53.0,299.0
S26437400031,2015-06-26T00:00:00.000,2573,HY-VEE FOOD STORE / MUSCATINE,2400 2ND AVE,MUSCATINE,52761,"{'type': 'Point', 'coordinates': [-91.035138, 41.451349]}",70,MUSCATINE,1011200,STRAIGHT BOURBON WHISKIES,259,HEAVEN HILL BRANDS,18048,EVAN WILLIAMS GREEN LABEL,6,1750,9.94,14.91,6,89.46,10.5,2.77,445.0,56.0,86.0,68.0,1835.0
S09733400032,2012-12-27T00:00:00.000,2508,HY-VEE FOOD STORE #1 / CEDAR RAPIDS,"1843 JOHNSON AVENUE, N.W.",CEDAR RAPIDS,52405,"{'type': 'Point', 'coordinates': [-91.697941, 41.97447]}",57,LINN,1031080,VODKA 80 PROOF,297,LAIRD AND COMPANY,35926,FIVE O'CLOCK PET VODKA,12,750,3.4,5.09,12,61.08,9.0,2.38,586.0,18.0,264.0,45.0,287.0
S14695800009,2013-09-23T00:00:00.000,3868,WAL-MART 3630 / MARION,5491 BUSINESS HWY 151,MARION,52302,,57,LINN,1012100,CANADIAN WHISKIES,260,DIAGEO AMERICAS,11296,CROWN ROYAL,12,750,14.75,22.13,12,265.56,9.0,2.38,,,,,
S12386600035,2013-05-23T00:00:00.000,3731,WAL-MART 1241 / DAVENPORT,5811 ELMORE AVE,DAVENPORT,52807,"{'type': 'Point', 'coordinates': [-90.525525, 41.580212]}",82,SCOTT,1031080,VODKA 80 PROOF,434,LUXCO-ST LOUIS,36306,HAWKEYE VODKA,12,750,3.28,4.92,24,118.08,18.0,4.76,227.0,56.0,83.0,67.0,1881.0
S13386600010,2013-07-16T00:00:00.000,3065,MONTEZUMA SUPER VALU,201 S FRONT ST,MONTEZUMA,50171,"{'type': 'Point', 'coordinates': [-92.527248, 41.584179]}",79,POWESHIEK,1041100,AMERICAN DRY GINS,434,LUXCO-ST LOUIS,30526,HAWKEYE GIN,12,750,3.33,5.0,3,15.0,2.25,0.59,921.0,12.0,137.0,65.0,296.0
S15322300065,2013-10-23T00:00:00.000,3990,CORK AND BOTTLE / OSKALOOSA,309 A AVE WEST,OSKALOOSA,52577,"{'type': 'Point', 'coordinates': [-92.648153, 41.296228]}",62,MAHASKA,1042100,IMPORTED DRY GINS,260,DIAGEO AMERICAS,28868,TANQUERAY GIN,6,1750,24.5,36.74,1,36.74,1.75,0.46,914.0,11.0,133.0,76.0,290.0
S03732500027,2012-01-25T00:00:00.000,2190,"CENTRAL CITY LIQUOR, INC.",1460 2ND AVE,DES MOINES,50314,"{'type': 'Point', 'coordinates': [-93.619787, 41.60566]}",77,POLK,1062300,FLAVORED RUM,35,"BACARDI U.S.A., INC.",43137,BACARDI LIMON,12,1000,10.24,15.35,12,184.2,12.0,3.17,432.0,25.0,316.0,64.0,1878.0
S14909400006,2013-10-02T00:00:00.000,4932,B AND B WEST,3105 HUDSON RD,CEDAR FALLS,50613,"{'type': 'Point', 'coordinates': [-92.465838, 42.509248]}",7,BLACK HAWK,1031080,VODKA 80 PROOF,260,DIAGEO AMERICAS,37994,SMIRNOFF VODKA 80 PRF,24,375,4.75,7.13,24,171.12,9.0,2.38,511.0,18.0,244.0,35.0,176.0


In [0]:
from pyspark.sql import functions as F

# Config
catalog, schema = "ct_andresolguin_finalproject", "bronze"
yyyy, mm, dd = "2025", "10", "18"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
src  = f"{base}/Process/usa/yyyy={yyyy}/mm={mm}/dd={dd}/iowa_dataset_{yyyy}{mm}{dd}.csv"
key  = "invoice_line_no"

# CSV del día y BRONZE del mes
csv = (spark.read.option("header","true").option("inferSchema","false").csv(src))
bronze = spark.table(f"{catalog}.{schema}.iowa_raw_str").where("year=2025 AND month=10")

# Conteos por clave
csv_k = csv.select(F.upper(F.trim(F.col(key))).alias("key")).groupBy("key").agg(F.count("*").alias("csv_count"))
bronze_k = bronze.select(F.upper(F.trim(F.col(key))).alias("key")).groupBy("key").agg(F.count("*").alias("bronze_count"))

cmp = (csv_k.join(bronze_k, "key", "left_outer")
           .fillna({"bronze_count":0})
           .withColumn("diff", F.col("csv_count") - F.col("bronze_count")))

# Métricas
excess_total = cmp.where("diff > 0").agg(F.sum("diff").alias("excess_total")).collect()[0]["excess_total"] or 0
missing_keys_count = cmp.where("bronze_count = 0").count()  # claves sólo en CSV (tu “PRAIRIE TRAIL ...” debería contar acá)
dups_total_est = excess_total - missing_keys_count

print("excess_total:", excess_total)           # suma de (csv_count - bronze_count) para claves con más en CSV
print("missing_keys_count:", missing_keys_count) # claves que están en CSV y no en BRONZE (esperado: 1)
print("dups_total_est:", dups_total_est)         # estimado de filas extra por duplicados (esperado: 5)

# Ver cuáles son (top 20)
display(cmp.where("diff > 0").orderBy(F.desc("diff")).limit(20))


excess_total: 6
missing_keys_count: 1
dups_total_est: 5


key,csv_count,bronze_count,diff
"PRAIRIE TRAIL SUITE 107-108""",6,0,6


In [0]:
from pyspark.sql import functions as F

# --- Config ---
catalog, schema = "ct_andresolguin_finalproject", "bronze"
yyyy, mm, dd = "2025", "10", "18"
country = "usa"
key  = "invoice_line_no"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
file_name = f"iowa_dataset_{yyyy}{mm}{dd}.csv"
src  = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/{file_name}"

# --- Cargar CSV y BRONZE del mes ---
csv = (spark.read.option("header","true").option("inferSchema","false").csv(src))
bronze = spark.table(f"{catalog}.{schema}.iowa_raw_str").where("year=2025 AND month=10")

# --- Recalcular claves con más registros en CSV que en BRONZE (las problemáticas) ---
csv_k = csv.select(F.upper(F.trim(F.col(key))).alias("key")).groupBy("key").agg(F.count("*").alias("csv_count"))
bronze_k = bronze.select(F.upper(F.trim(F.col(key))).alias("key")).groupBy("key").agg(F.count("*").alias("bronze_count"))
cmp = (csv_k.join(bronze_k, "key", "left_outer").fillna({"bronze_count":0})
           .withColumn("diff", F.col("csv_count") - F.col("bronze_count")))

bad_keys = [r["key"] for r in cmp.where("diff > 0").select("key").collect()]  # deberían ser 1 (la dirección)
print("bad_keys:", bad_keys)

# --- Filtrar filas "malas" del CSV y guardarlas en Rejected del día ---
bad_rows = csv.where(F.upper(F.trim(F.col(key))).isin(bad_keys))
rej_out = f"{base}/Rejected/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/bad_{file_name.replace('.csv','')}"

bad_rows.coalesce(1).write.mode("overwrite").option("header","true").csv(rej_out)

# --- Verificación de guardado ---
saved = spark.read.option("header","true").csv(rej_out).count()
print("✅ Guardadas en Rejected:", saved, "filas")
print("📁 Rejected path:", rej_out)


bad_keys: ['PRAIRIE TRAIL SUITE 107-108"']
✅ Guardadas en Rejected: 6 filas
📁 Rejected path: /Volumes/ct_andresolguin_finalproject/bronze/flatfiles_managed/Rejected/usa/yyyy=2025/mm=10/dd=18/bad_iowa_dataset_20251018


In [0]:
from datetime import datetime

# --- Config ---
catalog = "ct_andresolguin_finalproject"
schema  = "bronze"
country = "usa"
yyyy, mm, dd = "2025","10","18"
file_name = f"iowa_dataset_{yyyy}{mm}{dd}.csv"

base    = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
src_dir = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
dst_dir = f"{base}/Processed/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
src     = f"{src_dir}/{file_name}"
dst     = f"{dst_dir}/{file_name}"

# 1) Releer tamaño y filas del CSV (para log)
items = [x for x in dbutils.fs.ls(src_dir) if x.name == file_name]
assert items, f"No existe el archivo en {src_dir}"
file_size_bytes = items[0].size

df_csv = (spark.read
          .option("header","true")
          .option("inferSchema","false")
          .csv(src))
row_count_csv = df_csv.count()

# 2) Crear tabla de bitácora si no existe
spark.sql("""
CREATE TABLE IF NOT EXISTS ct_andresolguin_finalproject.bronze.file_audit_log (
  country         STRING,
  process_date    DATE,
  file_name       STRING,
  file_size_bytes BIGINT,
  row_count       BIGINT,
  source_path     STRING,
  dest_path       STRING,
  moved_at        TIMESTAMP,
  status          STRING,
  note            STRING
)
USING DELTA
PARTITIONED BY (country, process_date)
""")

# 3) Mover Process -> Processed
dbutils.fs.mkdirs(dst_dir)
dbutils.fs.mv(src, dst, True)

# 4) Insertar registro en bitácora
process_date = f"{yyyy}-{mm}-{dd}"
note = "6 rows sent to Rejected (invoice_line_no inválido)"
spark.sql(f"""
INSERT INTO ct_andresolguin_finalproject.bronze.file_audit_log
(country, process_date, file_name, file_size_bytes, row_count, source_path, dest_path, moved_at, status, note)
VALUES (
  '{country}',
  DATE'{process_date}',
  '{file_name}',
  {file_size_bytes},
  {row_count_csv},
  '{src}',
  '{dst}',
  current_timestamp(),
  'moved',
  '{note}'
)
""")

# 5) Verificación rápida
src_exists = any(x.name == file_name for x in dbutils.fs.ls(src_dir)) if dbutils.fs.ls(src_dir) else False
dst_exists = any(x.name == file_name for x in dbutils.fs.ls(dst_dir))

print("src_exists_after_move:", src_exists)
print("dst_exists_after_move:", dst_exists)

display(spark.sql(f"""
SELECT country, process_date, file_name, row_count, status, moved_at, note
FROM ct_andresolguin_finalproject.bronze.file_audit_log
WHERE country='{country}' AND process_date=DATE'{process_date}'
ORDER BY moved_at DESC
LIMIT 1
"""))


src_exists_after_move: False
dst_exists_after_move: True


country,process_date,file_name,row_count,status,moved_at,note
usa,2025-10-18,iowa_dataset_20251018.csv,15299006,moved,2025-10-20T17:42:52.520Z,6 rows sent to Rejected (invoice_line_no inválido)


In [0]:
from pyspark.sql import functions as F

catalog, schema = "ct_andresolguin_finalproject", "bronze"
country, yyyy, mm, dd = "usa", "2025", "10", "18"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"
src  = f"{base}/Processed/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/iowa_dataset_{yyyy}{mm}{dd}.csv"
out  = f"{base}/Processed/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/split4"

df = (spark.read.option("header","true").option("inferSchema","false").csv(src))
df = df.withColumn("chunk_id", F.pmod(F.abs(F.hash(F.col("invoice_line_no"))), F.lit(4)))

for cid in range(4):
    (df.filter(F.col("chunk_id") == cid)
       .drop("chunk_id").coalesce(1)
       .write.mode("overwrite").option("header","true").csv(f"{out}/chunk={cid}"))

counts = [ (cid, spark.read.option("header","true").csv(f"{out}/chunk={cid}").count()) for cid in range(4) ]
print("Split OK"); [print(f"chunk={cid}: {c:,}") for cid,c in counts]
print("total:", f"{sum(c for _,c in counts):,}"); print("📁", out)


Split OK
chunk=0: 3,821,540
chunk=1: 3,829,292
chunk=2: 3,824,819
chunk=3: 3,823,355
total: 15,299,006
📁 /Volumes/ct_andresolguin_finalproject/bronze/flatfiles_managed/Processed/usa/yyyy=2025/mm=10/dd=18/split4


In [0]:
# BRZ–Prep: asegurar insumo en Process para el job
catalog, schema = "ct_andresolguin_finalproject", "bronze"
country, yyyy, mm, dd = "usa", "2025", "10", "18"
base = f"/Volumes/{catalog}/{schema}/flatfiles_managed"

process_dir   = f"{base}/Process/{country}/yyyy={yyyy}/mm={mm}/dd={dd}"
processed_src = f"{base}/Processed/{country}/yyyy={yyyy}/mm={mm}/dd={dd}/iowa_dataset_{yyyy}{mm}{dd}.csv"
process_dst   = f"{process_dir}/iowa_dataset_{yyyy}{mm}{dd}.csv"

# Listar Process; si no existe, crear carpeta
try:
    files = [x.name for x in dbutils.fs.ls(process_dir)]
    print("[Process] contenido:", files[:5], "… total:", len(files))
    exists = any(n == f"iowa_dataset_{yyyy}{mm}{dd}.csv" for n in files)
except Exception:
    dbutils.fs.mkdirs(process_dir)
    files = []
    exists = False
    print("[Process] carpeta creada:", process_dir)

# Si falta el archivo, copiar desde Processed
if not exists:
    dbutils.fs.cp(processed_src, process_dst, True)
    print("[ACCIÓN] Copiado desde Processed → Process:", process_dst)
else:
    print("[OK] Archivo ya estaba en Process:", process_dst)


[Process] contenido: [] … total: 0
[ACCIÓN] Copiado desde Processed → Process: /Volumes/ct_andresolguin_finalproject/bronze/flatfiles_managed/Process/usa/yyyy=2025/mm=10/dd=18/iowa_dataset_20251018.csv


In [0]:
%sql
USE CATALOG ct_andresolguin_finalproject;
USE SCHEMA bronze;

-- Ajustá year/month si fuera otro período
SELECT COUNT(*) AS bad_county_number_rows
FROM iowa_raw_str
WHERE year = 2025
  AND month = 10
  AND county_number IS NOT NULL
  AND TRIM(county_number) <> ''
  AND county_number RLIKE '[^0-9]';   -- detecta cualquier carácter no numérico


bad_county_number_rows
29775501


In [0]:
%sql
USE CATALOG ct_andresolguin_finalproject;
USE SCHEMA bronze;

SELECT
  county_number,      -- debería ser código numérico, acá veremos textos (p.ej. 'FLOYD')
  county,             -- acá probablemente quedó el código numérico corrido
  zipcode,
  store_location,
  category, category_name,
  vendor_no, vendor_name,
  itemno, im_desc
FROM iowa_raw_str
WHERE year = 2025
  AND month = 10
  AND county_number RLIKE '[^0-9]'   -- casos con letras u otros no numéricos
LIMIT 30;


county_number,county,zipcode,store_location,category,category_name,vendor_no,vendor_name,itemno,im_desc
SCOTT,1012200,52804,82,SCOTCH WHISKIES,260,DIAGEO AMERICAS,905864,HA GAME OF THRONES STARK - DALWHINNIE WINTERS FROST,6
LINN,1082000,52403,57,IMPORTED CORDIALS & LIQUEURS,461,SKYY SPIRITS INC,64636,CAMPARI ITALIAN APERITIVO,12
LYON,1012100,51240,60,CANADIAN WHISKIES,260,DIAGEO AMERICAS,11290,CROWN ROYAL MINI,10
PALO ALTO,1081600,50536,74,WHISKEY LIQUEUR,421,SAZERAC COMPANY INC,64868,FIREBALL CINNAMON WHISKEY,6
JOHNSON,1062100,52338,52,GOLD RUM,461,SKYY SPIRITS INC,43371,APPLETON SIGNATURE BLEND,12
DUBUQUE,1012200,52001,31,SCOTCH WHISKIES,260,DIAGEO AMERICAS,5346,JOHNNIE WALKER RED,12
POLK,1011400,50021,77,TENNESSEE WHISKIES,85,BROWN FORMAN CORP.,86670,JACK DANIELS TENNESSEE HONEY,12
LINN,1082000,52402,57,IMPORTED CORDIALS & LIQUEURS,461,SKYY SPIRITS INC,64790,CYNAR,12
BLACK HAWK,1032100,50647,7,IMPORTED VODKAS,35,BACARDI USA INC,34421,GREY GOOSE VODKA MINI,10
WEBSTER,1011200,50501,94,STRAIGHT BOURBON WHISKIES,65,JIM BEAM BRANDS,27783,JIM BEAM APPLE,12


In [0]:
%sql
USE CATALOG ct_andresolguin_finalproject;
USE SCHEMA bronze;

SELECT COUNT(*) AS suspect_pack_rows
FROM iowa_raw_str
WHERE year = 2025
  AND month = 10
  AND TRIM(pack) IN ('375','750','1000');


suspect_pack_rows
19443263


In [0]:
%sql
USE CATALOG ct_andresolguin_finalproject;
USE SCHEMA bronze;

SELECT
  pack,
  bottle_volume_ml,
  sale_bottles,
  state_bottle_cost,
  state_bottle_retail,
  county_number,  -- debería ser numérico (acá se ve corrido)
  county,         -- acá suele quedar el código
  zipcode,
  itemno, im_desc
FROM iowa_raw_str
WHERE year = 2025
  AND month = 10
  AND TRIM(pack) IN ('375','750','1000')
LIMIT 30;


pack,bottle_volume_ml,sale_bottles,state_bottle_cost,state_bottle_retail,county_number,county,zipcode,itemno,im_desc
750,19.99,59.98,29.99,2,SCOTT,1012200,52804,HA GAME OF THRONES STARK - DALWHINNIE WINTERS FROST,6
750,15.0,45.0,22.5,2,LINN,1082000,52403,CAMPARI ITALIAN APERITIVO,12
750,10.5,15.75,15.75,1,JOHNSON,1062100,52338,APPLETON SIGNATURE BLEND,12
750,13.5,40.5,20.25,2,DUBUQUE,1012200,52001,JOHNNIE WALKER RED,12
750,15.57,1401.6,23.36,60,POLK,1011400,50021,JACK DANIELS TENNESSEE HONEY,12
1000,13.5,20.25,20.25,1,LINN,1082000,52402,CYNAR,12
750,10.5,189.0,15.75,12,WEBSTER,1011200,50501,JIM BEAM APPLE,12
750,23.62,212.58,35.43,6,WOODBURY,1701100,51105,BASIL HAYDEN DARK RYE,6
750,10.5,189.0,15.75,12,POLK,1081300,50322,WILD TURKEY AMERICAN HONEY,12
750,15.59,280.68,23.39,12,POLK,1012100,50021,CROWN ROYAL VANILLA,12


In [0]:
%sql
USE CATALOG ct_andresolguin_finalproject;
USE SCHEMA bronze;

SELECT COUNT(*) AS decimal_sale_bottles_rows
FROM iowa_raw_str
WHERE year = 2025
  AND month = 10
  -- detecta números con parte decimal (ej: 58.50, 360.00, 21.0)
  AND sale_bottles RLIKE '^[0-9]+\\.[0-9]+$';


decimal_sale_bottles_rows
30608018


In [0]:
%sql
USE CATALOG ct_andresolguin_finalproject;
USE SCHEMA bronze;

SELECT
  sale_bottles,
  pack,
  bottle_volume_ml,
  state_bottle_cost,
  state_bottle_retail,
  itemno,
  im_desc
FROM iowa_raw_str
WHERE year = 2025
  AND month = 10
  AND sale_bottles RLIKE '^[0-9]+\\.[0-9]+$'
LIMIT 30;


sale_bottles,pack,bottle_volume_ml,state_bottle_cost,state_bottle_retail,itemno,im_desc
59.98,750,19.99,29.99,2,HA GAME OF THRONES STARK - DALWHINNIE WINTERS FROST,6
45.0,750,15.0,22.5,2,CAMPARI ITALIAN APERITIVO,12
11.03,300,7.35,11.03,1,CROWN ROYAL MINI,10
69.0,1750,15.33,23.0,3,FIREBALL CINNAMON WHISKEY,6
15.75,750,10.5,15.75,1,APPLETON SIGNATURE BLEND,12
40.5,750,13.5,20.25,2,JOHNNIE WALKER RED,12
1401.6,750,15.57,23.36,60,JACK DANIELS TENNESSEE HONEY,12
20.25,1000,13.5,20.25,1,CYNAR,12
30.0,600,20.0,30.0,1,GREY GOOSE VODKA MINI,10
189.0,750,10.5,15.75,12,JIM BEAM APPLE,12


In [0]:
%sql
USE CATALOG ct_andresolguin_finalproject;
USE SCHEMA silver;

/* 1) VÁLIDOS: quedan listos para GOLD */
CREATE OR REPLACE TABLE iowa_clean
USING DELTA
PARTITIONED BY (year, month)
AS
SELECT
  r.*,
  /* tipificar campos claves para downstream */
  TRY_CAST(r.pack               AS INT)    AS pack_int,
  TRY_CAST(r.bottle_volume_ml   AS DOUBLE) AS bottle_volume_ml_d,
  TRY_CAST(r.state_bottle_cost  AS DOUBLE) AS state_bottle_cost_d,
  TRY_CAST(r.state_bottle_retail AS DOUBLE) AS state_bottle_retail_d,
  TRY_CAST(r.sale_bottles       AS INT)    AS sale_bottles_int
FROM ct_andresolguin_finalproject.bronze.iowa_raw_str r
WHERE r.year = 2025 AND r.month = 10
  /* REGLAS DE CALIDAD (todas deben cumplirse) */
  AND r.county_number RLIKE '^[0-9]+$'
  AND TRIM(r.pack) NOT IN ('375','750','1000')
  AND TRY_CAST(r.pack AS INT) BETWEEN 1 AND 60
  AND r.sale_bottles NOT RLIKE '^[0-9]+\\.[0-9]+$'
  AND TRY_CAST(r.bottle_volume_ml AS DOUBLE) >= 50
  AND TRY_CAST(r.state_bottle_cost AS DOUBLE) > 0
  AND TRY_CAST(r.state_bottle_retail AS DOUBLE) > 0
  AND TRY_CAST(r.state_bottle_retail AS DOUBLE) >= TRY_CAST(r.state_bottle_cost AS DOUBLE)
;

/* 2) RECHAZADOS: quedan auditables con motivo */
CREATE OR REPLACE TABLE iowa_rejected
USING DELTA
PARTITIONED BY (year, month)
AS
SELECT
  r.*,
  CASE
    WHEN r.county_number RLIKE '[^0-9]'                             THEN 'COUNTY_NUMBER_NO_NUMERICO'
    WHEN TRIM(r.pack) IN ('375','750','1000')                       THEN 'PACK_DESPLAZADO'
    WHEN TRY_CAST(r.pack AS INT) IS NULL OR TRY_CAST(r.pack AS INT) <= 0 OR TRY_CAST(r.pack AS INT) > 60
                                                                     THEN 'PACK_SOSPECHOSO'
    WHEN r.sale_bottles RLIKE '^[0-9]+\\.[0-9]+$'                   THEN 'SALE_BOTTLES_DECIMAL'
    WHEN TRY_CAST(r.bottle_volume_ml AS DOUBLE) < 50                 THEN 'VOLUME_IRRISORIO'
    WHEN TRY_CAST(r.state_bottle_cost AS DOUBLE) <= 0                THEN 'COSTO_NO_POSITIVO'
    WHEN TRY_CAST(r.state_bottle_retail AS DOUBLE) <= 0              THEN 'RETAIL_NO_POSITIVO'
    WHEN TRY_CAST(r.state_bottle_retail AS DOUBLE) < TRY_CAST(r.state_bottle_cost AS DOUBLE)
                                                                     THEN 'RETAIL_MENOR_QUE_COSTO'
    ELSE 'OTRA_INCONSISTENCIA'
  END AS issue_code
FROM ct_andresolguin_finalproject.bronze.iowa_raw_str r
WHERE r.year = 2025 AND r.month = 10
  AND (
    r.county_number RLIKE '[^0-9]'
    OR TRIM(r.pack) IN ('375','750','1000')
    OR TRY_CAST(r.pack AS INT) IS NULL OR TRY_CAST(r.pack AS INT) <= 0 OR TRY_CAST(r.pack AS INT) > 60
    OR r.sale_bottles RLIKE '^[0-9]+\\.[0-9]+$'
    OR TRY_CAST(r.bottle_volume_ml AS DOUBLE) < 50
    OR TRY_CAST(r.state_bottle_cost AS DOUBLE) <= 0
    OR TRY_CAST(r.state_bottle_retail AS DOUBLE) <= 0
    OR TRY_CAST(r.state_bottle_retail AS DOUBLE) < TRY_CAST(r.state_bottle_cost AS DOUBLE)
  )
;


num_affected_rows,num_inserted_rows


In [0]:
%sql
USE CATALOG ct_andresolguin_finalproject;
USE SCHEMA silver;

SELECT 'iowa_clean'    AS tabla, COUNT(*) AS filas
FROM iowa_clean
WHERE year = 2025 AND month = 10
UNION ALL
SELECT 'iowa_rejected' AS tabla, COUNT(*) AS filas
FROM iowa_rejected
WHERE year = 2025 AND month = 10;


tabla,filas
iowa_clean,86231992
iowa_rejected,35330608


## Cierre — Resultado de ejecución
- Imprime resumen: cantidad de archivos válidos.  
- Devuelve estado `OK` (o `WARN` si se ejecuta en “dry run”).


In [0]:
import traceback

try:
    moved = []
    # reutiliza csvs creados en la celda de validación
    archive_dir = arch_dir
    dbutils.fs.mkdirs(archive_dir)
    for f in csvs:
        dst = f"{archive_dir}/{f.name}"
        dbutils.fs.mv(f.path, dst, True)
        moved.append(dst)

    print(f"Archivados {len(moved)} archivo(s):")
    for m in moved: print("  -", m)
    dbutils.notebook.exit("OK")
except Exception as e:
    print("WARN: no se pudieron archivar todos los archivos.")
    print(traceback.format_exc())
    dbutils.notebook.exit("WARN")
