In [0]:
import requests
import json
import pandas as pd

In [0]:
CLIENT_ID= 'qb0j58l9bi8l0a2gaalnqcg5cmeszg'
CLIENT_SECRET= '6dh3d2ze9iei023gw6v10az9tk0r65'

auth_url = 'https://id.twitch.tv/oauth2/token'
auth_params = {
    'client_id': CLIENT_ID,
    'client_secret': CLIENT_SECRET,
    'grant_type': 'client_credentials'
}

auth_response = requests.post(auth_url, params=auth_params)
access_token = auth_response.json()['access_token']

In [0]:
def query_igdb(endpoint, query_body):
    """
    Query IGDB game data
    """
    url = f'https://api.igdb.com/v4/{endpoint}'
    headers = {
        'Client-ID': CLIENT_ID,
        'Authorization': f'Bearer {access_token}',
        'Accept': 'application/json'
    }
    
    response = requests.post(url, headers=headers, data=query_body)

    return response.json()

In [0]:
import time
from pyspark.sql import SparkSession

In [0]:
def fetch_games_by_names(game_names):
    """
    Fetch all games from IGDB database
    """
    all_games = []
    batch_size = 10

    for i in range(0, len(game_names), batch_size):
        batch = game_names[i:i+batch_size]

        formatted_names = ", ".join(f'"{name.replace('"', '\\"')}"' for name in batch)

        query = f"""
        fields name, first_release_date, total_rating, total_rating_count; where name = ({formatted_names}); limit 500;"
        """
        try:
            games = query_igdb('games', query)
            all_games.extend(games)
            time.sleep(0.25)
        except Exception as e:
            print(f"Error fetching games: {e}")

    return all_games

In [0]:
def fetch_popularity_types():
    """
    Fetch all popularity types (metadata for popularity scores)
    """
    query = 'fields name, popularity_source; sort id asc;'
    
    try:
        pop_types = query_igdb('popularity_types', query)
        return pop_types
    except Exception as e:
        print(f"Error fetching popularity types: {e}")
        return []

In [0]:
def fetch_popularity_primitives_for_games(game_ids):
    """
    Fetch all popularity primitives (actual popularity scores for games)
    """
    all_primitives = []
    batch_size = 500  # IGDB limit per request

    for i in range(0, len(game_ids), batch_size):
        batch = game_ids[i:i + batch_size]
        game_id_list = ','.join(map(str, batch))
        
        # Fetch all primitives for this batch of games
        offset = 0
        limit = 500
        
        while True:
            query = f"""fields game_id, value, popularity_type;
                where game_id = ({game_id_list});
                limit {limit};
                offset {offset};"""
            
            try:
                primitives = query_igdb('popularity_primitives', query)
                if not primitives or len(primitives) == 0:
                    break
                all_primitives.extend(primitives)
                offset += limit
                time.sleep(0.25)  # Rate limiting
            except Exception as e:
                print(f"Error at batch {i//batch_size}, offset {offset}: {e}")
                break
        
        print(f"Fetched primitives for {i + len(batch)}/{len(game_ids)} games")
    
    return all_primitives

In [0]:
# Fetch popularity types
pop_types = fetch_popularity_types()

# Create mapping from popularity types
pop_type_mapping = {row['id']: row['name'].replace(' ', '_') for row in pop_types}
print(f"Popularity type mapping: {pop_type_mapping}")

In [0]:
# Get list of game names from Kaggle dataset
spark = SparkSession.builder.getOrCreate()
kaggle_games_df = spark.read.table("workspace.02_silver.kaggle_games")
game_names = [row.name for row in kaggle_games_df.select("name").distinct().collect()]

# Fetch games matching Kaggle names
games = fetch_games_by_names(game_names)

In [0]:
cleaned_games_dict = {}

for game in games:
    name = game['name']
    # Use a very large number if release date is missing
    release_date = game.get('first_release_date', float('inf'))
    
    if name not in cleaned_games_dict:
        # First time seeing this game name
        cleaned_games_dict[name] = game
    else:
        # Compare current release date with the one already stored
        existing_date = cleaned_games_dict[name].get('first_release_date', float('inf'))
        if release_date < existing_date:
            cleaned_games_dict[name] = game

# Final list of unique games with the lowest release date
matched_igdb_games = list(cleaned_games_dict.values())

In [0]:
game_ids = [game['id'] for game in matched_igdb_games if 'id' in game]

In [0]:
# Fetch popularity primitives for these games only
print("Fetching popularity primitives...")
pop_primitives = fetch_popularity_primitives_for_games(game_ids)
print(f"Fetched {len(pop_primitives)} popularity primitives")

In [0]:
print(f"Popularity type mapping: {pop_type_mapping}")
for row in pop_primitives:
    if 'popularity_type' in row and (row['popularity_type'] < 1 or row['popularity_type'] > 10) and row['popularity_type'] != 34:
        print(row)

In [0]:
# Convert to DataFrames
games_df = pd.DataFrame(matched_igdb_games)
pop_primitives_df = pd.DataFrame(pop_primitives)

# Pivot popularity primitives
if not pop_primitives_df.empty:
    pop_pivot = pop_primitives_df.pivot_table(
        index='game_id', 
        columns='popularity_type', 
        values='value',
        aggfunc='first'
    ).reset_index()
    
    # Rename columns with popularity type names from API
    pop_pivot.columns = ['game_id'] + [f"{pop_type_mapping.get(col, col)}_score".lower() for col in pop_pivot.columns[1:]]
    
    final_df = games_df.merge(pop_pivot, left_on='id', right_on='game_id', how='left').drop(columns=['game_id'])
else:
    final_df = games_df

# Convert back to Spark DataFrame and write to Bronze table
spark_df = spark.createDataFrame(final_df)
spark_df.write.mode("overwrite").saveAsTable("workspace.01_bronze.igdb_games")