# Notebook 4 – Integración y Fact Table (FraSoHome)

## Objetivo formativo
En este laboratorio vamos a **integrar** las distintas fuentes del caso *FraSoHome* (CRM, e-commerce y POS) para construir una **tabla de hechos (fact table)** con granularidad **línea de transacción** (venta o devolución).

El foco está en:
- Unificar **claves** y **formatos** (IDs, fechas, importes).
- Construir una fact table con un **esquema común** para *ONLINE* y *POS*.
- Enriquecer la fact con dimensiones (**clientes**, **productos**, **tiendas**).
- Ejecutar **comprobaciones básicas** de consistencia tras la integración.
- Exportar el resultado para los notebooks posteriores (**features / RFM / churn / basket / propensión**).

> Nota: este notebook asume que existen los CSV de origen:
> `crm.csv`, `productos.csv`, `tiendas.csv`,
> `pedidos.csv`, `lineas_pedido.csv`, `devoluciones_online.csv`,
> `ventas_pos.csv`, `devoluciones_tienda.csv`.
>
> Si has ejecutado el Notebook 3 y generaste `outputs/*_clean.csv`, el notebook intentará cargar primero la versión limpia.
> Si no existe, usará la versión raw y aplicará una **estandarización mínima**.

## 1) Setup
- Leemos CSV como texto (`dtype=str`) para no “romper” por datos sucios.
- Creamos funciones reutilizables que aceptan **dataframes como parámetros**.
- Generamos columnas nuevas parseadas (`*_dt`, `*_num`, `*_clean`) para no perder el valor original.

In [1]:
from __future__ import annotations

import re
from pathlib import Path
from typing import Dict, List, Optional

import numpy as np
import pandas as pd

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

In [9]:
# --- Configuración de rutas ---
# Ajusta DATA_DIR a la carpeta donde tienes los CSV.
DATA_DIR = Path(r"..\data\raw""") # por defecto: carpeta del notebook
CLEAN_DIR = Path("notebooks\outputs" )  # salida del Notebook 3 (si existe)
OUT_DIR = Path("..\outputs")

OUT_DIR.mkdir(parents=True, exist_ok=True)

EXPECTED_RAW_FILES = [
    "crm.csv",
    "productos.csv",
    "tiendas.csv",
    "pedidos.csv",
    "lineas_pedido.csv",
    "devoluciones_online.csv",
    "ventas_pos.csv",
    "devoluciones_tienda.csv",
]

def check_expected_files(data_dir: Path, files: List[str]) -> None:
    missing = [f for f in files if not (data_dir / f).exists()]
    if missing:
        print("[WARN] Faltan archivos esperados en DATA_DIR:")
        for f in missing:
            print(" -", f)
        print("Ajusta DATA_DIR o copia los archivos a la carpeta del notebook.")
    else:
        print("[OK] Todos los archivos raw esperados existen en DATA_DIR.")

check_expected_files(DATA_DIR, EXPECTED_RAW_FILES)

[OK] Todos los archivos raw esperados existen en DATA_DIR.


  CLEAN_DIR = Path("notebooks\outputs" )  # salida del Notebook 3 (si existe)
  OUT_DIR = Path("..\outputs")


## 2) Funciones utilitarias (reutilizables)

Helpers para:
- **Carga robusta** de CSV (tolerante a líneas raras).
- Estandarizar IDs (`strip` + `upper`).
- Parseo de **importes** con €, coma/punto, miles, etc.
- Parseo de **fechas** heterogéneas (ISO, DD/MM/YY, texto “10 de Enero de 2023”, etc.).

In [10]:
def load_csv_robust(path: Path, **kwargs) -> pd.DataFrame:
    # Carga robusta de CSV en UTF-8. Si falla, reintenta con engine='python'
    base_kwargs = dict(dtype=str, encoding="utf-8", low_memory=False)
    base_kwargs.update(kwargs)

    try:
        return pd.read_csv(path, **base_kwargs)
    except Exception as e:
        print(f"[WARN] Lectura estándar falló en {path.name}: {e}")
        print("       Reintentando con engine='python' y on_bad_lines='warn' ...")
        base_kwargs.update(dict(engine="python", on_bad_lines="warn"))
        return pd.read_csv(path, **base_kwargs)


def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    # lower + strip + espacios -> '_'
    out = df.copy()
    out.columns = (
        out.columns.astype(str)
        .str.strip()
        .str.lower()
        .str.replace(" ", "_")
    )
    return out


def standardize_id(series: pd.Series) -> pd.Series:
    # strip + quitar espacios internos + upper
    s = series.astype(str)
    s = s.replace({"nan": np.nan, "None": np.nan})
    s = s.str.strip()
    s = s.str.replace(r"\s+", "", regex=True)
    s = s.str.upper()
    s = s.replace({"": np.nan})
    return s


