# Laboratorio 2: Data Understanding

**Universidad del Valle de Guatemala**  
**Facultad de Ingeniería**  
**Departamento de Ciencias de la Computación**  
**Machine Learning Operations** 

## Integrantes

- Arturo Argueta - 21527 
- Edwin de León - 22809 
- Diego Leiva - 21752 
- Pablo Orellana - 21970

## Librerías

In [1]:
from pathlib import Path
import pandas as pd
from tqdm import tqdm
from typing import Dict, Tuple
import logging

## Logging

In [2]:
# Basic logging configuration
logging.basicConfig(
    level=logging.INFO,  # Change to DEBUG for more detailed output
    format="[%(levelname)s] - %(message)s",
    datefmt="%H:%M:%S"
)
logger = logging.getLogger(__name__)

## Lectura de datos

Durante el análisis exploratorio se encontró que hay conjuntos de datos con codificaciones diferentes a la típica utf-8, por lo que se necesita una lectura segura de datos

In [3]:
def read_csv_with_fallback(
        path: Path, 
        encodings: Tuple[str, ...] = ("utf-8", "utf-8-sig", "latin1", "cp1252"), 
        **pd_kwargs
    ) -> Tuple[pd.DataFrame, str]:
    """
    Attempts to read a CSV file with various encodings to find the correct one.

    Args:
        path (str): Path to the CSV file.
        encodings (tuple): Encodings to try.

    Returns:
        Tuple[pd.DataFrame, str]: The DataFrame read and the encoding used.
    """
    last_error = None

    # Iterating over encodings
    for encoding in encodings:
        try:
            # Try reading the CSV with the current encoding
            df = pd.read_csv(path, encoding=encoding, **pd_kwargs)
            # Standardize column names
            df.columns = (df.columns
                          .str.strip() # Remove leading and trailing whitespace
                          .str.replace(r"\s+", " ", regex=True) # Replace multiple spaces with a single space
                         )
            
            # If successful, return the DataFrame and the encoding used
            return df, encoding

        # If it fails, try the next encoding
        except UnicodeDecodeError as e:
            last_error = e

        # If it fails, capture the error
        except Exception as e:
            # Other errors; keep track and keep trying in case it's just encoding
            last_error = e

    # Last "tolerant" attempt: utf-8 with replacement for bad characters
    try:
        with open(path, "r", encoding="utf-8", errors="replace") as f:
            # Try reading the CSV with the current encoding
            df = pd.read_csv(f, **pd_kwargs)
            # Standardize column names
            df.columns = df.columns.str.strip().str.replace(r"\s+", " ", regex=True)
            # If successful, return the DataFrame and the encoding used
            return df, "utf-8(errors=replace)"
    
    # If not successful, keep track of the last error
    except Exception:
        raise last_error

In [4]:
def load_csvs(
        folder: str,
        pattern: str = "*.csv",
        encodings: Tuple[str, ...] = ("utf-8", "utf-8-sig", "latin1", "cp1252"),
        **pd_kwargs
    ) -> Dict[str, pd.DataFrame]:
    """
    Load CSV files from a folder into a dictionary of DataFrames.
    
    Args:
        folder (str): The folder containing the CSV files.
        pattern (str, optional): The glob pattern to match CSV files. Defaults to "*.csv".
        encodings (Tuple[str, ...], optional): The encodings to try when reading CSV files. Defaults to ("utf-8", "latin1", "cp1252").
        **pd_kwargs: Additional keyword arguments to pass to pandas read_csv.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary mapping file names (without extensions) to DataFrames.
    """
    # Get a list of all CSV files in the folder
    files = sorted(Path(folder).glob(pattern))
    dfs: Dict[str, pd.DataFrame] = {}

    # Read each CSV file into a DataFrame
    for file in tqdm(files, desc="Reading CSVs", unit="file"):
        df, encoding = read_csv_with_fallback(file, encodings=encodings, **pd_kwargs)
        name = file.stem.lower()  # Use the file name without extension as the key
        dfs[name] = df # Store the DataFrame with the file name (without extension) as the key
        logger.debug(f"Loaded {name} with encoding {encoding}")

    return dfs

