In [4]:
# === ETL completo: Región, Familia, Empresa transformadora (CON DISTRIBUCIÓN POR PORCENTAJE) ===
import pandas as pd
import numpy as np
import unicodedata
import re


In [5]:
# --- Utilidades ---
def _norm(s):
    s = "" if pd.isna(s) else str(s).strip()
    s = unicodedata.normalize("NFD", s)
    s = "".join(ch for ch in s if unicodedata.category(ch) != "Mn")
    for ch in "()[]{}.,;:-_/\\\"'":
        s = s.replace(ch, " ")
    return " ".join(s.lower().split())

In [6]:

def clean_text(s):
    if pd.isna(s):
        return ""
    s = str(s).lower()
    s = re.sub(r"\(.*?\)", "", s)
    s = re.sub(r"[^a-z0-9áéíóúñ\s\-]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

In [7]:
def map_family(txt):
    t = clean_text(txt)
    if not t:
        return ("Papel", "baja") 
    if "pet" in t or "tereftalato" in t:
        return ("PET", "alta")
    if "tetra" in t or "bebidas" in t:
        return ("Cartón para bebidas", "alta")
    if any(w in t for w in ["cartón","carton", "kraft", "kraf", "plegad", "corrugado", "empaque de carton"]):
        return ("Cartón", "alta")
    if any(w in t for w in ["papel", "archivo", "periodico", "pasta", "bond", "impresion"]):
        return ("Papel", "alta")
    if "vidrio" in t:
        return ("Vidrio", "alta")
    if any(w in t for w in ["polietileno alta", "hdpe", "polietileno de alta", "alta densidad"]):
        return ("PE - PP Rígido", "alta")
    if "polipropileno" in t or re.search(r"\bpp\b", t) or "rigido" in t:
        return ("PE - PP Rígido", "alta")
    if any(w in t for w in ["polietileno baja", "ldpe", "polietileno de baja", "baja densidad", "flexible", "pelicula", "bolsa"]):
        return ("PE - PP Flexible", "alta")
    if any(w in t for w in ["aluminio", "cobre", "bronce", "lata", "no ferroso", "no ferr"]):
        return ("Metales No Ferrosos", "alta")
    if any(w in t for w in ["acero", "chatarra", "hierro", "ferroso", "ferr", "metal"]):
        return ("Metales Ferrosos", "media")
    if any(w in t for w in ["pvc", "policloruro", "plástico", "plastico", "plastificado", "acrilico", "poliestireno", "ps", "polimero"]):
        return ("Otros Plásticos", "media")
    if any(w in t for w in ["mader", "madera", "textil", "tela"]):
        return ("Papel", "baja")
    if "metal" in t:
        return ("Metales Ferrosos", "baja")
    if any(w in t for w in ["poliet", "poli"]):
        return ("Otros Plásticos", "baja")
    return ("Papel", "baja")

In [8]:
def _cargar_hoja(xls, opciones):
    opciones_norm = {_norm(o): o for o in opciones}
    for sh in xls.sheet_names:
        if _norm(sh) in opciones_norm:
            return pd.read_excel(xls, sh)
    for o in opciones:
        try:
            return pd.read_excel(xls, o)
        except Exception:
            continue
    return None

In [9]:

def _renombrar_reporte_inicial(df):
    alias = {
        "nombre del gestor": "Gestor",
        "nit": "NIT",
        "fecha de transaccion dd mm aaaa": "Fecha de transacción",
        "fecha de transaccion": "Fecha de transacción",
        "no de factura res dian": "No de factura",
        "no de factura": "No de factura",
        "ciudad o municipio de ubicacion de la eca": "Municipio",
        "municipio": "Municipio",
        "tipo de material entregado": "Tipo de Material",
        "tipo de material": "Tipo de Material",
        "material": "Tipo de Material",
        "cantidad entregada kg": "Cantidad entregada (Kg)",
        "intermediario o empresa transformadora receptora": "Intermediario 1",
        "intermediario o empresa transformadora": "Intermediario 1",
        "intermediario 1": "Intermediario 1",
        "nit de intermediario o transformador": "NIT del Comprador",
        "nit del comprador": "NIT del Comprador",
    }
    new_cols = {}
    for c in df.columns:
        nc = _norm(c)
        if nc in alias:
            new_cols[c] = alias[nc]
    df = df.rename(columns=new_cols).copy()
    requeridas = [
        "Gestor","Fecha de transacción","No de factura","Municipio",
        "Tipo de Material","Cantidad entregada (Kg)","Intermediario 1","NIT del Comprador"
    ]
    faltan = [c for c in requeridas if c not in df.columns]
    if faltan:
        raise ValueError("Faltan columnas requeridas en 'Reporte Inicial': " + ", ".join(faltan) + f". Encabezados leídos: {list(df.columns)}")
    return df

In [10]:

def _renombrar_factor_ee(df):
    alias = {
        "material": "Tipo de Material",
        "tipo de material": "Tipo de Material",
        "factor envases y empaques": "Factor Envases y empaques",
        "factor ee": "Factor Envases y empaques",
        "factor e e": "Factor Envases y empaques",
    }
    new_cols = {}
    for c in df.columns:
        nc = _norm(c)
        if nc in alias:
            new_cols[c] = alias[nc]
    df = df.rename(columns=new_cols).copy()
    if "Tipo de Material" not in df.columns:
        raise ValueError("La hoja de factores no tiene la columna 'Material' o 'Tipo de Material'.")
    if "Factor Envases y empaques" not in df.columns:
        df["Factor Envases y empaques"] = np.nan
    return df[["Tipo de Material","Factor Envases y empaques"]].drop_duplicates()

In [11]:

NORMALIZACION_MUNS = {
    '5001': 'MEDELLÍN',
    '5088': 'BELLO', 
    '5615': 'RIONEGRO', 
    'CALI': 'SANTIAGO DE CALI', 
    'CÚCUTA': 'SAN JOSÉ DE CÚCUTA', 
    'Cúcuta': 'SAN JOSÉ DE CÚCUTA',
    'TOLU': 'SANTIAGO DE TOLÚ',
    "MEDELLIN": "MEDELLÍN",
    "ITAGUI": "ITAGÜÍ",
    "CALARCA": "CALARCÁ",
    "CARMEN DE APICALA": "CARMEN DE APICALÁ",
    "FACATATIVA": "FACATATIVÁ",
    "CUCUTA": "SAN JOSÉ DE CÚCUTA",
    "SOLEDAD ": "SOLEDAD",
    "BOGOTA": "BOGOTÁ, D.C.",
    "BOGOTA, D.C.": "BOGOTÁ, D.C.",
    "IBAGUE": "IBAGUÉ",
    "SANTIAGO DE TOLU": "SANTIAGO DE TOLÚ",
    "DOS QUEBRADAS": "DOSQUEBRADAS",
    "VILLAO": "VILLAVICENCIO",
    "PATIOS": "LOS PATIOS"
}


In [12]:
def _normalizar_municipio_valor(v):
    if pd.isna(v):
        return v
    v2 = str(v).strip().upper()
    if v2 in NORMALIZACION_MUNS:
        return NORMALIZACION_MUNS[v2]
    return v2.title() if v2.isalpha() else v2

In [13]:
def _agregar_clave_municipio(df, col_mun="Municipio"):
    df[col_mun] = df[col_mun].apply(_normalizar_municipio_valor)
    df["mun_key"] = df[col_mun].apply(_norm)
    return df

In [14]:
def _cargar_mapa_regiones(regiones_path):
    reg = pd.read_excel(regiones_path)
    ren = {}
    for c in reg.columns:
        nc = _norm(c)
        if nc in ("municipio", "municipio normalizado", "ciudad o municipio", "municipios"):
            ren[c] = "Municipio"
        elif nc in ("region", "regiones", "region pdet"):
            ren[c] = "Región"
    reg = reg.rename(columns=ren)
    if "Municipio" not in reg.columns or "Región" not in reg.columns:
        raise ValueError(f"El archivo de regiones debe tener columnas 'Municipio' y 'Región'. Columnas encontradas: {list(reg.columns)}")
    reg = _agregar_clave_municipio(reg, "Municipio")
    reg = reg.sort_values(["mun_key", "Región"], na_position="last").drop_duplicates(subset=["mun_key"], keep="first")
    return reg[["mun_key", "Región"]]

In [15]:
# --- ETL principal ---
xls_path = "Copia de Analista de datos_prueba excel.xlsx"
regiones_path = "regiones.xlsx"

xls = pd.ExcelFile(xls_path)
rep_ini = _cargar_hoja(xls, ["Reporte Inicial","Reporte inicial","reporte inicial"])
if rep_ini is None:
    raise ValueError(f"No se encontró la hoja 'Reporte Inicial'. Hojas disponibles: {xls.sheet_names}")
iniciativas = _cargar_hoja(xls, ["Iniciativas","iniciativas"])
factor_ee = _cargar_hoja(xls, ["Factor Envases y empaques","Factor envases y empaques","FactorEE"])
porcentajes_empresas = _cargar_hoja(xls, ["Porcentajes empresas","Porcentajes Empresas","porcentajes empresas"])
precert = _cargar_hoja(xls, ["Condiciones precertificación","Condiciones precertificacion","Precertificacion"])

In [16]:

# Reporte base
df = _renombrar_reporte_inicial(rep_ini)
df = _agregar_clave_municipio(df, "Municipio")
reg_map = _cargar_mapa_regiones(regiones_path)
df = df.merge(reg_map, on="mun_key", how="left")

In [17]:
# Fechas y métricas base
df["Cantidad entregada (Kg)"] = pd.to_numeric(df["Cantidad entregada (Kg)"], errors="coerce")
df["Fecha de transacción"] = pd.to_datetime(df["Fecha de transacción"], dayfirst=True, errors="coerce")
df["Año"] = df["Fecha de transacción"].dt.year
df["Periodo"] = df["Fecha de transacción"].dt.to_period("M").astype(str)
df["Fecha de transacción"] = df["Fecha de transacción"].dt.strftime("%Y/%m/%d")

In [18]:
# Iniciativa por Gestor
if iniciativas is not None:
    iniciativas = iniciativas.rename(columns={"Gestor":"Gestor","Iniciativa":"Iniciativa"})
    df = df.merge(iniciativas[["Gestor","Iniciativa"]].drop_duplicates(), on="Gestor", how="left")

In [19]:
# Familia desde heurística
fam = df["Tipo de Material"].apply(lambda x: pd.Series(map_family(x), index=["Familia","confianza_familia"]))
df = pd.concat([df, fam], axis=1)

In [20]:
# Factor E&E
if factor_ee is not None:
    fe = _renombrar_factor_ee(factor_ee)
    df = df.merge(fe, on="Tipo de Material", how="left")
else:
    df["Factor Envases y empaques"] = np.nan

In [21]:
# IMPORTANTE: Guardar cantidades originales ANTES de la distribución por empresa
df["Cantidad_Original_Kg"] = df["Cantidad entregada (Kg)"]
df["Cantidad entregada (Ton)"] = df["Cantidad entregada (Kg)"] / 1000.0
df["Cantidad de E&E (Ton)"] = df["Cantidad entregada (Ton)"] * df["Factor Envases y empaques"]

In [22]:
# Certificación
if precert is not None:
    pr = precert.rename(columns={
        "Intermediario /Transformador":"Intermediario 1",
        "Intermediario/Transformador":"Intermediario 1",
        "Material":"Tipo de Material"
    })
    pr = pr[["Intermediario 1","Tipo de Material"]].drop_duplicates()
    df = df.merge(pr, on=["Intermediario 1","Tipo de Material"], how="left", indicator="i_prec")
    df["Certificado"] = df["i_prec"].eq("both")
    df = df.drop(columns=["i_prec"])
else:
    df["Certificado"] = False

In [23]:
# *** CAMBIO CLAVE: Empresa transformadora con DISTRIBUCIÓN por porcentaje ***
if porcentajes_empresas is not None:
    pe = porcentajes_empresas.copy()
    ren = {}
    for c in pe.columns:
        nc = _norm(c)
        if nc in ("iniciativa",):
            ren[c] = "Iniciativa"
        elif nc in ("familia de material", "familia", "familia material"):
            ren[c] = "Familia"
        elif nc in ("empresa", "empresa transformadora"):
            ren[c] = "Empresa transformadora"
        elif nc in ("porcentaje de asignacion", "porcentaje", "porcentaje asignacion"):
            ren[c] = "Porcentaje de asignación"
    pe = pe.rename(columns=ren)
    if "Porcentaje de asignación" in pe.columns:
        pe["Porcentaje de asignación"] = pd.to_numeric(pe["Porcentaje de asignación"], errors="coerce")
    else:
        pe["Porcentaje de asignación"] = 1.0
    
    # Hacer merge que EXPANDE las filas: cada factura se replica por cada empresa
    df = df.merge(pe[["Iniciativa","Familia","Empresa transformadora","Porcentaje de asignación"]], 
                  on=["Iniciativa","Familia"], how="left")
    
    # Distribuir cantidades proporcionalmente según el porcentaje de cada empresa
    df["Cantidad entregada (Kg)"] = df["Cantidad_Original_Kg"] * df["Porcentaje de asignación"]
    df["Cantidad entregada (Ton)"] = df["Cantidad entregada (Kg)"] / 1000.0
    df["Cantidad de E&E (Ton)"] = df["Cantidad entregada (Ton)"] * df["Factor Envases y empaques"]
else:
    df["Empresa transformadora"] = np.nan
    df["Porcentaje de asignación"] = np.nan

In [24]:
# Columnas finales
cols = [
    "Año","Periodo","Región","Municipio","Iniciativa","Fecha de transacción","No de factura",
    "Tipo de Material","Familia","Cantidad entregada (Kg)","Cantidad entregada (Ton)","Cantidad de E&E (Ton)",
    "Intermediario 1","Empresa transformadora","Certificado","Gestor","NIT del Comprador"
]
for c in cols:
    if c not in df.columns:
        df[c] = np.nan
        
reporte_solicitado = df[cols].sort_values(["Año","Periodo","Iniciativa","Municipio","No de factura"])

In [25]:
# Agregados
agg_tipo = reporte_solicitado.groupby(["Año","Periodo","Iniciativa","Tipo de Material"], as_index=False).agg({
    "Cantidad entregada (Kg)":"sum",
    "Cantidad entregada (Ton)":"sum",
    "Cantidad de E&E (Ton)":"sum"
})
agg_familia = reporte_solicitado.groupby(["Año","Periodo","Iniciativa","Familia"], as_index=False).agg({
    "Cantidad entregada (Kg)":"sum",
    "Cantidad entregada (Ton)":"sum",
    "Cantidad de E&E (Ton)":"sum"
})

In [26]:
# Guardar
reporte_solicitado.to_csv("reporte_solicitado.csv", index=False, encoding="utf-8-sig")
agg_tipo.to_csv("agg_por_tipo.csv", index=False, encoding="utf-8-sig")
agg_familia.to_csv("agg_por_familia.csv", index=False, encoding="utf-8-sig")