# Complete ETL: Cleaning, Normalization and Loading into PostgreSQL
**Sales Dataset — 1,048,575 records**

This notebook contains the full workflow:

1. Initial exploration
2. Dataset cleaning
3. Field normalization
4. Creation of dimension tables and fact table
5. Export to CSV
6. Load into PostgreSQL using SQLAlchemy + Environment Variables

## 1. Importing libraries and loading the dataset

In [64]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
import seaborn as sns
import unicodedata
import re
import os
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

df = pd.read_csv("../ventas.csv")

print(df)
# [1.048.575 rows x 11 columns]
# 11 Columnas
# 1.048.575 Registros

                   Ciudad       Fecha Producto       Tipo_Producto Cantidad  \
0                Santiago  2025-10-30    Arepa           Abarrotes      2.0   
1                 Córdoba  2025-11-17    Arepa           Abarrotes      7.0   
2            Barranquilla  2025-10-22    Leche              Lácteo      9.0   
3                New York  2025-10-20   Cereal              Lácteo      3.0   
4                  Madrid  2025-10-20    Leche               Hogar      2.0   
...                   ...         ...      ...                 ...      ...   
1048570       Guadalajara  2025-10-23   Yogurt              Lácteo      9.0   
1048571  Ciudad de México  2025-11-13  Gaseosa               Hogar      7.0   
1048572              Lima  2025-10-30    Arepa              Bebida      8.0   
1048573            Madrid  2025-10-23     Café           Abarrotes     10.0   
1048574             Cusco  2025-10-22    Queso  Alimento_Percedero      1.0   

        Precio_Unitario  Tipo_de_Venta Tipo_Cliente

## 2. Initial data inspection
View unique values per column

In [65]:
for row in df.columns:
    print(f"'{row}' {df[row].unique()}\n")

