
<div style="background-color:#CCCCCC; padding:12px; border-radius:8px;">
<h1 style="color:#003366; text-align:center; margin:8px 0;">Revisión y limpieza completa de 3 DataFrames</h1>
<p style="text-align:center; color:#003366; margin:0;"><em>Notebook docente en castellano — funciones en snake_case — flujo: leer → revisar → limpiar → salvar</em></p>
</div>



<div style="background-color:#CCCCCC; padding:8px; border-radius:6px;">
<h2 style="color:black; text-align:center;">Resumen y objetivos</h2>
<ul style="color:black;">
<li>Leer los 3 CSV: <code>marketing.csv</code>, <code>ventas.csv</code>, <code>clientes.csv</code> (busca en <code>./data_in/</code> y en <code>/mnt/data/</code>).</li>
<li>Aplicar limpieza automática y parametrizable: quitar acentos, normalizar mayúsculas/minúsculas, convertir numéricos, parsear fechas, eliminar duplicados, marcar y guardar reportes antes/después.</li>
<li>Guardar los datasets limpios en <code>./reportes/limpios/</code> y empaquetar reportes.</li>
</ul>
<p style="color:black;">Pega y ejecuta las celdas en orden. Las funciones están documentadas y listas para usar en clases o trabajos prácticos.</p>
</div>


In [None]:
# Imports y configuración de rutas
import os
from pathlib import Path
import json
import unicodedata
import zipfile
from datetime import datetime

import pandas as pd
import numpy as np

# Rutas: prioriza ./data_in, luego /mnt/data
carpeta_entrada_local = Path('./data_in')
carpeta_entrada_mnt = Path('/mnt/data')
carpeta_reportes = Path('./reportes')
carpeta_limpios = carpeta_reportes / 'limpios'
carpeta_reportes.mkdir(parents=True, exist_ok=True)
carpeta_limpios.mkdir(parents=True, exist_ok=True)

# Nombres esperados
archivo_marketing = 'marketing.csv'
archivo_ventas = 'ventas.csv'
archivo_clientes = 'clientes.csv'

def ruta_entrada(nombre_archivo):
    p_local = carpeta_entrada_local / nombre_archivo
    if p_local.exists():
        return p_local
    p_mnt = carpeta_entrada_mnt / nombre_archivo
    if p_mnt.exists():
        return p_mnt
    return None

print('Rutas configuradas. Comprueba ./data_in/ y /mnt/data/ para los CSV.')

In [None]:
# ---------- Utilidades básicas ----------
def sacar_acentos(texto):
    """Elimina acentos (tildes) y diacríticos de un texto. Mantiene NaN intactos."""
    if pd.isna(texto):
        return texto
    texto = str(texto)
    nk = unicodedata.normalize('NFKD', texto)
    return ''.join([c for c in nk if not unicodedata.combining(c)])

TOKENS_VALOR_FALTANTE = {
    '', 'na', 'n/a', 'null', 'none', 'sin dato', 's/d', 'nd', '-', '--', '?', 'sin_dato', 'n/d', 'n.d.'
}

def es_valor_faltante(valor):
    """Devuelve True si el valor representa un faltante (NaN) según tokens o es NaN."""
    if pd.isna(valor):
        return True
    s = str(valor).strip().lower()
    s = sacar_acentos(s)
    return s in TOKENS_VALOR_FALTANTE

def guardar_csv(df, ruta):
    """Guarda un DataFrame en ruta (Path o str). Crea directorio padre si hace falta."""
    ruta = Path(ruta)
    ruta.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(ruta, index=False, encoding='utf-8')
    return ruta

def mostrar_resumen_pequeno(df, n=5):
    """Muestra un pequeño resumen para inspección rápida."""
    print('Dimensiones:', df.shape)
    display(df.head(n))


In [None]:
# ---------- Detección de outliers ----------
def mascara_valores_atipicos_rango_intercuartil(serie_datos):
    """Devuelve máscara booleana (True = outlier) según IQR."""
    serie_limpia = serie_datos.dropna().astype(float)
    if serie_limpia.shape[0] < 4:
        return pd.Series([False] * len(serie_datos), index=serie_datos.index)
    q1 = serie_limpia.quantile(0.25)
    q3 = serie_limpia.quantile(0.75)
    iqr = q3 - q1
    limite_inferior = q1 - 1.5 * iqr
    limite_superior = q3 + 1.5 * iqr
    return (serie_datos < limite_inferior) | (serie_datos > limite_superior)