## Limpieza

### Normalización de Strings

In [5]:
def normalize_strings(
        df: pd.DataFrame, 
        cols: list[str]
    ) -> pd.DataFrame:
    """
    Normalize string columns in a DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame to process.
        cols (list[str]): The columns to normalize.

    Returns:
        pd.DataFrame: The DataFrame with normalized string columns.
    """
    out = df.copy() # Create a copy of the DataFrame
    # Iterate over the specified columns
    for c in cols:
        # Check if the column exists in the DataFrame
        if c in out.columns:
            # Normalize the string values in the column
            out[c] = (out[c].astype("string")
                      .str.strip() # Remove leading and trailing whitespace
                      .str.replace(r"\s+", " ", regex=True) # Replace multiple spaces with a single space
                     )

    return out

### Coerción de tipos en llaves

In [6]:
def coerce_column_types(
        dfs: Dict[str, pd.DataFrame], 
        schema: Dict[str, Dict[str, str]]
    ) -> Dict[str, pd.DataFrame]:
    """
    Coerce DataFrame column types according to a schema.

    Args:
        dfs (Dict[str, pd.DataFrame]): The input DataFrames.
        schema (Dict[str, Dict[str, str]]): The schema defining the desired column types.

    Returns:
        Dict[str, pd.DataFrame]: The DataFrames with coerced column types.
    """
    out: Dict[str, pd.DataFrame] = dfs.copy() # Create output dictionary

    # Iterate over the input DataFrames
    for name, df in dfs.items():
        dfo = df.copy() # Create a copy of the DataFrame

        # Coerce types according to the schema
        if name in schema:
            # Iterate over the columns and their desired types
            for col, dtype in schema[name].items():
                if col in dfo.columns:
                    # Coerce the column to the desired type
                    dfo[col] = pd.to_numeric(dfo[col], errors="coerce") if dtype in ("Int64","Float64","int","float") else dfo[col].astype(dtype)
        out[name] = dfo
    return out

### Manejo de duplicados

Durante el análisis exploratorio se encontró que los conjuntos de datos de clientes y eventos tienen registros completos duplicados.

In [7]:
def duplicates_handling(
        df: pd.DataFrame,
        subset: list[str] | None = None
    ) -> pd.DataFrame:
    """
    Handle duplicates in a DataFrame.
    
    Args:
        df (pd.DataFrame): The DataFrame to process.
        subset (list[str] | None): The columns to consider for identifying duplicates.

    Returns:
        pd.DataFrame: The DataFrame without duplicates.
    """
    # Drop duplicates based on the subset of columns
    df_unique = df.drop_duplicates(subset=subset, keep="first").copy()
    # Reset the index
    df_unique.reset_index(drop=True, inplace=True)

    return df_unique

### Manejo de nulos

Durante el análisis exploratorio se encontró que el conjunto de datos de  `clientes` tiene 281 registros nulos en cada columna (2.34%). en `eventos` solo la variable `transactionid` tiene 99.9% de valores nulos, y para `producto` las variables `categoria_id` y `marca_id` tienen 8.55% y 7.28% respectivamente y `precio` tiene un 0.05% de nulos.

Para tratarlos se decide eliminar los resgistros con datos nulos para `clientes` y la variable `precio`, para las variables `categoria_id` y `marca_id` se decide mapearlos como desconocido u otros. 

In [8]:
def next_free_id(
        series: pd.Series
    ) -> int:
    """
    Get the next free (unused) ID from a series of existing IDs.

    Args:
        series (pd.Series): Series of existing IDs.

    Returns:
        int: Next free ID.
    """
    # Get the next free ID
    if series.empty:
        # If the series is empty, return 1 as the next ID
        return 1
    
    return int(pd.to_numeric(series, errors="coerce").max()) + 1

