In [28]:
import os
from pathlib import Path
from dotenv import load_dotenv
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, String, Float, Date, MetaData, Table
from sqlalchemy.ext.declarative import declarative_base

In [29]:
BASE_DIR = Path().resolve()
load_dotenv(BASE_DIR / ".env")

# Archivos limpios
DIM_CUSTOMERS = BASE_DIR / "dim_customers.csv"
DIM_PRODUCTS = BASE_DIR / "dim_products.csv"
DIM_DATE = BASE_DIR / "dim_dates.csv"
DIM_PAYMENT = BASE_DIR / "dim_payments.csv"
DIM_MALL = BASE_DIR / "dim_malls.csv"
FACT_SALES = BASE_DIR / "fact_sales.csv"

# Conexión PostgreSQL
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASS = os.getenv("DB_PASS", "1111")
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5433")
DB_NAME = os.getenv("DB_NAME", "cienciadedatos")

DATABASE_URL = f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

engine = create_engine(DATABASE_URL)
Base = declarative_base()

  Base = declarative_base()


In [30]:
class DimCustomer(Base):
    __tablename__ = "dimcustomer"
    customer_id = Column(String, primary_key=True)
    gender = Column(String)
    age = Column(Integer)

class DimProduct(Base):
    __tablename__ = "dimproduct"
    product_id = Column(Integer, primary_key=True)
    category = Column(String)

class DimDate(Base):
    __tablename__ = "dimdate"
    date_id = Column(Integer, primary_key=True)
    invoice_date = Column(Date)
    year = Column(Integer)
    month = Column(Integer)
    day = Column(Integer)

class DimPayment(Base):
    __tablename__ = "dimpayment"
    payment_id = Column(Integer, primary_key=True)
    payment_method = Column(String)

class DimMall(Base):
    __tablename__ = "dimmall"
    mall_id = Column(Integer, primary_key=True)
    shopping_mall = Column(String)

class FactSales(Base):
    __tablename__ = "factsales"
    invoice_no = Column(String, primary_key=True)
    customer_id = Column(String)
    product_id = Column(Integer)
    date_id = Column(Integer)
    payment_id = Column(Integer)
    mall_id = Column(Integer)
    quantity = Column(Integer)
    price = Column(Float)

In [31]:
def validate_foreign_keys():
    dim_customers = pd.read_csv(DIM_CUSTOMERS)
    dim_products = pd.read_csv(DIM_PRODUCTS)
    dim_date = pd.read_csv(DIM_DATE)
    dim_payment = pd.read_csv(DIM_PAYMENT)
    dim_mall = pd.read_csv(DIM_MALL)
    fact_sales = pd.read_csv(FACT_SALES)

    print("Clientes faltantes:", len(set(fact_sales["customer_id"]) - set(dim_customers["customer_id"])))
    print("Productos faltantes:", len(set(fact_sales["product_id"]) - set(dim_products["product_id"])))
    print("Fechas faltantes:", len(set(fact_sales["date_id"]) - set(dim_date["date_id"])))
    print("Métodos de pago faltantes:", len(set(fact_sales["payment_id"]) - set(dim_payment["payment_id"])))
    print("Malls faltantes:", len(set(fact_sales["mall_id"]) - set(dim_mall["mall_id"])))

In [33]:
from sqlalchemy import text

def load_table(df_path, table_name, unique_col=None):
    df = pd.read_csv(df_path)

    if unique_col:
        df = df.drop_duplicates(subset=[unique_col])

    with engine.begin() as conn:
        conn.execute(text(f"TRUNCATE TABLE {table_name} CASCADE;"))  # limpia sin borrar esquema

    df.to_sql(
        table_name,
        engine,
        if_exists="append",  # 👈 ahora sí, porque la tabla ya existe
        index=False,
        method="multi",
        chunksize=1000
    )
    print(f"{len(df)} filas insertadas en {table_name}")


if __name__ == "__main__":
    validate_foreign_keys()
    run()

Clientes faltantes: 0
Productos faltantes: 0
Fechas faltantes: 0
Métodos de pago faltantes: 0
Malls faltantes: 0
99457 filas insertadas en dimcustomer
8 filas insertadas en dimproduct
797 filas insertadas en dimdate
3 filas insertadas en dimpayment
10 filas insertadas en dimmall
37110 filas insertadas en factsales
