# Notebook ETL de Comercio Regional
Este cuaderno documenta el flujo completo de extracción, limpieza, transformación y carga para los datasets mensuales de ventas regionales.

## 1. Configurar entorno y bibliotecas
Definimos variables de rutas y cargamos librerias clave para un flujo ETL robusto (pandas, numpy, janitor, great_expectations, etc.).

In [5]:
from __future__ import annotations

import os
import sys
from pathlib import Path

import janitor
import numpy as np
import pandas as pd

BASE_DIR = Path("..")
SRC_DIR = BASE_DIR / "src"
if str(SRC_DIR.resolve()) not in sys.path:
    sys.path.insert(0, str(SRC_DIR.resolve()))

from comercio_regional.pipeline import RegionalSalesPipeline
from comercio_regional.config import PipelineConfig

DATA_DIR = BASE_DIR / "data"
RAW_DIR = DATA_DIR / "raw"
PROCESSED_DIR = DATA_DIR / "processed"

os.environ.setdefault("COMERCIO_BASE_DIR", str(BASE_DIR.resolve()))

BASE_DIR, RAW_DIR, PROCESSED_DIR

(PosixPath('..'), PosixPath('../data/raw'), PosixPath('../data/processed'))

## 2. Carga de datos crudos
Utilizamos funciones reutilizables para leer archivos CSV y validar los tipos detectados durante la carga.

In [6]:
def read_raw_sources(raw_dir: Path) -> pd.DataFrame:
    frames = []
    for csv_path in sorted(raw_dir.glob("dataset_*.csv")):
        frame = pd.read_csv(
            csv_path,
            parse_dates=["Fecha"],
            dayfirst=True,
            thousands=".",
            decimal=",",
        )
        frame["region"] = csv_path.stem.replace("dataset_", "")
        frames.append(frame)
    if not frames:
        raise FileNotFoundError("No se encontraron datasets en data/raw")
    combined = pd.concat(frames, ignore_index=True)
    inferred_types = combined.dtypes
    display(inferred_types)
    return combined

raw_df = read_raw_sources(RAW_DIR)
raw_df.head()

Fecha                                  datetime64[ns]
Compraventas, Venta regional, monto             int64
region                                         object
dtype: object

Unnamed: 0,Fecha,"Compraventas, Venta regional, monto",region
0,2025-08-01,685,araucania
1,2025-07-01,713,araucania
2,2025-06-01,685,araucania
3,2025-05-01,721,araucania
4,2025-04-01,755,araucania


## 3. Perfilado exploratorio basico
Calculamos estadisticos descriptivos, graficas y exploramos outliers.

In [7]:
summary = raw_df.describe(include="all")
display(summary)

monto_outliers = raw_df["Compraventas, Venta regional, monto"].astype(float)
q1, q3 = np.nanpercentile(monto_outliers, [25, 75])
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
print("Rango intercuartil:", (lower_bound, upper_bound))

Unnamed: 0,Fecha,"Compraventas, Venta regional, monto",region
count,460,460.0,460
unique,,,5
top,,,araucania
freq,,,92
mean,2021-10-16 01:33:54.782608640,4506.113043,
min,2018-01-01 00:00:00,165.0,
25%,2019-11-23 12:00:00,418.5,
50%,2021-10-16 12:00:00,900.0,
75%,2023-09-08 12:00:00,1575.0,
max,2025-08-01 00:00:00,30743.0,


Rango intercuartil: (np.float64(-1316.25), np.float64(3309.75))


## 4. Limpieza y estandarizacion de columnas
Renombramos columnas al formato `snake_case`, removemos espacios y resolvemos duplicados.

In [9]:
clean_df = (
    raw_df.copy()
    .rename(columns={"Fecha": "fecha", "Compraventas, Venta regional, monto": "monto"})
    .clean_names()
)
clean_df = clean_df.drop_duplicates()
clean_df.head()

Unnamed: 0,fecha,monto,region
0,2025-08-01,685,araucania
1,2025-07-01,713,araucania
2,2025-06-01,685,araucania
3,2025-05-01,721,araucania
4,2025-04-01,755,araucania


## 5. Tratamiento de valores faltantes
Identificamos columnas con `NaN` y aplicamos imputacion condicional documentando el impacto.

In [10]:
missing_summary = clean_df.isna().sum()
display(missing_summary)

imputed_df = clean_df.copy()
imputed_df["monto"] = imputed_df["monto"].fillna(imputed_df["monto"].median())
imputed_df["region"] = imputed_df["region"].fillna("sin_region")

imputation_log = {
    "monto": "median",
    "region": "constant: sin_region",
}
imputation_log

fecha     0
monto     0
region    0
dtype: int64

{'monto': 'median', 'region': 'constant: sin_region'}

## 6. Normalizacion y codificacion de variables
Estandarizamos unidades y codificamos las variables categoricas para analisis y modelado.

In [11]:
from sklearn.preprocessing import OneHotEncoder, StandardScaler

feature_df = imputed_df.assign(
    monto_normalizado=lambda df: df["monto"].astype(float)
)