In [9]:
def nulls_handling(
        dfs: Dict[str, pd.DataFrame]
    ) -> Dict[str, pd.DataFrame]:
    """
    Handle null values in the given DataFrames.
    
    Args:
        dfs (Dict[str, pd.DataFrame]): The DataFrames to process.
    
    Returns:
        Dict[str, pd.DataFrame]: The processed DataFrames.
    """
    out: Dict[str, pd.DataFrame] = dfs.copy() # Initialize output dictionary

    # --- Handle CATEGORIA ---
    category_df = dfs["categoria"].copy() # create a copy of the categoria DataFrame
    # Check if 'Otro' category exists
    if not (category_df["categoria"] == "Otro").any():
        # If not, create it
        new_category_id = next_free_id(category_df["id"]) # Get the next free ID
        # Append the new category with the next free ID and the name 'Otro'
        category_df = pd.concat(
            [category_df, pd.DataFrame([{"id": new_category_id, "categoria": "Otro"}])],
            ignore_index=True
        )
    out["categoria"] = category_df

    # --- Handle MARCA ---
    brand_df = dfs["marca"].copy() # create a copy of the marca DataFrame
    # Check if 'Otro' brand exists
    if not (brand_df["marca"] == "Otro").any():
        # If not, create it
        new_brand_id = next_free_id(brand_df["id"]) # Get the next free ID
        # Append the new brand with the next free ID and the name 'Otro'
        brand_df = pd.concat(
            [brand_df, pd.DataFrame([{"id": new_brand_id, "marca": "Otro"}])],
            ignore_index=True
        )
    out["marca"] = brand_df

    # Get IDs for 'Otro' category and brand
    unknown_category_id = int(out["categoria"].loc[out["categoria"]["categoria"] == "Otro", "id"].iloc[0])
    unknown_brand_id = int(out["marca"].loc[out["marca"]["marca"] == "Otro", "id"].iloc[0])

    # --- Handle PRODUCTO ---
    product_df = dfs["producto"].copy()
    # Drop any rows with null values on precio
    product_df = product_df.dropna(subset=["precio"])
    # Impute null foreign keys as 'Otro'
    # Fill null values in 'categoria_id' with the ID of 'Otro' category
    product_df["categoria_id"] = product_df["categoria_id"].fillna(unknown_category_id)
    # Fill null values in 'marca_id' with the ID of 'Otro' brand
    product_df["marca_id"] = product_df["marca_id"].fillna(unknown_brand_id)
    out["producto"] = product_df

    # --- Handle CLIENTE ---
    client_df = dfs["cliente"].copy()
    # Drop any rows with null values as primary keys (id)
    client_df = client_df.dropna(subset=["id"]).copy()
    # Ensure all string columns are stripped of whitespace
    for col in ["nombre", "apellido", "genero", "empresa", "idioma", "nit", "puesto", "ciudad", "correo", "telefono"]:
        if col in client_df.columns:
            client_df[col] = client_df[col].astype("string").str.strip().str.replace(r"\s+", " ", regex=True)
    
    # --- Handle EVENTO ---
    event_df = dfs["evento"].copy() # create a copy of the evento DataFrame
    # Check if 'transactionid' column exists
    if "transactionid" in event_df.columns:
        # Drop the 'transactionid' column
        event_df = event_df.drop(columns=["transactionid"])
    out["evento"] = event_df

    return out

### Filtro de productos válidos

Se aplica un filtro obligatorio para conservar únicamente aquellos eventos cuyo `itemid` esté presente y referenciado en `producto.id`. De esta forma se eliminan todos los registros donde el producto del evento es desconocido, garantizando la integridad referencial con la tabla de productos y permitiendo calcular métricas consistentes de ventas, categorías y marcas.