def _normalize_number_str(x: Optional[str]) -> Optional[str]:
    if x is None:
        return None
    x = str(x).strip()
    if x == "" or x.lower() in {"nan", "none"}:
        return None

    # deja solo dígitos, separadores y signo
    x = re.sub(r"[^\d,.\-]", "", x)

    # múltiples comas sin puntos: 15,7,00 -> 157.00 (última coma decimal)
    if x.count(",") > 1 and x.count(".") == 0:
        parts = x.split(",")
        x = "".join(parts[:-1]) + "." + parts[-1]

    # múltiples puntos sin comas: 1.234.56 -> 1234.56 (último punto decimal)
    if x.count(".") > 1 and x.count(",") == 0:
        parts = x.split(".")
        x = "".join(parts[:-1]) + "." + parts[-1]

    # si contiene punto y coma: decidir separador decimal por el último separador
    if ("," in x) and ("." in x):
        if x.rfind(",") > x.rfind("."):
            # coma decimal -> quitar puntos de miles
            x = x.replace(".", "")
            x = x.replace(",", ".")
        else:
            # punto decimal -> quitar comas de miles
            x = x.replace(",", "")
    else:
        # solo coma -> coma decimal
        if "," in x and "." not in x:
            x = x.replace(",", ".")

    return x


def parse_numeric_mixed(series: pd.Series) -> pd.Series:
    s = series.copy()
    s = s.where(s.notna(), None)
    normalized = s.apply(_normalize_number_str)
    return pd.to_numeric(normalized, errors="coerce")


SPANISH_MONTHS = {
    "enero": "01", "febrero": "02", "marzo": "03", "abril": "04",
    "mayo": "05", "junio": "06", "julio": "07", "agosto": "08",
    "septiembre": "09", "setiembre": "09", "octubre": "10", "noviembre": "11", "diciembre": "12",
}

def _normalize_spanish_date_text(s: str) -> str:
    # Convierte '10 de Enero de 2023' (y variantes) a '10/01/2023' conservando hora si existe.
    s0 = s.strip()
    s0_low = s0.lower()

    m = re.search(r"(\d{1,2})\s*de\s*([a-záéíóúñ]+)\s*de\s*(\d{4})(.*)$", s0_low)
    if not m:
        return s0

    day = m.group(1)
    month_txt = m.group(2)
    year = m.group(3)
    rest = m.group(4)

    month_txt_norm = (
        month_txt.replace("á","a").replace("é","e").replace("í","i")
                 .replace("ó","o").replace("ú","u")
    )

    month_num = SPANISH_MONTHS.get(month_txt_norm, None)
    if month_num is None:
        return s0

    return f"{day}/{month_num}/{year}{rest}"


