# Imports + settings

In [1]:
import os
from pathlib import Path

import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

# Paths + cargar CSV

In [2]:
DATA_DIR = Path("data")

customers = pd.read_csv(DATA_DIR / "olist_customers_dataset.csv")
orders = pd.read_csv(DATA_DIR / "olist_orders_dataset.csv")
order_items = pd.read_csv(DATA_DIR / "olist_order_items_dataset.csv")
products = pd.read_csv(DATA_DIR / "olist_products_dataset.csv")
sellers = pd.read_csv(DATA_DIR / "olist_sellers_dataset.csv")
payments = pd.read_csv(DATA_DIR / "olist_order_payments_dataset.csv")
reviews = pd.read_csv(DATA_DIR / "olist_order_reviews_dataset.csv")
geolocation = pd.read_csv(DATA_DIR / "olist_geolocation_dataset.csv")
translation = pd.read_csv(DATA_DIR / "product_category_name_translation.csv")

print("CSVs cargados ✅")
print("orders:", orders.shape, "| customers:", customers.shape, "| order_items:", order_items.shape)


CSVs cargados ✅
orders: (99441, 8) | customers: (99441, 5) | order_items: (112650, 7)


# Conexión a Postgres usando .env

In [3]:
load_dotenv()

DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME")

url = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(url)

# test conexión
with engine.connect() as conn:
    print(conn.execute(text("SELECT 'Conexión OK ✅'")).scalar())


Conexión OK ✅


# ✅ customers_stg

In [4]:
# 1) Copia para staging
customers_stg = customers.copy()

# 2) Normalizar textos (opcional pero recomendable)
#    - quita espacios al inicio/fin
#    - pasa a minúsculas para consistencia
customers_stg["customer_city"] = customers_stg["customer_city"].astype(str).str.strip().str.lower()
customers_stg["customer_state"] = customers_stg["customer_state"].astype(str).str.strip().str.upper()

# 3) Asegurar tipos correctos
customers_stg["customer_zip_code_prefix"] = customers_stg["customer_zip_code_prefix"].astype("int64")

# 4) Validaciones rápidas
print("Shape:", customers_stg.shape)
print("Nulos:\n", customers_stg.isna().sum())
print("Duplicados (filas completas):", customers_stg.duplicated().sum())
print("customer_id duplicados:", customers_stg["customer_id"].duplicated().sum())


Shape: (99441, 5)
Nulos:
 customer_id                 0
customer_unique_id          0
customer_zip_code_prefix    0
customer_city               0
customer_state              0
dtype: int64
Duplicados (filas completas): 0
customer_id duplicados: 0


### Celda — Vista rápida

In [5]:
customers_stg.head()

Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
1,18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
2,4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
3,b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4,4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP


# ✅ orders_stg

## Limpieza de Orders (convertir fechas)

## Limpieza + validación de orders_stg

In [6]:
orders_stg = orders.copy()

date_cols = [
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_delivered_customer_date",
    "order_estimated_delivery_date",
]

for col in date_cols:
    orders_stg[col] = pd.to_datetime(orders_stg[col], errors="coerce")

# Chequeo rápido
orders_stg[date_cols].isna().sum()


order_purchase_timestamp            0
order_approved_at                 160
order_delivered_carrier_date     1783
order_delivered_customer_date    2965
order_estimated_delivery_date       0
dtype: int64

# ✅ order_items_stg

## Limpieza de Orders_Items 

In [7]:
# 1) Copia para staging
order_items_stg = order_items.copy()

# 2) Convertir fecha a datetime
order_items_stg["shipping_limit_date"] = pd.to_datetime(
    order_items_stg["shipping_limit_date"],
    errors="coerce"
)

# 3) Validaciones
print("Shape:", order_items_stg.shape)

print("\nNulos:")
print(order_items_stg.isna().sum())

print("\nDuplicados (filas completas):", order_items_stg.duplicated().sum())

print(
    "\nDuplicados por clave lógica (order_id, order_item_id):",
    order_items_stg.duplicated(subset=["order_id", "order_item_id"]).sum()
)

print("\nTipos de datos:")
print(order_items_stg.dtypes)


Shape: (112650, 7)

Nulos:
order_id               0
order_item_id          0
product_id             0
seller_id              0
shipping_limit_date    0
price                  0
freight_value          0
dtype: int64

Duplicados (filas completas): 0

Duplicados por clave lógica (order_id, order_item_id): 0

Tipos de datos:
order_id                       object
order_item_id                   int64
product_id                     object
seller_id                      object
shipping_limit_date    datetime64[ns]
price                         float64
freight_value                 float64
dtype: object


# ✅ payments_stg

## Limpieza de payments_stg

In [8]:
# 1) Copia para staging
payments_stg = payments.copy()

# 2) Limpieza mínima de texto (opcional)
payments_stg["payment_type"] = payments_stg["payment_type"].astype(str).str.strip().str.lower()

