# 02. Data Cleaning & Enrichment

**Objective:** Transform raw staging data into high-quality analytics-ready data.
**Key Actions:**
1.  **Regex Cleaning:** Removed artifacts like `***` and `@@@` from text fields.
2.  **Geospatial Enrichment:** Mapped `Ciudad` to a new `Pais` column for Power BI mapping.
3.  **Type Casting:** Converted strings to `Float` and `DateTime`.
4.  **Logical Normalization:** Fixed inconsistent `Tipo_Producto` classifications using a master dictionary.

### Imports & Setup

In [1]:
import pandas as pd
import numpy as np
import sys
import os
import re # For Regex

# Add src to path
sys.path.append(os.path.abspath(os.path.join('..', 'src')))
from database.db_connection import get_db_engine

### Load Staging Data

In [2]:
engine = get_db_engine()
df = pd.read_sql("SELECT * FROM raw_sales", engine)
print(f"Loaded {len(df)} rows.")

Successfully created engine for database: riwi_ventas_db
Loaded 1000000 rows.


### Standard Cleaning Pipeline

In [3]:
# Remove Special Characters (Regex) from Text Columns
text_cols = ['Ciudad', 'Producto', 'Tipo_Producto', 'Tipo_Venta', 'Tipo_Cliente']
for col in text_cols:
    # Remove anything that isn't a letter, number, or space
    df[col] = df[col].astype(str).str.replace(r'[^\w\s]', '', regex=True).str.strip().str.title()

# Fix Numeric Columns (Coerce errors to NaN)
numeric_cols = ['Cantidad', 'Precio_Unitario', 'Descuento', 'Costo_Envio', 'Total']
for col in numeric_cols:
    df[col] = pd.to_numeric(df[col], errors='coerce')

# Fix Date
df['Fecha'] = pd.to_datetime(df['Fecha'], errors='coerce')

# Handle Nulls & Duplicates
print(f"Duplicates before: {df.duplicated().sum()}")
df_clean = df.drop_duplicates().dropna().copy()
print(f"Rows remaining: {len(df_clean)}")

# Check types
df_clean.info()

Duplicates before: 15191
Rows remaining: 940258
<class 'pandas.core.frame.DataFrame'>
Index: 940258 entries, 0 to 999999
Data columns (total 11 columns):
 #   Column           Non-Null Count   Dtype         
---  ------           --------------   -----         
 0   Ciudad           940258 non-null  object        
 1   Fecha            940258 non-null  datetime64[ns]
 2   Producto         940258 non-null  object        
 3   Tipo_Producto    940258 non-null  object        
 4   Cantidad         940258 non-null  float64       
 5   Precio_Unitario  940258 non-null  float64       
 6   Tipo_Venta       940258 non-null  object        
 7   Tipo_Cliente     940258 non-null  object        
 8   Descuento        940258 non-null  float64       
 9   Costo_Envio      940258 non-null  float64       
 10  Total            940258 non-null  float64       
dtypes: datetime64[ns](1), float64(5), object(5)
memory usage: 86.1+ MB


### Data Enrichment (Adding Country)

In [4]:
# Add Country Mapping to Cleaned DataFrame

# Define the Mapping Dictionary 
city_to_country = {
    'Antofagasta': 'Chile',
    'Santiago': 'Chile',
    'Valparaíso': 'Chile',
    'Valparaiso': 'Chile', # Just in case without accent
    'Concepción': 'Chile',
    'Concepcion': 'Chile',
    
    'Bogotá': 'Colombia',
    'Bogota': 'Colombia',
    'Medellín': 'Colombia',
    'Medellin': 'Colombia',
    'Cali': 'Colombia',
    'Barranquilla': 'Colombia',
    'Cartagena': 'Colombia',
    'Bucaramanga': 'Colombia',
    'Pereira': 'Colombia',
    
    'Ciudad De México': 'Mexico',
    'Ciudad De Mexico': 'Mexico',
    'Monterrey': 'Mexico',
    'Guadalajara': 'Mexico',
    'Puebla': 'Mexico',
    'Tijuana': 'Mexico',
    
    'Buenos Aires': 'Argentina',
    'Córdoba': 'Argentina',
    'Cordoba': 'Argentina',
    'Rosario': 'Argentina',
    'Mendoza': 'Argentina',
    
    'Lima': 'Peru',
    'Cusco': 'Peru',
    'Arequipa': 'Peru',
    'Trujillo': 'Peru',
    
    'Madrid': 'Spain',
    'Barcelona': 'Spain',
    'Valencia': 'Spain',
    'Sevilla': 'Spain',
    
    'New York': 'USA',
    'Los Angeles': 'USA',
    'Chicago': 'USA',
    'Houston': 'USA',
    'Miami': 'USA'
}

