# Pipeline E2E de Validación — Cleaning → Aggregation → Preprocessing (Grupo 04)

Este notebook valida la calidad de los datos procesados y prueba las funciones implementadas en:
- `src/preprocess/cleaning.py`
- `src/preprocess/aggregation.py`
- `src/preprocess/preprocessing.py`

Checklist de esta ejecución:
- [ ] Cargar raw y ejecutar todas las funciones de cleaning, guardando parquet/csv limpios
- [ ] Leer limpio y ejecutar aggregation (marca x día) + KPIs + calendario
- [ ] Probar utilidades de preprocessing: selección de serie, ARIMA/Prophet/ML/LSTM/SARIMAX
- [ ] Mostrar métricas de calidad y formas intermedias

Nota: este notebook asume que el archivo raw existe en `../data/raw/data_sample.parquet` (desde `notebooks/`).

In [34]:
# ===============
# SETUP & IMPORTS
# ===============
import sys
from pathlib import Path
import pandas as pd
import numpy as np

# Asegurar que src/ esté en el path
ROOT = Path('..').resolve()
SRC = ROOT / 'src'
if str(SRC) not in sys.path:
    sys.path.append(str(SRC))

# Forzar recarga de módulos para reflejar cambios recientes en src/
import importlib
import preprocess.cleaning as _cleaning_mod
import preprocess.aggregation as _aggregation_mod
import preprocess.preprocessing as _preprocessing_mod
importlib.reload(_cleaning_mod)
importlib.reload(_aggregation_mod)
importlib.reload(_preprocessing_mod)

from preprocess.cleaning import (
    CleaningConfig,
    run_cleaning,
    load_raw,
    normalize_strings_and_placeholders,
    parse_dates,
    normalize_brand,
    drop_high_missing_columns,
    drop_all_missing_rows,
    coerce_types,
    drop_duplicates,
    negatives_to_nan,
    impute_missing,
    handle_outliers,
    drop_constant_cols,
    reduce_categoricals_cardinality,
    DataQualityReport,
)
from preprocess.aggregation import (
    AggregationConfig,
    load_cleaned,
    aggregate_brand_daily,
    add_brand_daily_kpis,
    complete_calendar,
    pivot_brand_metric,
    run_aggregation,
)
from preprocess.preprocessing import (
    TSConfig,
    TimeSeriesPreprocessor,
)

RAW = ROOT / 'data' / 'raw' / 'data_sample.parquet'
CLEAN_PARQUET = ROOT / 'data' / 'raw' / 'data_sample_cleaned_group04.parquet'
BRAND_DAILY_PARQUET = ROOT / 'data' / 'raw' / 'brand_daily_group04.parquet'

print('ROOT:', ROOT)
print('RAW exists:', RAW.exists())

ROOT: C:\ProyectoParcialGrupo4\pc1_20252_metodologia_data_science
RAW exists: True


In [35]:
# ============================
# 1) CLEANING — Paso a paso (proteger parsed_date y product_brand)
# ============================

cfg_clean = CleaningConfig(
    raw_path=str(RAW),
    out_parquet=str(CLEAN_PARQUET),
    out_csv=str(CLEAN_PARQUET).replace('.parquet', '.csv'),
    persist=True,
    # Para columnas con alto % de nulos: imputar (num->media, cat->moda)
    drop_columns_missing_action='impute',
)

# Cargar raw
raw_df = load_raw(cfg_clean)
print('Raw shape:', raw_df.shape)

# Reporte de calidad acumulado
report = DataQualityReport()

# 1) Normalizar strings/placeholders
step_df = normalize_strings_and_placeholders(raw_df, cfg_clean)
print('After normalize_strings:', step_df.shape)

# 2) Parseo de fechas -> crea 'parsed_date' si hay candidata
step_df = parse_dates(step_df, cfg_clean)
print("Has 'parsed_date'?:", 'parsed_date' in step_df.columns)

# 3) Normalizar marca (si falta -> Unknown) y crear columna canónica 'product_brand'
step_df = normalize_brand(step_df, cfg_clean, report)
# Fallback robusto por si el kernel tiene una versión antigua del módulo
if 'product_brand' not in step_df.columns:
    for cand in ['product_brand','productBrand','brand','Brand','product_brand_name']:
        if cand in step_df.columns:
            step_df['product_brand'] = step_df[cand].astype('object')
            break
    else:
        step_df['product_brand'] = 'Unknown'
print("Has 'product_brand'?:", 'product_brand' in step_df.columns)