# 3) Asegurar tipos numéricos (por si vinieran como texto en otros datasets)
payments_stg["payment_sequential"] = pd.to_numeric(payments_stg["payment_sequential"], errors="coerce").astype("int64")
payments_stg["payment_installments"] = pd.to_numeric(payments_stg["payment_installments"], errors="coerce").astype("int64")
payments_stg["payment_value"] = pd.to_numeric(payments_stg["payment_value"], errors="coerce")

# 4) Validaciones
print("Shape:", payments_stg.shape)

print("\nNulos:")
print(payments_stg.isna().sum())

print("\nDuplicados (filas completas):", payments_stg.duplicated().sum())

# Importante: en payments, order_id puede repetirse (un pedido puede tener más de un pago)
print("\norder_id únicos:", payments_stg["order_id"].nunique())
print("payment_type valores:", payments_stg["payment_type"].value_counts())

Shape: (103886, 5)

Nulos:
order_id                0
payment_sequential      0
payment_type            0
payment_installments    0
payment_value           0
dtype: int64

Duplicados (filas completas): 0

order_id únicos: 99440
payment_type valores: payment_type
credit_card    76795
boleto         19784
voucher         5775
debit_card      1529
not_defined        3
Name: count, dtype: int64


# ✅ sellers_stg

## Limpieza de sellers_stg

In [9]:
# 1) Copia para staging
sellers_stg = sellers.copy()

# 2) Normalizar textos (recomendado)
sellers_stg["seller_city"] = sellers_stg["seller_city"].astype(str).str.strip().str.lower()
sellers_stg["seller_state"] = sellers_stg["seller_state"].astype(str).str.strip().str.upper()

# 3) Asegurar tipo del zip
sellers_stg["seller_zip_code_prefix"] = sellers_stg["seller_zip_code_prefix"].astype("int64")

# 4) Validaciones
print("Shape:", sellers_stg.shape)

print("\nNulos:")
print(sellers_stg.isna().sum())

print("\nDuplicados (filas completas):", sellers_stg.duplicated().sum())
print("seller_id duplicados:", sellers_stg["seller_id"].duplicated().sum())


Shape: (3095, 4)

Nulos:
seller_id                 0
seller_zip_code_prefix    0
seller_city               0
seller_state              0
dtype: int64

Duplicados (filas completas): 0
seller_id duplicados: 0


# ✅ products_stg

## Limpieza de products_stg

In [10]:
# 1) Copia para staging
products_stg = products.copy()

# 2) Categoría: nulos -> 'unknown'
products_stg["product_category_name"] = (
    products_stg["product_category_name"]
    .astype(str)
    .str.strip()
    .str.lower()
    .replace("nan", "unknown")
)

# 3) Columnas numéricas a imputar con mediana
num_cols = [
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm",
]

for col in num_cols:
    products_stg[col] = pd.to_numeric(products_stg[col], errors="coerce")
    median = products_stg[col].median()
    products_stg[col] = products_stg[col].fillna(median)

# 4) Validaciones
print("Shape:", products_stg.shape)
print("\nNulos por columna:")
print(products_stg.isna().sum())

print("\nDuplicados (filas completas):", products_stg.duplicated().sum())
print("product_id duplicados:", products_stg["product_id"].duplicated().sum())

print("\nTipos de datos:")
print(products_stg.dtypes)


Shape: (32951, 9)

Nulos por columna:
product_id                    0
product_category_name         0
product_name_lenght           0
product_description_lenght    0
product_photos_qty            0
product_weight_g              0
product_length_cm             0
product_height_cm             0
product_width_cm              0
dtype: int64

Duplicados (filas completas): 0
product_id duplicados: 0

Tipos de datos:
product_id                     object
product_category_name          object
product_name_lenght           float64
product_description_lenght    float64
product_photos_qty            float64
product_weight_g              float64
product_length_cm             float64
product_height_cm             float64
product_width_cm              float64
dtype: object


# ✅ geolocation_stg

## Limpieza de geolocation_stg

In [11]:
# 1) Copia base
geo = geolocation.copy()

# 2) Normalizar texto
geo["geolocation_city"] = geo["geolocation_city"].astype(str).str.strip().str.lower()
geo["geolocation_state"] = geo["geolocation_state"].astype(str).str.strip().str.upper()

# 3) Función para ciudad más frecuente
def moda_ciudad(x):
    return x.value_counts().idxmax()

# 4) Agrupación por ZIP
geolocation_stg = (
    geo
    .groupby("geolocation_zip_code_prefix", as_index=False)
    .agg({
        "geolocation_lat": "mean",
        "geolocation_lng": "mean",
        "geolocation_city": moda_ciudad,
        "geolocation_state": moda_ciudad
    })
)