def mascara_valores_atipicos_zscore(serie_datos, umbral=3.0):
    """Devuelve máscara booleana (True = outlier) según Z-score."""
    serie_limpia = serie_datos.dropna().astype(float)
    if serie_limpia.shape[0] < 4 or serie_limpia.std() == 0:
        return pd.Series([False] * len(serie_datos), index=serie_datos.index)
    puntaje_z = (serie_datos - serie_limpia.mean()) / serie_limpia.std()
    return puntaje_z.abs() > umbral

print('Funciones de outliers cargadas.')

In [None]:
# ---------- Detección de problemas en DataFrame ----------
def detectar_problemas_en_dataframe(df: pd.DataFrame):
    """Detecta nulos, duplicados, problemas textuales, outliers y parseabilidad de fechas."""
    resumen = {}
    resumen['filas'] = df.shape[0]
    resumen['columnas'] = df.shape[1]
    resumen['nulos_por_columna'] = df.isna().sum().to_dict()
    dup_mask = df.duplicated(keep=False)
    resumen['duplicados_exactos'] = int(dup_mask.sum())

    chequeos_por_columna = {}
    for col in df.columns:
        ser = df[col]
        info = {'dtype': str(ser.dtype), 'nulos': int(ser.isna().sum())}
        if ser.dtype == object or pd.api.types.is_string_dtype(ser):
            s = ser.astype(str)
            info['espacios_inicio'] = int(s.str.match(r'^\s+').sum())
            info['espacios_final'] = int(s.str.match(r'\s+$').sum())
            try:
                unique_original = set(s.dropna().unique())
                unique_lower = set(s.dropna().str.lower().unique())
                info['unique_original'] = len(unique_original)
                info['unique_lower'] = len(unique_lower)
                info['variantes_mayusculas'] = len(unique_lower) < len(unique_original)
            except Exception:
                info['unique_original'] = ser.nunique(dropna=True)
                info['unique_lower'] = None
                info['variantes_mayusculas'] = None
            try:
                unaccented = s.dropna().map(lambda x: sacar_acentos(x).lower())
                groups = unaccented.groupby(unaccented).size()
                conflicts = groups[groups > 1]
                info['grupos_variantes_acentos'] = int(conflicts.shape[0])
                ejemplos = {}
                if not conflicts.empty:
                    for val in conflicts.index[:5]:
                        originales = sorted(list(s[unaccented == val].unique())[:10])
                        ejemplos[val] = originales
                info['ejemplos_variantes_acentos'] = ejemplos
            except Exception:
                info['grupos_variantes_acentos'] = None
                info['ejemplos_variantes_acentos'] = {}
            info['tokens_aparente_faltante'] = int(s.map(lambda x: str(x).strip().lower()).map(lambda v: sacar_acentos(v) in TOKENS_VALOR_FALTANTE).sum())
            info['muestras'] = list(s.dropna().unique()[:10])
        else:
            if pd.api.types.is_numeric_dtype(ser):
                s_f = ser.dropna().astype(float)
                info['media'] = float(s_f.mean()) if not s_f.empty else None
                info['std'] = float(s_f.std()) if not s_f.empty else None
                info['min'] = float(s_f.min()) if not s_f.empty else None
                info['max'] = float(s_f.max()) if not s_f.empty else None
                if len(s_f) >= 4:
                    q1 = s_f.quantile(0.25)
                    q3 = s_f.quantile(0.75)
                    iqr = q3 - q1
                    lb = q1 - 1.5 * iqr
                    ub = q3 + 1.5 * iqr
                    out_iqr = (s_f < lb) | (s_f > ub)
                    info['outliers_iqr'] = int(out_iqr.sum())
                    info['limites_iqr'] = (float(lb), float(ub))
                else:
                    info['outliers_iqr'] = None
                    info['limites_iqr'] = None
                if len(s_f) >= 4 and s_f.std() != 0:
                    z = (s_f - s_f.mean()) / s_f.std()
                    info['outliers_z'] = int((z.abs() > 3).sum())
                else:
                    info['outliers_z'] = None
            else:
                parsed = pd.to_datetime(ser, errors='coerce', dayfirst=True)
                info['fechas_parseables'] = int(parsed.notna().sum())
                info['muestras'] = list(ser.dropna().unique()[:10])
        chequeos_por_columna[col] = info

    filas_problemas = []
    for idx, fila in df.iterrows():
        lista_problemas = []
        if dup_mask.loc[idx]:
            lista_problemas.append('duplicado_exacto')
        for col in df.columns:
            val = fila[col]
            # heurísticas textuales
            if pd.api.types.is_string_dtype(type(val)) or isinstance(val, str) or (not pd.isna(val) and not pd.api.types.is_numeric_dtype(type(val)) and str(chequeos_por_columna[col].get('dtype','')).startswith('object')):
                s = str(val)
                if s != s.strip():
                    lista_problemas.append(f'espacios_en_columna_{col}')
                if chequeos_por_columna[col].get('variantes_mayusculas'):
                    if s and s != s.lower() and s.lower() in [str(x).lower() for x in df[col].dropna().unique()]:
                        lista_problemas.append(f'inconsistencia_mayusculas_columna_{col}')
                if chequeos_por_columna[col].get('grupos_variantes_acentos') and chequeos_por_columna[col]['grupos_variantes_acentos'] > 0:
                    try:
                        un = sacar_acentos(s).lower()
                        group_vals = [x for x in chequeos_por_columna[col].get('muestras', []) if sacar_acentos(str(x)).lower() == un]
                        if group_vals and any(sacar_acentos(str(x)).lower() != sacar_acentos(s).lower() for x in group_vals):
                            lista_problemas.append(f'variantes_acentos_columna_{col}')
                    except Exception:
                        pass
                if es_valor_faltante(s):
                    lista_problemas.append(f'token_faltante_columna_{col}')
            else:
                # heurísticas numéricas
                try:
                    fval = float(val)
                    info_col = chequeos_por_columna[col]
                    limites = info_col.get('limites_iqr')
                    if limites and (fval < limites[0] or fval > limites[1]):
                        lista_problemas.append(f'outlier_iqr_columna_{col}')
                    if info_col.get('std') not in (None, 0):
                        mean = info_col.get('media')
                        std = info_col.get('std')
                        if std and abs((fval - mean) / std) > 3:
                            lista_problemas.append(f'outlier_z_columna_{col}')
                except Exception:
                    pass
        if lista_problemas:
            filas_problemas.append({'row_index': idx, 'problemas': ';'.join(sorted(set(lista_problemas))), 'muestra': json.dumps({str(c): str(fila[c]) for c in df.columns[:8]})})
    df_problemas = pd.DataFrame(filas_problemas)
    return resumen, chequeos_por_columna, df_problemas