# 4) Gestionar columnas con muchos nulos (imputación configurada) — respeta columnas protegidas
step_df = drop_high_missing_columns(step_df, cfg_clean, report)
print('High-missing handled. Dropped:', report.dropped_columns_missing, '| Imputed:', getattr(report, 'imputed_columns_high_missing', []))

# 5) Drop filas completamente nulas
step_df = drop_all_missing_rows(step_df, cfg_clean, report)
print('Dropped all-missing rows:', report.dropped_rows_all_missing)

# 6) Coerción de tipos
step_df = coerce_types(step_df, cfg_clean)

# 7) Duplicados
step_df = drop_duplicates(step_df, cfg_clean, report)
print('Duplicates dropped:', report.duplicate_rows_dropped)

# 8) Negativos -> NaN
step_df = negatives_to_nan(step_df, cfg_clean, report)
print('Negatives→NaN cols:', report.negatives_to_nan_cols)

# 9) Imputación
step_df = impute_missing(step_df, cfg_clean)

# 10) Winsorize (outliers)
step_df = handle_outliers(step_df, cfg_clean, report)
print('Winsorized cols:', report.winsorized_cols[:8], '...')

# 11) Columnas constantes (respeta columnas protegidas)
step_df = drop_constant_cols(step_df, cfg_clean, report)
print('Constant cols dropped:', report.constant_cols_dropped)

# 12) Reducir cardinalidad (categorías raras -> 'Otros') — no afecta columnas protegidas
step_df = reduce_categoricals_cardinality(step_df, cfg_clean)

# Sanity check: la marca y fecha deben existir para el pipeline Día/Marca
assert 'product_brand' in step_df.columns, "ERROR: product_brand fue eliminada"
assert 'parsed_date' in step_df.columns, "ERROR: parsed_date no existe; verifica columnas de fecha en raw o CleaningConfig.date_candidates"

# Persistir y cerrar reporte
report.n_rows_before, report.n_cols_before = raw_df.shape
report.n_rows_after, report.n_cols_after = step_df.shape
print('Cleaned shape:', step_df.shape)

# Guardar
step_df.to_parquet(cfg_clean.out_parquet, index=False)
print('Saved parquet:', CLEAN_PARQUET.exists())

# Vista rápida
display(step_df.head())
display(report.to_frame())

# Ver todas las columnas restantes
import pandas as pd
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 220)
print(f"✅ Columnas finales ({step_df.shape[1]}):")
print(list(step_df.columns))

Raw shape: (74457, 77)
After normalize_strings: (74457, 77)
Has 'parsed_date'?: True
After normalize_strings: (74457, 77)
Has 'parsed_date'?: True
Has 'product_brand'?: True
Has 'product_brand'?: True


  df = load_raw(cfg)


High-missing handled. Dropped: [] | Imputed: ['is_impression', 'is_click', 'promo_id', 'promo_name', 'promo_creative', 'promo_position', 'bounces', 'campaign', 'keyword', 'ad_content', 'adwords_campaign_id', 'adwords_adgroup_id', 'adwords_creative_id', 'adwords_criteria_id', 'gclid', 'ad_network_type', 'is_entrance', 'referer', 'entrance_page_path']
Dropped all-missing rows: 0
Dropped all-missing rows: 0
Duplicates dropped: 240
Negatives→NaN cols: []
Duplicates dropped: 240
Negatives→NaN cols: []




