# Migración Base de Datos Chinook: PostgreSQL → MongoDB

Este notebook implementa la migración completa de la base de datos relacional Chinook desde PostgreSQL hacia MongoDB, como parte del laboratorio final del curso de Bases de Datos No Relacionales.

## Objetivos del Proyecto

1. **Migración de esquema**: Transformar el modelo relacional en documentos MongoDB optimizados
2. **Comparación de consultas**: Demostrar diferencias entre SQL y agregaciones MongoDB  
3. **Optimización**: Implementar índices y campos calculados para mejor rendimiento
4. **Análisis**: Documentar ventajas y desventajas de cada enfoque

## Base de Datos Original
- **Fuente**: [Chinook Database](https://github.com/lerocha/chinook-database)
- **Dominio**: Tienda de música digital (iTunes-like)
- **Estructura**: 11 tablas relacionales normalizadas

## Estrategia de Migración
- **Albums**: Embeber tracks dentro de álbumes
- **Customers**: Embeber facturas completas con líneas  
- **Employees**: Embeber clientes asignados
- **Tracks**: Agregar estadísticas de ventas precalculadas
- **Playlists**: Mantener referencias a tracks

---

In [None]:
pip install psycopg2-binary




In [None]:
!pip install pymongo[srv] pandas

Collecting pymongo[srv]
  Downloading pymongo-4.13.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
[0mCollecting dnspython<3.0.0,>=1.16.0 (from pymongo[srv])
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading dnspython-2.7.0-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pymongo-4.13.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m31.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.7.0 pymongo-4.13.1


In [None]:
#Connect db
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import pandas as pd
from bson import ObjectId
import datetime
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Connect to MongoDB using environment variables
mongodb_password = os.getenv('MONGODB_PASSWORD')
mongodb_username = os.getenv('MONGODB_USERNAME')
mongodb_cluster = os.getenv('MONGODB_CLUSTER')

if not all([mongodb_password, mongodb_username, mongodb_cluster]):
    raise ValueError("Missing MongoDB environment variables. Please check your .env file.")

uri = f"mongodb+srv://{mongodb_username}:{mongodb_password}@{mongodb_cluster}/?retryWrites=true&w=majority&appName=Cluster0"

client = MongoClient(uri, server_api=ServerApi('1'))

try:
    client.admin.command('ping')
    print("✅ Connected to MongoDB!")
except Exception as e:
    print("❌ Connection failed:", e)

✅ Connected to MongoDB!


In [None]:
db= client['chinook-music-store']

In [None]:
# Helper function for PostgreSQL connections
def get_postgres_connection():
    """Get PostgreSQL connection using environment variables"""
    postgres_username = os.getenv('POSTGRES_USERNAME')
    postgres_password = os.getenv('POSTGRES_PASSWORD')
    postgres_host = os.getenv('POSTGRES_HOST')
    postgres_database = os.getenv('POSTGRES_DATABASE')
    
    if not all([postgres_username, postgres_password, postgres_host, postgres_database]):
        raise ValueError("Missing PostgreSQL environment variables. Please check your .env file.")
    
    connection_string = f"postgresql://{postgres_username}:{postgres_password}@{postgres_host}/{postgres_database}"
    return psycopg2.connect(connection_string)

In [None]:
import psycopg2
# Mostrar playlist donde aparezca una cancion con cierto id # Resultado da igual
# Conexión usando variables de entorno
conn = get_postgres_connection()

cur = conn.cursor()

# Ejecutar una consulta
cur.execute("""
    SELECT name from playlist p
    LEFT JOIN playlist_track pt on pt.playlist_id = p.playlist_id
    where pt.track_id = 3
""")
rows = cur.fetchall()

# Mostrar resultados
for row in rows:
    print(row)



# Cerrar conexiones
cur.close()
conn.close()

print("------------------------------")

results = db.playlists.find({
    'tracks.track_id': 4
})

for playlist in results:
  print(playlist['name'])

('Music',)
('90’s Music',)
('Music',)
('Heavy Metal Classic',)
------------------------------
Music
90’s Music
Music
Heavy Metal Classic


In [None]:
import psycopg2
# Mostrar los clientes que atendio un empleado con cierto id # Resultado da igual
# Conexión usando el URI completo
conn = psycopg2.connect("postgresql://lab2user:n4yyQSda08pJT8ucHDe9Y7SSGskYzldJ@dpg-d0r462idbo4c73a0sqs0-a.virginia-postgres.render.com/chinook")

cur = conn.cursor()

# Ejecutar una consulta
cur.execute("""
    SELECT first_name FROM customer c
    where c.support_rep_id = 4
""")
rows = cur.fetchall()

# Mostrar resultados
for row in rows:
    print(row)



# Cerrar conexiones
cur.close()
conn.close()

print("------------------------------")

employee_id = 4



cursor = db.employees.find(
    {'employee_id': employee_id},
    {'customers.customer_name': 1, '_id': 0}
)

for doc in cursor:
    for customer in doc.get('customers', []):
        print(customer['customer_name'])

('Bjørn',)
('František',)
('Daan',)
('Kara',)
('Eduardo',)
('Fernanda',)
('Frank',)
('Dan',)
('Heather',)
('John',)
('Richard',)
('Patrick',)
('Aaron',)
('João',)
('Madalena',)
('Camille',)
('Dominique',)
('Stanisław',)
('Mark',)
('Diego',)
------------------------------
Bjørn Hansen
František Wichterlová
Daan Peeters
Kara Nielsen
Eduardo Martins
Fernanda Ramos
Frank Harris
Dan Miller
Heather Leacock
John Gordon
Richard Cunningham
Patrick Gray
Aaron Mitchell
João Fernandes
Madalena Sampaio
Camille Bernard
Dominique Lefebvre
Stanisław Wójcik
Mark Taylor
Diego Gutiérrez


In [None]:
import psycopg2
import os
# Mostrar los clientes ordenados por quien gasto mas en canciones # Resultado igual
# Conexión usando variables de entorno
conn = get_postgres_connection()

cur = conn.cursor()

# Ejecutar una consulta
cur.execute("""
    SELECT c.customer_id, c.first_name, c.last_name, SUM(inl.unit_price * inl.quantity) as total FROM customer c
    LEFT JOIN invoice inv on inv.customer_id = c.customer_id
    LEFT JOIN invoice_line inl on inv.invoice_id = inl.invoice_id
    GROUP BY c.customer_id, c.first_name, c.last_name
    ORDER BY total desc

""")
rows = cur.fetchall()

# Mostrar resultados
for row in rows:
    print(row)



# Cerrar conexiones
cur.close()
conn.close()

print("------------------------------")

cursor = db.customers.find(
    {},
    {'first_name': 1,
     'last_name': 1,
     'total_spent': 1,
     'customer_id': 1,
     '_id': 0}
).sort('total_spent', -1)

for customer in cursor:
    print(f"({customer['customer_id']}, '{customer['first_name']}', '{customer['last_name']}', Decimal('{round(customer['total_spent'],2)}'))")

(59, 'Puja', 'Srivastava', Decimal('36.64'))
(58, 'Manoj', 'Pareek', Decimal('38.62'))
(57, 'Luis', 'Rojas', Decimal('46.62'))
(56, 'Diego', 'Gutiérrez', Decimal('37.62'))
(55, 'Mark', 'Taylor', Decimal('37.62'))
(54, 'Steve', 'Murray', Decimal('37.62'))
(53, 'Phil', 'Hughes', Decimal('37.62'))
(52, 'Emma', 'Jones', Decimal('37.62'))
(51, 'Joakim', 'Johansson', Decimal('38.62'))
(50, 'Enrique', 'Muñoz', Decimal('37.62'))
(49, 'Stanisław', 'Wójcik', Decimal('37.62'))
(48, 'Johannes', 'Van der Berg', Decimal('40.62'))
(47, 'Lucas', 'Mancini', Decimal('37.62'))
(46, 'Hugh', "O'Reilly", Decimal('45.62'))
(45, 'Ladislav', 'Kovács', Decimal('45.62'))
(44, 'Terhi', 'Hämäläinen', Decimal('41.62'))
(43, 'Isabelle', 'Mercier', Decimal('40.62'))
(42, 'Wyatt', 'Girard', Decimal('39.62'))
(41, 'Marc', 'Dubois', Decimal('37.62'))
(40, 'Dominique', 'Lefebvre', Decimal('38.62'))
(39, 'Camille', 'Bernard', Decimal('38.62'))
(38, 'Niklas', 'Schröder', Decimal('37.62'))
(37, 'Fynn', 'Zimmermann', Decimal

In [None]:
import psycopg2
# Mostrar cuales son las canciones mas vendidas de un genero determinado # Resultado da igual
# Conexión usando variables de entorno
conn = get_postgres_connection()
cur = conn.cursor()

# Ejecutar una consulta
cur.execute("""
    SELECT t.track_id, t.name, SUM(inl.unit_price * inl.quantity) as total from track t
    LEFT JOIN invoice_line inl on inl.track_id = t.track_id
    WHERE inl.unit_price * inl.quantity > 0 and t.genre_id = 3
    GROUP BY t.track_id, t.name
    ORDER BY total desc
""")
rows = cur.fetchall()

# Mostrar resultados
for row in rows:
    print(row)



# Cerrar conexiones
cur.close()
conn.close()

print("------------------------------")

genre_id = 3

results = db.tracks.find(
    {'genre_id': genre_id},
    {
        'name': 1,
        'sales.total_revenue': 1,
        '_id': 0,
        'track_id': 1
    }
).sort('sales.total_revenue', -1)

for track in results:
    # print(track);
    print(f"({track['track_id']}, '{track['name']}', Decimal('{track['sales']['total_revenue']}'))")


(162, 'Cornucopia', Decimal('1.98'))
(184, 'Chemical Wedding', Decimal('1.98'))
(1822, 'Stone Cold Crazy', Decimal('1.98'))
(1389, 'Gangland', Decimal('1.98'))
(1344, 'Aces High', Decimal('1.98'))
(1888, 'Shoot Me Again', Decimal('1.98'))
(416, 'Whiskey In The Jar', Decimal('1.98'))
(2561, 'Question!', Decimal('1.98'))
(1377, 'The Clairvoyant', Decimal('1.98'))
(1841, 'The House Jack Built', Decimal('1.98'))
(84, 'Welcome Home (Sanitarium)', Decimal('1.98'))
(1865, 'Better Than You', Decimal('1.98'))
(1358, 'Blood Brothers', Decimal('1.98'))
(1876, 'For Whom The Bell Tolls', Decimal('1.98'))
(1808, 'Nothing Else Matters', Decimal('1.98'))
(1831, 'Motorbreath', Decimal('1.98'))
(161, 'Snowblind', Decimal('1.98'))
(1226, 'Can I Play With Madness', Decimal('1.98'))
(1126, 'Re-Align', Decimal('1.98'))
(1394, 'The Prisoner', Decimal('1.98'))
(1331, 'Run Silent Run Deep', Decimal('1.98'))
(1553, "You've Got Another Thing Comin'", Decimal('1.98'))
(2560, 'Violent Pornography', Decimal('1.98')

In [None]:
import psycopg2
# Mostrar las canciones mas vendidos de un pais # Resultado da igual
# Conexión usando variables de entorno
conn = get_postgres_connection()
cur = conn.cursor()

# Ejecutar una consulta
cur.execute("""
    SELECT t.track_id, t.name, SUM(inl.unit_price * inl.quantity) as total from track t
    LEFT JOIN invoice_line inl on inl.track_id = t.track_id
    LEFT JOIN invoice inv on inv.invoice_id = inl.invoice_id
    WHERE inl.unit_price * inl.quantity > 0 and inv.billing_country = 'USA'
    GROUP BY t.track_id, t.name
    ORDER BY total desc
""")
rows = cur.fetchall()

# Mostrar resultados
for row in rows:
    print(row)



# Cerrar conexiones
cur.close()
conn.close()
print("------------------------------")
country = 'USA'

pipeline = [
    {"$match": {"invoices.billing_country": country}},
    {"$unwind": "$invoices"},
    {"$unwind": "$invoices.invoice_lines"},
    {"$group": {
        "_id": "$invoices.invoice_lines.track_id",
        "total_invoice_value": {
            "$sum": {
                "$multiply": [
                    "$invoices.invoice_lines.quantity",
                    "$invoices.invoice_lines.unit_price"
                ]
            }
        }
    }},
    {"$lookup": {
        "from": "tracks",
        "localField": "_id",
        "foreignField": "track_id",
        "as": "track_info"
    }},
    {"$unwind": "$track_info"},
    {"$project": {
        "_id": 0,
        "track_id": "$_id",
        "track_name": "$track_info.name",
        "total_invoice_value": 1
    }},
    {"$sort": {"total_invoice_value": -1}}
]

results = db.customers.aggregate(pipeline)

for result in results:
    print(f"({result['track_id']}, '{result['track_name']}', Decimal('{result['total_invoice_value']})")

(3200, 'Gay Witch Hunt', Decimal('3.98'))
(3173, 'Diversity Day', Decimal('1.99'))
(3191, 'The Carpet', Decimal('1.99'))
(2837, 'Crossroads, Pt. 1', Decimal('1.99'))
(3218, 'Safety Training', Decimal('1.99'))
(2828, 'The Passage', Decimal('1.99'))
(3222, 'The Job', Decimal('1.99'))
(2918, '"?"', Decimal('1.99'))
(2882, 'Lost Survival Guide', Decimal('1.99'))
(3230, 'Lost Planet of the Gods, Pt. 2', Decimal('1.99'))
(2900, 'Exposé', Decimal('1.99'))
(3212, "Producer's Cut: The Return", Decimal('1.99'))
(2891, 'Enter 77', Decimal('1.99'))
(3214, "Phyllis's Wedding", Decimal('1.99'))
(3208, 'The Convict', Decimal('1.99'))
(3238, 'The Living Legend, Pt. 2', Decimal('1.99'))
(3182, 'Halloween', Decimal('1.99'))
(2846, 'Seven Minutes to Midnight', Decimal('1.99'))
(3428, 'Branch Closing', Decimal('1.99'))
(2909, 'Catch-22', Decimal('1.99'))
(2864, 'Orientation', Decimal('1.99'))
(2873, 'House of the Rising Sun', Decimal('1.99'))
(3202, 'The Coup', Decimal('1.99'))
(3347, 'Meet Kevin Johnson'

In [None]:
import psycopg2
# Duracion promedio de las canciones de un album
# Conexión usando variables de entorno
conn = get_postgres_connection()
cur = conn.cursor()

# Ejecutar una consulta
cur.execute("""
    SELECT AVG(milliseconds / 1000) AS avg_duration
    FROM track
    WHERE album_id = 1;
""")
rows = cur.fetchall()

# Mostrar resultados
for row in rows:
    print(row)



# Cerrar conexiones
cur.close()
conn.close()

print("------------------------------")

album_id = 1

pipeline = [
    {"$match": {"album_id": album_id}},
    {"$unwind": "$tracks"},
    {"$group": {
        "_id": "$album_id",
        "album_title": {"$first": "$title"},
        "average_duration_ms": {"$avg": "$tracks.milliseconds"}
    }},
    {"$project": {
        "_id": 0,
        "album_id": "$_id",
        "album_title": 1,
        "average_duration_ms": 1,
        "average_duration_seconds": {"$divide": ["$average_duration_ms", 1000]}
    }}
]

result = db.albums.aggregate(pipeline)
# result = db.albums.find({'album_id': album_id},{'tracks.milliseconds': 1 , '_id': 0})
# for doc in result:
#   print(doc)

for doc in result:
    print(f"Álbum: {doc['album_title']} (ID: {doc['album_id']})")
    print(f"Duración promedio: {doc['average_duration_ms']} ms ({doc['average_duration_seconds']} segundos)")


(Decimal('239.4000000000000000'),)
------------------------------
Álbum: For Those About To Rock We Salute You (ID: 1)
Duración promedio: 240041.5 ms (240.0415 segundos)


In [None]:
import psycopg2
# Mostrar que empleado representa a cada cliente
# Conexión usando variables de entorno
conn = get_postgres_connection()
cur = conn.cursor()

# Ejecutar una consulta
cur.execute("""
    SELECT
    c.first_name || ' ' || c.last_name AS customer_name,
    e.first_name || ' ' || e.last_name AS support_rep
FROM Customer c
JOIN Employee e ON c.support_rep_id = e.employee_id;
""")
rows = cur.fetchall()

# Mostrar resultados
for row in rows:
    print(row)



# Cerrar conexiones
cur.close()
conn.close()

print('-------------------------')

results = db.employees.aggregate([
    {"$unwind": "$customers"},
    {
        "$project": {
            "_id": 0,
            "customer_name": "$customers.customer_name",
            "support_rep": {
                "$concat": ["$first_name", " ", "$last_name"]
            }
        }
    }
])

for doc in results:
  print(f"('{doc['customer_name']}', '{doc['support_rep']}')")



('Luís Gonçalves', 'Jane Peacock')
('Leonie Köhler', 'Steve Johnson')
('François Tremblay', 'Jane Peacock')
('Bjørn Hansen', 'Margaret Park')
('František Wichterlová', 'Margaret Park')
('Helena Holý', 'Steve Johnson')
('Astrid Gruber', 'Steve Johnson')
('Daan Peeters', 'Margaret Park')
('Kara Nielsen', 'Margaret Park')
('Eduardo Martins', 'Margaret Park')
('Alexandre Rocha', 'Steve Johnson')
('Roberto Almeida', 'Jane Peacock')
('Fernanda Ramos', 'Margaret Park')
('Mark Philips', 'Steve Johnson')
('Jennifer Peterson', 'Jane Peacock')
('Frank Harris', 'Margaret Park')
('Jack Smith', 'Steve Johnson')
('Michelle Brooks', 'Jane Peacock')
('Tim Goyer', 'Jane Peacock')
('Dan Miller', 'Margaret Park')
('Kathy Chase', 'Steve Johnson')
('Heather Leacock', 'Margaret Park')
('John Gordon', 'Margaret Park')
('Frank Ralston', 'Jane Peacock')
('Victor Stevens', 'Steve Johnson')
('Richard Cunningham', 'Margaret Park')
('Patrick Gray', 'Margaret Park')
('Julia Barnett', 'Steve Johnson')
('Robert Brown'

In [None]:
import psycopg2
# # Mostrar que empleado roporta a quien
# Conexión usando variables de entorno
conn = get_postgres_connection()
cur = conn.cursor()

# Ejecutar una consulta
cur.execute("""
    SELECT
    e.first_name || ' ' || e.last_name AS employee_name,
    m.first_name || ' ' || m.last_name AS manager_name
FROM Employee e
JOIN Employee m ON e.reports_to = m.employee_id;
""")
rows = cur.fetchall()

# Mostrar resultados
for row in rows:
    print(row)



# Cerrar conexiones
cur.close()
conn.close()

print("------------------------------")

results = db.employees.find({}, {'first_name': 1, 'last_name': 1, 'reports_to': 1, '_id': 0})
for doc in results:
  print(f"Empleado: {doc['first_name']} {doc['last_name']} reporta a {doc['reports_to']}")


('Nancy Edwards', 'Andrew Adams')
('Jane Peacock', 'Nancy Edwards')
('Margaret Park', 'Nancy Edwards')
('Steve Johnson', 'Nancy Edwards')
('Michael Mitchell', 'Andrew Adams')
('Robert King', 'Michael Mitchell')
('Laura Callahan', 'Michael Mitchell')
------------------------------
Empleado: Andrew Adams reporta a None
Empleado: Nancy Edwards reporta a Andrew Adams
Empleado: Jane Peacock reporta a Nancy Edwards
Empleado: Margaret Park reporta a Nancy Edwards
Empleado: Steve Johnson reporta a Nancy Edwards
Empleado: Michael Mitchell reporta a Andrew Adams
Empleado: Robert King reporta a Michael Mitchell
Empleado: Laura Callahan reporta a Michael Mitchell


In [None]:
import psycopg2
# Listado detallado de compras hechas por un cliente dado
# Conexión usando variables de entorno
conn = get_postgres_connection()
cur = conn.cursor()

# Ejecutar una consulta
cur.execute("""
    SELECT
    i.invoice_id,
    TO_CHAR(i.invoice_date, 'yyyy/mm/dd') AS invoice_date,
    c.first_name || ' ' || c.last_name AS customer_name,
    t.name AS track_name,
    il.quantity,
    il.unit_price,
    (il.quantity * il.unit_price) AS line_total
FROM invoice i
JOIN customer c ON i.customer_id = c.customer_id
JOIN invoice_line il ON i.invoice_id = il.invoice_id
JOIN track t ON il.track_id = t.track_id
where c.customer_id = 10
ORDER BY i.invoice_date DESC, i.invoice_id;
""")
rows = cur.fetchall()

# Mostrar resultados
for row in rows:
    print(row)



# Cerrar conexiones
cur.close()
conn.close()

print("------------------------------")

results = db.customers.aggregate([
    {"$match": {"customer_id": 10}},
    {"$unwind": "$invoices"},
    {"$unwind": "$invoices.invoice_lines"},
    {
        "$lookup": {
            "from": "tracks",
            "localField": "invoices.invoice_lines.track_id",
            "foreignField": "track_id",
            "as": "track_info"
        }
    },
    {"$unwind": "$track_info"},
    {
        "$project": {
            "_id": 0,
            "invoice_id": "$invoices.invoice_id",
            "invoice_date": {
                "$dateToString": {
                    "format": "%Y/%m/%d",
                    "date": "$invoices.invoice_date"
                }
            },
            "customer_name": {
                "$concat": ["$first_name", " ", "$last_name"]
            },
            "track_name": "$track_info.name",
            "quantity": "$invoices.invoice_lines.quantity",
            "unit_price": "$invoices.invoice_lines.unit_price",
            "line_total": {
                "$multiply": [
                    "$invoices.invoice_lines.quantity",
                    "$invoices.invoice_lines.unit_price"
                ]
            }
        }
    },
    {
        "$sort": {
            "invoice_date": -1,
            "invoice_id": 1
        }
    }
])

for doc in results:
    print(f"({doc['invoice_id']}, '{doc['invoice_date']}', '{doc['customer_name']}', '{doc['track_name']}', {doc['quantity']}, Decimal('{doc['unit_price']}'), Decimal('{doc['line_total']}'))")

(383, '2025/08/12', 'Eduardo Martins', 'Pick Myself Up', 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', 'Dissident', 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', 'Most High', 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', 'Any Colour You Like', 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', "Don't Look Back", 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', 'Oceans', 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', '1/2 Full', 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', 'Cropduster', 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', 'Big Wave', 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', 'Black', 1, Decimal('0.99'), Decimal('0.99'))
(383, '2025/08/12', 'Eduardo Martins', 'Untitled', 1, Decimal('0.99'), Decimal('0.99'))
(383, '20

In [None]:
cols = ["albums","tracks","playlists","customers","employees"];
for c in cols:
  colecciones = db.list_collection_names()
  if c not in colecciones:
      db.create_collection(c)
      print("✔ Colección creada:", c)

In [None]:
# Crear indices
# Albums
db.albums.create_index({ "album_id": 1 }, unique=True)
db.albums.create_index({ "artist_id": 1 })
db.albums.create_index({ "tracks.track_id": 1 })

# Tracks
db.tracks.create_index({ "track_id": 1 }, unique=True)
db.tracks.create_index([("genre", 1), ("sales.total_quantity", -1)])
db.tracks.create_index({ "artist_id": 1 })

# Playlists
db.playlists.create_index({ "playlist_id": 1 }, unique=True)
db.playlists.create_index({ "tracks.track_id": 1 })

# Customers
db.customers.create_index({ "customer_id": 1 }, unique=True)
db.customers.create_index({ "total_spent": -1 })
db.customers.create_index({ "support_rep.employee_id": 1 })
db.customers.create_index({ "invoices.invoice_id": 1 })
db.customers.create_index({ "invoices.invoice_date": 1 })
db.customers.create_index({ "invoices.billing_country": 1 })

# Employees
db.employees.create_index({ "employee_id": 1 }, unique=True)
db.employees.create_index({ "customers.customer_id": 1 })


'customers.customer_id_1'

In [None]:
def migrate(cur, db):
    # Limpia colecciones existentes (opcional)
    for coll in ['albums', 'tracks', 'playlists', 'customers', 'employees']:
        db[coll].delete_many({})

    # 1) Albums + tracks embebidos
    cur.execute("SELECT * FROM album;")
    for alb in cur.fetchall():
        cur.execute("""
            SELECT t.track_id,
                   t.name,
                   t.media_type_id,
                   mt.name AS media_type_name,
                   t.genre_id,
                   g.name AS genre_name,
                   t.composer,
                   t.milliseconds,
                   t.bytes,
                   t.unit_price
            FROM track t
            JOIN media_type mt ON t.media_type_id = mt.media_type_id
            JOIN genre g ON t.genre_id = g.genre_id
            WHERE t.album_id = %s;
        """, (alb['album_id'],))
        tracks = cur.fetchall()

        alb_doc = {
            'album_id':     alb['album_id'],
            'artist_id':    alb['artist_id'],
            'title':        alb['title'],
            'tracks': [{
                'track_id':    tr['track_id'],
                'media': {
                    'media_type_id': tr['media_type_id'],
                    'name':          tr['media_type_name']
                },
                'genre': {
                    'genre_id': tr['genre_id'],
                    'name':     tr['genre_name']
                },
                'name':        tr['name'],
                'composer':    tr['composer'],
                'milliseconds':tr['milliseconds'],
                'bytes':       tr['bytes'],
                'unit_price':  float(tr['unit_price'])
            } for tr in tracks]
        }
        db.albums.insert_one(alb_doc)

    # 2) Tracks sueltos + stats de ventas
    cur.execute("SELECT * FROM track;")
    for tr in cur.fetchall():
        cur.execute("SELECT title, artist_id FROM album WHERE album_id = %s;", (tr['album_id'],))
        alb_info = cur.fetchone()
        cur.execute("SELECT name FROM artist WHERE artist_id = %s;", (alb_info['artist_id'],))
        artist_info = cur.fetchone()
        cur.execute("""
            SELECT SUM(quantity)       AS total_quantity,
                   SUM(quantity*unit_price) AS total_revenue
            FROM invoice_line
            WHERE track_id = %s;
        """, (tr['track_id'],))
        sales = cur.fetchone() or {}
        total_revenue = float(sales.get('total_revenue', 0.0)) if sales.get('total_revenue') else 0.0
        tr_doc = {
            'track_id':     tr['track_id'],
            'name':         tr['name'],
            'album_id':     tr['album_id'],
            'album_name':   alb_info['title'],
            'artist_id':    alb_info['artist_id'],
            'artist_name':  artist_info['name'],
            'genre_id':     tr['genre_id'],
            'composer':     tr['composer'],
            'milliseconds': tr['milliseconds'],
            'bytes':        tr['bytes'],
            'unit_price':   float(tr['unit_price']),
            'sales': {
                'total_quantity': sales.get('total_quantity', 0),
                'total_revenue':  total_revenue
            }
        }
        db.tracks.insert_one(tr_doc)

    # 3) Playlists
    cur.execute("SELECT * FROM playlist;")
    for pl in cur.fetchall():
        cur.execute("""
            SELECT pt.track_id,
                   t.name AS track_name,
                   t.album_id
            FROM playlist_track pt
            JOIN track t ON pt.track_id = t.track_id
            WHERE pt.playlist_id = %s;
        """, (pl['playlist_id'],))
        pts = cur.fetchall()

        pl_doc = {
            'playlist_id': pl['playlist_id'],
            'name':        pl['name'],
            'tracks': [{
                'track_id':  pt['track_id'],
                'track_name':pt['track_name'],
                'album_id':  pt['album_id']
            } for pt in pts]
        }
        db.playlists.insert_one(pl_doc)

    # 4) Customers + invoices embebidas
    cur.execute("SELECT * FROM customer;")
    for cus in cur.fetchall():
        cur.execute("""
            SELECT employee_id,
                   first_name || ' ' || last_name AS employee_name
            FROM employee
            WHERE employee_id = %s;
        """, (cus['support_rep_id'],))
        rep = cur.fetchone() or {}

        cur.execute("SELECT * FROM invoice WHERE customer_id = %s;", (cus['customer_id'],))
        inv_docs = []
        total_spent = 0.0
        for inv in cur.fetchall():
            cur.execute("SELECT track_id, unit_price, quantity FROM invoice_line WHERE invoice_id = %s;", (inv['invoice_id'],))
            lines = cur.fetchall()
            inv_total = sum(float(line['unit_price']) * line['quantity'] for line in lines)
            total_spent += inv_total

            inv_docs.append({
                'invoice_id':      inv['invoice_id'],
                'invoice_lines':  [{'track_id': l['track_id'], 'unit_price': float(l['unit_price']), 'quantity': l['quantity']} for l in lines],
                'total':           inv_total,
                'invoice_date':    inv['invoice_date'],
                'billing_address': inv['billing_address'],
                'billing_city':    inv['billing_city'],
                'billing_state':   inv['billing_state'],
                'billing_country': inv['billing_country'],
                'billing_postal_code': inv['billing_postal_code']
            })

        cus_doc = {
            'customer_id': cus['customer_id'],
            'first_name':  cus['first_name'],
            'last_name':   cus['last_name'],
            'company':     cus['company'],
            'address':     cus['address'],
            'city':        cus['city'],
            'state':       cus['state'],
            'postal_code': cus['postal_code'],
            'fax':         cus['fax'],
            'email':       cus['email'],
            'support_rep': rep,
            'invoices':    inv_docs,
            'total_spent': total_spent
        }
        db.customers.insert_one(cus_doc)

    # 5) Employees + clientes embebidos
    cur.execute("SELECT * FROM employee;")
    for emp in cur.fetchall():
        cur.execute("""
            SELECT customer_id,
                   first_name || ' ' || last_name AS customer_name
            FROM customer
            WHERE support_rep_id = %s;
        """, (emp['employee_id'],))
        custs = cur.fetchall()

        cur.execute("""
          SELECT m.first_name || ' ' || m.last_name AS manager_name
          FROM Employee e
          JOIN Employee m ON e.reports_to = m.employee_id
          WHERE e.employee_id = %s;
        """, (emp['employee_id'],))
        manager = cur.fetchone()

        emp_doc = {
            'employee_id':   emp['employee_id'],
            'first_name':    emp['first_name'],
            'last_name':     emp['last_name'],
            'reports_to':    manager['manager_name'] if manager else None,
            'customer_count': len(custs),
            'customers':    [{'customer_id': c['customer_id'], 'customer_name': c['customer_name']} for c in custs]
        }
        db.employees.insert_one(emp_doc)

    print("✅ Migración completada.")

import psycopg2
import psycopg2.extras


# Conexión usando variables de entorno
conn = get_postgres_connection()

cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

migrate(cur, db)
cur.close()
conn.close()

✅ Migración completada.
