## 从IGDB数据库爬取游戏种类和发行时间

In [None]:
import pandas as pd

file_path = "Twitch_game_data.csv"
df = pd.read_csv(file_path, encoding='ISO-8859-1')
unique_games = df["Game"].unique()

unique_games_df = pd.DataFrame(unique_games, columns=["Game"])
unique_games_path = "unique_games.csv"
unique_games_df.to_csv(unique_games_path, index=False, encoding='ISO-8859-1')

print(f"Extracted {len(unique_games)} unique games. Saved to {unique_games_path}.")

Extracted 2360 unique games. Saved to unique_games.csv.


In [None]:
import requests
import pandas as pd
import time

CLIENT_ID = "e1cvubjaunqp8g8rhyxmuog7ld4tsy" 
CLIENT_SECRET = "tnettt8ysiuybxxi1g8dlkzlwbzice" 

def get_access_token():
    url = "https://id.twitch.tv/oauth2/token"
    params = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "grant_type": "client_credentials"
    }
    response = requests.post(url, params=params)
    response.raise_for_status()
    return response.json()["access_token"]

def get_game_info(access_token, game_name):
    url = "https://api.igdb.com/v4/games"
    headers = {
        "Client-ID": CLIENT_ID,
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json"
    }
    data = f'fields name, genres.name, first_release_date; where name ~ "{game_name}"*; limit 1;'
    response = requests.post(url, headers=headers, data=data)
    response.raise_for_status()
    results = response.json()
    
    if results:
        game = results[0]
        genres = game.get("genres", [])
        genre_names = ", ".join([genre["name"] for genre in genres]) if genres else "Unknown"
        release_date = game.get("first_release_date")
        release_year_month = time.strftime('%Y-%m', time.gmtime(release_date)) if release_date else "Unknown"
        return genre_names, release_year_month
    return "Unknown", "Unknown"

unique_games_path = "unique_games.csv"
df_games = pd.read_csv(unique_games_path, encoding='ISO-8859-1')

access_token = get_access_token()

game_info_dict = {}
for game in df_games["Game"].tolist():
    genre, release_date = get_game_info(access_token, game)
    game_info_dict[game] = {"Genre": genre, "Release Date": release_date}

df_game_info = pd.DataFrame.from_dict(game_info_dict, orient='index').reset_index()
df_game_info.columns = ["Game", "Genre", "Release Date"]

output_path = "game_info.csv"
df_game_info.to_csv(output_path, index=False, encoding='ISO-8859-1')
print(f"Updated file saved: {output_path}")

Updated file saved: game_info.csv


In [None]:
import requests
import pandas as pd
import time
import re
from tqdm import tqdm

CLIENT_ID = "e1cvubjaunqp8g8rhyxmuog7ld4tsy"
CLIENT_SECRET = "tnettt8ysiuybxxi1g8dlkzlwbzice"

def get_access_token():
    url = "https://id.twitch.tv/oauth2/token"
    params = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "grant_type": "client_credentials"
    }
    response = requests.post(url, params=params)
    response.raise_for_status()
    return response.json()["access_token"]

def get_game_info(access_token, game_name):
    url = "https://api.igdb.com/v4/games"
    headers = {
        "Client-ID": CLIENT_ID,
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json"
    }
    
    data = f'fields name, genres.name, first_release_date; where name = "{game_name}"; limit 5;'
    response = requests.post(url, headers=headers, data=data)
    response.raise_for_status()
    results = response.json()
    
    if not results:
        data = f'fields name, genres.name, first_release_date; search "{game_name}"; limit 5;'
        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()
        results = response.json()
    
    if results:
        print(f"\ngame: {game_name} found {len(results)} result:")
        for i, game in enumerate(results):
            release_date = game.get("first_release_date")
            release_str = time.strftime('%Y-%m', time.gmtime(release_date)) if release_date else "Unknown"
            print(f"  {i+1}. {game['name']} - release time: {release_str}")
        
        game = results[0]
        genres = game.get("genres", [])
        genre_names = ", ".join([genre["name"] for genre in genres]) if genres else "Unknown"
        release_date = game.get("first_release_date")
        
        if release_date:
            release_year = time.strftime('%Y', time.gmtime(release_date))
            release_month = time.strftime('%m', time.gmtime(release_date))
            return {
                "Genre": genre_names, 
                "Release Year": release_year,
                "Release Month": release_month,
                "Game Name in IGDB": game["name"]
            }
    
    return {
        "Genre": "Unknown", 
        "Release Year": "Unknown",
        "Release Month": "Unknown",
        "Game Name in IGDB": "Not Found"
    }

In [None]:
import requests
import pandas as pd
import time
import re
from tqdm import tqdm

CLIENT_ID = "e1cvubjaunqp8g8rhyxmuog7ld4tsy"
CLIENT_SECRET = "tnettt8ysiuybxxi1g8dlkzlwbzice"

def get_access_token():
    url = "https://id.twitch.tv/oauth2/token"
    params = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "grant_type": "client_credentials"
    }
    response = requests.post(url, params=params)
    response.raise_for_status()
    return response.json()["access_token"]

