<h3> ✅ Este notebook NO crea Silver.
Solo selecciona columnas con reglas + métricas y genera el CSV de selección para producción.</h3>

In [None]:
from pathlib import Path
import pandas as pd
import numpy as np
import duckdb

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# === RUTAS (ajusta si cambian) ===
DUCKDB_PATH = r"C:\Users\serrios\01_prueba_analitico_4\op_cobro\database\analytics.duckdb"
OUT_PATH = Path(r"C:\Users\serrios\01_prueba_analitico_4\op_cobro\data\process_files\selected_canales_columns.csv")

# === DECISIONES (tuyas) ===
KEEP_WINDOWS = {"ult6", "ult12"}
KEEP_PREFIXES = ("sum_trx_", "avg_trx_")
DROP_SMMLV = True

TARGET_MIN = 120
TARGET_MAX = 150
TARGET_N = 140

# === FILTROS AUTOMÁTICOS (técnicos) ===
MIN_NONZERO_RATE = 0.01  # 1%
MIN_VARIANCE = 0.0

KEYS = ["num_doc", "obl17", "f_analisis"]

In [None]:
spark = (
    SparkSession.builder
    .appName("channel_select")
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.sql.catalogImplementation", "in-memory")  # evita problemas de metastore
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")
print("✅ Spark listo")

In [None]:
con = duckdb.connect(DUCKDB_PATH, read_only=True)

# Traer columnas desde DuckDB para construir SELECT eficiente
cols = con.execute("DESCRIBE raw.canales").df()["column_name"].tolist()
print("✅ Columnas en raw.canales:", len(cols))

# (Opcional) limitar filas para que el scoring sea rápido en notebook
# Puedes subirlo luego a 20k/50k si quieres más estabilidad.
LIMIT_ROWS = 20000

query = f"SELECT * FROM raw.canales LIMIT {LIMIT_ROWS}"
pdf = con.execute(query).df()
con.close()

print("✅ Muestra cargada en pandas:", pdf.shape)

# pasar a Spark
df = spark.createDataFrame(pdf)
print("✅ DataFrame Spark creado. Rows:", df.count(), "Cols:", len(df.columns))

In [None]:
all_cols = [c for c in df.columns if c not in KEYS]

def is_candidate(c: str) -> bool:
    tokens = c.split("_")
    if DROP_SMMLV and "smmlv" in tokens:
        return False

    # totales base
    if c in ("trx_cnt_total", "trx_mnt_total"):
        return True

    # totales por canal (base)
    if c.startswith("trx_cnt_can_") or c.startswith("trx_mnt_can_"):
        return True

    # stats por ventana
    if c.startswith(KEEP_PREFIXES) and tokens[-1] in KEEP_WINDOWS:
        return True

    return False

candidates = [c for c in all_cols if is_candidate(c)]
print("✅ Candidatas por reglas:", len(candidates))
candidates[:25]

In [4]:
con.close()