print('Función detectar_problemas_en_dataframe cargada.')

In [None]:
# ---------- Aplicar reglas por columna y limpieza principal ----------
def aplicar_regla_columna(serie, regla):
    """Aplica regla a la serie. Mantiene firma esperada."""
    tipo, opts = regla if isinstance(regla, tuple) else (regla, {})
    opts = opts or {}
    s = serie.copy()
    if tipo == 'strip':
        s = s.map(lambda x: str(x).strip() if not pd.isna(x) else x)
    elif tipo == 'lower':
        s = s.map(lambda x: str(x).strip().lower() if not pd.isna(x) else x)
        if opts.get('normalizar_acentos'):
            s = s.map(lambda x: sacar_acentos(x) if not pd.isna(x) else x)
    elif tipo == 'upper':
        s = s.map(lambda x: str(x).strip().upper() if not pd.isna(x) else x)
        if opts.get('normalizar_acentos'):
            s = s.map(lambda x: sacar_acentos(x) if not pd.isna(x) else x)
    elif tipo == 'title':
        s = s.map(lambda x: str(x).strip().title() if not pd.isna(x) else x)
        if opts.get('normalizar_acentos'):
            s = s.map(lambda x: sacar_acentos(x) if not pd.isna(x) else x)
    elif tipo == 'quitar_acentos':
        s = s.map(lambda x: sacar_acentos(x).strip() if not pd.isna(x) else x)
    elif tipo == 'numeric':
        def to_num(v):
            if pd.isna(v):
                return np.nan
            t = str(v).strip()
            if opts.get('remove_non_digits', False):
                t = ''.join([c for c in t if c.isdigit() or c in '.-'])
            if opts.get('remove_thousands', False):
                sep = opts.get('thousands_separator', ',')
                if sep == ',':
                    t = t.replace('.', '').replace(',', '')
                else:
                    t = t.replace(',', '').replace('.', '')
            try:
                val = pd.to_numeric(t, errors='coerce')
                if opts.get('as_int', False):
                    if pd.isna(val):
                        return pd.NA
                    try:
                        return int(val)
                    except Exception:
                        return pd.NA
                return float(val) if not pd.isna(val) else np.nan
            except Exception:
                return np.nan
        s = s.map(to_num)
    elif tipo == 'date':
        formatos = opts.get('formats', [])
        dayfirst = opts.get('dayfirst', True)
        def to_date(v):
            if pd.isna(v):
                return pd.NaT
            t = str(v).strip()
            for fmt in formatos:
                try:
                    return pd.to_datetime(datetime.strptime(t, fmt))
                except Exception:
                    continue
            try:
                return pd.to_datetime(t, dayfirst=dayfirst, errors='coerce')
            except Exception:
                return pd.NaT
        s = s.map(to_date)
    else:
        if s.dtype == object or pd.api.types.is_string_dtype(s):
            s = s.map(lambda x: np.nan if es_valor_faltante(x) else (str(x).strip() if not pd.isna(x) else x))
    return s