def get_game_info(access_token, game_name):
    url = "https://api.igdb.com/v4/games"
    headers = {
        "Client-ID": CLIENT_ID,
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json"
    }
    
    data = f'fields name, genres.name, first_release_date, rating, status, platforms.name; where name = "{game_name}"; limit 5;'
    response = requests.post(url, headers=headers, data=data)
    response.raise_for_status()
    exact_results = response.json()
    
    search_results = []
    if not exact_results:
        clean_name = re.sub(r'[^\w\s]', '', game_name)
        data = f'fields name, genres.name, first_release_date, rating, status, platforms.name; search "{clean_name}"; limit 5;'
        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()
        search_results = response.json()
    
    results = exact_results if exact_results else search_results
    results_count = len(results)
    
    if results:
        best_match = None
        highest_rating = -1
        
        for game in results:
            if game["name"].lower() == game_name.lower() or game["name"].lower() in game_name.lower() or game_name.lower() in game["name"].lower():
                rating = game.get("rating", 0)
                if rating > highest_rating:
                    highest_rating = rating
                    best_match = game
        
        if not best_match and results:
            best_match = results[0]
            
        if best_match:
            genres = best_match.get("genres", [])
            genre_names = ", ".join([genre["name"] for genre in genres]) if genres else "Unknown"
            release_date = best_match.get("first_release_date")
            
            if release_date:
                release_year = time.strftime('%Y', time.gmtime(release_date))
                release_month = time.strftime('%m', time.gmtime(release_date))
                return {
                    "Genre": genre_names, 
                    "Release Year": release_year,
                    "Release Month": release_month,
                    "Game Name in IGDB": best_match["name"],
                    "Multiple Results": "yes" if results_count > 1 else "no",
                    "Results Count": results_count
                }
    
    return {
        "Genre": "Unknown", 
        "Release Year": "Unknown",
        "Release Month": "Unknown",
        "Game Name in IGDB": "Not Found",
        "Multiple Results": "no",
        "Results Count": 0
    }

def process_all_games(df_games, access_token):
    print(f"start processing{len(df_games)} games ...")
    game_info_list = []
    
    for game in tqdm(df_games["Game"].tolist(), desc="get game info"):
        try:
            info = get_game_info(access_token, game)
            game_info_list.append({
                "Game": game,
                "Genre": info["Genre"],
                "Release Year": info["Release Year"],
                "Release Month": info["Release Month"],
                "Game Name in IGDB": info["Game Name in IGDB"],
                "Multiple Results": info["Multiple Results"],
                "Results Count": info["Results Count"]
            })
            time.sleep(0.2 + 0.3 * (time.time() % 1))
        except Exception as e:
            print(f"\n '{game}' error: {str(e)}")
            game_info_list.append({
                "Game": game,
                "Genre": "Error",
                "Release Year": "Error",
                "Release Month": "Error",
                "Game Name in IGDB": f"Error: {str(e)}",
                "Multiple Results": "no",
                "Results Count": 0
            })
            time.sleep(1)
        
        if len(game_info_list) % 100 == 0:
            temp_df = pd.DataFrame(game_info_list)
            temp_df.to_csv(f"game_info_temp_{len(game_info_list)}.csv", index=False, encoding='UTF-8')
            print(f"\nprocessed {len(game_info_list)} games, save temp")
            
    df_game_info = pd.DataFrame(game_info_list)
    output_path = "game_info_full.csv"
    df_game_info.to_csv(output_path, index=False, encoding='UTF-8')
    print(f"processed and saved to {output_path}")
    
    multiple_results = df_game_info[df_game_info["Multiple Results"] == "是"].shape[0]
    not_found = df_game_info[df_game_info["Game Name in IGDB"] == "Not Found"].shape[0]
    errors = df_game_info[df_game_info["Release Year"] == "Error"].shape[0]
    
    return df_game_info

if __name__ == "__main__":
    unique_games_path = "unique_games.csv"
    try:
        df_games = pd.read_csv(unique_games_path, encoding='UTF-8')
    except UnicodeDecodeError:
        df_games = pd.read_csv(unique_games_path, encoding='ISO-8859-1')
    
    access_token = get_access_token()
    
    while True:
        process_option = input("\n1. all\n2. number\nplease input (1/2): ")
        if process_option == '1':
            df_game_info = process_all_games(df_games, access_token)
            break
        elif process_option == '2':
            try:
                num_games = int(input("please input number"))
                if num_games > 0 and num_games <= len(df_games):
                    df_partial = df_games.head(num_games)
                    df_game_info = process_all_games(df_partial, access_token)
                    break
    
    print("done")

In [1]:
import requests
import pandas as pd
import time
import re
from tqdm import tqdm

# IGDB API 凭据
CLIENT_ID = "e1cvubjaunqp8g8rhyxmuog7ld4tsy"
CLIENT_SECRET = "tnettt8ysiuybxxi1g8dlkzlwbzice"

# 获取 OAuth 令牌
def get_access_token():
    url = "https://id.twitch.tv/oauth2/token"
    params = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "grant_type": "client_credentials"
    }
    response = requests.post(url, params=params)
    response.raise_for_status()
    return response.json()["access_token"]