# 5) Validaciones
print("Shape original:", geolocation.shape)
print("Shape staging:", geolocation_stg.shape)

print("\nDuplicados por zip:", 
      geolocation_stg["geolocation_zip_code_prefix"].duplicated().sum())

print("\nNulos:")
print(geolocation_stg.isna().sum())

geolocation_stg.head()


Shape original: (1000163, 5)
Shape staging: (19015, 5)

Duplicados por zip: 0

Nulos:
geolocation_zip_code_prefix    0
geolocation_lat                0
geolocation_lng                0
geolocation_city               0
geolocation_state              0
dtype: int64


Unnamed: 0,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
0,1001,-23.55019,-46.634024,sao paulo,SP
1,1002,-23.548146,-46.634979,sao paulo,SP
2,1003,-23.548994,-46.635731,sao paulo,SP
3,1004,-23.549799,-46.634757,sao paulo,SP
4,1005,-23.549456,-46.636733,sao paulo,SP


# ✅ review_stg

## Limpieza de review_stg

In [12]:
# 1) Copia para staging
reviews_stg = reviews.copy()

# 2) Convertir fechas
date_cols = ["review_creation_date", "review_answer_timestamp"]

for col in date_cols:
    reviews_stg[col] = pd.to_datetime(reviews_stg[col], errors="coerce")

# 3) Limpieza mínima de texto (sin inventar datos)
text_cols = ["review_comment_title", "review_comment_message"]

for col in text_cols:
    reviews_stg[col] = reviews_stg[col].astype("string").str.strip()

# 4) Asegurar tipo del score
reviews_stg["review_score"] = reviews_stg["review_score"].astype("int64")

# 5) Validaciones
print("Shape:", reviews_stg.shape)

print("\nNulos por columna:")
print(reviews_stg.isna().sum())

print("\nDuplicados (filas completas):", reviews_stg.duplicated().sum())
print("review_id duplicados:", reviews_stg["review_id"].duplicated().sum())


Shape: (99224, 7)

Nulos por columna:
review_id                      0
order_id                       0
review_score                   0
review_comment_title       87656
review_comment_message     58247
review_creation_date           0
review_answer_timestamp        0
dtype: int64

Duplicados (filas completas): 0
review_id duplicados: 814


# ✅ traslation_stg

## Limpieza de traslation_stg

In [13]:
# 1) Copia para staging
translation_stg = translation.copy()

# 2) Normalizar textos
translation_stg["product_category_name"] = (
    translation_stg["product_category_name"]
    .astype(str)
    .str.strip()
    .str.lower()
)

translation_stg["product_category_name_english"] = (
    translation_stg["product_category_name_english"]
    .astype(str)
    .str.strip()
    .str.lower()
)

# 3) Validaciones
print("Shape:", translation_stg.shape)
print("\nNulos:")
print(translation_stg.isna().sum())
print("\nDuplicados:", translation_stg.duplicated().sum())

translation_stg.head()


Shape: (71, 2)

Nulos:
product_category_name            0
product_category_name_english    0
dtype: int64

Duplicados: 0


Unnamed: 0,product_category_name,product_category_name_english
0,beleza_saude,health_beauty
1,informatica_acessorios,computers_accessories
2,automotivo,auto
3,cama_mesa_banho,bed_bath_table
4,moveis_decoracao,furniture_decor


# Helper para subir tablas staging a Postgres

In [14]:
def to_staging(df: pd.DataFrame, table_name: str):
    df.to_sql(
        table_name,
        con=engine,
        schema="staging",
        if_exists="replace",
        index=False
    )
    print(f"Tabla staging.{table_name} cargada ✅ | shape={df.shape}")


## SUBIR TABLAS STAGING A POSTGRES

In [15]:
to_staging(customers_stg, "customers_stg")
to_staging(orders_stg, "orders_stg")
to_staging(order_items_stg, "order_items_stg")
to_staging(payments_stg, "payments_stg")
to_staging(sellers_stg, "sellers_stg")
to_staging(products_stg, "products_stg")
to_staging(reviews_stg, "reviews_stg")
to_staging(geolocation_stg, "geolocation_stg")
to_staging(translation_stg, "translation_stg")


Tabla staging.customers_stg cargada ✅ | shape=(99441, 5)
Tabla staging.orders_stg cargada ✅ | shape=(99441, 8)
Tabla staging.order_items_stg cargada ✅ | shape=(112650, 7)
Tabla staging.payments_stg cargada ✅ | shape=(103886, 5)
Tabla staging.sellers_stg cargada ✅ | shape=(3095, 4)
Tabla staging.products_stg cargada ✅ | shape=(32951, 9)
Tabla staging.reviews_stg cargada ✅ | shape=(99224, 7)
Tabla staging.geolocation_stg cargada ✅ | shape=(19015, 5)
Tabla staging.translation_stg cargada ✅ | shape=(71, 2)
