In [1]:
import pandas as pd
from sqlalchemy import create_engine, text


# 1) Parametros de conexion

In [None]:
usuario = "etluser"
password = "etlpass"
host = "localhost"
puerto = "5432"
base_datos = "dw"

engine = create_engine(f"postgresql://{usuario}:{password}@{host}:{puerto}/{base_datos}")

# 2) Crear tablas ejecutando el schema.sql


In [8]:
with open("..//dw/schema.sql", "r", encoding="utf-8") as f:
    schema_sql = f.read()

with engine.connect() as conn:
    conn.execute(text(schema_sql))
    conn.commit()

print("Esquema creado correctamente en PostgreSQL")

Esquema creado correctamente en PostgreSQL


# 3) Leer datos curated (used_cars_final)

In [5]:
query = "SELECT * FROM Used_cars_final;"
df = pd.read_sql(query, engine)
print(f"Dataset cargado desde tabla curated ({len(df)} filas)")

Dataset cargado desde tabla curated (300 filas)


# 4) Cargamos dimensiones


In [6]:
# ---- dim_brand ----
dim_brand = df[['brand', 'country']].drop_duplicates().rename(
    columns={'brand': 'brand_name', 'country': 'brand_country'}
)
dim_brand.to_sql('dim_brand', engine, if_exists='append', index=False)
print("dim_brand poblada")

# ---- dim_model ----
brand_ids = pd.read_sql("SELECT brand_id, brand_name FROM dim_brand;", engine)
dim_model = (
    df[['brand', 'model']]
    .drop_duplicates()
    .merge(brand_ids, left_on='brand', right_on='brand_name')
    [['brand_id', 'model']]
    .rename(columns={'model': 'model_name'})
)
dim_model.to_sql('dim_model', engine, if_exists='append', index=False)
print("dim_model poblada")

# ---- dim_location ----
dim_location = df[['city', 'state', 'country']].drop_duplicates()
dim_location.to_sql('dim_location', engine, if_exists='append', index=False)
print("dim_location poblada")

# ---- dim_date ----
dim_date = pd.DataFrame({
    'full_date': pd.to_datetime(df['listing_date'])
}).drop_duplicates().sort_values('full_date')

dim_date['date_id'] = range(1, len(dim_date) + 1)
dim_date['year'] = dim_date['full_date'].dt.year
dim_date['quarter'] = dim_date['full_date'].dt.quarter
dim_date['month'] = dim_date['full_date'].dt.month
dim_date['day'] = dim_date['full_date'].dt.day
dim_date['day_name'] = dim_date['full_date'].dt.day_name()
dim_date['weekofyear'] = dim_date['full_date'].dt.isocalendar().week

dim_date.to_sql('dim_date', engine, if_exists='append', index=False)
print("dim_date poblada")

dim_brand poblada
dim_model poblada
dim_location poblada
dim_date poblada


# 5) Poblar tabla de hecho fact_listings

In [7]:
# Cargar IDs de dimensiones
brand_ids = pd.read_sql("SELECT * FROM dim_brand;", engine)
model_ids = pd.read_sql("SELECT * FROM dim_model;", engine)
location_ids = pd.read_sql("SELECT * FROM dim_location;", engine)
date_ids = pd.read_sql("SELECT * FROM dim_date;", engine)

# Unir dataset principal con las dimensiones
fact_df = (
    df.merge(brand_ids, left_on='brand', right_on='brand_name', how='left')
      .merge(model_ids, left_on=['model', 'brand_id'], right_on=['model_name', 'brand_id'], how='left')
      .merge(location_ids, on=['city', 'state', 'country'], how='left')
      .merge(date_ids, left_on='listing_date', right_on='full_date', how='left')
)

# Limpieza y preparación
fact_df = fact_df.rename(columns={'key': 'listing_key'})

# Verificamos si year se renombró en los merges
if 'year_x' in fact_df.columns:
    fact_df = fact_df.rename(columns={'year_x': 'year'})
elif 'year_y' in fact_df.columns:
    fact_df = fact_df.rename(columns={'year_y': 'year'})

# Seleccionamos solo las columnas necesarias
cols_utiles = [
    'listing_id',
    'listing_key',
    'model_id',
    'location_id',
    'seller_type',
    'fuel',
    'transmission',
    'year',
    'mileage_km',
    'price_usd',
    'price_clean',
    'currency',
    'date_id'
]

fact_df = fact_df[[c for c in cols_utiles if c in fact_df.columns]]

# Renombrar para coincidir con la tabla de hechos
fact_df = fact_df.rename(columns={
    'listing_id': 'listing_natural_id',
    'listing_key': 'listing_key',
    'date_id': 'listing_date_id'
})

# Insertar en la tabla de hechos
fact_df.to_sql('fact_listings', engine, if_exists='append', index=False)
print("act_listings poblada correctamente")

print("\nCarga completa del Data Warehouse finalizada con éxito")

act_listings poblada correctamente

Carga completa del Data Warehouse finalizada con éxito
