In [4]:
import duckdb
from pathlib import Path

# Conectar a DuckDB
conn = duckdb.connect(":memory:")

# Instalar extensi√≥n de PostgreSQL
print("Instalando extensi√≥n de PostgreSQL...")
conn.execute("INSTALL postgres")
conn.execute("LOAD postgres")
print("‚úì Extensi√≥n cargada\n")

# Ruta de archivos curados
curated_path = Path("../data/curated/")

print("=" * 60)
print("ETL: Parquet ‚Üí PostgreSQL con DuckDB")
print("=" * 60 + "\n")

# 1. Conectar a PostgreSQL y limpiar TODO
print("Conectando a PostgreSQL...")
pg_conn = "dbname=analytics user=user password=pass host=localhost port=5432"
conn.execute(f"ATTACH '{pg_conn}' AS pg (TYPE POSTGRES)")
print("‚úì Conectado\n")

print("üóëÔ∏è  Eliminando TODAS las tablas y constraints...")
conn.execute("DROP TABLE IF EXISTS pg.sales CASCADE")
conn.execute("DROP TABLE IF EXISTS pg.products CASCADE")
conn.execute("DROP TABLE IF EXISTS pg.customers CASCADE")
conn.execute("DROP TABLE IF EXISTS pg.cities CASCADE")
conn.execute("DROP TABLE IF EXISTS pg.customer_types CASCADE")
conn.execute("DROP TABLE IF EXISTS pg.product_types CASCADE")
conn.execute("DROP TABLE IF EXISTS pg.sale_types CASCADE")
print("‚úì PostgreSQL limpio\n")

# 2. Cargar todos los Parquet en DuckDB como tablas temporales
print("Cargando archivos Parquet en DuckDB...\n")

conn.execute(f"CREATE TABLE cities AS SELECT * FROM read_parquet('{(curated_path / 'cities.parquet').as_posix()}')")
count = conn.execute("SELECT COUNT(*) FROM cities").fetchone()[0]
print(f"‚úì cities            : {count:,} filas")

conn.execute(f"CREATE TABLE customer_types AS SELECT * FROM read_parquet('{(curated_path / 'customer_types.parquet').as_posix()}')")
count = conn.execute("SELECT COUNT(*) FROM customer_types").fetchone()[0]
print(f"‚úì customer_types    : {count:,} filas")

conn.execute(f"CREATE TABLE product_types AS SELECT * FROM read_parquet('{(curated_path / 'product_types.parquet').as_posix()}')")
count = conn.execute("SELECT COUNT(*) FROM product_types").fetchone()[0]
print(f"‚úì product_types     : {count:,} filas")

conn.execute(f"CREATE TABLE sale_types AS SELECT * FROM read_parquet('{(curated_path / 'sale_types.parquet').as_posix()}')")
count = conn.execute("SELECT COUNT(*) FROM sale_types").fetchone()[0]
print(f"‚úì sale_types        : {count:,} filas")

conn.execute(f"CREATE TABLE customers AS SELECT * FROM read_parquet('{(curated_path / 'customers.parquet').as_posix()}')")
count = conn.execute("SELECT COUNT(*) FROM customers").fetchone()[0]
print(f"‚úì customers         : {count:,} filas")

conn.execute(f"CREATE TABLE products AS SELECT * FROM read_parquet('{(curated_path / 'products.parquet').as_posix()}')")
count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
print(f"‚úì products          : {count:,} filas")

conn.execute(f"CREATE TABLE sales AS SELECT * FROM read_parquet('{(curated_path / 'sales.parquet').as_posix()}')")
count = conn.execute("SELECT COUNT(*) FROM sales").fetchone()[0]
print(f"‚úì sales             : {count:,} filas")

# 3. Transferir datos de DuckDB a PostgreSQL
print("\nTransfiriendo datos a PostgreSQL...\n")

# Orden correcto para respetar foreign keys
transfer_tables = [
    "cities",
    "customer_types", 
    "product_types",
    "sale_types",
    "customers",
    "products",
    "sales"
]

for table in transfer_tables:
    conn.execute(f"CREATE TABLE pg.{table} AS SELECT * FROM {table}")
    count = conn.execute(f"SELECT COUNT(*) FROM pg.{table}").fetchone()[0]
    print(f"‚úì {table:20s}: {count:,} filas ‚Üí PostgreSQL")

print("\n" + "=" * 60)
print("‚úì Datos transferidos exitosamente")
print("=" * 60)

conn.close()

Instalando extensi√≥n de PostgreSQL...
‚úì Extensi√≥n cargada

ETL: Parquet ‚Üí PostgreSQL con DuckDB

Conectando a PostgreSQL...
‚úì Conectado

üóëÔ∏è  Eliminando TODAS las tablas y constraints...
‚úì PostgreSQL limpio

Cargando archivos Parquet en DuckDB...

‚úì cities            : 33 filas
‚úì customer_types    : 4 filas
‚úì product_types     : 6 filas
‚úì sale_types        : 4 filas
‚úì customers         : 132 filas
‚úì products          : 72 filas
‚úì Extensi√≥n cargada

ETL: Parquet ‚Üí PostgreSQL con DuckDB

Conectando a PostgreSQL...
‚úì Conectado

üóëÔ∏è  Eliminando TODAS las tablas y constraints...
‚úì PostgreSQL limpio