In [10]:
def filter_events_by_valid_product(
        dfs: Dict[str, pd.DataFrame],
        itemid_col: str = "itemid",
        prodid_col: str = "id"
    ) -> pd.DataFrame:
    """
    Filter events by valid product IDs.

    Args:
        dataframes (Dict[str, pd.DataFrame]): Input DataFrames containing events and products.
        itemid_col (str): Name of the item ID column in the event DataFrame.
        prodid_col (str): Name of the product ID column in the product DataFrame.

    Returns:
        pd.DataFrame: DataFrame of filtered events.
    """
    # Create copies of event and product DataFrames
    events = dfs["evento"].copy()
    products = dfs["producto"].copy()

    # Check columns of keys are not missin
    if itemid_col not in events.columns or prodid_col not in products.columns:
        logger.warning(f"Columns {itemid_col} or {prodid_col} not found in DataFrames.")
        return events

    # Get valid product IDs
    prod_ids = set(products[prodid_col].dropna().unique())

    # Generate mask for valid events
    mask_valid = events[itemid_col].isin(prod_ids)
    total, valid = len(events), int(mask_valid.sum())
    invalid = total - valid

    logger.debug(f"Filtering events: {valid} valid, {invalid} invalid out of {total} total events.")

    # Filter events by valid product IDs
    valid_events = events.loc[mask_valid].copy()

    # Sanity check: all item IDs in valid events must be in product IDs
    assert valid_events[itemid_col].dropna().isin(prod_ids).all(), \
        "Mismatched item IDs found after filtering."

    return valid_events

### Manejo de fechas y timestamps

#### Conversión de timestamps a fecha

In [11]:
def convert_timestamp(
        df: pd.DataFrame, 
        tmstp_col: str = "timestamp", 
        date_col: str = "fecha"
    ) -> pd.DataFrame:
    """
    Convert timestamp (seconds or milliseconds) column to datetime.

    Args:
        df (pd.DataFrame): DataFrame to process.
        tmstp_col (str): Name of the timestamp column.
        date_col (str): Name of the resulting date column.

    Returns:
        pd.DataFrame: DataFrame with the converted date column.
    """
    df = df.copy()
    # Check if timestamp column exists
    if tmstp_col not in df.columns:
        return df

    # Determine the unit of the timestamp
    # Heuristic: If the value is less than 1e11, it's in seconds, otherwise milliseconds
    sample_val = df[tmstp_col].dropna().iloc[0] # Get a sample value from the timestamp column
    unit = 's' if sample_val < 1e11 else 'ms' # Determine the unit of the timestamp

    # Convert the timestamp column to datetime
    df[date_col] = pd.to_datetime(df[tmstp_col], unit=unit, errors="coerce")

    return df

#### Limpieza y formateo de fecha de nacimiento

In [12]:
def clean_birthdate_and_age(
        dfs: Dict[str, pd.DataFrame], 
        client_col: str = "cliente", 
        birth_col: str = "nacimiento", 
        age_col: str = "edad"
    ) -> Dict[str, pd.DataFrame]:
    """
    Clean birthdate and age fields in the specified DataFrame.
    Handle parsing, future date correction, and age recalculation.

    Args:
        dataframes (Dict[str, pd.DataFrame]): Dictionary of DataFrames.
        client_col (str): Key for the DataFrame to process.
        birth_col (str): Name of the birthdate column.
        age_col (str): Name of the age column.

    Returns:
        Dict[str, pd.DataFrame]: Updated dictionary of DataFrames.
    """
    # Check if client exists
    if client_col not in dfs:
        return dfs
    
    df = dfs[client_col].copy() # Create a copy of the DataFrame

    # Parse date field to datetime format
    dob = pd.to_datetime(df[birth_col], format="%m/%d/%y", errors="coerce")

    # Fallback
    mask_fallback = dob.isna() & df[birth_col].notna()
    if mask_fallback.any():
        dob_alt = pd.to_datetime(df.loc[mask_fallback, birth_col], errors="coerce")
        dob.loc[mask_fallback] = dob_alt

    # Correct future years
    today = pd.Timestamp.today().normalize() # Get today's date
    future_date_mask = dob > today # Identify future dates
    dob.loc[future_date_mask] -= pd.DateOffset(years=100) # Subtract 100 years from future dates

    # Compute age
    age = ((today - dob).dt.days / 365.25).round().astype("Int64")

    # Reassign age to the DataFrame
    df[birth_col] = dob
    df[age_col] = age

    dfs[client_col] = df # Update the DataFrame in the dictionary

    return dfs