# 查询 IGDB API 获取游戏信息
def get_game_info(access_token, game_name):
    url = "https://api.igdb.com/v4/games"
    headers = {
        "Client-ID": CLIENT_ID,
        "Authorization": f"Bearer {access_token}",
        "Accept": "application/json"
    }
    
    data = f'fields name, genres.name, first_release_date, rating, status, platforms.name; where name = "{game_name}"; limit 5;'
    response = requests.post(url, headers=headers, data=data)
    response.raise_for_status()
    exact_results = response.json()
    
    search_results = []
    if not exact_results:
        clean_name = re.sub(r'[^\w\s]', '', game_name)
        data = f'fields name, genres.name, first_release_date, rating, status, platforms.name; search "{clean_name}"; limit 5;'
        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()
        search_results = response.json()
    
    results = exact_results if exact_results else search_results
    results_count = len(results)
    
    if results:
        best_match = None
        highest_rating = -1
        
        for game in results:
            if game["name"].lower() == game_name.lower() or game["name"].lower() in game_name.lower() or game_name.lower() in game["name"].lower():
                rating = game.get("rating", 0)
                if rating > highest_rating:
                    highest_rating = rating
                    best_match = game
        
        if not best_match and results:
            best_match = results[0]
            
        if best_match:
            genres = best_match.get("genres", [])
            genre_names = ", ".join([genre["name"] for genre in genres]) if genres else "Unknown"
            release_date = best_match.get("first_release_date")
            
            if release_date:
                release_year = time.strftime('%Y', time.gmtime(release_date))
                release_month = time.strftime('%m', time.gmtime(release_date))
                return {
                    "Genre": genre_names, 
                    "Release Year": release_year,
                    "Release Month": release_month,
                    "Game Name in IGDB": best_match["name"],
                    "Multiple Results": "是" if results_count > 1 else "否",
                    "Results Count": results_count
                }
    
    return {
        "Genre": "Unknown", 
        "Release Year": "Unknown",
        "Release Month": "Unknown",
        "Game Name in IGDB": "Not Found",
        "Multiple Results": "否",
        "Results Count": 0
    }

# 处理所有游戏
def process_all_games(df_games, access_token):
    print(f"开始处理全部 {len(df_games)} 个游戏...")
    game_info_list = []
    
    for game in tqdm(df_games["Game"].tolist(), desc="获取游戏信息"):
        try:
            info = get_game_info(access_token, game)
            game_info_list.append({
                "Game": game,
                "Genre": info["Genre"],
                "Release Year": info["Release Year"],
                "Release Month": info["Release Month"],
                "Game Name in IGDB": info["Game Name in IGDB"],
                "Multiple Results": info["Multiple Results"],
                "Results Count": info["Results Count"]
            })
            time.sleep(0.2 + 0.3 * (time.time() % 1))
        except Exception as e:
            print(f"\n处理游戏 '{game}' 时出错: {str(e)}")
            game_info_list.append({
                "Game": game,
                "Genre": "Error",
                "Release Year": "Error",
                "Release Month": "Error",
                "Game Name in IGDB": f"Error: {str(e)}",
                "Multiple Results": "否",
                "Results Count": 0
            })
            time.sleep(1)
            
    df_game_info = pd.DataFrame(game_info_list)
    output_path = "igdb_data.csv"
    df_game_info.to_csv(output_path, index=False, encoding='UTF-8')
    print(f"处理完成，文件已保存至 {output_path}")
    
    return df_game_info

if __name__ == "__main__":
    unique_games_path = "unique_games.csv"
    try:
        df_games = pd.read_csv(unique_games_path, encoding='UTF-8')
    except UnicodeDecodeError:
        df_games = pd.read_csv(unique_games_path, encoding='ISO-8859-1')
    
    print("获取API访问令牌...")
    access_token = get_access_token()
    
    while True:
        process_option = input("请选择处理方式:\n1. 处理全部游戏\n2. 处理指定数量的游戏\n请输入选择 (1/2): ")
        if process_option == '1':
            df_game_info = process_all_games(df_games, access_token)
            break
        elif process_option == '2':
            try:
                num_games = int(input("请输入要处理的游戏数量: "))
                if num_games > 0 and num_games <= len(df_games):
                    df_partial = df_games.head(num_games)
                    df_game_info = process_all_games(df_partial, access_token)
                    break
                else:
                    print(f"请输入有效数字 (1-{len(df_games)})")
            except ValueError:
                print("请输入有效数字")
        else:
            print("无效选择，请重新输入")
    
    print("处理完成！")

获取API访问令牌...
开始处理全部 2360 个游戏...


获取游戏信息:  52%|█████▏    | 1218/2360 [14:25<16:05,  1.18it/s]


处理游戏 'nan' 时出错: expected string or bytes-like object, got 'float'


获取游戏信息: 100%|██████████| 2360/2360 [27:35<00:00,  1.43it/s]

处理完成，文件已保存至 igdb_data.csv
处理完成！