def limpiar_dataframe(df, reglas_por_columna=None):
    """Limpieza principal: aplica reglas, trim por defecto, elimina duplicados y postprocesa tipos."""
    reglas_por_columna = reglas_por_columna or {}
    df2 = df.copy()
    df2.columns = [str(c).strip() for c in df2.columns]
    for col in df2.columns:
        if col in reglas_por_columna:
            df2[col] = aplicar_regla_columna(df2[col], reglas_por_columna[col])
        else:
            if df2[col].dtype == object or pd.api.types.is_string_dtype(df2[col]):
                df2[col] = df2[col].map(lambda x: np.nan if es_valor_faltante(x) else (str(x).strip() if not pd.isna(x) else x))
    df2 = df2.drop_duplicates(keep='first').reset_index(drop=True)
    # post-conversion simple
    for col, regla in (reglas_por_columna or {}).items():
        tipo = regla if not isinstance(regla, tuple) else regla[0]
        opts = {} if not isinstance(regla, tuple) else regla[1] or {}
        if tipo == 'numeric':
            df2[col] = pd.to_numeric(df2[col], errors='coerce')
            if opts.get('as_int', False):
                try:
                    df2[col] = df2[col].astype('Int64')
                except Exception:
                    pass
        elif tipo == 'date':
            formatos = opts.get('formats', [])
            dayfirst = opts.get('dayfirst', True)
            parsed = pd.Series(pd.NaT, index=df2.index)
            for fmt in formatos:
                try:
                    mask = parsed.isna()
                    parsed.loc[mask] = pd.to_datetime(df2.loc[mask, col].astype(str), format=fmt, errors='coerce')
                except Exception:
                    pass
            still_na = parsed.isna()
            if still_na.any():
                parsed.loc[still_na] = pd.to_datetime(df2.loc[still_na, col].astype(str), dayfirst=dayfirst, errors='coerce')
            if opts.get('format_output') == 'YYYY/MM/DD':
                df2[col] = parsed.dt.strftime('%Y/%m/%d')
            else:
                df2[col] = parsed
    return df2

print('Funciones de limpieza cargadas.')

In [None]:
# ---------- Bucle/menu principal para procesar diccionario de DataFrames ----------
def menu_procesar_diccionario(dic_frames, reglas_por_archivo):
    """Recorre dic_frames (nombre_archivo -> DataFrame), detecta problemas antes/después, limpia y guarda."""
    for nombre_archivo, df_actual in dic_frames.items():
        resumen_antes, chequeos_antes, problemas_antes = detectar_problemas_en_dataframe(df_actual)
        print(f"\n=== RESUMEN ANTES: {nombre_archivo} ===\n")
        print(resumen_antes)
        print('\nMuestras problemas antes:') 
        display(problemas_antes.head(5) if not problemas_antes.empty else 'No se detectaron filas con problemas.')

        reglas = reglas_por_archivo.get(nombre_archivo, {})
        df_limpio = limpiar_dataframe(df_actual, reglas_por_columna=reglas)

        resumen_despues, chequeos_despues, problemas_despues = detectar_problemas_en_dataframe(df_limpio)
        print(f"\n=== RESUMEN DESPUÉS: {nombre_archivo} ===\n")
        print(resumen_despues)
        print('\nMuestras problemas después:')
        display(problemas_despues.head(5) if not problemas_despues.empty else 'No se detectaron filas con problemas tras la limpieza.')

        print('\n' + '-'*100)
        print(f'nombre_archivo = {nombre_archivo}')
        print(f'type(nombre_archivo) = {type(nombre_archivo)}')
        print('-'*100 + '\n')

        ruta_nombre_limpio = nombre_archivo[:-4] + ' limpio.csv' if nombre_archivo.lower().endswith('.csv') else nombre_archivo + ' limpio.csv'
        ruta_guardado = carpeta_limpios / ruta_nombre_limpio
        guardar_csv(df_limpio, ruta_guardado)
        print(f'Guardado cleaned en: {ruta_guardado}')

        # sobrescribir variables globales por conveniencia
        if 'marketing' in nombre_archivo.lower():
            globals()['df_marketing'] = df_limpio
        elif 'ventas' in nombre_archivo.lower():
            globals()['df_ventas'] = df_limpio
        elif 'clientes' in nombre_archivo.lower():
            globals()['df_clientes'] = df_limpio

    print('\nProceso completo del diccionario de DataFrames.')