scaler = StandardScaler()
feature_df["monto_escalado"] = scaler.fit_transform(feature_df[["monto_normalizado"]])

encoder = OneHotEncoder(sparse_output=False)
region_encoded = encoder.fit_transform(feature_df[["region"]])
region_cols = [f"region_{cat}" for cat in encoder.categories_[0]]
encoded_region_df = pd.DataFrame(region_encoded, columns=region_cols)
feature_df = pd.concat([feature_df.reset_index(drop=True), encoded_region_df], axis=1)
feature_df.head()

Unnamed: 0,fecha,monto,region,monto_normalizado,monto_escalado,region_araucania,region_biobio,region_los-lagos,region_los-rios,region_santiago
0,2025-08-01,685,araucania,685.0,-0.491657,1.0,0.0,0.0,0.0,0.0
1,2025-07-01,713,araucania,713.0,-0.488055,1.0,0.0,0.0,0.0,0.0
2,2025-06-01,685,araucania,685.0,-0.491657,1.0,0.0,0.0,0.0,0.0
3,2025-05-01,721,araucania,721.0,-0.487025,1.0,0.0,0.0,0.0,0.0
4,2025-04-01,755,araucania,755.0,-0.482651,1.0,0.0,0.0,0.0,0.0


## 7. Validacion de calidad de datos
Definimos expectativas de integridad, unicidad y rangos mediante comprobaciones programaticas inspiradas en Great Expectations.

In [12]:
def validate_dataset(df: pd.DataFrame) -> dict[str, bool]:
    checks = {
        "monto_no_nulos": df["monto_normalizado"].notna().all(),
        "monto_no_negativo": (df["monto_normalizado"] >= 0).all(),
        "region_no_nula": df["region"].notna().all(),
        "fechas_ordenadas": df.sort_values("fecha")["fecha"].is_monotonic_increasing,
    }
    return checks

validation_results = validate_dataset(feature_df)
validation_results

{'monto_no_nulos': np.True_,
 'monto_no_negativo': np.True_,
 'region_no_nula': np.True_,
 'fechas_ordenadas': True}

## 8. Transformacion final y union de tablas
Construimos el pipeline final combinando datasets enriquecidos y agregamos metricas mensuales.

In [13]:
pipeline_config = PipelineConfig.default(base_dir=BASE_DIR)
pipeline = RegionalSalesPipeline(pipeline_config)

data_for_pipeline = imputed_df[["fecha", "monto", "region"]].copy()

processed_df = data_for_pipeline
for step in pipeline.steps():
    processed_df = step(processed_df)

processed_df.head()

Unnamed: 0,region,periodo,anio,mes,trimestre,monto_total,monto_promedio,observaciones
0,Araucania,2018-01,2018,1,2018Q1,359,359.0,1
1,Araucania,2018-02,2018,2,2018Q1,404,404.0,1
2,Araucania,2018-03,2018,3,2018Q1,410,410.0,1
3,Araucania,2018-04,2018,4,2018Q2,402,402.0,1
4,Araucania,2018-05,2018,5,2018Q2,394,394.0,1


## 9. Carga de datos procesados
Exportamos los resultados a Parquet y registramos metadatos clave de versionado.

In [17]:
output_path = PROCESSED_DIR / "ventas_regionales_notebook.csv"
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
processed_df.to_csv(output_path, index=False)

metadata = {
    "rows": len(processed_df),
    "columns": list(processed_df.columns),
    "output_path": str(output_path),
}
metadata

{'rows': 460,
 'columns': ['region',
  'periodo',
  'anio',
  'mes',
  'trimestre',
  'monto_total',
  'monto_promedio',
  'observaciones'],
 'output_path': '../data/processed/ventas_regionales_notebook.csv'}

## 10. Pruebas automatizadas del flujo ETL
Mostramos como integrar `pytest` para validar las funciones criticas del pipeline.

In [16]:
import textwrap

pytest_example = textwrap.dedent(
    """
    def test_pipeline_runs(tmp_path):
        config = PipelineConfig.default(base_dir=Path('..'))
        pipeline = RegionalSalesPipeline(config)
        pipeline.run()
        assert pipeline.output_path.exists()
    """
)
print(pytest_example)


def test_pipeline_runs(tmp_path):
    config = PipelineConfig.default(base_dir=Path('..'))
    pipeline = RegionalSalesPipeline(config)
    pipeline.run()
    assert pipeline.output_path.exists()



## Visualización interactiva con estilo The Economist
En esta sección exploramos la serie limpia `ventas_regionales_notebook.csv` mediante gráficos con estética inspirada en The Economist.

In [None]:
import pandas as pd
import plotly.express as px
from plotly.colors import qualitative

processed_path = PROCESSED_DIR / "ventas_regionales_notebook.csv"
viz_df = pd.read_csv(processed_path, parse_dates=["periodo"])
viz_df = viz_df.sort_values(["periodo", "region"])
viz_df["region"] = viz_df["region"].str.replace("-", " ").str.title()

economist_font = dict(family="Georgia, serif", color="#1e293b")
palette = qualitative.T10