# Migración SQL → NoSQL (MongoDB)

Notebook interactivo para migrar el dump **dbHeroeCloudSQL.sql** (o una base **MySQL** en vivo) hacia **MongoDB**, con un modelado práctico.


## 1) Requisitos (ejecutar en tu entorno local o nuve)

> Si ya tienes instalados `pymysql` y `pymongo`, puedes omitir esta celda.

```bash
pip install pymysql pymongo
```


In [None]:
# Import necessary libraries
import pymysql
import pymongo
import datetime
import typing

## 2) Configuración

Elige el **origen** (MySQL o dump SQL) y el **destino** (MongoDB o JSON).
Edita las variables de esta celda según tu entorno.


In [None]:

# === Origen de datos ===
SQL_MODE = "sqlite"   # "mysql" | "sqlite"

# Para MySQL (si SQL_MODE == "mysql")
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
MYSQL_DB   = "myDb"
MYSQL_USER = "usuario"
MYSQL_PASS = "clave"

# Para Dump SQL (si SQL_MODE == "sqlite")
SQLITE_SQL_PATH = "/mnt/data/dbHeroeCloudSQL.sql"  # ajusta si copiaste a otra ruta

# === Destino ===
# Si defines MONGO_URI se escribirá en MongoDB; si lo dejas vacío, exporta JSON en ./output/*.json
MONGO_URI = ""  # p.ej. "mongodb://localhost:27017"
MONGO_DB  = "apiheroes_nosql"


In [None]:

# Dependencias opcionales (se importan si están instaladas)
try:
    import pymysql  # para MySQL
except Exception:
    pymysql = None

try:
    from pymongo import MongoClient, ReplaceOne
    from bson.objectid import ObjectId
except Exception:
    MongoClient = None
    ReplaceOne = None
    ObjectId = None

import os, sys, json, sqlite3
from datetime import datetime
from typing import Any, Dict, List, Optional


## 3) Utilidades

In [None]:

def to_bool(x: Any) -> bool:
    if isinstance(x, bool):
        return x
    if x is None:
        return False
    if isinstance(x, (int, float)):
        return x != 0
    s = str(x).strip().lower()
    return s in {'1', 'true', 't', 'yes', 'y', 'on'}

def to_iso_date(x: Any) -> Optional[str]:
    if x in (None, "", "0000-00-00"):
        return None
    try:
        return datetime.fromisoformat(str(x)).date().isoformat()
    except Exception:
        try:
            return datetime.strptime(str(x), "%Y-%m-%d").date().isoformat()
        except Exception:
            return str(x)


## 4) Cargar origen SQL

In [None]:

class SQLSource:
    def query_all(self, sql: str, params: tuple = ()):
        raise NotImplementedError
    def close(self): ...
    
class MySQLSource(SQLSource):
    def __init__(self, host: str, port: int, db: str, user: str, password: str):
        if pymysql is None:
            raise RuntimeError("pymysql no está instalado. Ejecuta: pip install pymysql")
        self.conn = pymysql.connect(
            host=host, port=port, user=user, password=password,
            db=db, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor
        )
    def query_all(self, sql: str, params: tuple = ()):
        with self.conn.cursor() as cur:
            cur.execute(sql, params)
            return list(cur.fetchall())
    def close(self):
        try: self.conn.close()
        except Exception: pass

class SQLiteFromDump(SQLSource):
    def __init__(self, dump_path: str):
        if not os.path.isfile(dump_path):
            raise FileNotFoundError(f"No existe el archivo SQL: {dump_path}")
        self.conn = sqlite3.connect(":memory:")
        self.conn.row_factory = sqlite3.Row
        with open(dump_path, "r", encoding="utf-8") as f:
            sql_script = f.read()
        stmts = (
            sql_script
            .replace("AUTO_INCREMENT", "")
            .replace("TINYINT(1)", "INTEGER")
            .replace("ENUM('ADMIN_ROLE', 'USER_ROLE')", "TEXT")
            .replace("BIGINT", "INTEGER")
            .replace("INT", "INTEGER")
        )
        self.conn.executescript(stmts)
    def query_all(self, sql: str, params: tuple = ()):
        cur = self.conn.execute(sql, params)
        return [dict(r) for r in cur.fetchall()]
    def close(self):
        try: self.conn.close()
        except Exception: pass

def get_source():
    if SQL_MODE.lower() == "mysql":
        return MySQLSource(MYSQL_HOST, int(MYSQL_PORT), MYSQL_DB, MYSQL_USER, MYSQL_PASS)
    return SQLiteFromDump(SQLITE_SQL_PATH)


## 5) Configurar destino (MongoDB o JSON)

In [None]:

class Sink:
    def write(self, coll: str, docs):
        raise NotImplementedError
    def close(self): ...