## Transformación

### Mapeo de clientes desconocidos

Se crea una nueva entrada en la tabla clientes para agregar a cliente anónimo con datos vacios, esto para luego sustituir todos los ids de clientes desconocidos por un único cliente desconocido referenciado en la tabla clientes

In [13]:
def ensure_anonymous_client(
        client_df: pd.DataFrame,
        anon_name: str = "Anonimo",
        anon_gender: str = "NA"
    ) -> tuple[pd.DataFrame, int]:
    """
    Ensures that an anonymous client exists in the DataFrame.
    A new anonymous client is created if one does not already exist.

    Args:
        client_df (pd.DataFrame): The DataFrame containing client information.
        anon_name (str): The name to use for the anonymous client.
        anon_gender (str): The gender to use for the anonymous client.

    Returns:
        tuple[pd.DataFrame, int]: The updated DataFrame and the ID of the anonymous client.
    """
    df = client_df.copy() # Create a copy of the client DataFrame

    # Check if both nombre and apellido exist
    if {"nombre", "apellido"}.issubset(df.columns):
        # Generate a mask for existing anonymous client
        mask_existing = (df["nombre"].fillna("").str.casefold() == anon_name.casefold()) & (df["apellido"].fillna("") == "")
        if mask_existing.any():
            # If an anonymous client already exists, return the original DataFrame and its ID
            anon_id = int(df.loc[mask_existing, "id"].iloc[0])
            return df, anon_id

    # If no anonymous client exists, create a new one
    anon_id = next_free_id(pd.to_numeric(df["id"], errors="coerce")) # Generate a new ID

    # Define the anonymous client entry
    row = {c: pd.NA for c in df.columns} # Initialize all columns with NA
    row.update({
        "id": anon_id,
        "nombre": anon_name,
        "apellido": "",
        "genero": anon_gender
    })
    row_df = pd.DataFrame([row]).astype(df.dtypes.to_dict(), errors="ignore") # Create a DataFrame for the new row
    df = pd.concat([df, row_df], ignore_index=True)

    return df, anon_id

In [14]:
def map_unknown_visitors_to_anonymous(
        dfs: Dict[str, pd.DataFrame],
        visitor_col: str = "visitorid",
        client_col: str = "id",
        anon_id: int = None,
    ) -> pd.DataFrame:
    """
    Map unknown visitors to an anonymous client in the event DataFrame.
    Args:
        dfs (Dict[str, pd.DataFrame]): Input DataFrames containing events and clients.
        visitor_col (str): Name of the visitor ID column in the event DataFrame.
        client_col (str): Name of the client ID column in the client DataFrame.
        anon_id (int): ID of the anonymous client.

    Returns:
        pd.DataFrame: DataFrame of events with unknown visitors mapped to the anonymous client.
    """
    mapped_events = dfs["evento"].copy() # Create a copy of the event DataFrame
    clients = dfs["cliente"].copy() # Create a copy of the client DataFrame

    # Set of valid client IDs
    valid_ids = set(pd.to_numeric(clients[client_col], errors="coerce").dropna().unique())

    # Add known flag dimension
    mapped_events["cliente_conocido"] = mapped_events[visitor_col].isin(valid_ids)

    # Mask of unknowns, NaN or not in clients features
    mask_unknown = (~mapped_events[visitor_col].isin(valid_ids)) | (mapped_events[visitor_col].isna())

    if anon_id is None:
        # Ensure an anonymous client exists
        raise ValueError("Anonymous client ID must be provided or created.")

    # Reassign unknown visitors to the anonymous client
    mapped_events.loc[mask_unknown, visitor_col] = anon_id

    return mapped_events

