Cargar entorno y crear SparkSession

In [1]:
!pip install howlongtobeatpy



In [39]:
from datetime import datetime
from pyspark.sql import SparkSession
import os
from dotenv import load_dotenv
import pandas as pd
import requests
from collections import Counter

# Cargar variables de entorno
load_dotenv("/home/jovyan/work/.env")

# Variables
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_PORT = os.getenv("MYSQL_PORT")
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_DATABASE = os.getenv("MYSQL_DATABASE")
TWITCH_CLIENT_ID = os.getenv("TWITCH_CLIENT_ID")
TWITCH_TOKEN = os.getenv("TWITCH_TOKEN")

# Crear sesión Spark con conector JDBC
spark = SparkSession.builder \
    .appName("Fase 1 - Videojuegos") \
    .config("spark.jars", "/home/jovyan/work/jars/mysql-connector-j-8.0.31.jar") \
    .getOrCreate()

**Función para insertar en log_carga (vía Spark)**

In [40]:
def insertar_log_mysql(origen, archivo, cantidad, inicio, fin):
    log_df = spark.createDataFrame([{
        "origen": origen,
        "archivo": archivo,
        "cantidad": int(cantidad),
        "fecha_inicio": inicio,
        "fecha_fin": fin
    }])

    log_df.write \
        .format("jdbc") \
        .option("url", f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", "log_carga") \
        .option("user", MYSQL_USER) \
        .option("password", MYSQL_PASSWORD) \
        .mode("append") \
        .save()

**Renovar el token**

In [41]:
def obtener_token_twitch():
    client_id = os.getenv("TWITCH_CLIENT_ID")
    client_secret = os.getenv("TWITCH_CLIENT_SECRET")  # necesitas esta variable en tu .env

    response = requests.post("https://id.twitch.tv/oauth2/token", data={
        'client_id': client_id,
        'client_secret': client_secret,
        'grant_type': 'client_credentials'
    })

    if response.status_code == 200:
        token = response.json()['access_token']
        print("✅ Nuevo token obtenido:", token)
        return token
    else:
        print("❌ Error al obtener token:", response.status_code, response.text)
        return None

# Obtener y usar nuevo token
TWITCH_TOKEN = obtener_token_twitch()
headers = {
    'Client-ID': TWITCH_CLIENT_ID,
    'Authorization': f'Bearer {TWITCH_TOKEN}'
}

✅ Nuevo token obtenido: k8c5xqwb0j23qh6oe4ta5uico7hh8w


**Descarga desde Twitch API**

In [None]:
# headers = {
#     'Client-ID': TWITCH_CLIENT_ID,
#     'Authorization': f'Bearer {TWITCH_TOKEN}'
# }
# inicio = datetime.now()
# response = requests.get("https://api.twitch.tv/helix/games/top?first=100", headers=headers)
# print("STATUS:", response.status_code)
# print("RESPONSE:", response.json())
# datos = response.json()["data"]
# df_twitch = pd.DataFrame(datos)
# df_twitch_spark = spark.createDataFrame(df_twitch)
# archivo_twitch = "twitch_top_games.parquet"
# df_twitch_spark.write.mode("overwrite").parquet(f"/home/jovyan/work/{archivo_twitch}")
# fin = datetime.now()
# insertar_log_mysql("Twitch", archivo_twitch, df_twitch.shape[0], inicio, fin)

In [44]:
import requests
import pandas as pd
from datetime import datetime
from collections import Counter
import json
import time

headers = {
    'Client-ID': TWITCH_CLIENT_ID,
    'Authorization': f'Bearer {TWITCH_TOKEN}'
}

# Paso 1: Extraer top juegos
inicio = datetime.now()
response = requests.get("https://api.twitch.tv/helix/games/top?first=100", headers=headers)
print("STATUS:", response.status_code)
datos = response.json()["data"]
df_twitch = pd.DataFrame(datos)

# Paso 2: Enriquecer con datos de streams
stream_data = []

for _, row in df_twitch.iterrows():
    game_id = row['id']

    for intento in range(3):  # Máximo 3 intentos
        try:
            stream_resp = requests.get(
                f"https://api.twitch.tv/helix/streams?game_id={game_id}",
                headers=headers,
                timeout=10
            )

            if stream_resp.status_code == 200:
                break
            else:
                print(f"Intento {intento+1} fallido con status {stream_resp.status_code} para game_id {game_id}")
                time.sleep(2)

        except requests.exceptions.SSLError as e:
            print(f"SSL error en game_id {game_id}, intento {intento+1}: {e}")
            time.sleep(2)
        except Exception as e:
            print(f"Error general en game_id {game_id}, intento {intento+1}: {e}")
            time.sleep(2)
    else:
        print(f"No se pudo procesar game_id {game_id} tras 3 intentos.")
        stream_data.append({
            "id": game_id,
            "total_viewers": None,
            "num_streams": None,
            "avg_viewers": None,
            "languages": None,
            "has_mature_stream": None
        })
        continue

    streams = stream_resp.json().get("data", [])
    total_viewers = sum(stream.get("viewer_count", 0) for stream in streams)
    num_streams = len(streams)
    avg_viewers = total_viewers / num_streams if num_streams > 0 else 0
    langs = Counter(stream.get("language", "unknown") for stream in streams)
    has_mature_stream = any(stream.get("is_mature", False) for stream in streams)

    stream_data.append({
        "id": game_id,
        "total_viewers": total_viewers,
        "num_streams": num_streams,
        "avg_viewers": avg_viewers,
        "languages": json.dumps(dict(langs)),
        "has_mature_stream": has_mature_stream
    })

    time.sleep(0.4)  # Control de velocidad para Twitch API

# Convertir a DataFrame y unir
df_streams = pd.DataFrame(stream_data)
df_twitch_enriched = pd.merge(df_twitch, df_streams, on="id", how="left")

df_twitch_enriched

STATUS: 200
Error general en game_id 1494, intento 1: HTTPSConnectionPool(host='api.twitch.tv', port=443): Read timed out. (read timeout=10)


Unnamed: 0,id,name,box_art_url,igdb_id,total_viewers,num_streams,avg_viewers,languages,has_mature_stream
0,509658,Just Chatting,https://static-cdn.jtvnw.net/ttv-boxart/509658-{width}x{height}.jpg,,214928,19,11312.0,"{""es"": 2, ""fr"": 1, ""ru"": 6, ""en"": 6, ""de"": 2, ""pl"": 1, ""ja"": 1}",True
1,32982,Grand Theft Auto V,https://static-cdn.jtvnw.net/ttv-boxart/32982_IGDB-{width}x{height}.jpg,1020.0,52329,19,2754.157895,"{""zh"": 1, ""en"": 7, ""de"": 3, ""ja"": 6, ""th"": 1, ""sv"": 1}",True
2,29595,Dota 2,https://static-cdn.jtvnw.net/ttv-boxart/29595-{width}x{height}.jpg,,117216,20,5860.8,"{""ru"": 9, ""en"": 6, ""pt"": 2, ""uk"": 1, ""th"": 1, ""tl"": 1}",True
3,491487,Dead by Daylight,https://static-cdn.jtvnw.net/ttv-boxart/491487-{width}x{height}.jpg,18866.0,67164,20,3358.2,"{""en"": 12, ""ru"": 2, ""es"": 1, ""pt"": 2, ""de"": 2, ""fr"": 1}",True
4,21779,League of Legends,https://static-cdn.jtvnw.net/ttv-boxart/21779-{width}x{height}.jpg,115.0,56480,20,2824.0,"{""fr"": 4, ""ja"": 1, ""en"": 8, ""de"": 1, ""cs"": 1, ""it"": 1, ""es"": 2, ""pt"": 1, ""da"": 1}",True
5,32399,Counter-Strike,https://static-cdn.jtvnw.net/ttv-boxart/32399-{width}x{height}.jpg,,45056,19,2371.368421,"{""ru"": 5, ""en"": 6, ""pt"": 2, ""de"": 2, ""pl"": 2, ""uk"": 1, ""es"": 1}",True
6,509672,IRL,https://static-cdn.jtvnw.net/ttv-boxart/509672-{width}x{height}.jpg,,80967,19,4261.421053,"{""de"": 8, ""en"": 4, ""es"": 1, ""da"": 1, ""ru"": 1, ""fr"": 1, ""cs"": 2, ""pl"": 1}",True
7,516575,VALORANT,https://static-cdn.jtvnw.net/ttv-boxart/516575-{width}x{height}.jpg,126459.0,48300,20,2415.0,"{""es"": 7, ""pt"": 1, ""en"": 4, ""fr"": 2, ""zh"": 4, ""ru"": 1, ""ja"": 1}",True
8,33214,Fortnite,https://static-cdn.jtvnw.net/ttv-boxart/33214-{width}x{height}.jpg,1905.0,40083,20,2004.15,"{""en"": 7, ""fr"": 4, ""de"": 1, ""es"": 1, ""ru"": 2, ""it"": 2, ""nl"": 1, ""pl"": 1, ""other"": 1}",False
9,512710,Call of Duty: Warzone,https://static-cdn.jtvnw.net/ttv-boxart/512710-{width}x{height}.jpg,131800.0,36776,20,1838.8,"{""en"": 12, ""fr"": 2, ""it"": 3, ""de"": 2, ""es"": 1}",True


In [45]:
# Guardar en parquet
df_twitch_spark = spark.createDataFrame(df_twitch_enriched)
archivo_twitch = "twitch_top_games.parquet"
df_twitch_spark.write.mode("overwrite").parquet(f"/home/jovyan/work/{archivo_twitch}")

# Log
fin = datetime.now()
insertar_log_mysql("Twitch", archivo_twitch, df_twitch_enriched.shape[0], inicio, fin)

**Descarga desde HowLongToBeat API (no oficial)**

In [None]:
from datetime import datetime
from howlongtobeatpy import HowLongToBeat
import pandas as pd
import json

# Inicio del proceso
inicio = datetime.now()

# Lista amplia de títulos conocidos
titulos = [
    "Zelda", "Elden Ring", "Dark Souls", "Sekiro", "Bloodborne",
    "Final Fantasy", "Persona", "The Witcher", "Cyberpunk", "Hollow Knight",
    "God of War", "Red Dead", "GTA", "Call of Duty", "Fortnite",
    "League of Legends", "Valorant", "Overwatch", "Apex Legends", "CSGO",
    "Pokémon", "Metroid", "Mario", "Luigi", "Donkey Kong",
    "Kirby", "Fire Emblem", "Xenoblade", "Dragon Quest", "Monster Hunter",
    "Resident Evil", "Silent Hill", "Assassin’s Creed", "Far Cry", "Watch Dogs",
    "Minecraft", "Terraria", "Starfield", "Baldur’s Gate", "Star Wars",
    "Bayonetta", "Cuphead", "Celeste", "Dead Cells", "Stardew Valley",
    "Slay the Spire", "Hades", "Returnal", "Deathloop", "It Takes Two",
    "The Last of Us", "Uncharted", "Ratchet & Clank", "Spyro", "Crash Bandicoot",
    "Titanfall", "Borderlands", "BioShock", "Fallout", "Skyrim",
    "Oblivion", "Morrowind", "Doom", "Quake", "Halo",
    "Destiny", "Battlefield", "PUBG", "The Division", "Splatoon",
    "Firewatch", "Oxenfree", "Undertale", "Inside", "Limbo",
    "Valheim", "Grounded", "Satisfactory", "The Forest", "No Man’s Sky",
    "Ark", "Subnautica", "Dying Light", "Dead Island", "The Evil Within",
    "Alan Wake", "Control", "The Sims", "SimCity", "Cities Skylines",
    "RollerCoaster Tycoon", "FIFA", "NBA 2K", "Madden", "Gran Turismo",
    "Forza", "Need for Speed", "Tetris", "Pac-Man", "Street Fighter"
]
# Buscar resultados
hltb = HowLongToBeat()
resultados = []

for i, titulo in enumerate(titulos, 1):
    resultados += hltb.search(titulo)

    
datos_raw = [{"raw_json": json.dumps(r.__dict__)} for r in resultados]
# Crear DataFrame pandas y pasarlo a Spark
df_hltb = pd.DataFrame(datos_raw)

if df_hltb.empty:
    print("⚠️ No se obtuvieron resultados. El DataFrame está vacío.")
else:
    df_hltb_spark = spark.createDataFrame(df_hltb)

# Guardar como Parquet
archivo_hltb = "howlongtobeat_multiple_raw.parquet"
df_hltb_spark.write.mode("overwrite").parquet(f"/home/jovyan/work/{archivo_hltb}")

# Registrar en log
fin = datetime.now()
insertar_log_mysql("HowLongToBeat", archivo_hltb, df_hltb.shape[0], inicio, fin)

print("Fase 1 completada: HowLongToBeat guardado como raw_json")

**Datos desde Pokémon Showdown (GitHub)**

In [46]:
from datetime import datetime
import requests
import pandas as pd

# Inicio del proceso
inicio = datetime.now()

# URL oficial del archivo Pokedex
url = "https://raw.githubusercontent.com/smogon/pokemon-showdown/master/data/pokedex.ts"

# Descargar el archivo completo como texto
response = requests.get(url)
data_text = response.text

# Extraer solo el bloque de la pokédex (entre llaves)
start = data_text.find("{")  # Primera llave
end = data_text.rfind("}")   # Última llave
bloque_raw = data_text[start:end+1]  # Incluir la última llave

# Guardar como raw_json (una fila con la pokedex completa)
df_showdown = pd.DataFrame([{"raw_json": bloque_raw}])
df_showdown_spark = spark.createDataFrame(df_showdown)

# Nombre del archivo
archivo_showdown = "pokemon_showdown_pokedex_raw.parquet"

# Guardar como Parquet
df_showdown_spark.write.mode("overwrite").parquet(f"/home/jovyan/work/{archivo_showdown}")

# Fin y registro en log
fin = datetime.now()
# insertar_log_mysql("Pokemon Showdown", archivo_showdown, 1, inicio, fin)

print("Fase 1 completada: Pokédex guardada como raw_json")

Fase 1 completada: Pokédex guardada como raw_json