Cargando archivos Parquet en DuckDB...

‚úì cities            : 33 filas
‚úì customer_types    : 4 filas
‚úì product_types     : 6 filas
‚úì sale_types        : 4 filas
‚úì customers         : 132 filas
‚úì products          : 72 filas
‚úì sales             : 7,431,930 filas

Transfiriendo datos a PostgreSQL...

‚úì cities              : 33 filas ‚Üí Postgr

In [5]:
import psycopg2

print("=" * 60)
print("Creando relaciones (Primary Keys y Foreign Keys)")
print("=" * 60 + "\n")

# Conectar a PostgreSQL
print("Conectando a PostgreSQL...")
pg_conn = psycopg2.connect(
    dbname="analytics",
    user="user",
    password="pass",
    host="localhost",
    port=5432
)
pg_conn.autocommit = False
cursor = pg_conn.cursor()
print("‚úì Conectado\n")

try:
    # 1. Agregar Primary Keys
    print("Agregando Primary Keys...\n")
    
    cursor.execute("ALTER TABLE cities ADD PRIMARY KEY (id_city)")
    print("‚úì cities(id_city)")
    
    cursor.execute("ALTER TABLE customer_types ADD PRIMARY KEY (id_type_customer)")
    print("‚úì customer_types(id_type_customer)")
    
    cursor.execute("ALTER TABLE product_types ADD PRIMARY KEY (id_type_product)")
    print("‚úì product_types(id_type_product)")
    
    cursor.execute("ALTER TABLE sale_types ADD PRIMARY KEY (id_type_sale)")
    print("‚úì sale_types(id_type_sale)")
    
    cursor.execute("ALTER TABLE customers ADD PRIMARY KEY (id_customer)")
    print("‚úì customers(id_customer)")
    
    cursor.execute("ALTER TABLE products ADD PRIMARY KEY (id_product)")
    print("‚úì products(id_product)")
    
    cursor.execute("ALTER TABLE sales ADD PRIMARY KEY (id_sale)")
    print("‚úì sales(id_sale)")
    
    pg_conn.commit()
    
    # 2. Agregar Foreign Keys
    print("\nAgregando Foreign Keys...\n")
    
    # Customers ‚Üí Cities, Customer Types
    cursor.execute("""
        ALTER TABLE customers 
        ADD CONSTRAINT fk_customers_city 
        FOREIGN KEY (id_city) REFERENCES cities(id_city)
    """)
    print("‚úì customers ‚Üí cities")
    
    cursor.execute("""
        ALTER TABLE customers 
        ADD CONSTRAINT fk_customers_type 
        FOREIGN KEY (id_type_customer) REFERENCES customer_types(id_type_customer)
    """)
    print("‚úì customers ‚Üí customer_types")
    
    # Products ‚Üí Product Types
    cursor.execute("""
        ALTER TABLE products 
        ADD CONSTRAINT fk_products_type 
        FOREIGN KEY (id_type_product) REFERENCES product_types(id_type_product)
    """)
    print("‚úì products ‚Üí product_types")
    
    # Sales ‚Üí Products, Sale Types, Customers
    cursor.execute("""
        ALTER TABLE sales 
        ADD CONSTRAINT fk_sales_product 
        FOREIGN KEY (id_product) REFERENCES products(id_product)
    """)
    print("‚úì sales ‚Üí products")
    
    cursor.execute("""
        ALTER TABLE sales 
        ADD CONSTRAINT fk_sales_type 
        FOREIGN KEY (id_type_sale) REFERENCES sale_types(id_type_sale)
    """)
    print("‚úì sales ‚Üí sale_types")
    
    cursor.execute("""
        ALTER TABLE sales 
        ADD CONSTRAINT fk_sales_customer 
        FOREIGN KEY (id_customer) REFERENCES customers(id_customer)
    """)
    print("‚úì sales ‚Üí customers")
    
    pg_conn.commit()
    
    print("\n" + "=" * 60)
    print("‚úì Todas las relaciones creadas exitosamente")
    print("=" * 60)
    
except Exception as e:
    pg_conn.rollback()
    print(f"\n‚úó Error: {e}")
    raise
finally:
    cursor.close()
    pg_conn.close()

Creando relaciones (Primary Keys y Foreign Keys)

Conectando a PostgreSQL...
‚úì Conectado

Agregando Primary Keys...

‚úì cities(id_city)
‚úì customer_types(id_type_customer)
‚úì product_types(id_type_product)
‚úì sale_types(id_type_sale)
‚úì customers(id_customer)
‚úì products(id_product)
‚úì sales(id_sale)

Agregando Foreign Keys...

‚úì customers ‚Üí cities
‚úì customers ‚Üí customer_types
‚úì products ‚Üí product_types
‚úì sales(id_sale)

Agregando Foreign Keys...

‚úì customers ‚Üí cities
‚úì customers ‚Üí customer_types
‚úì products ‚Üí product_types
‚úì sales ‚Üí products
‚úì sales ‚Üí products
‚úì sales ‚Üí sale_types
‚úì sales ‚Üí sale_types
‚úì sales ‚Üí customers

‚úì Todas las relaciones creadas exitosamente
‚úì sales ‚Üí customers

‚úì Todas las relaciones creadas exitosamente