### Creación de dimensiones adicionales en tablas

In [15]:
def build_dim_cliente(
        client_df: pd.DataFrame,
        full_name_col: str = "cliente"
    ) -> pd.DataFrame:
    """
    Build a dimension for clients with a full name column.
    A full name is created by merging the "nombre" and "apellido" columns.

    Args:
        client_df (pd.DataFrame): The DataFrame containing client information.
        full_name_col (str): The name of the full name column to create.

    Returns:
        pd.DataFrame: The updated DataFrame with the full name column.
    """
    base_cols = client_df.columns.tolist() # Get all columns from the client DataFrame
    base_cols.append("edad") # Add age column if it exists

    # Merge full name
    client_df[full_name_col] = (
        client_df.get("nombre", "").fillna("") + " " +
        client_df.get("apellido", "").fillna("")
        ).str.strip()
    
    return client_df

### Selección de Dimensiones de las tablas

In [16]:
def select_columns(
        df: pd.DataFrame,
        cols: list[str]
    ) -> pd.DataFrame:
    """
    Creates a copy of the DataFrame with only the specified columns, in the given order,
    silently ignoring any that do not exist.

    Args:
        df (pd.DataFrame): The input DataFrame.
        cols (list[str]): The columns to select.

    Returns:
        pd.DataFrame: A copy of the DataFrame with only the selected columns.
    """
    # Select only the columns that exist in the DataFrame
    keep = [c for c in cols if c in df.columns]
    selected = df[keep].copy()
    
    return selected

### Renombramiento de columnas

In [17]:
def rename_columns(
        df: pd.DataFrame, 
        mapping: dict[str, str]
    ) -> pd.DataFrame:
    """
    Renames columns according to mapping {original: new}.
    Does not fail if any key does not exist.

    Args:
        df (pd.DataFrame): The input DataFrame.
        mapping (dict[str, str]): A dictionary mapping original column names to new names.

    Returns:
        pd.DataFrame: A copy of the DataFrame with renamed columns.
    """
    # Rename columns according to mapping
    renamed_df = df.rename(columns={k: v for k, v in mapping.items() if k in df.columns}).copy()
    return renamed_df

## Exportacion de tablas

In [18]:
def export_tables(
        dfs: Dict[str, pd.DataFrame],
        out_folder: str = "data/processed"
) -> None:
    """
    Export multiple DataFrames to CSV files.

    Args:
        dfs (Dict[str, pd.DataFrame]): The DataFrames to export.
        names (list[str]): The names of the output CSV files.
    """
    if out_folder is not None:
        out_path = Path(out_folder)
        out_path.mkdir(parents=True, exist_ok=True) # Ensure output directory exists
        
        # Define a tqdm iterator
        iterator = tqdm(dfs.items(), total=len(dfs), desc="Exporting CSVs", unit="table")

        # Iterate over dataframes
        for name, df in iterator:
            path = out_path / f"{name}.csv"
            try:
                df.to_csv(path, index=False)
                iterator.set_postfix(rows=len(df), file=path.name, refresh=False)
            except Exception as e:
                msg = f"Failed to export {name} to {path}: {e}"
                tqdm.write(msg)
                logger.exception(msg)
            else:
                logger.debug(f"Exported {name} to {path}")


## Integración de Pipeline completo

