In [1]:
!pip install mysql-connector-python
!pip install pymysql

Collecting mysql-connector-python
  Downloading mysql_connector_python-9.3.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (7.2 kB)
Downloading mysql_connector_python-9.3.0-cp311-cp311-manylinux_2_28_x86_64.whl (33.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m58.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.3.0
Collecting pymysql
  Downloading PyMySQL-1.1.1-py3-none-any.whl.metadata (4.4 kB)
Downloading PyMySQL-1.1.1-py3-none-any.whl (44 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.0/45.0 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pymysql
Successfully installed pymysql-1.1.1


In [None]:
import requests
import time
import json
import pandas as pd
import os
import logging
from datetime import datetime
from zoneinfo import ZoneInfo

os.makedirs("logs", exist_ok=True)
log_path = os.path.join("logs", "registro_etl.log")

logger = logging.getLogger("ETLLogger")
logger.setLevel(logging.INFO)

if not any(isinstance(h, logging.FileHandler) for h in logger.handlers):
    file_handler = logging.FileHandler(log_path)
    formatter = logging.Formatter('%(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

hora_log = datetime.now(ZoneInfo("Europe/Madrid")).strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"{hora_log} - INFO - 🟢 Inicio del proceso de extracción ETL.")
print(f"✅ Log creado en: {os.path.abspath(log_path)}")

def registrar_log(etiqueta, datos, archivo, inicio, estado="Éxito", error_msg=None):
    registros = len(datos) if datos is not None else 0
    zona_local = ZoneInfo("Europe/Madrid")
    ahora = datetime.now(zona_local)
    hora_log = ahora.strftime('%Y-%m-%d %H:%M:%S')
    inicio_local = datetime.fromtimestamp(inicio, tz=zona_local).strftime('%H:%M:%S')
    fin_local = ahora.strftime('%H:%M:%S')
    duracion = round(time.time() - inicio, 2)

    cuerpo = (
        f"🔍 Fuente: {etiqueta} | "
        f"🕒 Inicio: {inicio_local} | 🕕 Fin: {fin_local} | "
        f"📊 Registros: {registros} | "
        f"📁 Archivo: {archivo} | "
        f"⏱ Tiempo total: {duracion} s | "
        f"📌 Resultado: {estado}"
    )
    if error_msg:
        cuerpo += f" | ⚠️ Error: {error_msg}"

    mensaje = f"{hora_log} - INFO - {cuerpo}"
    print(mensaje)
    logger.info(mensaje)

RAWG_API_KEY = "a28383ba212e47e58157ff4528a0a49c"
RAWG_URL = "https://api.rawg.io/api/games"
GIANTBOMB_API_KEY = "a265cb9c56c7a4d9dde27e9ec74fa9afeaf00c07"
GIANTBOMB_URL = "https://www.giantbomb.com/api/games/"
HEADERS = {"User-Agent": "DataCollector/1.0"}

def fetch_json_data(url):
    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            return response.json()
    except requests.exceptions.RequestException:
        return {}
    return {}

def procesar_json_dict(data):
    df = pd.DataFrame.from_dict(data, orient='index')
    for col in df.columns:
        if df[col].apply(lambda x: isinstance(x, (list, dict, bool))).any():
            df[col] = df[col].apply(lambda x: json.dumps(x) if x is not None else None)
    return df

def procesar_json_list(data):
    df = pd.DataFrame(data)
    for col in df.columns:
        if df[col].apply(lambda x: isinstance(x, (list, dict, bool))).any():
            df[col] = df[col].apply(lambda x: json.dumps(x) if x is not None else None)
    return df


INFO:ETLLogger:2025-06-05 22:18:38 - INFO - 🟢 Inicio del proceso de extracción ETL.


✅ Log creado en: /content/logs/registro_etl.log


In [None]:
# RAWG
RAWG_API_KEY = "a28383ba212e47e58157ff4528a0a49c"
def fetch_rawg_games(api_key, max_pages=150):
    page = 1
    page_size = 40
    all_games = []
    while page <= max_pages:
        try:
            params = {"key": api_key, "page": page, "page_size": page_size}
            response = requests.get("https://api.rawg.io/api/games", params=params, timeout=15)
            if response.status_code == 200:
                data = response.json()
                all_games.extend(data.get("results", []))
                page += 1
                time.sleep(0.5)
            else:
                break
        except:
            break
    return all_games

def procesar_rawg_data(lista_rawg):
    if not lista_rawg:
        return pd.DataFrame()

    df = pd.DataFrame(lista_rawg)

    columnas_clave = [
        "id", "slug", "name", "released", "rating", "ratings_count",
        "metacritic", "playtime", "added", "added_by_status", "platforms", "updated"
    ]
    df = df[[col for col in columnas_clave if col in df.columns]].copy()

    columnas_imagenes = [col for col in df.columns if "image" in col or "background_image" in col]
    df.drop(columns=columnas_imagenes, inplace=True, errors="ignore")

    if "added_by_status" in df.columns:
        df["added_by_status"] = df["added_by_status"].apply(lambda x: x if isinstance(x, dict) else {})
        claves_added = set()
        df["added_by_status"].apply(lambda d: claves_added.update(d.keys()))
        for clave in sorted(claves_added):
            df[f"added_{clave}"] = df["added_by_status"].apply(lambda d: d.get(clave, None))
        df.drop(columns=["added_by_status"], inplace=True)

    if "platforms" in df.columns:
        df["platforms"] = df["platforms"].apply(lambda x: x if isinstance(x, list) else [])
        df = df.explode("platforms")

        df["platform_id"] = df["platforms"].apply(lambda p: p.get("platform", {}).get("id") if isinstance(p, dict) else None)
        df["platform_name"] = df["platforms"].apply(lambda p: p.get("platform", {}).get("name") if isinstance(p, dict) else None)
        df["platform_slug"] = df["platforms"].apply(lambda p: p.get("platform", {}).get("slug") if isinstance(p, dict) else None)

        df.drop(columns=["platforms"], inplace=True)

    return df

inicio_rawg = time.time()
try:
    rawg_data = fetch_rawg_games(RAWG_API_KEY)
    df_rawg = procesar_rawg_data(rawg_data)
    df_rawg.to_parquet("rawg_games.parquet", index=False)
    registrar_log("RAWG", df_rawg, "rawg_games.parquet", inicio_rawg)
except Exception as e:
    registrar_log("RAWG", None, "rawg_games.parquet", inicio_rawg, estado="Fallo", error_msg=str(e))

INFO:ETLLogger:2025-06-05 22:47:18 - INFO - 🔍 Fuente: RAWG | 🕒 Inicio: 22:45:48 | 🕕 Fin: 22:47:18 | 📊 Registros: 9652 | 📁 Archivo: rawg_games.csv | ⏱ Tiempo total: 89.64 s | 📌 Resultado: Éxito


2025-06-05 22:47:18 - INFO - 🔍 Fuente: RAWG | 🕒 Inicio: 22:45:48 | 🕕 Fin: 22:47:18 | 📊 Registros: 9652 | 📁 Archivo: rawg_games.csv | ⏱ Tiempo total: 89.64 s | 📌 Resultado: Éxito


INFO:ETLLogger:2025-06-05 22:50:38 - INFO - 🔍 Fuente: RAWG | 🕒 Inicio: 22:47:18 | 🕕 Fin: 22:50:38 | 📊 Registros: 20068 | 📁 Archivo: rawg_games.parquet | ⏱ Tiempo total: 200.22 s | 📌 Resultado: Éxito


2025-06-05 22:50:38 - INFO - 🔍 Fuente: RAWG | 🕒 Inicio: 22:47:18 | 🕕 Fin: 22:50:38 | 📊 Registros: 20068 | 📁 Archivo: rawg_games.parquet | ⏱ Tiempo total: 200.22 s | 📌 Resultado: Éxito


In [None]:
# GIANT BOMB
GIANTBOMB_API_KEY = "a265cb9c56c7a4d9dde27e9ec74fa9afeaf00c07"
def fetch_giantbomb_games(api_key, max_pages=150, max_retries=3):
    offset = 0
    limit = 100
    all_games = []
    current_page = 0
    while current_page < max_pages:
        retries = 0
        while retries < max_retries:
            try:
                params = {
                    "api_key": api_key,
                    "format": "json",
                    "offset": offset,
                    "limit": limit,
                    "field_list": "id,guid,name,expected_release_year,original_release_date,date_added,date_last_updated,platforms,original_game_rating"
                }
                response = requests.get(GIANTBOMB_URL, params=params, headers=HEADERS, timeout=15)
                if response.status_code == 200:
                    data = response.json()
                    results = data.get("results", [])
                    if not results:
                        break
                    all_games.extend(results)
                    offset += limit
                    current_page += 1
                    time.sleep(1)
                    break
                else:
                    retries += 1
                    time.sleep(2 ** retries)
            except requests.exceptions.RequestException:
                retries += 1
                time.sleep(2 ** retries)
        else:
            break
    return all_games

def procesar_giantbomb_data(lista_juegos):
    juegos = []
    plataformas = []
    ratings = []

    for juego in lista_juegos:
        juego_base = {
            "id": juego.get("id"),
            "guid": juego.get("guid"),
            "name": juego.get("name"),
            "expected_release_year": juego.get("expected_release_year"),
            "original_release_date": juego.get("original_release_date"),
            "date_added": juego.get("date_added"),
            "date_last_updated": juego.get("date_last_updated")
        }
        juegos.append(juego_base)

        for p in juego.get("platforms") or []:
            if isinstance(p, dict):
                plataformas.append({
                    "game_id": juego.get("id"),
                    "platform_id": p.get("id"),
                    "platform_name": p.get("name"),
                    "platform_abbr": p.get("abbreviation")
                })

        for r in juego.get("original_game_rating") or []:
            if isinstance(r, dict):
                ratings.append({
                    "game_id": juego.get("id"),
                    "rating_id": r.get("id"),
                    "rating_name": r.get("name")
                })

    df_juegos = pd.DataFrame(juegos)
    df_platforms = pd.DataFrame(plataformas)
    df_ratings = pd.DataFrame(ratings)

    df_final = df_juegos.merge(df_platforms, how="left", left_on="id", right_on="game_id")
    df_final = df_final.merge(df_ratings, how="left", on="game_id")

    return df_final

inicio_giantbomb = time.time()
try:
    giantbomb_data = fetch_giantbomb_games(GIANTBOMB_API_KEY)
    df_giantbomb = procesar_giantbomb_data(giantbomb_data)
    df_giantbomb.to_parquet("giantbomb_games.parquet", index=False)
    registrar_log("Giant Bomb", df_giantbomb, "giantbomb_games.parquet", inicio_giantbomb)
except Exception as e:
    registrar_log("Giant Bomb", None, "giantbomb_games.parquet", inicio_giantbomb, estado="Fallo", error_msg=str(e))

INFO:ETLLogger:2025-06-05 23:17:46 - INFO - 🔍 Fuente: Giant Bomb | 🕒 Inicio: 23:05:55 | 🕕 Fin: 23:17:46 | 📊 Registros: 50839 | 📁 Archivo: giantbomb_games.parquet | ⏱ Tiempo total: 711.38 s | 📌 Resultado: Éxito


2025-06-05 23:17:46 - INFO - 🔍 Fuente: Giant Bomb | 🕒 Inicio: 23:05:55 | 🕕 Fin: 23:17:46 | 📊 Registros: 50839 | 📁 Archivo: giantbomb_games.parquet | ⏱ Tiempo total: 711.38 s | 📌 Resultado: Éxito


In [None]:
# Pokémon Showdown - Pokedex
inicio_pokedex = time.time()
try:
    pokedex_data = fetch_json_data("https://play.pokemonshowdown.com/data/pokedex.json")
    df_pokedex = procesar_json_dict(pokedex_data)

    columnas_utiles = [
        "num", "name", "types", "baseStats", "abilities", "genderRatio", "egggroups",
        "tier", "heightm", "weightkg", "color", "prevo", "evoLevel", "evoitem",
        "evocondition", "isNonstandard", "canGigantamax", "baseSpecies", "forme",
        "requiredItem", "changesFrom"
    ]
    df_pokedex = df_pokedex[[col for col in columnas_utiles if col in df_pokedex.columns]]

    if "types" in df_pokedex.columns and df_pokedex["types"].apply(lambda x: isinstance(x, str)).any():
        df_pokedex["types"] = df_pokedex["types"].apply(json.loads)

    df_pokedex["type_1"] = df_pokedex["types"].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else None)
    df_pokedex["type_2"] = df_pokedex["types"].apply(lambda x: x[1] if isinstance(x, list) and len(x) > 1 else None)
    df_pokedex.drop(columns=["types"], inplace=True)

    for col in ["baseStats", "abilities", "genderRatio"]:
        if col in df_pokedex.columns and df_pokedex[col].apply(lambda x: isinstance(x, str)).any():
            df_pokedex[col] = df_pokedex[col].apply(json.loads)


    if "baseStats" in df_pokedex.columns:
        df_stats = df_pokedex["baseStats"].apply(pd.Series)
        df_stats.columns = [f"stat_{col}" for col in df_stats.columns]
        df_pokedex = df_pokedex.drop(columns=["baseStats"]).join(df_stats)

    if "abilities" in df_pokedex.columns:
        df_abilities = df_pokedex["abilities"].apply(pd.Series)
        df_abilities.columns = [f"ability_{col}" for col in df_abilities.columns]
        df_pokedex = df_pokedex.drop(columns=["abilities"]).join(df_abilities)

    if "genderRatio" in df_pokedex.columns:
        df_gender = df_pokedex["genderRatio"].apply(pd.Series)
        df_gender.columns = [f"gender_{col}" for col in df_gender.columns]
        df_pokedex = df_pokedex.drop(columns=["genderRatio"]).join(df_gender)


    df_pokedex.to_parquet("pokemon_pokedex.parquet", index=False)

    registrar_log("Pokémon Showdown - Pokedex", df_pokedex, "pokemon_pokedex.parquet", inicio_pokedex)

except Exception as e:
    registrar_log("Pokémon Showdown - Pokedex", None, "pokemon_pokedex.parquet", inicio_pokedex, estado="Fallo", error_msg=str(e))

INFO:ETLLogger:2025-06-05 23:17:48 - INFO - 🔍 Fuente: Pokémon Showdown - Pokedex | 🕒 Inicio: 23:17:46 | 🕕 Fin: 23:17:48 | 📊 Registros: 1425 | 📁 Archivo: pokemon_pokedex.parquet | ⏱ Tiempo total: 2.26 s | 📌 Resultado: Éxito


2025-06-05 23:17:48 - INFO - 🔍 Fuente: Pokémon Showdown - Pokedex | 🕒 Inicio: 23:17:46 | 🕕 Fin: 23:17:48 | 📊 Registros: 1425 | 📁 Archivo: pokemon_pokedex.parquet | ⏱ Tiempo total: 2.26 s | 📌 Resultado: Éxito


In [None]:
# Pokémon Showdown - Moves
inicio_moves = time.time()
try:
    moves_data = fetch_json_data("https://play.pokemonshowdown.com/data/moves.json")
    df_moves = procesar_json_dict(moves_data)

    columnas_utiles = [
        "move_id", "num", "name", "type", "category", "accuracy", "basePower",
        "pp", "priority", "flags", "target", "contestType",
        "recoil", "forceSwitch", "selfdestruct", "breaksProtect", "ohko",
        "sleepUsable", "stealBoosts", "willCrit"
    ]

    df_moves = df_moves[[col for col in columnas_utiles if col in df_moves.columns]].copy()

    if "accuracy" in df_moves.columns:
        df_moves["accuracy"] = df_moves["accuracy"].apply(
            lambda x: 100 if x is True else (x if isinstance(x, (int, float)) else None)
        )

    if "flags" in df_moves.columns:
        def limpiar_flags(x):
            if isinstance(x, dict):
                return x
            try:
                return json.loads(x)
            except:
                return {}

        df_moves["flags"] = df_moves["flags"].apply(limpiar_flags)

        all_keys = set()
        df_moves["flags"].apply(lambda d: all_keys.update(d.keys()) if isinstance(d, dict) else None)

        for key in sorted(all_keys):
            df_moves[f"flag_{key}"] = df_moves["flags"].apply(
                lambda d: int(d.get(key, 0)) if isinstance(d, dict) else 0
            )

        df_moves.drop(columns=["flags"], inplace=True)

    df_moves.to_parquet("pokemon_moves.parquet", index=False)

    registrar_log("Pokémon Showdown - Moves", df_moves, "pokemon_moves.parquet", inicio_moves)

except Exception as e:
    registrar_log("Pokémon Showdown - Moves", None, "pokemon_moves.parquet", inicio_moves, estado="Fallo", error_msg=str(e))

INFO:ETLLogger:2025-06-05 23:17:49 - INFO - 🔍 Fuente: Pokémon Showdown - Moves | 🕒 Inicio: 23:17:49 | 🕕 Fin: 23:17:49 | 📊 Registros: 952 | 📁 Archivo: pokemon_moves.parquet | ⏱ Tiempo total: 0.44 s | 📌 Resultado: Éxito


2025-06-05 23:17:49 - INFO - 🔍 Fuente: Pokémon Showdown - Moves | 🕒 Inicio: 23:17:49 | 🕕 Fin: 23:17:49 | 📊 Registros: 952 | 📁 Archivo: pokemon_moves.parquet | ⏱ Tiempo total: 0.44 s | 📌 Resultado: Éxito


In [None]:
df_moves

Unnamed: 0,num,name,type,category,accuracy,basePower,pp,priority,target,contestType,...,flag_powder,flag_protect,flag_pulse,flag_punch,flag_recharge,flag_reflectable,flag_slicing,flag_snatch,flag_sound,flag_wind
10000000voltthunderbolt,719,"10,000,000 Volt Thunderbolt",Electric,Special,,195,1,0,normal,Cool,...,0,0,0,0,0,0,0,0,0,0
absorb,71,Absorb,Grass,Special,,20,25,0,normal,Clever,...,0,1,0,0,0,0,0,0,0,0
accelerock,709,Accelerock,Rock,Physical,,40,20,1,normal,Cool,...,0,1,0,0,0,0,0,0,0,0
acid,51,Acid,Poison,Special,,40,30,0,allAdjacentFoes,Clever,...,0,1,0,0,0,0,0,0,0,0
acidarmor,151,Acid Armor,Poison,Status,,0,20,0,self,Tough,...,0,0,0,0,0,0,0,1,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
zenheadbutt,428,Zen Headbutt,Psychic,Physical,,80,15,0,normal,Clever,...,0,1,0,0,0,0,0,0,0,0
zingzap,716,Zing Zap,Electric,Physical,,80,10,0,normal,Cool,...,0,1,0,0,0,0,0,0,0,0
zippyzap,729,Zippy Zap,Electric,Physical,,80,10,2,normal,Cool,...,0,1,0,0,0,0,0,0,0,0
paleowave,0,Paleo Wave,Rock,Special,,85,15,0,normal,Beautiful,...,0,1,0,0,0,0,0,0,0,0




In [None]:
import pandas as pd
import json
import uuid
import logging
import os
import pymysql
from datetime import datetime
from zoneinfo import ZoneInfo
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import unicodedata
import re
import time

os.makedirs("logs", exist_ok=True)
log_path = os.path.join("logs", "carga_global.log")

logger = logging.getLogger("ETLGlobal")
logger.setLevel(logging.INFO)

if not logger.handlers:
    handler = logging.FileHandler(log_path)
    formatter = logging.Formatter('%(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

def log_info(msg):
    now = datetime.now(ZoneInfo("Europe/Madrid")).strftime("%Y-%m-%d %H:%M:%S")
    final_msg = f"{now} - INFO - {msg}"
    print(final_msg)
    logger.info(final_msg)

DB_USER = "admin"
DB_PASS = "1234admin"
DB_HOST = "ricardo.ctr6cqctznjl.us-east-1.rds.amazonaws.com"
DB_NAME = "videojuegos"

engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}")


def normalizar_nombre_columna(nombre):
    nombre = ''.join((c for c in unicodedata.normalize('NFD', nombre) if unicodedata.category(c) != 'Mn'))
    nombre = re.sub(r'\s+', '_', nombre)
    nombre = re.sub(r'[^\w_]', '', nombre)
    return nombre.lower()

def normalizar_columnas(df):
    df.columns = [normalizar_nombre_columna(col) for col in df.columns]
    return df

def limpiar_y_cargar(nombre_archivo, tabla_destino, id_columna, renombrar_id, fuente, endpoint, transformar=None):
    fecha_inicio = datetime.now(ZoneInfo("Europe/Madrid"))
    id_carga = str(uuid.uuid4())
    estado = "Éxito"
    error_msg = ""
    registros = 0

    try:
        log_info(f"🔍 Procesando {fuente} desde {nombre_archivo}...")
        df = pd.read_parquet(nombre_archivo)

        df = normalizar_columnas(df)

        if id_columna and renombrar_id:
            df = df.drop_duplicates(subset=[id_columna])
            df = df.rename(columns={id_columna: renombrar_id})
        else:
            df = df.drop_duplicates()

        df = df.dropna(how='all')
        df = df.fillna(value=pd.NA)

        if transformar:
            df = transformar(df)

        df["dw_fecha_registro"] = datetime.now(ZoneInfo("Europe/Madrid"))
        df["dw_id_carga"] = id_carga
        df["dw_deleted"] = False
        df["dw_source"] = fuente
        df["dw_endpoint"] = endpoint

        registros = len(df)
        log_info(f"💾 Insertando {registros} registros en {tabla_destino}...")
        df.to_sql(tabla_destino, con=engine, if_exists="append", index=False)
        log_info(f"✅ Éxito: {registros} registros insertados en {tabla_destino}.")

    except SQLAlchemyError as e:
        estado = "Error SQL"
        error_msg = str(e)
        log_info(f"❌ Error SQL en {fuente}: {error_msg}")
    except Exception as e:
        estado = "Error General"
        error_msg = str(e)
        log_info(f"❌ Error general en {fuente}: {error_msg}")

    finally:
        fecha_fin = datetime.now(ZoneInfo("Europe/Madrid"))
        duracion = (fecha_fin - fecha_inicio).total_seconds()
        log_operacion = pd.DataFrame([{
            "fuente": fuente,
            "tabla": tabla_destino,
            "registros": registros,
            "fecha_inicio": fecha_inicio,
            "fecha_fin": fecha_fin,
            "duracion_segundos": duracion,
            "estado": estado,
            "error_msg": error_msg
        }])
        log_operacion.to_sql("log_operaciones", con=engine, if_exists="append", index=False)

def transformar_rawg(df):
    df["released"] = pd.to_datetime(df["released"], errors="coerce")
    df["updated"] = pd.to_datetime(df["updated"], errors="coerce")
    return df

def transformar_giantbomb(df):
    df["original_release_date"] = pd.to_datetime(df["original_release_date"], errors="coerce")
    df["date_added"] = pd.to_datetime(df["date_added"], errors="coerce")
    df["date_last_updated"] = pd.to_datetime(df["date_last_updated"], errors="coerce")
    return df

def transformar_pokedex(df):
    if "num" in df.columns:
        df = df.drop(columns=["num"])
    df = df.reset_index().rename(columns={"index": "num"})
    return df

def transformar_ladder(df):
    return df

def transformar_moves(df):
    df = df.reset_index().rename(columns={"index": "move_id"})
    return df

limpiar_y_cargar("rawg_games.parquet", "rawg_games", "id", "id_rawg", "RAWG", "https://api.rawg.io/api/games", transformar_rawg)
limpiar_y_cargar("giantbomb_games.parquet", "giantbomb_games", "id", "id_gb", "GiantBomb", "https://www.giantbomb.com/api/games/", transformar_giantbomb)
limpiar_y_cargar("pokemon_pokedex.parquet", "core_pokemon_pokedex", None, None, "ShowdownPokedex", "https://play.pokemonshowdown.com/data/pokedex.json", transformar_pokedex)
limpiar_y_cargar("pokemon_ladder.parquet", "core_pokemon_ladder", "userid", "userid", "ShowdownLadder", "https://pokemonshowdown.com/ladder/gen8ou.json", transformar_ladder)
limpiar_y_cargar("pokemon_moves.parquet", "core_pokemon_moves", None, None, "ShowdownMoves", "https://play.pokemonshowdown.com/data/moves.json", transformar_moves)

INFO:ETLGlobal:2025-06-05 23:34:03 - INFO - 🔍 Procesando RAWG desde rawg_games.parquet...
INFO:ETLGlobal:2025-06-05 23:34:03 - INFO - 💾 Insertando 6000 registros en rawg_games...


2025-06-05 23:34:03 - INFO - 🔍 Procesando RAWG desde rawg_games.parquet...
2025-06-05 23:34:03 - INFO - 💾 Insertando 6000 registros en rawg_games...


INFO:ETLGlobal:2025-06-05 23:34:05 - INFO - ✅ Éxito: 6000 registros insertados en rawg_games.
INFO:ETLGlobal:2025-06-05 23:34:05 - INFO - 🔍 Procesando GiantBomb desde giantbomb_games.parquet...


2025-06-05 23:34:05 - INFO - ✅ Éxito: 6000 registros insertados en rawg_games.
2025-06-05 23:34:05 - INFO - 🔍 Procesando GiantBomb desde giantbomb_games.parquet...


INFO:ETLGlobal:2025-06-05 23:34:05 - INFO - 💾 Insertando 15000 registros en giantbomb_games...


2025-06-05 23:34:05 - INFO - 💾 Insertando 15000 registros en giantbomb_games...


INFO:ETLGlobal:2025-06-05 23:34:07 - INFO - ✅ Éxito: 15000 registros insertados en giantbomb_games.
INFO:ETLGlobal:2025-06-05 23:34:07 - INFO - 🔍 Procesando ShowdownPokedex desde pokemon_pokedex.parquet...
INFO:ETLGlobal:2025-06-05 23:34:07 - INFO - 💾 Insertando 1425 registros en core_pokemon_pokedex...


2025-06-05 23:34:07 - INFO - ✅ Éxito: 15000 registros insertados en giantbomb_games.
2025-06-05 23:34:07 - INFO - 🔍 Procesando ShowdownPokedex desde pokemon_pokedex.parquet...
2025-06-05 23:34:07 - INFO - 💾 Insertando 1425 registros en core_pokemon_pokedex...


INFO:ETLGlobal:2025-06-05 23:34:08 - INFO - ✅ Éxito: 1425 registros insertados en core_pokemon_pokedex.
INFO:ETLGlobal:2025-06-05 23:34:08 - INFO - 🔍 Procesando ShowdownLadder desde pokemon_ladder.parquet...
INFO:ETLGlobal:2025-06-05 23:34:08 - INFO - 💾 Insertando 500 registros en core_pokemon_ladder...


2025-06-05 23:34:08 - INFO - ✅ Éxito: 1425 registros insertados en core_pokemon_pokedex.
2025-06-05 23:34:08 - INFO - 🔍 Procesando ShowdownLadder desde pokemon_ladder.parquet...
2025-06-05 23:34:08 - INFO - 💾 Insertando 500 registros en core_pokemon_ladder...


INFO:ETLGlobal:2025-06-05 23:34:08 - INFO - ✅ Éxito: 500 registros insertados en core_pokemon_ladder.
INFO:ETLGlobal:2025-06-05 23:34:08 - INFO - 🔍 Procesando ShowdownMoves desde pokemon_moves.parquet...
INFO:ETLGlobal:2025-06-05 23:34:08 - INFO - 💾 Insertando 952 registros en core_pokemon_moves...


2025-06-05 23:34:08 - INFO - ✅ Éxito: 500 registros insertados en core_pokemon_ladder.
2025-06-05 23:34:08 - INFO - 🔍 Procesando ShowdownMoves desde pokemon_moves.parquet...
2025-06-05 23:34:08 - INFO - 💾 Insertando 952 registros en core_pokemon_moves...


INFO:ETLGlobal:2025-06-05 23:34:08 - INFO - ✅ Éxito: 952 registros insertados en core_pokemon_moves.


2025-06-05 23:34:08 - INFO - ✅ Éxito: 952 registros insertados en core_pokemon_moves.


In [None]:
import pandas as pd
import re
from datetime import datetime
from zoneinfo import ZoneInfo
from sqlalchemy import create_engine

DB_USER = "admin"
DB_PASS = "1234admin"
DB_HOST = "ricardo.ctr6cqctznjl.us-east-1.rds.amazonaws.com"
DB_NAME = "videojuegos"

engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}")