class MongoSink(Sink):
    def __init__(self, uri: str, dbname: str):
        if MongoClient is None:
            raise RuntimeError("pymongo no está instalado. Ejecuta: pip install pymongo")
        self.client = MongoClient(uri)
        self.db = self.client[dbname]
    def write(self, coll: str, docs):
        if not docs: return
        c = self.db[coll]
        ops = []
        for d in docs:
            if "_id" in d:
                filt = {"_id": d["_id"]}
            elif "nombre" in d:
                filt = {"nombre": d["nombre"]}
            else:
                c.insert_one(d); 
                continue
            ops.append(ReplaceOne(filt, d, upsert=True))
        if ops:
            c.bulk_write(ops, ordered=False)
    def close(self):
        try: self.client.close()
        except Exception: pass

class JSONSink(Sink):
    def __init__(self, outdir: str):
        os.makedirs(outdir, exist_ok=True)
        self.outdir = outdir
    def write(self, coll: str, docs):
        path = os.path.join(self.outdir, f"{coll}.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(docs, f, ensure_ascii=False, indent=2)

def get_sink():
    if MONGO_URI:
        print(f"[INFO] Enviando a MongoDB '{MONGO_DB}' en {MONGO_URI} ...")
        return MongoSink(MONGO_URI, MONGO_DB)
    outdir = os.path.join(os.getcwd(), "output")
    print(f"[INFO] MONGO_URI no definido. Exportando JSON en {outdir} ...")
    return JSONSink(outdir)


## 6) Extraer datos desde SQL

In [None]:

src = get_source()

heroes = src.query_all("SELECT id, nombre, bio, img, aparicion, casa FROM heroes_ds")
peliculas = src.query_all("SELECT id, nombre FROM peliculas_ds")
usuarios = src.query_all("""
SELECT id, nombre, correo, password, img, rol, estado, google, fecha_creacion, fecha_actualizacion
FROM usuarios_ds
""")
multimedias = src.query_all("SELECT idmultimedia AS id, nombre, url, tipo FROM multimedias_ds")
protagonistas = src.query_all("""
SELECT id, papel, fecha_participacion, heroes_id, peliculas_id
FROM protagonistas_ds
""")
hero_media = src.query_all("SELECT heroes_id, idmultimedia FROM multimedias_heroe_ds")

len(heroes), len(peliculas), len(usuarios), len(multimedias), len(protagonistas), len(hero_media)


## 7) Transformar a documentos NoSQL

In [None]:

pel_by_id = {p["id"]: p for p in peliculas}
media_by_id = {m["id"]: m for m in multimedias}

media_by_hero = {}
for hm in hero_media:
    media_by_hero.setdefault(hm["heroes_id"], []).append(media_by_id.get(hm["idmultimedia"]))

part_by_hero = {}
for pr in protagonistas:
    pel = pel_by_id.get(pr["peliculas_id"], {"id": pr["peliculas_id"], "nombre": None})
    part_by_hero.setdefault(pr["heroes_id"], []).append({
        "pelicula_id": pel["id"],
        "pelicula_nombre": pel["nombre"],
        "papel": pr["papel"],
        "fecha_participacion": to_iso_date(pr["fecha_participacion"]),
    })

heroes_docs = []
for h in heroes:
    heroes_docs.append({
        "_id": int(h["id"]),
        "nombre": h["nombre"],
        "bio": h.get("bio"),
        "img": h.get("img"),
        "aparicion": to_iso_date(h.get("aparicion")),
        "casa": h.get("casa"),
        "peliculas": part_by_hero.get(h["id"], []),
        "multimedias": media_by_hero.get(h["id"], []),
    })

peliculas_docs = [{"_id": int(p["id"]), "nombre": p["nombre"]} for p in peliculas]

usuarios_docs = []
for u in usuarios:
    usuarios_docs.append({
        "_id": int(u["id"]),
        "nombre": u["nombre"],
        "correo": u["correo"],
        "password": u["password"],
        "img": u.get("img"),
        "rol": u["rol"],
        "estado": to_bool(u.get("estado")),
        "google": to_bool(u.get("google")),
        "fecha_creacion": to_iso_date(u.get("fecha_creacion")),
        "fecha_actualizacion": to_iso_date(u.get("fecha_actualizacion")),
    })

multimedias_docs = [{"_id": int(m["id"]), "nombre": m["nombre"], "url": m.get("url"), "tipo": m.get("tipo")} for m in multimedias]

protagonistas_docs = []
for pr in protagonistas:
    protagonistas_docs.append({
        "_id": int(pr["id"]),
        "papel": pr["papel"],
        "fecha_participacion": to_iso_date(pr["fecha_participacion"]),
        "heroes_id": int(pr["heroes_id"]),
        "peliculas_id": int(pr["peliculas_id"]),
    })

summary = {
    "heroes": len(heroes_docs),
    "peliculas": len(peliculas_docs),
    "usuarios": len(usuarios_docs),
    "multimedias": len(multimedias_docs),
    "protagonistas": len(protagonistas_docs),
}
summary


## 8) Cargar en destino

In [None]:

sink = get_sink()
sink.write("heroes", heroes_docs)
sink.write("peliculas", peliculas_docs)
sink.write("usuarios", usuarios_docs)
sink.write("multimedias", multimedias_docs)
sink.write("protagonistas", protagonistas_docs)
sink.close()
src.close()
print("[OK] Migración terminada.")