'Ciudad' ['Santiago' 'Córdoba' 'Barranquilla' 'New York' 'Madrid' 'Pereira'
 'Trujillo' 'Valencia' 'Puebla' 'Mendoza' 'Lima' 'Barcelona' 'Monterrey'
 'Ciudad de México' 'Concepción' 'Antofagasta' 'Rosario' 'Guadalajara'
 'Chicago' 'Bucaramanga' 'Cali' 'Cartagena' 'Medellín' 'Cusco' 'Houston'
 'Los Angeles' 'Arequipa' 'Valparaíso' 'Buenos Aires' 'Miami' 'Sevilla'
 'Tijuana' 'Bogotá' '  Arequipa   ' nan '  Antofagasta   ' 'Pereira@@@'
 'Buenos Aires###' 'mONTERREY' 'Cusco***' '  Guadalajara   '
 '  Córdoba   ' 'mENDOZA' 'vALPARAÍSO' '  Trujillo   ' 'lOS aNGELES'
 'cALI' 'Houston***' '  Chicago   ' 'Miami***' 'aNTOFAGASTA' 'mEDELLÍN'
 'Monterrey@@@' 'Madrid***' 'Tijuana###' '  Valencia   ' 'gUADALAJARA'
 'Miami@@@' 'vALENCIA' 'pEREIRA' 'cÓRDOBA' 'Córdoba###' 'bUCARAMANGA'
 'New York***' '  Concepción   ' 'bARRANQUILLA' 'rOSARIO' '  Barcelona   '
 'pUEBLA' 'cUSCO' 'Antofagasta###' '  Miami   ' 'lIMA' '  Pereira   '
 '  Lima   ' '  Mendoza   ' '  Los Angeles   ' 'tIJUANA' 'Arequipa@@@'
 'aR

Preview of the dataset

In [66]:
print(df.head())

         Ciudad       Fecha Producto Tipo_Producto Cantidad Precio_Unitario  \
0      Santiago  2025-10-30    Arepa     Abarrotes      2.0          3681.0   
1       Córdoba  2025-11-17    Arepa     Abarrotes      7.0          2321.0   
2  Barranquilla  2025-10-22    Leche        Lácteo      9.0          3540.0   
3      New York  2025-10-20   Cereal        Lácteo      3.0          3287.0   
4        Madrid  2025-10-20    Leche         Hogar      2.0          3414.0   

   Tipo_de_Venta Tipo_Cliente Descuento Costo_Envio Total_Venta  
0         Online    Minorista       0.2         0.0      5889.0  
1   Distribuidor     Gobierno      0.15         0.0     13809.0  
2   Distribuidor     Gobierno       0.2         0.0     25488.0  
3  Tienda_Física     Gobierno      0.05         0.0      9367.0  
4   Distribuidor    Mayorista       0.0         0.0      6828.0  


Dataset info

In [67]:
print(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1048575 entries, 0 to 1048574
Data columns (total 11 columns):
 #   Column           Non-Null Count    Dtype 
---  ------           --------------    ----- 
 0   Ciudad           1047607 non-null  object
 1   Fecha            1047629 non-null  object
 2   Producto         1047632 non-null  object
 3   Tipo_Producto    1047660 non-null  object
 4   Cantidad         1047611 non-null  object
 5   Precio_Unitario  1047618 non-null  object
 6   Tipo_de_Venta    1047585 non-null  object
 7   Tipo_Cliente     1047627 non-null  object
 8   Descuento        1047657 non-null  object
 9   Costo_Envio      1047675 non-null  object
 10  Total_Venta      1047592 non-null  object
dtypes: object(11)
memory usage: 88.0+ MB
None


Count missing values

In [68]:
print(df.isna().sum())

Ciudad             968
Fecha              946
Producto           943
Tipo_Producto      915
Cantidad           964
Precio_Unitario    957
Tipo_de_Venta      990
Tipo_Cliente       948
Descuento          918
Costo_Envio        900
Total_Venta        983
dtype: int64


statistics values (Descriptive statistics)

In [69]:
print(df.describe(include='all'))

          Ciudad       Fecha Producto Tipo_Producto Cantidad Precio_Unitario  \
count    1047607     1047629  1047632       1047660  1047611         1047618   
unique       185         111       72            36       34            5013   
top     Trujillo  2025-10-31     Café        Bebida     10.0             ???   
freq       37703       35279    87569        174793   104992             729   

       Tipo_de_Venta Tipo_Cliente Descuento Costo_Envio Total_Venta  
count        1047585      1047627   1047657     1047675     1047592  
unique            24           24         6           9       49360  
top     Distribuidor     Gobierno      0.15         0.0         ???  
freq          262457       262228    210077      697490         691  


## 3. Cleaning and normalizing the dataset

In [70]:
df = pd.read_csv("../ventas.csv")
def limpiar_ciudad(texto):
    if pd.isna(texto):
        return texto
    
    # Converte el texto a minúsculas
    texto = texto.lower().strip()

    # Elimina los acentos
    texto = ''.join(
        c for c in unicodedata.normalize('NFD', texto)
        if unicodedata.category(c) != 'Mn'
    )

    # Elimina caracteres especiales (dejando solo las letras)
    texto = re.sub(r'[^a-z\s]', '', texto)

    # Quita los espacios dobles
    texto = re.sub(r'\s+', ' ', texto)

    return texto
# 1. Normalización de texto
cols_texto = ["Ciudad", "Producto", "Tipo_Producto", "Tipo_de_Venta", "Tipo_Cliente"]
for col in cols_texto:
    df[col] = df[col].apply(limpiar_ciudad)

# 2. Converción a numéricos
df["Descuento"] = pd.to_numeric(df["Descuento"], errors="coerce")
df["Precio_Unitario"] = pd.to_numeric(df["Precio_Unitario"], errors="coerce")
df["Cantidad"] = pd.to_numeric(df["Cantidad"], errors="coerce")
df["Costo_Envio"] = pd.to_numeric(df["Costo_Envio"], errors="coerce")
df["Total_Venta"] = pd.to_numeric(df["Total_Venta"], errors="coerce")

# 3. Rellenar faltantes
df["Tipo_Cliente"] = df["Tipo_Cliente"].replace("nan", np.nan)
df["Tipo_Cliente"] = df["Tipo_Cliente"].fillna("desconocido")

df["Descuento"] = df["Descuento"].fillna(0)
df["Precio_Unitario"] = df["Precio_Unitario"].fillna(0)
df["Cantidad"] = df["Cantidad"].fillna(0)
df["Costo_Envio"] = df["Costo_Envio"].fillna(0)
df["Total_Venta"] = df["Total_Venta"].fillna(0)

# 4. Converción a fecha
df["Fecha"] = pd.to_datetime(df["Fecha"], errors="coerce")


# 5. Recalcular el valor de Total_Venta
df["Total_Venta"] = (df["Cantidad"] * df["Precio_Unitario"]) - (df["Descuento"] * df["Cantidad"] * df["Precio_Unitario"]) + df["Costo_Envio"]

df.columns = df.columns.str.lower()

# 6. Guarda los cambios en  un nuevo archivo .csv

df.to_csv("../ventas_limpias.csv", index=False)


In [71]:
print(df)

                   ciudad      fecha producto      tipo_producto  cantidad  \
0                santiago 2025-10-30    arepa          abarrotes       2.0   
1                 cordoba 2025-11-17    arepa          abarrotes       7.0   
2            barranquilla 2025-10-22    leche             lacteo       9.0   
3                new york 2025-10-20   cereal             lacteo       3.0   
4                  madrid 2025-10-20    leche              hogar       2.0   
...                   ...        ...      ...                ...       ...   
1048570       guadalajara 2025-10-23   yogurt             lacteo       9.0   
1048571  ciudad de mexico 2025-11-13  gaseosa              hogar       7.0   
1048572              lima 2025-10-30    arepa             bebida       8.0   
1048573            madrid 2025-10-23     cafe          abarrotes      10.0   
1048574             cusco 2025-10-22    queso  alimentopercedero       1.0   

         precio_unitario tipo_de_venta tipo_cliente  descuento 

## 4. PostgreSQL connection configuration

In [72]:
#Carga el archivo .env
load_dotenv(override=True)

#Variables de entorno

DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")


URL = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(URL)

conn = engine.connect()
try:
    print("Connection Succesfull!!" if conn else "")

except Exception as e:
    print("Error al conectar la base datos en", e)



Connection Succesfull!!


## 5. Schema verification in PostgreSQL

In [73]:
# Setting schema

set_schema = conn.execute(text("set search_path to riwi_ventas;"))

# Array to verify the status of each one table
tables = ["ciudad", "tipo_producto", "producto", "tipo_venta", "tipo_cliente", "factura_ventas"]

# This loop verfy the status
for t in range(len(tables)):
    verify = pd.read_sql((f"select * from {tables[t]};"),conn)
    if not verify.empty:
        print(f"Talbe: '{tables[t].capitalize()}' Exist and is Filled.")
    if verify.empty:
        print(f"Table: '{tables[t].capitalize()}' doesn't Exist or is Empty.")

Talbe: 'Ciudad' Exist and is Filled.
Talbe: 'Tipo_producto' Exist and is Filled.
Talbe: 'Producto' Exist and is Filled.
Talbe: 'Tipo_venta' Exist and is Filled.
Talbe: 'Tipo_cliente' Exist and is Filled.
Talbe: 'Factura_ventas' Exist and is Filled.


## 6. Create dimension tables and fact table

In [74]:
import pandas as pd
import os


cleaned_df = pd.read_csv("../ventas_limpias.csv")

#Obtain the unique cities and generate the ID for Cities
ciudad_df = (
    cleaned_df["ciudad"].drop_duplicates()
                .reset_index(drop=True)
                .reset_index()
                .rename(columns={"index": "ciudad_id", "ciudad": "nombre_ciudad"})
                
)
ciudad_df["ciudad_id"] += 1

cleaned_df =cleaned_df.merge(ciudad_df, left_on="ciudad", right_on="nombre_ciudad", how="left",suffixes=("", "_drop"))


# Obtain the unique cities and generate the ID for Products

tipo_producto_df = (
    cleaned_df["tipo_producto"].drop_duplicates()
                       .reset_index(drop=True)
                       .reset_index()
                       .rename(columns={"index": "tipo_producto_id"})
)
tipo_producto_df["tipo_producto_id"] += 1

cleaned_df =cleaned_df.merge(tipo_producto_df, on="tipo_producto", how="left",suffixes=("", "_drop"))


# Obtain the unique cities and generate the ID for type products

cleaned_df = cleaned_df.rename(columns={"producto":"nombre_producto"}) #Rename to prevent conflicts with DB

producto_df = (
    cleaned_df[["nombre_producto", "tipo_producto_id"]]
    .drop_duplicates()
    .reset_index(drop=True)
    .reset_index()
    .rename(columns={"index": "producto_id", "producto":"nombre_producto"})
)
producto_df["producto_id"] += 1

cleaned_df =cleaned_df.merge(producto_df, on=["nombre_producto", "tipo_producto_id"], how="left",suffixes=("", "_drop"))

# Obtain the unique cities and generate the ID for type sale

cleaned_df = cleaned_df.rename(columns={"tipo_de_venta":"tipo_venta"}) #Rename to prevent conflicts with DB

tipo_venta_df = (
    cleaned_df["tipo_venta"].drop_duplicates()
                       .reset_index(drop=True)
                       .reset_index()
                       .rename(columns={"index": "tipo_venta_id"})
)
tipo_venta_df["tipo_venta_id"] += 1

cleaned_df =cleaned_df.merge(tipo_venta_df, on="tipo_venta", how="left",suffixes=("", "_drop"))


# Obtain the unique cities and generate the ID for type customer
tipo_cliente_df = (
    cleaned_df["tipo_cliente"].drop_duplicates()
                      .reset_index(drop=True)
                      .reset_index()
                      .rename(columns={"index": "tipo_cliente_id"})
)
tipo_cliente_df["tipo_cliente_id"] += 1

cleaned_df =cleaned_df.merge(tipo_cliente_df, on="tipo_cliente", how="left",suffixes=("", "_drop"))


#Create the table for sales
factura_df =cleaned_df[[
    "fecha", "ciudad_id", "producto_id", "tipo_venta_id",
    "tipo_cliente_id", "cantidad", "precio_unitario",
    "descuento", "costo_envio", "total_venta"
]].copy()


#export tables to external folder
os.makedirs("../tables_csv", exist_ok=True)

#Save the files into tables_csv
ciudad_df.to_csv("../tables_csv/ciudad.csv", index=False)
tipo_producto_df.to_csv("../tables_csv/tipo_producto.csv", index=False)
producto_df.to_csv("../tables_csv/producto.csv", index=False)
tipo_venta_df.to_csv("../tables_csv/tipo_venta.csv", index=False)
tipo_cliente_df.to_csv("../tables_csv/tipo_cliente.csv", index=False)
factura_df.to_csv("../tables_csv/factura_ventas.csv", index=False)

print("CSV creados correctamente en la carpeta 'tables_csv'")


CSV creados correctamente en la carpeta 'tables_csv'


## 7. Export tables to CSV

In [75]:
#This order respect the cardinality and prevent issues
order_tables = [
    "ciudad",
    "tipo_producto",
    "producto",
    "tipo_venta",
    "tipo_cliente",
    "factura_ventas"
]

SEEDERS_DIR = "../tables_csv"

CHUNK_SIZE = 100000  

with engine.connect() as conn:
    
    conn.execute(text("SET search_path TO riwi_ventas;"))

    for table in order_tables:
        csv_path = os.path.join(SEEDERS_DIR, f"{table}.csv")

        if not os.path.exists(csv_path):
            print(f"doesn't exist: {csv_path}")
            continue

        print(f"\nInsert into table: {table}")

        # Read by chunk
        chunk_iter = pd.read_csv(csv_path, chunksize=CHUNK_SIZE)

        for i, chunk_df in enumerate(chunk_iter):
            print(f"  → Chunk {i+1} inserting... ({len(chunk_df)} rows)")

            # Replace NaN per None or Null for SQL
            chunk_df = chunk_df.where(pd.notnull(chunk_df), None)

            cols = chunk_df.columns.tolist()
            colnames = ", ".join(cols)
            placeholders = ", ".join([f":{c}" for c in cols])

            query = text(f"""
                INSERT INTO {table} ({colnames})
                VALUES ({placeholders});
            """)

            # Ejecutar todas las filas del chunk
            conn.execute(query, chunk_df.to_dict(orient="records"))

        print(f"Table {table} was inserted sucess!!")

    conn.commit()

    conn.close()
    
print("Data inserted sucessfull!!")


Insert into table: ciudad
  → Chunk 1 inserting... (34 rows)


IntegrityError: (psycopg2.errors.UniqueViolation) llave duplicada viola restricción de unicidad «ciudad_pkey»
DETAIL:  Ya existe la llave (ciudad_id)=(1).

[SQL: 
                INSERT INTO ciudad (ciudad_id, nombre_ciudad)
                VALUES (%(ciudad_id)s, %(nombre_ciudad)s);
            ]
[parameters: [{'ciudad_id': 1, 'nombre_ciudad': 'santiago'}, {'ciudad_id': 2, 'nombre_ciudad': 'cordoba'}, {'ciudad_id': 3, 'nombre_ciudad': 'barranquilla'}, {'ciudad_id': 4, 'nombre_ciudad': 'new york'}, {'ciudad_id': 5, 'nombre_ciudad': 'madrid'}, {'ciudad_id': 6, 'nombre_ciudad': 'pereira'}, {'ciudad_id': 7, 'nombre_ciudad': 'trujillo'}, {'ciudad_id': 8, 'nombre_ciudad': 'valencia'}  ... displaying 10 of 34 total bound parameter sets ...  {'ciudad_id': 33, 'nombre_ciudad': 'bogota'}, {'ciudad_id': 34, 'nombre_ciudad': None}]]
(Background on this error at: https://sqlalche.me/e/20/gkpj)