log_path = "logs/registro_etl.log"

REGISTRO_REGEX = re.compile(
    r"^(.*?) - INFO - 🔍 Fuente: (.*?) \| 🕒 Inicio: (.*?) \| 🕕 Fin: (.*?) \| "
    r"📊 Registros: (\d+) \| 📁 Archivo: (.*?) \| ⏱ Tiempo total: ([\d.]+) s \| "
    r"📌 Resultado: (\w+)(?: \| ⚠️ Error: (.*))?$"
)

registros = []

with open(log_path, encoding="utf-8") as f:
    for linea in f:
        linea = linea.strip()
        match = REGISTRO_REGEX.match(linea)
        if match:
            (
                fecha_log_str, fuente, hora_inicio_str, hora_fin_str,
                registros_str, archivo, duracion_str, resultado, error
            ) = match.groups()

            fecha_log = datetime.strptime(fecha_log_str, "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo("Europe/Madrid"))
            hora_inicio = datetime.strptime(hora_inicio_str, "%H:%M:%S").time()
            hora_fin = datetime.strptime(hora_fin_str, "%H:%M:%S").time()

            registros.append({
                "log_time": fecha_log,
                "nivel": "INFO",
                "fuente": fuente,
                "hora_inicio": hora_inicio,
                "hora_fin": hora_fin,
                "registros": int(registros_str),
                "archivo": archivo,
                "duracion_segundos": float(duracion_str),
                "resultado": resultado,
                "error": error if error else None,
                "mensaje": linea
            })

df = pd.DataFrame(registros)

if not df.empty:
    try:
        df.to_sql("log_etl_fuentes", con=engine, if_exists="append", index=False)
        print("✅ Log cargado exitosamente en la tabla log_etl_fuentes.")
    except Exception as e:
        print(f"❌ Error al insertar en la base de datos: {e}")
else:
    print("⚠️ No se encontraron registros válidos en el log.")

✅ Log cargado exitosamente en la tabla log_etl_fuentes.


In [None]:
import pandas as pd
import re
from datetime import datetime
from zoneinfo import ZoneInfo
import os

def exportar_registro_etl(log_path, salida_csv):
    REGEX_ETL = re.compile(
        r"^(.*?) - INFO - 🔍 Fuente: (.*?) \| 🕒 Inicio: (.*?) \| 🕕 Fin: (.*?) \| "
        r"📊 Registros: (\d+) \| 📁 Archivo: (.*?) \| ⏱ Tiempo total: ([\d.]+) s \| "
        r"📌 Resultado: (\w+)(?: \| ⚠️ Error: (.*))?$"
    )

    registros = []
    with open(log_path, encoding="utf-8") as file:
        for linea in file:
            linea = linea.strip()
            match = REGEX_ETL.match(linea)
            if match:
                (
                    fecha_log_str, fuente, hora_inicio_str, hora_fin_str,
                    registros_str, archivo, duracion_str, resultado, error
                ) = match.groups()

                try:
                    fecha_log = datetime.strptime(fecha_log_str, "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo("Europe/Madrid"))
                    hora_inicio = datetime.strptime(hora_inicio_str, "%H:%M:%S").time()
                    hora_fin = datetime.strptime(hora_fin_str, "%H:%M:%S").time()
                except:
                    continue

                registros.append({
                    "log_time": fecha_log,
                    "fuente": fuente,
                    "hora_inicio": hora_inicio,
                    "hora_fin": hora_fin,
                    "registros": int(registros_str),
                    "archivo": archivo,
                    "duracion_segundos": float(duracion_str),
                    "resultado": resultado,
                    "error": error if error else None,
                    "mensaje": linea
                })

    df = pd.DataFrame(registros)
    df.to_csv(salida_csv, index=False, encoding="utf-8")
    print(f"✅ Exportado: {salida_csv}")

def exportar_carga_global_completo(log_path, salida_csv):
    regex_contexto = re.compile(r"^(.*?) - INFO - 🔍 Procesando (.*?) desde (.*?)\.\.\.")
    regex_exito = re.compile(r"^(.*?) - INFO - ✅ Éxito: (\d+) registros insertados en (\w+)\.$")
    regex_error = re.compile(r"^(.*?) - INFO - ❌ (Error (?:SQL|general)) en (.*?): (.*)")

    registros = []
    contexto_actual = None

    with open(log_path, encoding="utf-8") as file:
        for line in file:
            line = line.strip()

            match_contexto = regex_contexto.match(line)
            if match_contexto:
                ts, fuente, archivo = match_contexto.groups()
                try:
                    fecha_inicio = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo("Europe/Madrid"))
                    tabla = archivo.replace(".parquet", "").lower()
                    contexto_actual = {
                        "fuente": fuente,
                        "tabla": tabla,
                        "fecha_inicio": fecha_inicio
                    }
                except:
                    contexto_actual = None

            match_exito = regex_exito.match(line)
            if match_exito and contexto_actual:
                ts, registros_str, tabla_destino = match_exito.groups()
                try:
                    fecha_fin = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo("Europe/Madrid"))
                    duracion = (fecha_fin - contexto_actual["fecha_inicio"]).total_seconds()
                    registros.append({
                        "fuente": contexto_actual["fuente"],
                        "tabla": tabla_destino,
                        "registros": int(registros_str),
                        "fecha_inicio": contexto_actual["fecha_inicio"],
                        "fecha_fin": fecha_fin,
                        "duracion_segundos": duracion,
                        "estado": "Éxito",
                        "error_msg": None
                    })
                    contexto_actual = None
                except:
                    continue

            match_error = regex_error.match(line)
            if match_error and contexto_actual:
                ts, tipo_error, fuente_error, msg_error = match_error.groups()
                try:
                    fecha_fin = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo("Europe/Madrid"))
                    duracion = (fecha_fin - contexto_actual["fecha_inicio"]).total_seconds()
                    registros.append({
                        "fuente": fuente_error,
                        "tabla": contexto_actual["tabla"],
                        "registros": 0,
                        "fecha_inicio": contexto_actual["fecha_inicio"],
                        "fecha_fin": fecha_fin,
                        "duracion_segundos": duracion,
                        "estado": tipo_error,
                        "error_msg": msg_error
                    })
                    contexto_actual = None
                except:
                    continue

    df = pd.DataFrame(registros)
    df.to_csv(salida_csv, index=False, encoding="utf-8")
    print(f"✅ Exportado: {salida_csv}")

os.makedirs("logs", exist_ok=True)
exportar_registro_etl("logs/registro_etl.log", "registro_etl.csv")
exportar_carga_global_completo("logs/carga_global.log", "log_operaciones.csv")

✅ Exportado: registro_etl.csv
✅ Exportado: log_operaciones.csv