def parse_datetime_es(series: pd.Series, dayfirst: bool = True) -> pd.Series:
    s = series.astype(str).str.strip()
    s = s.replace({"": np.nan, "nan": np.nan, "None": np.nan})

    dt = pd.to_datetime(s, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)

    mask = dt.isna() & s.notna()
    if mask.any():
        s2 = s[mask].apply(_normalize_spanish_date_text)
        dt2 = pd.to_datetime(s2, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
        dt.loc[mask] = dt2

    return dt


def coalesce(*series_list: pd.Series) -> pd.Series:
    out = None
    for s in series_list:
        if out is None:
            out = s.copy()
        else:
            out = out.combine_first(s)
    return out

## 3) Carga de datasets (raw vs clean)

Definimos un cargador:
- Si existe `output_clean/<nombre>_clean.csv`, lo usa.
- Si no existe, carga el raw en `DATA_DIR`.

In [11]:
def load_dataset(name: str, raw_filename: str, clean_filename: Optional[str] = None) -> pd.DataFrame:
    if clean_filename is None:
        clean_filename = f"{name}_clean.csv"

    clean_path = CLEAN_DIR / clean_filename
    raw_path = DATA_DIR / raw_filename

    if clean_path.exists():
        print(f"[LOAD] {name}: usando CLEAN -> {clean_path}")
        df = load_csv_robust(clean_path)
        df["_source_loaded_from"] = "clean"
    else:
        print(f"[LOAD] {name}: usando RAW -> {raw_path}")
        df = load_csv_robust(raw_path)
        df["_source_loaded_from"] = "raw"

    df = standardize_columns(df)
    return df


crm = load_dataset("crm", "crm.csv", "crm_clean.csv")
productos = load_dataset("productos", "productos.csv", "productos_clean.csv")
tiendas = load_dataset("tiendas", "tiendas.csv", "tiendas_clean.csv")

pedidos = load_dataset("pedidos", "pedidos.csv", "pedidos_clean.csv")
lineas = load_dataset("lineas_pedido", "lineas_pedido.csv", "lineas_pedido_clean.csv")
devol_online = load_dataset("devoluciones_online", "devoluciones_online.csv", "devoluciones_online_clean.csv")

ventas_pos = load_dataset("ventas_pos", "ventas_pos.csv", "ventas_pos_clean.csv")
devol_pos = load_dataset("devoluciones_tienda", "devoluciones_tienda.csv", "devoluciones_tienda_clean.csv")

print("\nShapes cargados:")
for name, df in {
    "crm": crm,
    "productos": productos,
    "tiendas": tiendas,
    "pedidos": pedidos,
    "lineas": lineas,
    "devol_online": devol_online,
    "ventas_pos": ventas_pos,
    "devol_pos": devol_pos,
}.items():
    print(f"- {name:14s}: {df.shape}")

[LOAD] crm: usando RAW -> ..\data\raw\crm.csv
[LOAD] productos: usando RAW -> ..\data\raw\productos.csv
[LOAD] tiendas: usando RAW -> ..\data\raw\tiendas.csv
[LOAD] pedidos: usando RAW -> ..\data\raw\pedidos.csv
[LOAD] lineas_pedido: usando RAW -> ..\data\raw\lineas_pedido.csv
[LOAD] devoluciones_online: usando RAW -> ..\data\raw\devoluciones_online.csv
[LOAD] ventas_pos: usando RAW -> ..\data\raw\ventas_pos.csv
[LOAD] devoluciones_tienda: usando RAW -> ..\data\raw\devoluciones_tienda.csv

Shapes cargados:
- crm           : (103, 21)
- productos     : (101, 19)
- tiendas       : (8, 20)
- pedidos       : (656, 17)
- lineas        : (1949, 13)
- devol_online  : (224, 13)
- ventas_pos    : (2521, 20)
- devol_pos     : (211, 17)


## 4) Preparación mínima (IDs, fechas, importes) por dataset

Creamos funciones por tabla para:
- estandarizar IDs (`*_clean`)
- parsear fechas (`*_dt`)
- parsear importes/cantidades (`*_num`)

In [12]:
def prepare_crm(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["customer_id_clean"] = standardize_id(out.get("customer_id"))
    out["fecha_alta_dt"] = parse_datetime_es(out.get("fecha_alta_programa"))
    out["fecha_baja_dt"] = parse_datetime_es(out.get("fecha_baja"))
    out["puntos_num"] = parse_numeric_mixed(out.get("puntos_acumulados"))
    return out


def prepare_productos(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["product_id_clean"] = standardize_id(out.get("product_id"))
    out["precio_venta_num"] = parse_numeric_mixed(out.get("precio_venta"))
    out["coste_unitario_num"] = parse_numeric_mixed(out.get("coste_unitario"))
    out["fecha_alta_catalogo_dt"] = parse_datetime_es(out.get("fecha_alta_catalogo"))
    out["iva_pct_num"] = parse_numeric_mixed(out.get("iva_pct"))
    return out


def prepare_tiendas(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["store_id_clean"] = standardize_id(out.get("store_id"))
    out["fecha_apertura_dt"] = parse_datetime_es(out.get("fecha_apertura"))
    out["metros_cuadrados_num"] = parse_numeric_mixed(out.get("metros_cuadrados"))
    out["lat_num"] = parse_numeric_mixed(out.get("lat"))
    out["lon_num"] = parse_numeric_mixed(out.get("lon"))
    return out


def prepare_pedidos(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["order_id_clean"] = standardize_id(out.get("order_id"))
    out["customer_id_clean"] = standardize_id(out.get("customer_id"))
    out["fecha_pedido_dt"] = parse_datetime_es(out.get("fecha_pedido"))
    out["importe_total_num"] = parse_numeric_mixed(out.get("importe_total"))
    out["gastos_envio_num"] = parse_numeric_mixed(out.get("gastos_envio"))
    return out


def prepare_lineas(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["order_line_id_clean"] = standardize_id(out.get("order_line_id"))
    out["order_id_clean"] = standardize_id(out.get("order_id"))
    out["product_id_clean"] = standardize_id(out.get("product_id"))

    out["cantidad_num"] = parse_numeric_mixed(out.get("cantidad"))
    out["precio_unitario_num"] = parse_numeric_mixed(out.get("precio_unitario"))
    out["descuento_pct_num"] = parse_numeric_mixed(out.get("descuento_pct"))
    out["descuento_importe_num"] = parse_numeric_mixed(out.get("descuento_importe"))
    out["importe_linea_num"] = parse_numeric_mixed(out.get("importe_linea"))
    out["iva_pct_num"] = parse_numeric_mixed(out.get("iva_pct"))
    return out


def prepare_devol_online(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["return_id_clean"] = standardize_id(out.get("return_id"))
    out["order_id_clean"] = standardize_id(out.get("order_id"))
    out["order_line_id_clean"] = standardize_id(out.get("order_line_id"))
    out["product_id_clean"] = standardize_id(out.get("product_id"))
    out["fecha_devolucion_dt"] = parse_datetime_es(out.get("fecha_devolucion"))
    out["cantidad_devuelta_num"] = parse_numeric_mixed(out.get("cantidad_devuelta"))
    out["importe_reembolsado_num"] = parse_numeric_mixed(out.get("importe_reembolsado"))
    return out


def prepare_ventas_pos(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["ticket_line_id_clean"] = standardize_id(out.get("ticket_line_id"))
    out["ticket_id_clean"] = standardize_id(out.get("ticket_id"))
    out["store_id_clean"] = standardize_id(out.get("store_id"))
    out["customer_id_clean"] = standardize_id(out.get("customer_id"))
    out["product_id_clean"] = standardize_id(out.get("product_id"))

    out["fecha_hora_dt"] = parse_datetime_es(out.get("fecha_hora"))
    out["cantidad_num"] = parse_numeric_mixed(out.get("cantidad"))
    out["precio_unitario_num"] = parse_numeric_mixed(out.get("precio_unitario"))
    out["descuento_pct_num"] = parse_numeric_mixed(out.get("descuento_pct"))
    out["descuento_importe_num"] = parse_numeric_mixed(out.get("descuento_importe"))
    out["importe_linea_num"] = parse_numeric_mixed(out.get("importe_linea"))
    return out


def prepare_devol_pos(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["return_id_clean"] = standardize_id(out.get("return_id"))
    out["store_id_clean"] = standardize_id(out.get("store_id"))
    out["ticket_id_original_clean"] = standardize_id(out.get("ticket_id_original"))
    out["ticket_line_id_original_clean"] = standardize_id(out.get("ticket_line_id_original"))
    out["order_id_original_clean"] = standardize_id(out.get("order_id_original"))
    out["customer_id_clean"] = standardize_id(out.get("customer_id"))
    out["product_id_clean"] = standardize_id(out.get("product_id"))

    out["fecha_devolucion_dt"] = parse_datetime_es(out.get("fecha_devolucion"))
    out["cantidad_devuelta_num"] = parse_numeric_mixed(out.get("cantidad_devuelta"))
    out["importe_reembolsado_num"] = parse_numeric_mixed(out.get("importe_reembolsado"))
    return out


crm_p = prepare_crm(crm)
productos_p = prepare_productos(productos)
tiendas_p = prepare_tiendas(tiendas)

pedidos_p = prepare_pedidos(pedidos)
lineas_p = prepare_lineas(lineas)
devol_online_p = prepare_devol_online(devol_online)
ventas_pos_p = prepare_ventas_pos(ventas_pos)
devol_pos_p = prepare_devol_pos(devol_pos)

print("[OK] Preparación mínima aplicada.")

[OK] Preparación mínima aplicada.


  dt = pd.to_datetime(s, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt2 = pd.to_datetime(s2, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt2 = pd.to_datetime(s2, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt = pd.to_datetime(s, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt = pd.to_datetime(s, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt2 = pd.to_datetime(s2, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt = pd.to_datetime(s, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt = pd.to_datetime(s, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt2 = pd.to_datetime(s2, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt = pd.to_datetime(s, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt = pd.to_datetime(s, errors="coerce", dayfirst=dayfirst, infer_datetime_format=True)
  dt2 = pd.to

## 5) Construcción de la Fact Table

Construimos 4 bloques y concatenamos:
1. Ventas ONLINE = `pedidos` + `lineas_pedido`
2. Devoluciones ONLINE = `devoluciones_online` (enlazada con pedidos cuando sea posible)
3. Ventas POS = `ventas_pos`
4. Devoluciones POS = `devoluciones_tienda` (enlazada con ticket_line original cuando sea posible)

Regla de modelado (didáctica):
- Para devoluciones, `quantity` y `amount_net` se guardan **en negativo**.

In [13]:
FACT_COLS = [
    "source_system", "source_table",
    "channel", "movement_type",
    "event_dt", "event_dt_raw",
    "currency",
    "order_id", "order_id_clean",
    "order_line_id", "order_line_id_clean",
    "ticket_id", "ticket_id_clean",
    "ticket_line_id", "ticket_line_id_clean",
    "return_id", "return_id_clean",
    "store_id", "store_id_clean",
    "customer_id", "customer_id_clean",
    "product_id", "product_id_clean",
    "quantity_raw", "quantity",
    "unit_price_raw", "unit_price",
    "discount_amount_raw", "discount_amount",
    "discount_pct_raw", "discount_pct",
    "amount_net_raw", "amount_net",
    "refund_amount_raw", "refund_amount",
    "status", "payment_method",
    "notes",
]

def _select_existing(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    existing = [c for c in cols if c in df.columns]
    out = df[existing].copy()
    for c in cols:
        if c not in out.columns:
            out[c] = np.nan
    return out[cols]

def build_fact_online_sales(pedidos: pd.DataFrame, lineas: pd.DataFrame) -> pd.DataFrame:
    p = pedidos.copy()
    l = lineas.copy()

    p_small = p[[
        "order_id", "order_id_clean",
        "fecha_pedido", "fecha_pedido_dt",
        "customer_id", "customer_id_clean",
        "moneda", "estado_pedido", "metodo_pago"
    ]].copy()

    merged = l.merge(p_small, on="order_id_clean", how="left", suffixes=("_line", "_order"))

    out = pd.DataFrame()
    out["source_system"] = "ECOM"
    out["source_table"] = "lineas_pedido"
    out["channel"] = "ONLINE"
    out["movement_type"] = "SALE"

    out["event_dt"] = merged["fecha_pedido_dt"]
    out["event_dt_raw"] = merged["fecha_pedido"]
    out["currency"] = merged.get("moneda")

    out["order_id"] = merged["order_id_order"]
    out["order_id_clean"] = merged["order_id_clean"]
    out["order_line_id"] = merged.get("order_line_id")
    out["order_line_id_clean"] = merged.get("order_line_id_clean")

    out["ticket_id"] = np.nan
    out["ticket_id_clean"] = np.nan
    out["ticket_line_id"] = np.nan
    out["ticket_line_id_clean"] = np.nan
    out["return_id"] = np.nan
    out["return_id_clean"] = np.nan

    out["store_id"] = "ONLINE"
    out["store_id_clean"] = "ONLINE"

    out["customer_id"] = merged.get("customer_id")
    out["customer_id_clean"] = merged.get("customer_id_clean")

    out["product_id"] = merged.get("product_id")
    out["product_id_clean"] = merged.get("product_id_clean")

    out["quantity_raw"] = merged.get("cantidad")
    out["quantity"] = merged.get("cantidad_num")

    out["unit_price_raw"] = merged.get("precio_unitario")
    out["unit_price"] = merged.get("precio_unitario_num")

    out["discount_amount_raw"] = merged.get("descuento_importe")
    out["discount_amount"] = merged.get("descuento_importe_num")

    out["discount_pct_raw"] = merged.get("descuento_pct")
    out["discount_pct"] = merged.get("descuento_pct_num")

    out["amount_net_raw"] = merged.get("importe_linea")
    out["amount_net"] = merged.get("importe_linea_num")

    calc_net = (out["quantity"] * out["unit_price"]) - out["discount_amount"].fillna(0)
    out["amount_net"] = out["amount_net"].fillna(calc_net)

    out["refund_amount_raw"] = np.nan
    out["refund_amount"] = np.nan

    out["status"] = merged.get("estado_pedido")
    out["payment_method"] = merged.get("metodo_pago")
    out["notes"] = merged.get("_source_loaded_from")

    return _select_existing(out, FACT_COLS)

def build_fact_online_returns(devol: pd.DataFrame, pedidos: pd.DataFrame) -> pd.DataFrame:
    d = devol.copy()

    p_small = pedidos[["order_id_clean", "customer_id", "customer_id_clean", "moneda"]].copy()
    d = d.merge(p_small, on="order_id_clean", how="left", suffixes=("", "_pedido"))

    out = pd.DataFrame()
    out["source_system"] = "ECOM"
    out["source_table"] = "devoluciones_online"
    out["channel"] = "ONLINE"
    out["movement_type"] = "RETURN"

    out["event_dt"] = d.get("fecha_devolucion_dt")
    out["event_dt_raw"] = d.get("fecha_devolucion")
    out["currency"] = d.get("moneda").combine_first(d.get("moneda_pedido"))

    out["order_id"] = d.get("order_id")
    out["order_id_clean"] = d.get("order_id_clean")
    out["order_line_id"] = d.get("order_line_id")
    out["order_line_id_clean"] = d.get("order_line_id_clean")

    out["ticket_id"] = np.nan
    out["ticket_id_clean"] = np.nan
    out["ticket_line_id"] = np.nan
    out["ticket_line_id_clean"] = np.nan

    out["return_id"] = d.get("return_id")
    out["return_id_clean"] = d.get("return_id_clean")

    out["store_id"] = "ONLINE"
    out["store_id_clean"] = "ONLINE"

    out["customer_id"] = d.get("customer_id").combine_first(d.get("customer_id_pedido"))
    out["customer_id_clean"] = d.get("customer_id_clean").combine_first(d.get("customer_id_clean_pedido"))

    out["product_id"] = d.get("product_id")
    out["product_id_clean"] = d.get("product_id_clean")

    out["quantity_raw"] = d.get("cantidad_devuelta")
    out["quantity"] = -1 * d.get("cantidad_devuelta_num")

    out["refund_amount_raw"] = d.get("importe_reembolsado")
    out["refund_amount"] = d.get("importe_reembolsado_num")

    out["amount_net_raw"] = d.get("importe_reembolsado")
    out["amount_net"] = -1 * d.get("importe_reembolsado_num")

    out["unit_price_raw"] = np.nan
    out["unit_price"] = np.nan
    out["discount_amount_raw"] = np.nan
    out["discount_amount"] = np.nan
    out["discount_pct_raw"] = np.nan
    out["discount_pct"] = np.nan

    out["status"] = d.get("estado_devolucion")
    out["payment_method"] = d.get("metodo_devolucion")
    out["notes"] = d.get("motivo_devolucion")

    return _select_existing(out, FACT_COLS)

def build_fact_pos_sales(ventas: pd.DataFrame) -> pd.DataFrame:
    v = ventas.copy()

    out = pd.DataFrame()
    out["source_system"] = "POS"
    out["source_table"] = "ventas_pos"
    out["channel"] = "POS"
    out["movement_type"] = "SALE"

    out["event_dt"] = v.get("fecha_hora_dt")
    out["event_dt_raw"] = v.get("fecha_hora")
    out["currency"] = v.get("moneda")

    out["ticket_id"] = v.get("ticket_id")
    out["ticket_id_clean"] = v.get("ticket_id_clean")
    out["ticket_line_id"] = v.get("ticket_line_id")
    out["ticket_line_id_clean"] = v.get("ticket_line_id_clean")

    out["order_id"] = np.nan
    out["order_id_clean"] = np.nan
    out["order_line_id"] = np.nan
    out["order_line_id_clean"] = np.nan

    out["return_id"] = np.nan
    out["return_id_clean"] = np.nan

    out["store_id"] = v.get("store_id")
    out["store_id_clean"] = v.get("store_id_clean")

    out["customer_id"] = v.get("customer_id")
    out["customer_id_clean"] = v.get("customer_id_clean")

    out["product_id"] = v.get("product_id")
    out["product_id_clean"] = v.get("product_id_clean")

    out["quantity_raw"] = v.get("cantidad")
    out["quantity"] = v.get("cantidad_num")

    out["unit_price_raw"] = v.get("precio_unitario")
    out["unit_price"] = v.get("precio_unitario_num")

    out["discount_amount_raw"] = v.get("descuento_importe")
    out["discount_amount"] = v.get("descuento_importe_num")

    out["discount_pct_raw"] = v.get("descuento_pct")
    out["discount_pct"] = v.get("descuento_pct_num")

    out["amount_net_raw"] = v.get("importe_linea")
    out["amount_net"] = v.get("importe_linea_num")

    calc_net = (out["quantity"] * out["unit_price"]) - out["discount_amount"].fillna(0)
    out["amount_net"] = out["amount_net"].fillna(calc_net)

    out["refund_amount_raw"] = np.nan
    out["refund_amount"] = np.nan

    out["status"] = "OK"
    out["payment_method"] = np.nan
    out["notes"] = v.get("observaciones")

    return _select_existing(out, FACT_COLS)

def build_fact_pos_returns(devol: pd.DataFrame, ventas_pos: pd.DataFrame) -> pd.DataFrame:
    d = devol.copy()

    v_small = ventas_pos[[
        "ticket_line_id_clean", "ticket_id_clean",
        "precio_unitario_num", "descuento_importe_num", "descuento_pct_num",
        "importe_linea_num"
    ]].copy()

    d = d.merge(v_small, left_on="ticket_line_id_original_clean", right_on="ticket_line_id_clean",
                how="left", suffixes=("", "_orig"))

    out = pd.DataFrame()
    out["source_system"] = "POS"
    out["source_table"] = "devoluciones_tienda"
    out["channel"] = "POS"
    out["movement_type"] = "RETURN"

    out["event_dt"] = d.get("fecha_devolucion_dt")
    out["event_dt_raw"] = d.get("fecha_devolucion")
    out["currency"] = d.get("moneda")

    out["ticket_id"] = d.get("ticket_id_original")
    out["ticket_id_clean"] = d.get("ticket_id_original_clean")
    out["ticket_line_id"] = d.get("ticket_line_id_original")
    out["ticket_line_id_clean"] = d.get("ticket_line_id_original_clean")

    out["order_id"] = d.get("order_id_original")
    out["order_id_clean"] = d.get("order_id_original_clean")
    out["order_line_id"] = np.nan
    out["order_line_id_clean"] = np.nan

    out["return_id"] = d.get("return_id")
    out["return_id_clean"] = d.get("return_id_clean")

    out["store_id"] = d.get("store_id")
    out["store_id_clean"] = d.get("store_id_clean")

    out["customer_id"] = d.get("customer_id")
    out["customer_id_clean"] = d.get("customer_id_clean")

    out["product_id"] = d.get("product_id")
    out["product_id_clean"] = d.get("product_id_clean")

    out["quantity_raw"] = d.get("cantidad_devuelta")
    out["quantity"] = -1 * d.get("cantidad_devuelta_num")

    out["refund_amount_raw"] = d.get("importe_reembolsado")
    out["refund_amount"] = d.get("importe_reembolsado_num")

    out["amount_net_raw"] = d.get("importe_reembolsado")
    out["amount_net"] = -1 * d.get("importe_reembolsado_num")

    # info de la venta original si existe
    out["unit_price_raw"] = np.nan
    out["unit_price"] = d.get("precio_unitario_num")

    out["discount_amount_raw"] = np.nan
    out["discount_amount"] = d.get("descuento_importe_num")

    out["discount_pct_raw"] = np.nan
    out["discount_pct"] = d.get("descuento_pct_num")

    out["status"] = d.get("estado_devolucion")
    out["payment_method"] = d.get("metodo_reembolso")
    out["notes"] = coalesce(d.get("motivo_devolucion"), d.get("canal_origen_venta"))

    return _select_existing(out, FACT_COLS)

fact_online_sales = build_fact_online_sales(pedidos_p, lineas_p)
fact_online_ret = build_fact_online_returns(devol_online_p, pedidos_p)
fact_pos_sales = build_fact_pos_sales(ventas_pos_p)
fact_pos_ret = build_fact_pos_returns(devol_pos_p, ventas_pos_p)

fact_all = pd.concat([fact_online_sales, fact_online_ret, fact_pos_sales, fact_pos_ret], ignore_index=True)

print("Filas por bloque:")
print(" - ONLINE sales :", len(fact_online_sales))
print(" - ONLINE ret   :", len(fact_online_ret))
print(" - POS sales    :", len(fact_pos_sales))
print(" - POS ret      :", len(fact_pos_ret))
print("\nTOTAL fact_all :", len(fact_all))

fact_all.head(5)

AttributeError: 'NoneType' object has no attribute 'dtype'

## 6) Enriquecimiento con dimensiones (clientes, productos, tiendas)

- Productos: categoría, subcategoría, marca, coste unitario, precio lista.
- Clientes: tier, puntos, estado, consentimiento marketing.
- Tiendas: ciudad, región, tipo (ONLINE vs física), etc.

En datasets con errores es normal que existan **huérfanos** (IDs en fact que no aparezcan en dimensiones).
Luego mediremos ese %.

In [8]:
def enrich_with_product_dim(fact: pd.DataFrame, productos: pd.DataFrame) -> pd.DataFrame:
    p = productos.copy()
    p = p.sort_values(by=["product_id_clean"]).drop_duplicates(subset=["product_id_clean"], keep="first")

    cols_keep = [
        "product_id_clean", "nombre_producto", "categoria", "subcategoria", "marca",
        "precio_venta_num", "coste_unitario_num", "estado_producto"
    ]
    p_small = p[[c for c in cols_keep if c in p.columns]].copy()
    return fact.merge(p_small, on="product_id_clean", how="left")

def enrich_with_customer_dim(fact: pd.DataFrame, crm: pd.DataFrame) -> pd.DataFrame:
    c = crm.copy()
    c = c.sort_values(by=["customer_id_clean"]).drop_duplicates(subset=["customer_id_clean"], keep="first")

    cols_keep = [
        "customer_id_clean", "tier_fidelizacion", "puntos_num",
        "consentimiento_marketing", "estado_cliente",
        "ciudad", "provincia", "codigo_postal", "pais"
    ]
    c_small = c[[c for c in cols_keep if c in c.columns]].copy()
    return fact.merge(c_small, on="customer_id_clean", how="left")

def enrich_with_store_dim(fact: pd.DataFrame, tiendas: pd.DataFrame) -> pd.DataFrame:
    t = tiendas.copy()
    t = t.sort_values(by=["store_id_clean"]).drop_duplicates(subset=["store_id_clean"], keep="first")

    cols_keep = [
        "store_id_clean", "nombre_tienda", "tipo_ubicacion", "canal",
        "ciudad", "provincia", "region", "estado"
    ]
    t_small = t[[c for c in cols_keep if c in t.columns]].copy()
    return fact.merge(t_small, on="store_id_clean", how="left")

fact_enriched = fact_all.copy()
fact_enriched = enrich_with_product_dim(fact_enriched, productos_p)
fact_enriched = enrich_with_customer_dim(fact_enriched, crm_p)
fact_enriched = enrich_with_store_dim(fact_enriched, tiendas_p)

fact_enriched.head(5)

NameError: name 'fact_all' is not defined

## 7) Métricas derivadas en la fact table

Calculamos métricas útiles para análisis posteriores:
- `quantity_abs`
- `gross_amount` (qty * unit_price) cuando tengamos unit_price
- `amount_net` ya viene con signo (ventas +, devoluciones -)
- `cost_total_signed` = coste_unitario * quantity (devoluciones queda negativo)
- `margin_signed` = amount_net - cost_total_signed

In [None]:
def add_fact_metrics(fact: pd.DataFrame) -> pd.DataFrame:
    out = fact.copy()

    out["quantity_abs"] = out["quantity"].abs()
    out["gross_amount"] = out["quantity"] * out["unit_price"]

    out["cost_total_signed"] = out["coste_unitario_num"] * out["quantity"]
    out["margin_signed"] = out["amount_net"] - out["cost_total_signed"]

    out["is_return"] = out["movement_type"].eq("RETURN")
    out["event_date"] = out["event_dt"].dt.date

    return out

fact_final = add_fact_metrics(fact_enriched)

fact_final[[
    "channel", "movement_type", "event_dt", "store_id_clean", "customer_id_clean", "product_id_clean",
    "quantity", "amount_net", "coste_unitario_num", "cost_total_signed", "margin_signed"
]].head(10)

## 8) Comprobaciones básicas post-integración (Data Quality)

Cuantificamos problemas típicos tras integrar:
- % de fechas no parseadas (`event_dt` NaT)
- % de productos no encontrados en catálogo
- % de tiendas no encontradas
- % de customer_id desconocidos (normal en POS por no identificar al cliente)
- Duplicados potenciales por claves naturales
- Rango de fechas (min/max) y outliers temporales

In [None]:
def null_rate(series: pd.Series) -> float:
    return float(series.isna().mean())

def coverage_report(fact: pd.DataFrame) -> pd.DataFrame:
    rows = []
    rows.append(("event_dt (parsed)", 1 - null_rate(fact["event_dt"])))
    rows.append(("product match (nombre_producto)", 1 - null_rate(fact["nombre_producto"])))
    rows.append(("store match (nombre_tienda)", 1 - null_rate(fact["nombre_tienda"])))
    rows.append(("customer match (tier_fidelizacion)", 1 - null_rate(fact["tier_fidelizacion"])))
    return pd.DataFrame(rows, columns=["metric", "coverage"]).sort_values("coverage")

def duplicates_by_natural_key(fact: pd.DataFrame) -> pd.DataFrame:
    nat = (
        fact["channel"].astype(str) + "|" +
        fact["movement_type"].astype(str) + "|" +
        coalesce(fact["order_line_id_clean"], fact["ticket_line_id_clean"], fact["return_id_clean"]).astype(str)
    )
    tmp = fact.copy()
    tmp["natural_key"] = nat.replace("nan", np.nan)
    dup = tmp["natural_key"].duplicated(keep=False)
    return tmp.loc[dup, ["natural_key", "channel", "movement_type", "order_line_id", "ticket_line_id", "return_id"]].sort_values("natural_key")

def date_range_report(fact: pd.DataFrame) -> Dict[str, object]:
    dt = fact["event_dt"]
    return {
        "min_event_dt": dt.min(),
        "max_event_dt": dt.max(),
        "pct_event_dt_null": null_rate(dt),
    }

print("Cobertura (tras joins/parseo):")
display(coverage_report(fact_final))

print("\nRango de fechas:")
print(date_range_report(fact_final))

print("\nDistribución por canal / tipo movimiento:")
display(
    fact_final.groupby(["channel", "movement_type"], dropna=False)
    .size()
    .reset_index(name="rows")
    .sort_values("rows", ascending=False)
)

dups = duplicates_by_natural_key(fact_final)
print(f"\nPosibles duplicados por clave natural: {len(dups)} filas (muestra 10)")
display(dups.head(10))

## 9) Exportación

Exportamos la fact table integrada a:
- `output_integrated/fact_transacciones_integrada.csv`

Esta salida será la base del Notebook 5 (features: RFM, churn, basket, etc.).

In [None]:
def add_fact_id(fact: pd.DataFrame, prefix: str = "F") -> pd.DataFrame:
    out = fact.copy().reset_index(drop=True)
    out.insert(0, "fact_id", [f"{prefix}{i:07d}" for i in range(1, len(out) + 1)])
    return out

fact_export = add_fact_id(fact_final)

out_path = OUT_DIR / "fact_transacciones_integrada.csv"
fact_export.to_csv(out_path, index=False, encoding="utf-8")

print("[OK] Exportado:", out_path)
print("Shape:", fact_export.shape)
fact_export.head(3)

## 10) Siguientes pasos (Notebook 5)

Con `fact_transacciones_integrada.csv` ya puedes:
- Calcular **RFM** por cliente.
- Derivar features de **tasa de devoluciones**, **canal preferido**, **uso de descuentos**, etc.
- Ejecutar **Market Basket Analysis** (por `order_id` y `ticket_id`).
- Construir un dataset de **propensión** (features + etiqueta) para ML.