In [19]:
def run_pipeline(
        raw_path: str = "data/raw"
    ) -> Dict[str, pd.DataFrame]:
    logger.info("Starting data processing pipeline...")
    # 0) Load raw data
    logger.debug("Loading raw data...")
    dfs = load_csvs(folder=raw_path)
    logger.debug("Raw data loaded successfully")

    # Sanity Check: Validate all tables are present
    required = {"categoria","cliente","evento","marca","producto"}
    missing = required - set(dfs.keys())
    if missing:
        raise ValueError(f"Faltan CSVs requeridos: {missing}")

    # ----- Data Cleaning -----
    logger.info("Data cleaning stage initialized")

    # 1) Handle duplicates on each table
    logger.debug("Handling duplicates...")
    dfs["categoria"] = duplicates_handling(dfs["categoria"], subset=None)
    dfs["marca"] = duplicates_handling(dfs["marca"], subset=None)
    dfs["producto"] = duplicates_handling(dfs["producto"], subset=["id"])
    dfs["cliente"] = duplicates_handling(dfs["cliente"], subset=["id"])
    dfs["evento"] = duplicates_handling(dfs["evento"], subset=None)
    logger.debug("Duplicates handled successfully")

    # 2) String normalization
    logger.debug("Normalizing strings...")
    dfs["categoria"] = normalize_strings(dfs["categoria"], ["categoria"])
    dfs["marca"] = normalize_strings(dfs["marca"], ["marca"])
    dfs["producto"] = normalize_strings(dfs["producto"], ["nombre"])
    dfs["cliente"] = normalize_strings(
            dfs["cliente"],
            ["nombre","apellido","genero","empresa","idioma",
            "nit","puesto","ciudad","correo","telefono"]
        )
    logger.debug("Strings normalized successfully")

    # 3) Data typing by schema
    schema = {
        "categoria": {"id":"Int64"},
        "marca":     {"id":"Int64"},
        "producto":  {"id":"Int64","categoria_id":"Int64",
                      "marca_id":"Int64","volumen":"Int64","precio":"Float64"},
        "cliente":   {"id":"Int64"},
        "evento":    {"timestamp":"Int64","visitorid":"Int64","itemid":"Int64"}
    }
    logger.debug("Coercing column types...")
    dfs = coerce_column_types(dfs, schema)
    logger.debug("Column types coerced successfully")

    # 4) Handle Null values
    logger.debug("Handling null values...")
    dfs = nulls_handling(dfs)
    logger.debug("Null values handled successfully")

    # 5) Filter event register by valid product IDs
    eventos_valid = filter_events_by_valid_product(dfs, itemid_col="itemid", prodid_col="id")
    dfs["evento"] = eventos_valid

    # 6) Convert timestamps to datetime data
    logger.debug("Converting timestamps to datetime in events table...")
    dfs["evento"] = convert_timestamp(dfs["evento"], tmstp_col="timestamp", date_col="fecha")
    logger.debug("Timestamps converted successfully")

    # 7) Handle birth date format and compute client age
    logger.debug("Cleaning birthdate and computing age in clients table...")
    dfs = clean_birthdate_and_age(dfs, client_col="cliente", birth_col="nacimiento", age_col="edad")
    logger.debug("Birthdate cleaned and age computed successfully")

    # ----- Data transformation -----
    logger.info("Data transformation stage initialized...")

    # 8) Anonymous clients mapping
    logger.debug("Mapping unknown visitors id to anonymoys client...")
    dfs["cliente"], anon_id = ensure_anonymous_client(dfs["cliente"], anon_name="Anonimo", anon_gender="NA")
    dfs["evento"] = map_unknown_visitors_to_anonymous(
        dfs, visitor_col="visitorid", client_col="id", anon_id=anon_id
    )
    logger.debug("Unknown visitors mapped to anonymous client successfully")

    # 9) Create new tables dimensions and column renaming
    logger.debug("Creating new dimensions and renaming columns...")
    # Select relevant product dimensions
    producto  = select_columns(dfs["producto"], ["id","categoria_id","nombre","marca_id","volumen","precio"])
    producto  = rename_columns(producto, {"nombre":"producto"})
    # Select relevant category dimensions
    categoria = select_columns(dfs["categoria"], ["id","categoria"])
    # Select relevant brand dimensions
    marca     = select_columns(dfs["marca"],     ["id","marca"])
    # Build new client full name dimension and select relevant dimensions
    cliente_full = build_dim_cliente(dfs["cliente"], full_name_col="cliente")
    cliente      = select_columns(cliente_full, ["id","cliente","genero","edad"])
    
    # 10) Create a cleaned evento table
    evento = select_columns(dfs["evento"], ["event","fecha","visitorid","itemid","cliente_conocido"])
    evento = rename_columns(evento, 
                                 {"event":"evento", 
                                  "visitorid":"cliente_id", 
                                  "itemid":"producto_id"})
    
    # 11) Required typing
    evento["cliente_id"]  = pd.to_numeric(evento["cliente_id"], errors="coerce").astype("Int64")
    evento["producto_id"] = pd.to_numeric(evento["producto_id"], errors="coerce").astype("Int64")
    cliente["id"] = pd.to_numeric(cliente["id"], errors="coerce").astype("Int64")
    producto["categoria_id"] = pd.to_numeric(producto["categoria_id"], errors="coerce").astype("Int64")
    producto["marca_id"] = pd.to_numeric(producto["marca_id"], errors="coerce").astype("Int64")

    logger.debug("New dimensions created and columns renamed successfully")

    # 11) Export tables as csv files
    logger.debug("Exporting tables to CSV files...")
    outputs = {
        "fact_evento":   evento,
        "dim_producto":  producto,
        "dim_categoria": categoria,
        "dim_marca":     marca,
        "dim_cliente":   cliente
    }
    export_tables(outputs, out_folder="data/processed")
    logger.debug("Tables exported successfully")
    logger.info("Data pipeline completed successfully")

    return outputs

