In [None]:
import os
import requests
import time
import math
import json
from pathlib import Path
from dotenv import load_dotenv
from datetime import datetime, timezone
import chime
import logging
# --- Configuration ---
load_dotenv() # Charge les variables depuis le fichier .env

CLIENT_ID = os.getenv("IGDB_CLIENT_ID")
CLIENT_SECRET = os.getenv("IGDB_CLIENT_SECRET")
DATA_DUMP_DIR = Path('igdb_datadumps') 
TOKEN_URL = 'https://id.twitch.tv/oauth2/token'
API_BASE_URL = 'https://api.igdb.com/v4'

DATA_DUMP_DIR.mkdir(parents=True, exist_ok=True)

def get_api_headers(client_id, client_secret):
    """Récupère un token d'accès Twitch/IGDB."""
    print("Récupération du token d'accès Twitch...")
    if not client_id or not client_secret:
        print("ERREUR: CLIENT_ID ou CLIENT_SECRET manquant.")
        return None
    try:
        response = requests.post(TOKEN_URL, params={
            'client_id': client_id, 'client_secret': client_secret,
            'grant_type': 'client_credentials'
        }, timeout=15) # Timeout légèrement augmenté
        token_data = response.json()
        access_token = token_data.get('access_token')
        api_headers = {
        'Client-ID': CLIENT_ID,
        'Authorization': f'Bearer {access_token}',
        'Accept': 'application/json'
    }
        return api_headers
    except requests.exceptions.RequestException as e:
        logging.error(f"Erreur lors de la récupération du token : {e}")


REQUIRED_ENDPOINTS = [
    'games',  'themes', 'keywords', 'player_perspectives', 'external_games'
    # 'genres', 'platforms', 'game_modes'
    #  'collections', 'franchises', 'game_types',
    # 'game_statuses', 'involved_companies', 'companies', ,
    # 'popularity_primitives', 'covers', 'artworks',
    # 'screenshots', 'alternative_names', 'external_game_sources',
    # 'websites', 'website_types',
    # # Ajouts pour les mappings nécessaires
    # 'game_localizations',
    # #'language_supports',
    # 'languages', 
    # 'multiplayer_modes',
    # 'age_ratings',
    # 'release_dates', # Absolument nécessaire pour le calcul de release_date min
    # 'tags',
]

In [None]:

def get_available_dumps(headers):
    """Récupère la liste des data dumps disponibles depuis l'API."""
    print("Récupération de la liste des data dumps disponibles...")
    if not headers: return None
    try:
        response = requests.get(f'{API_BASE_URL}/dumps', headers=headers, timeout=30)
        response.raise_for_status()
        dumps_list = response.json()
        print(f"Liste des dumps obtenue ({len(dumps_list)} dumps trouvés).")
        # Convertit la liste en dictionnaire {endpoint_name: dump_info}
        return {dump['endpoint']: dump for dump in dumps_list if 'endpoint' in dump}
    except requests.exceptions.Timeout:
        print("ERREUR: Timeout lors de la récupération de la liste des dumps.")
    except requests.exceptions.RequestException as e:
        print(f"ERREUR lors de la récupération de la liste des dumps : {e}", f"Réponse: {e.response.text}" if hasattr(e, 'response') and e.response else "")
    return None

def download_dump(url, dest_path):
    try:
        with requests.get(url, stream=True, timeout=600) as r: # Timeout long pour gros fichiers
            r.raise_for_status()
            total_size = int(r.headers.get('content-length', 0))
            downloaded_size = 0
            last_print_time = time.time()
            print(f"  Taille totale: {total_size/1024/1024:.1f} Mo" if total_size > 0 else "  Taille inconnue.")
            with open(dest_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192 * 34): # Chunk de 128Ko
                    if chunk: # Filtre les keep-alive chunks
                        f.write(chunk)
                        downloaded_size += len(chunk)
                        now = time.time()
                        if total_size > 0 and (now - last_print_time > 1): # MAJ toutes les secondes
                            percent = (downloaded_size / total_size) * 100
                            print(f"    -> {downloaded_size/1024/1024:.1f} / {total_size/1024/1024:.1f} Mo ({percent:.1f}%)", end='\r')
                            last_print_time = now
            # Assurer que la ligne de progression est effacée
            print(' ' * 80, end='\r')
            print(f"  {dest_path.name} téléchargé avec succès.")
        return True
    except requests.exceptions.Timeout:
        print(f"\n  ERREUR: Timeout pendant le téléchargement de {dest_path.name}.")
    except requests.exceptions.RequestException as e:
        print(f"\n  ERREUR pendant le téléchargement de {dest_path.name} : {e}")
    except Exception as e:
        print(f"\n  ERREUR inconnue pendant le téléchargement de {dest_path.name} : {e}")