Winsorized cols: ['transaction_revenue_usd', 'transaction_tax_usd', 'transaction_shipping_usd', 'product_quantity', 'product_price_usd', 'product_revenue_usd', 'visitor_id', 'session_id'] ...
Constant cols dropped: ['transaction_affiliation', 'currency_code', 'is_impression', 'is_click', 'promo_id', 'promo_name', 'promo_creative', 'promo_position', 'total_visits', 'bounces', 'new_visits', 'is_true_direct', 'browser_version', 'os_version', 'mobile_device_brand', 'mobile_device_model', 'device_language', 'screen_resolution', 'adwords_campaign_id', 'adwords_adgroup_id', 'adwords_creative_id', 'adwords_criteria_id', 'ad_network_type', 'is_interaction', 'is_entrance', 'is_exit', 'referer', 'page_path', 'hostname', 'entrance_page_path', 'exit_page_path', 'social_engagement_type']
Constant cols dropped: ['transaction_affiliation', 'currency_code', 'is_impression', 'is_click', 'promo_id', 'promo_name', 'promo_creative', 'promo_position', 'total_visits', 'bounces', 'new_visits', 'is_true_direct

Unnamed: 0,transaction_date,parsed_date,transaction_id,transaction_revenue_usd,transaction_tax_usd,transaction_shipping_usd,product_sku,product_name,product_category,product_brand,product_variant,product_quantity,product_price_usd,product_revenue_usd,visitor_id,session_id,session_number,session_start_time,total_hits,total_pageviews,time_on_site_seconds,traffic_source,traffic_medium,campaign,keyword,ad_content,referral_path,channel_grouping,browser,operating_system,is_mobile,device_category,continent,sub_continent,country,region,metro,city,network_domain,gclid,hit_number,hit_time_ms,hit_hour,hit_minute,page_title
0,20170801,2017-08-01 00:00:00+00:00,ORD201708011814,96.32,13.11,13.0,GGOEGAEJ028013,Google Women's Short Sleeve Hero Tee Grey,Apparel,(not set),SM,1.0,5.1,19.49,509972280802528263,1501522116,4,1501522116,9,9,165.0,(direct),(none),AW - Dynamic Search Ads Whole Site,6qEhsCssdK0z36ri,Google Merchandise Collection,/,Display,Chrome,Macintosh,False,desktop,Americas,Northern America,United States,California,San Francisco-Oakland-San Jose CA,Mountain View,comcast.net,CJ3Ls5Diqs4CFQ6oaQodXegHOA,8,155519,14,0,Checkout Confirmation
1,20170801,2017-08-01 00:00:00+00:00,ORD201708011814,40.29,8.82,11.0,GGOEGAEJ028013,Google Women's Short Sleeve Hero Tee Grey,Apparel,(not set),SM,2.0,5.1,11.45,509972280802528263,1501522116,4,1501522116,9,9,165.0,(direct),(none),AW - Dynamic Search Ads Whole Site,6qEhsCssdK0z36ri,Google Merchandise Collection,/,Display,Chrome,Macintosh,False,desktop,Americas,Northern America,United States,California,San Francisco-Oakland-San Jose CA,Mountain View,comcast.net,CJ3Ls5Diqs4CFQ6oaQodXegHOA,8,155519,14,0,Checkout Confirmation
2,20170801,2017-08-01 00:00:00+00:00,ORD201708011814,96.32,13.11,13.0,GGOEGALB034113,Google Women's Vintage Hero Tee Black,Apparel,(not set),SM,1.0,5.7,19.49,509972280802528263,1501522116,4,1501522116,9,9,165.0,(direct),(none),AW - Dynamic Search Ads Whole Site,6qEhsCssdK0z36ri,Google Merchandise Collection,/,Display,Chrome,Macintosh,False,desktop,Americas,Northern America,United States,California,San Francisco-Oakland-San Jose CA,Mountain View,comcast.net,CJ3Ls5Diqs4CFQ6oaQodXegHOA,8,155519,14,0,Checkout Confirmation
3,20170801,2017-08-01 00:00:00+00:00,ORD201708011814,40.29,8.82,11.0,GGOEGALB034113,Google Women's Vintage Hero Tee Black,Apparel,(not set),SM,2.0,5.7,12.65,509972280802528263,1501522116,4,1501522116,9,9,165.0,(direct),(none),AW - Dynamic Search Ads Whole Site,6qEhsCssdK0z36ri,Google Merchandise Collection,/,Display,Chrome,Macintosh,False,desktop,Americas,Northern America,United States,California,San Francisco-Oakland-San Jose CA,Mountain View,comcast.net,CJ3Ls5Diqs4CFQ6oaQodXegHOA,8,155519,14,0,Checkout Confirmation
4,20170801,2017-08-01 00:00:00+00:00,ORD201708011814,96.32,13.11,13.0,GGOEGOCB017499,Leatherette Journal,Office,(not set),Single Option Only,1.0,7.69,19.49,509972280802528263,1501522116,4,1501522116,9,9,165.0,(direct),(none),AW - Dynamic Search Ads Whole Site,6qEhsCssdK0z36ri,Google Merchandise Collection,/,Display,Chrome,Macintosh,False,desktop,Americas,Northern America,United States,California,San Francisco-Oakland-San Jose CA,Mountain View,comcast.net,CJ3Ls5Diqs4CFQ6oaQodXegHOA,8,155519,14,0,Checkout Confirmation


Unnamed: 0,n_rows_before,n_cols_before,n_rows_after,n_cols_after,dropped_columns_missing,dropped_rows_all_missing,duplicate_rows_dropped,negatives_to_nan_cols,winsorized_cols,constant_cols_dropped,brand_unknown_rows
0,74457,77,74217,45,[],0,240,[],"[transaction_revenue_usd, transaction_tax_usd,...","[transaction_affiliation, currency_code, is_im...",0


✅ Columnas finales (45):
['transaction_date', 'parsed_date', 'transaction_id', 'transaction_revenue_usd', 'transaction_tax_usd', 'transaction_shipping_usd', 'product_sku', 'product_name', 'product_category', 'product_brand', 'product_variant', 'product_quantity', 'product_price_usd', 'product_revenue_usd', 'visitor_id', 'session_id', 'session_number', 'session_start_time', 'total_hits', 'total_pageviews', 'time_on_site_seconds', 'traffic_source', 'traffic_medium', 'campaign', 'keyword', 'ad_content', 'referral_path', 'channel_grouping', 'browser', 'operating_system', 'is_mobile', 'device_category', 'continent', 'sub_continent', 'country', 'region', 'metro', 'city', 'network_domain', 'gclid', 'hit_number', 'hit_time_ms', 'hit_hour', 'hit_minute', 'page_title']


In [None]:
# ============================
# 2) AGGREGATION — Marca x Día
# ============================

# Usar SIEMPRE las funciones actuales del módulo (evita imports antiguos)
import importlib, preprocess.aggregation as _aggregation_mod
_aggregation_mod = importlib.reload(_aggregation_mod)
# Diagnóstico: confirmar módulo y versión de pandas
try:
    print('Agg module file:', getattr(_aggregation_mod, '__file__', 'unknown'))
except Exception:
    pass
try:
    import pandas as _pd_check
    print('pandas version:', _pd_check.__version__)
except Exception:
    pass

cfg_agg = _aggregation_mod.AggregationConfig(
    input_path=str(CLEAN_PARQUET),
    out_parquet=str(BRAND_DAILY_PARQUET),
    out_csv=str(BRAND_DAILY_PARQUET).replace('.parquet', '.csv'),
    persist=True,
)

# Asegurar que el parquet limpio exista; si no, ejecutar cleaning rápidamente
from pathlib import Path as _P
print('Using cleaned input:', cfg_agg.input_path)
if not _P(cfg_agg.input_path).exists():
    print('\n[warn] Cleaned parquet not found — running cleaning runner to create it...')
    _ = run_cleaning(CleaningConfig(raw_path=str(RAW), out_parquet=str(CLEAN_PARQUET), out_csv=str(CLEAN_PARQUET).replace('.parquet', '.csv'), persist=True))
    print('[info] Cleaning completed. Exists now?', _P(cfg_agg.input_path).exists())

# Cargar limpio con manejo de error visible
try:
    cleaned_df = _aggregation_mod.load_cleaned(cfg_agg)
except Exception as e:
    print('[error] load_cleaned failed:', e)
    raise
print('Cleaned loaded:', cleaned_df.shape)
print('Cleaned columns (sample):', list(cleaned_df.columns)[:25], '...')

if cleaned_df.empty:
    raise ValueError('El DataFrame limpio está vacío; revisa la etapa de cleaning.')

# Agregación base
try:
    brand_daily = _aggregation_mod.aggregate_brand_daily(cleaned_df, cfg_agg)
except Exception as e:
    print('[error] aggregate_brand_daily failed:', e)
    print('Columns present:', list(cleaned_df.columns))
    # Fallback robusto y version-safe para eventos
    print('[info] trying fallback aggregation for events (version-safe) ...')
    try:
        _tmp = cleaned_df.copy()
        if cfg_agg.date_col not in _tmp.columns or cfg_agg.brand_col not in _tmp.columns:
            print('[error] fallback: missing required columns for aggregation. Available:', list(_tmp.columns))
            raise
        _tmp[cfg_agg.date_col] = pd.to_datetime(_tmp[cfg_agg.date_col], errors='coerce', utc=True).dt.normalize()
        _tmp[cfg_agg.brand_col] = _tmp[cfg_agg.brand_col].astype('object').fillna('Unknown')
        _ev = _tmp.groupby([cfg_agg.brand_col, cfg_agg.date_col], as_index=False).size()
        if isinstance(_ev, pd.Series):
            brand_daily = _ev.reset_index(name='events')
        else:
            # DataFrame camino: última columna corresponde al size
            lastcol = _ev.columns[-1]
            brand_daily = _ev.rename(columns={lastcol: 'events'})
        # Ordenar y asegurar tipos
        brand_daily = brand_daily.sort_values([cfg_agg.date_col, cfg_agg.brand_col]).reset_index(drop=True)
        print('[info] fallback built brand_daily:', brand_daily.shape)
    except Exception as ee:
        print('[error] fallback aggregation also failed:', ee)
        raise
print('brand_daily shape:', brand_daily.shape)

# KPIs
try:
    brand_daily = _aggregation_mod.add_brand_daily_kpis(brand_daily, cfg_agg)
except Exception as e:
    print('[warn] add_brand_daily_kpis failed:', e)

# Calendario completo
try:
    brand_daily = _aggregation_mod.complete_calendar(brand_daily, cfg_agg)
except Exception as e:
    print('[error] complete_calendar failed:', e)
    print('Date col min/max:', brand_daily[cfg_agg.date_col].min() if cfg_agg.date_col in brand_daily.columns else None,
          brand_daily[cfg_agg.date_col].max() if cfg_agg.date_col in brand_daily.columns else None)
    raise
print('brand_daily (calendar) shape:', brand_daily.shape)

# Persistir
try:
    brand_daily.to_parquet(cfg_agg.out_parquet, index=False)
    print('Saved brand_daily parquet:', _P(cfg_agg.out_parquet).exists())
except Exception as e:
    print('[error] saving brand_daily failed:', e)

# Vista rápida
try:
    display(brand_daily.head())
except Exception:
    pass

# Pivoteo rápido si existe alguna métrica típica
for metric in ['transactions','transactionRevenue','sessions','pageviews']:
    if metric in brand_daily.columns:
        try:
            wide = _aggregation_mod.pivot_brand_metric(brand_daily, metric, cfg_agg)
            print(f"Wide metric='{metric}':", wide.shape)
            display(wide.head(3))
        except Exception as e:
            print('[warn] pivot_brand_metric failed:', e)
        break

TypeError: AggregationConfig.__init__() got an unexpected keyword argument 'out_csv'

In [None]:
# ============================
# 3) PREPROCESSING — Preparación por modelo
# ============================

# Usaremos el parquet agregado por marca/día
DATA_FOR_MODELS = str(BRAND_DAILY_PARQUET)

# Config: intentar detectar columnas y usar una métrica común si existe
config = TSConfig(
    date_col='parsed_date',            # producido por cleaning
    brand_col='product_brand',         # normalizado
    target_metric=None,                # dejamos None para que elija la primera numérica disponible
    freq='D',
    fill_missing='zero',
)
prep = TimeSeriesPreprocessor(config)

# Cargar datos y mostrar overview
df_models = prep.load_data(DATA_FOR_MODELS)
print('Model DF shape:', df_models.shape)
print('Index (head):', df_models.index[:3])
print('Columns:', list(df_models.columns)[:15], '...')

a_sample_brand = None
if 'product_brand' in df_models.columns:
    a_sample_brand = df_models['product_brand'].mode(dropna=True).iloc[0]
    print('Sample brand:', a_sample_brand)

# 3.1 Selección de serie (si se define target_metric)
try:
    prep.config.target_metric = next((c for c in df_models.select_dtypes(include=[np.number]).columns), None)
    series = prep.select_brand_series(df_models, brand=a_sample_brand, metric=prep.config.target_metric)
    print('Series (brand,target) shape:', series.shape)
    display(series.head())
except Exception as e:
    print('Select brand series skipped:', e)

# 3.2 ARIMA
try:
    arima_series = prep.prepare_arima_data(series if 'series' in locals() else df_models)
    print('ARIMA series len:', len(arima_series))
except Exception as e:
    print('ARIMA prep error:', e)

# 3.3 Prophet
try:
    prophet_df, exog_cols = prep.prepare_prophet_data(series if 'series' in locals() else df_models)
    print('Prophet df:', prophet_df.shape, '| exog:', exog_cols)
    display(prophet_df.head())
except Exception as e:
    print('Prophet prep error:', e)

# 3.4 ML (X_train/y_train/X_test/y_test)
try:
    X_tr, y_tr, X_te, y_te = prep.prepare_ml_data(series if 'series' in locals() else df_models)
    print('ML shapes:', X_tr.shape, y_tr.shape, X_te.shape, y_te.shape)
except Exception as e:
    print('ML prep error:', e)

# 3.5 LSTM (secuencias)
try:
    feats = prep.create_features(series if 'series' in locals() else df_models.iloc[:,0])
    feats = feats.dropna()
    Xtr, Xte, ytr, yte = prep.prepare_lstm_data(feats, sequence_length=30)
    print('LSTM shapes:', Xtr.shape, Xte.shape, ytr.shape, yte.shape)
except Exception as e:
    print('LSTM prep error:', e)

# 3.6 SARIMAX exógenas
try:
    exog = prep.create_sarimax_exogenous(series if 'series' in locals() else df_models.iloc[:,0], events_periods=[])
    print('Exogenous shape:', exog.shape)
except Exception as e:
    print('SARIMAX exog error:', e)