# Apply Mapping to df_clean specifically
df_clean['Ciudad'] = df_clean['Ciudad'].str.title().str.strip()
df_clean['Pais'] = df_clean['Ciudad'].map(city_to_country)

# Validation: Check for nulls in 'Pais'
missing_countries = df_clean[df_clean['Pais'].isnull()]['Ciudad'].unique()
print(f"Cities without country: {missing_countries}")

# 4. Preview to confirm column exists
display(df_clean[['Ciudad', 'Pais']].head())

Cities without country: ['None']


Unnamed: 0,Ciudad,Pais
0,Rosario,Argentina
1,Miami,USA
2,Bogotá,Colombia
3,Madrid,Spain
4,Monterrey,Mexico


### Final Polish (Remove None & Fix Categories)

In [5]:
# Identify text columns (where 'None' string might exist)
text_cols = ['Ciudad', 'Pais', 'Producto', 'Tipo_Producto', 'Tipo_Venta', 'Tipo_Cliente']

# Filter out rows where ANY of these columns equals "None"
# This is stricter than dropna() to deal with the string "None"
print(f"Rows before global cleaning: {len(df_clean)}")

for col in text_cols:
    count_none = len(df_clean[df_clean[col] == 'None'])
    if count_none > 0:
        print(f"Found {count_none} 'None' values in column '{col}'. Removing...")
        df_clean = df_clean[df_clean[col] != 'None']

print(f"Rows after global cleaning: {len(df_clean)}")

# Standardize Product Categories (Logical Fix)
product_mapping = {
    'Arepa': 'Harinas',
    'Leche': 'Lácteos',
    'Queso': 'Lácteos',
    'Chocolate': 'Dulces',
    'Yogurt': 'Lácteos',
    'Galletas': 'Dulces',
    'Pan': 'Panadería',
    'Gaseosa': 'Bebidas',
    'Té': 'Bebidas',
    'Café': 'Bebidas',
    'Mantequilla': 'Lácteos'
}

# Overwrite Product_Type with the correct static mapping
df_clean['Tipo_Producto'] = df_clean['Producto'].map(product_mapping).fillna(df_clean['Tipo_Producto'])

print("Product categories standardized.")

Rows before global cleaning: 940258
Found 4292 'None' values in column 'Ciudad'. Removing...
Found 4314 'None' values in column 'Producto'. Removing...
Found 4303 'None' values in column 'Tipo_Producto'. Removing...
Found 4200 'None' values in column 'Tipo_Venta'. Removing...
Found 4082 'None' values in column 'Tipo_Cliente'. Removing...
Rows after global cleaning: 919067
Product categories standardized.


### DATA INTEGRITY GATE: Mathematical Validation 

In [6]:
# Calculate Expected Total based on formula
# Formula: (Unit_Price * Quantity * (1 - Discount)) + Shipping
expected_total = (df_clean['Precio_Unitario'] * df_clean['Cantidad'] * (1 - df_clean['Descuento'])) + df_clean['Costo_Envio']

# Calculate Deviation
# Compare the CSV Total vs Expected Total
deviation = abs(df_clean['Total'] - expected_total)

# Filter Data
# Allow a small tolerance of $1.0 for floating point rounding differences
tolerance = 1.0
valid_rows = deviation < tolerance

# Count dropped rows
dropped_math_count = (~valid_rows).sum()
print(f"Integrity Check: Dropping {dropped_math_count} rows due to mathematical inconsistency.")

# Apply filter
df_clean = df_clean[valid_rows].copy()

print(f"Final clean row count: {len(df_clean)}")

Integrity Check: Dropping 19043 rows due to mathematical inconsistency.
Final clean row count: 900024


### Load to Data Warehouse (Clean Table)

In [7]:
table_name = 'clean_sales'

try:
    print(f"Saving {len(df_clean)} rows to '{table_name}'...")
    df_clean.to_sql(
        name=table_name,
        con=engine,
        if_exists='replace',
        index=False,
        chunksize=10000
    )
    print("Success! Pipeline finished.")
except Exception as e:
    print(f"Error saving to DB: {e}")

Saving 900024 rows to 'clean_sales'...
Success! Pipeline finished.


## Export
Clean data exported to PostgreSQL table `clean_sales`.