In [None]:
def check_and_download_dump(endpoint, dump_info, headers, local_dir):
    """Vérifie si un dump local est à jour et le télécharge si nécessaire."""
    print(f"\n--- Vérification Endpoint: {endpoint} ---")
    try: 
        url = f'{API_BASE_URL}/dumps/{endpoint}'
        response = requests.get(url, headers=headers, timeout=20)
    except:
        logging.error(f'An error occured for this endpoint: {endpoint} ')
        sys.exit()
    local_file_path = local_dir / (endpoint + '.csv')
    
    
    url = f'{API_BASE_URL}/dumps/{endpoint}'
    response = requests.get(url, headers=headers, timeout=20)
    s3_url = response.json().get('s3_url')
    if s3_url:
        # Supprimer l'ancien fichier avant de télécharger le nouveau
        if local_file_path.exists():
            print(f"  Suppression de l'ancien fichier {local_file_path.name}...")
            try:
                local_file_path.unlink()
            except OSError as e_del:
                print(f"  ERREUR lors de la suppression de l'ancien {local_file_path.name}: {e_del}")

        # Lancer le téléchargement
        download_dump(s3_url, local_file_path)
    else:
        print(f"Impossible d'obtenir l'URL de téléchargement pour '{endpoint}'.")
        return None # Echec

    # Si on arrive ici, c'est que needs_download était False
    return local_file_path

In [None]:
import re
import pandas as pd

def convert_ids_to_names(df, column, target_column):
    """
    Convertit ids en noms à partir d'un DataFrame de référence.
    """
    # Dictionnaire id -> nom
    reference_df = pd.read_csv(f'igdb_datadumps/{column}.csv')
    mapping = reference_df.set_index('id')[target_column].to_dict()
    
    def convert(value):
        if pd.isna(value) or value == '{}':
            return None
        
        # Extraire les IDs et les convertir en noms
        ids = [int(x.strip()) for x in str(value).strip('{}').split(',') if x.strip()]
        names = [mapping.get(id, f'Unknown_{id}') for id in ids]
        return ', '.join(names)
    
    return df[column].apply(convert)


games = pd.read_csv('igdb_datadumps/games.csv')
external_games = pd.read_csv('igdb_datadumps/external_games.csv')

games['themes'] = convert_ids_to_names(games, 'themes', 'slug')
games['keywords'] = convert_ids_to_names(games, 'keywords', 'slug')
games['player_perspectives'] = convert_ids_to_names(games, 'player_perspectives', 'slug')

external_games = pd.read_csv('igdb_datadumps/external_games.csv')
external_games = external_games[external_games['category'] == 1] #Only steam games
external_games['appid'] = external_games['url'].str.extract(r'app/(\d+)')



games_with_id = pd.merge(games, external_games, left_on='id', right_on='game')
games_with_id = games_with_id[['slug', 'name_x', 'appid', 'summary', 'themes', 'keywords', 'player_perspectives', 'first_release_date', ]]
games_clean = games_with_id.rename(columns={'name_x': 'title'})
games_clean_sorted = games_clean.sort_values(by=['first_release_date'], ascending=False)
games_clean_sorted_deduped = games_clean_sorted.drop_duplicates(subset=['appid'])

  games = pd.read_csv('igdb_datadumps/games.csv')
  external_games = pd.read_csv('igdb_datadumps/external_games.csv')
  external_games = pd.read_csv('igdb_datadumps/external_games.csv')


In [4]:
import sqlalchemy
from models import GameTable
from datetime import datetime
engine = sqlalchemy.create_engine("postgresql://user:password@localhost:5434/steamreviews")

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)
session = Session()
    


GameTable.__table__.drop(engine, checkfirst=True)
GameTable.__table__.create(engine)
for _, row in games_clean_sorted_deduped.iterrows():
    if pd.isna(row['appid']):
        continue
    release_date = None
    if pd.notna(row.get('first_release_date')):
        try:
            # Si c'est une string, la parser
            if isinstance(row['first_release_date'], str):
                release_date = datetime.strptime(row['first_release_date'], '%Y-%m-%d %H:%M:%S').date()
            # Si c'est déjà un datetime
            elif hasattr(row['first_release_date'], 'date'):
                release_date = row['first_release_date'].date()
        except (ValueError, AttributeError):
            release_date = None
    game = GameTable(
        slug=row['slug'],
        title=row.get('title'),  
        steam_app_id=int(row['appid']),
        summary=row.get('summary'),
        themes=row.get('themes', '').split(', ') if row.get('themes') else None,
        keywords=row.get('keywords', '').split(', ') if row.get('keywords') else None,
        player_perspectives=row.get('player_perspectives', '').split(', ') if row.get('player_perspectives') else None,
        first_release_date=release_date,
    )
    session.add(game)

session.commit()
session.close()

In [None]:
print(games_with_id.columns.tolist())