## Ejecución

In [20]:
dfs = run_pipeline(raw_path="data/raw")

[INFO] - Starting data processing pipeline...
Reading CSVs: 100%|██████████| 5/5 [00:00<00:00,  7.04file/s]
[INFO] - Data cleaning stage initialized
[INFO] - Data transformation stage initialized...
Exporting CSVs: 100%|██████████| 5/5 [00:01<00:00,  2.63table/s, file=dim_cliente.csv, rows=11721] 
[INFO] - Data pipeline completed successfully


In [21]:
for name, df in dfs.items():
    print(f"\n{name} Table:")
    display(df.head())


fact_evento Table:


Unnamed: 0,evento,fecha,cliente_id,producto_id,cliente_conocido
1,view,2015-06-02 05:50:14.164,1407399,248676,False
4,view,2015-06-02 05:02:17.106,1407399,367447,False
6,view,2015-06-02 05:12:03.240,1407399,443030,False
7,view,2015-06-02 05:34:51.897,1407399,439202,False
10,view,2015-06-02 05:16:02.373,1407399,10572,False



dim_producto Table:


Unnamed: 0,id,categoria_id,producto,marca_id,volumen,precio
0,356475,9,Crown Royal Honey,1,750,22.49
1,15335,9,Crown Royal Regal Apple Mini,1,300,11.03
2,81345,9,Crown Royal Regal Apple,1,200,7.08
3,150318,9,Crown Royal Xr Canadian Whiskey,1,750,98.99
4,310791,9,Crown Royal Canadian Whisky Mini,1,300,11.03



dim_categoria Table:


Unnamed: 0,id,categoria
0,1,SCOTCH WHISKIES
1,2,STRAIGHT BOURBON WHISKIES
2,3,BLENDED WHISKIES
3,4,IMPORTED DRY GINS
4,5,DECANTERS & SPECIALTY PACKAGES



dim_marca Table:


Unnamed: 0,id,marca
0,1,Diageo Americas
1,2,Heaven Hill Brands
2,3,"Sazerac Co., Inc."
3,4,Sage Beverages
4,5,MHW Ltd



dim_cliente Table:


Unnamed: 0,id,cliente,genero,edad
0,599528,Samuel Ward,Male,36
1,121688,Willie Gonzales,Male,53
2,552148,Betty Spencer,Female,42
3,102019,Beverly Jordan,Female,54
4,189384,Cynthia Flores,Female,55