In [None]:
# ---------- Funciones adicionales útiles (reportes, empaquetado) ----------
def generar_resumen_global(reportes_creados, zip_salida=None):
    """Empaqueta los reportes listados en reportes_creados en un ZIP (opcional ruta)."""
    zip_salida = zip_salida or (carpeta_reportes / 'reports_dataset.zip')
    with zipfile.ZipFile(zip_salida, 'w', compression=zipfile.ZIP_DEFLATED) as zf:
        for p in reportes_creados:
            p = Path(p)
            if p.exists():
                zf.write(p, arcname=os.path.join('reports', p.name))
        for p in carpeta_limpios.glob('cleaned_*.csv'):
            zf.write(p, arcname=os.path.join('limpios', p.name))
    return zip_salida

def crear_diccionario_frames():
    """Lee los 3 CSV esperados y devuelve un diccionario nombre->DataFrame (solo los que existen)."""
    dic = {}
    for nombre in [archivo_marketing, archivo_ventas, archivo_clientes]:
        r = ruta_entrada(nombre)
        if r is not None:
            try:
                df = pd.read_csv(r, dtype=str, keep_default_na=False, na_values=[''])
            except Exception:
                df = pd.read_csv(r, encoding='latin1', dtype=str, keep_default_na=False, na_values=[''])
            dic[nombre] = df
        else:
            print(f'ATENCION: no se encontró {nombre} en ./data_in/ ni en /mnt/data/.')
    return dic

print('Funciones extras cargadas.')

In [None]:
# ---------- Ejemplo de uso (ejecuta esta celda para procesar los 3 CSV) ----------
reglas_ejemplo_marketing = {
    'nombre': ('title', {'normalizar_acentos': True}),
    'email': ('lower', {}),
    'fecha_registro': ('date', {'dayfirst': True, 'formats': ['%d/%m/%Y', '%Y-%m-%d'], 'format_output': 'YYYY/MM/DD'})
}
reglas_ejemplo_ventas = {
    'monto': ('numeric', {'remove_thousands': True, 'as_int': False}),
    'fecha_venta': ('date', {'dayfirst': True, 'formats': ['%d/%m/%Y', '%Y-%m-%d'], 'format_output': 'YYYY/MM/DD'})
}
reglas_ejemplo_clientes = {
    'apellido': ('title', {'normalizar_acentos': True}),
    'dni': ('numeric', {'remove_non_digits': True, 'as_int': True})
}
reglas_por_archivo = {
    archivo_marketing: reglas_ejemplo_marketing,
    archivo_ventas: reglas_ejemplo_ventas,
    archivo_clientes: reglas_ejemplo_clientes
}

dic_frames = crear_diccionario_frames()
print('\nDataFrames leídos: ', list(dic_frames.keys()))

menu_procesar_diccionario(dic_frames, reglas_por_archivo)

zip_generado = generar_resumen_global([])
print('ZIP generado (si corresponde):', zip_generado)


<div style="background-color:#CCCCCC; padding:10px; border-radius:6px;">
<h2 style="color:black; text-align:center;">Notas finales</h2>
<p style="color:black;">Si alguna columna no quedó con el tipo deseado, ajustá <code>reglas_por_archivo</code> y volvé a ejecutar la celda de ejemplo.</p>
</div>
