In [2]:
pip install requests networkx


Note: you may need to restart the kernel to use updated packages.


# Bước 1: Chuẩn bị môi trường & API key

In [None]:
# %% 
import os
from pathlib import Path
import json
import time
import hashlib
from itertools import islice
import requests
import networkx as nx
from dotenv import load_dotenv

# Load API key
load_dotenv()
API_KEY = os.getenv("STEAM_API_KEY")
if not API_KEY:
    raise SystemExit("Please set STEAM_API_KEY in environment or .env file")

BASE = "https://api.steampowered.com"
CACHE_DIR = Path("steam_cache")
CACHE_DIR.mkdir(exist_ok=True)

DELAY_BETWEEN_REQUESTS = 0.5
BATCH_SIZE = 50
MAX_ACHIEVEMENTS_APPS = 3
REQUEST_TIMEOUT = 15


# STEAM CRAWLER - full functions (cache, API, endpoints, BFS crawler, save outputs)
- Cache để tránh request trùng
- API request với backoff, skip private
- Các endpoint Steam: summaries, friends, owned games, recently played, groups, bans, achievements
- BFS crawl nhiều batch, skip user/private, merge cuối cùng
- Safe với NetworkX GEXF (loại bỏ NoneType)

In [10]:
# %% 
"""
STEAM CRAWLER - full functions (cache, API, endpoints, BFS crawler, save outputs)
- Cache để tránh request trùng
- API request với backoff, skip private
- Các endpoint Steam: summaries, friends, owned games, recently played, groups, bans, achievements
- BFS crawl nhiều batch, skip user/private, merge cuối cùng
- Safe với NetworkX GEXF (loại bỏ NoneType)
"""

import json, time, hashlib
from itertools import islice
import requests
import networkx as nx
from pathlib import Path

# Cache utils
CACHE_DIR = Path("steam_cache")
CACHE_DIR.mkdir(exist_ok=True)

def cache_key(endpoint, params):
    s = endpoint + json.dumps(params, sort_keys=True)
    return hashlib.sha1(s.encode("utf-8")).hexdigest()

def cache_load(endpoint, params):
    k = cache_key(endpoint, params)
    p = CACHE_DIR / f"{k}.json"
    if p.exists():
        try:
            return json.loads(p.read_text(encoding="utf-8"))
        except Exception:
            return None
    return None

def cache_save(endpoint, params, data):
    k = cache_key(endpoint, params)
    p = CACHE_DIR / f"{k}.json"
    p.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")

# API request with backoff
DELAY_BETWEEN_REQUESTS = 0.5
BATCH_SIZE = 50
MAX_ACHIEVEMENTS_APPS = 3
REQUEST_TIMEOUT = 15
BASE = "https://api.steampowered.com"
API_KEY = os.getenv("STEAM_API_KEY")

def api_get(endpoint, params, use_cache=True):
    params = dict(params)
    params["key"] = API_KEY
    if use_cache:
        cached = cache_load(endpoint, params)
        if cached is not None:
            return cached

    url = f"{BASE}/{endpoint}"
    backoff = 1.0
    for attempt in range(6):
        try:
            r = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
            if r.status_code == 200:
                try: data = r.json()
                except ValueError: data = r.text
                cache_save(endpoint, params, data)
                time.sleep(DELAY_BETWEEN_REQUESTS)
                return data
            elif r.status_code in (429, 503): time.sleep(backoff+0.1); backoff*=2
            elif r.status_code in (401,403):
                print(f"[PRIVATE] skip: {params}")
                return {"private": True}
            else:
                print(f"[WARN] {r.status_code} from {url} params={params}")
                return None
        except requests.RequestException as e:
            print(f"[WARN] Request exception {e}, backoff {backoff}s")
            time.sleep(backoff)
            backoff*=2
    print("[ERROR] Max retries exceeded for", url)
    return None

# Helper chunks
def chunks(iterable, n):
    it = iter(iterable)
    while True:
        chunk = list(islice(it, n))
        if not chunk: break
        yield chunk

# Steam endpoints
def get_player_summaries(steamid_list):
    endpoint="ISteamUser/GetPlayerSummaries/v0002/"; results=[]
    for batch in chunks(steamid_list,BATCH_SIZE):
        params={"steamids":",".join(batch)}
        data=api_get(endpoint,params)
        if data and not data.get("private"):
            results.extend(data.get("response",{}).get("players",[]))
    return results

def get_friend_list(steamid):
    endpoint="ISteamUser/GetFriendList/v0001/"
    params={"steamid":steamid,"relationship":"friend"}
    data=api_get(endpoint,params)
    if data and data.get("private"): return []
    return [f.get("steamid") for f in data.get("friendslist",{}).get("friends",[])] if data else []

def get_owned_games(steamid):
    endpoint="IPlayerService/GetOwnedGames/v0001/"
    params={"steamid":steamid,"include_appinfo":1,"include_played_free_games":1,"format":"json"}
    data=api_get(endpoint,params)
    if data and data.get("private"): return {}
    return data.get("response",{}) if data else {}

def get_recently_played(steamid):
    endpoint="IPlayerService/GetRecentlyPlayedGames/v0001/"
    params={"steamid":steamid}
    data=api_get(endpoint,params)
    if data and data.get("private"): return {}
    return data.get("response",{}) if data else {}

def get_user_groups(steamid):
    endpoint="ISteamUser/GetUserGroupList/v1/"
    params={"steamid":steamid}
    data=api_get(endpoint,params)
    if data and data.get("private"): return []
    return data.get("response",{}).get("groups",[]) if data else []

def get_player_bans(steamid_list):
    endpoint="ISteamUser/GetPlayerBans/v1/"; results=[]
    for batch in chunks(steamid_list,BATCH_SIZE):
        params={"steamids":",".join(batch)}
        data=api_get(endpoint,params)
        if data and not data.get("private"): results.extend(data.get("players",[]))
    return results

def get_player_achievements(steamid,appid):
    endpoint="ISteamUserStats/GetPlayerAchievements/v1/"
    params={"steamid":steamid,"appid":appid}
    data=api_get(endpoint,params)
    if data and data.get("private"): return {}
    return data.get("playerstats",{}) if data else {}

def resolve_vanity(name):
    endpoint="ISteamUser/ResolveVanityURL/v0001/"
    params={"vanityurl":name}
    data=api_get(endpoint,params)
    if data and data.get("response",{}).get("success")==1:
        return data["response"].get("steamid")
    return None

# BFS crawler
def crawl(seed_ids, depth=1, max_users=1000, max_apps_per_user=3, visited_global=None):
    G=nx.Graph()
    visited_global=visited_global or set()
    queue=[(s,0) for s in seed_ids]
    all_profiles={}
    stats={"public":0,"private":0,"skipped":0}

    while queue and len(visited_global)<max_users:
        steamid,d=queue.pop(0)
        if steamid in visited_global or d>depth: continue
        if not steamid.isdigit(): steamid=resolve_vanity(steamid) or steamid
        if not steamid.isdigit(): stats["skipped"]+=1; continue

        players=get_player_summaries([steamid])
        if not players: stats["private"]+=1; visited_global.add(steamid); continue
        player=players[0]; visited_global.add(steamid); stats["public"]+=1
        all_profiles[steamid]=player
        G.add_node(steamid,label=player.get("personaname",""),avatar=player.get("avatarfull",""),country=player.get("loccountrycode",""))

        friends=get_friend_list(steamid)
        for f in friends:
            G.add_edge(steamid,f)
            if f not in visited_global and d+1<=depth: queue.append((f,d+1))

        owned=get_owned_games(steamid); recent=get_recently_played(steamid); groups=get_user_groups(steamid)
        G.nodes[steamid]["owned_game_count"]=owned.get("game_count",0)
        G.nodes[steamid]["recent_count"]=recent.get("total_count",0)
        G.nodes[steamid]["groups"]=[g.get("groupid64") for g in groups] if groups else []

        apps=owned.get("games",[]) if isinstance(owned,dict) else []
        top_apps=sorted(apps,key=lambda x:x.get("playtime_forever",0),reverse=True)[:max_apps_per_user]
        G.nodes[steamid]["top_apps"]=[a.get("appid") for a in top_apps]
        achievements={}
        for app in top_apps: achievements[app.get("appid")]=get_player_achievements(steamid,app.get("appid"))
        G.nodes[steamid]["achievements_sample"]=achievements

    bans=get_player_bans(list(visited_global))
    bans_map={b["SteamId"]:b for b in bans}
    for node in G.nodes: G.nodes[node]["ban_info"]=bans_map.get(node,{})

    print(f"Stats: {stats}")
    return G, all_profiles, visited_global

def clean_node_attrs(G):
    for n, data in G.nodes(data=True):
        for k, v in list(data.items()):
            if v is None:              # Nếu là None thì đổi thành chuỗi rỗng
                data[k] = ""
            elif isinstance(v, (list, dict)):  # Nếu là list/dict thì convert sang string
                data[k] = str(v)
    return G

def load_existing_profiles(json_path):
    if Path(json_path).exists():
        with open(json_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        # Lấy danh sách steamid đã crawl
        return set(data.keys()), data
    return set(), {}



def save_outputs(G, profiles, out_prefix="steam_output"):
    import json
    import networkx as nx
    
    # Clean graph trước khi lưu
    G = clean_node_attrs(G)

    # Save JSON
    json_path = f"{out_prefix}_profiles.json"
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(profiles, f, indent=2, ensure_ascii=False)

    # Save GEXF
    gexf_path = f"{out_prefix}_graph.gexf"
    nx.write_gexf(G, gexf_path)

    print(f"Saved profiles -> {json_path}")
    print(f"Saved graph -> {gexf_path}")

# Load dữ liệu cũ
visited_prev, profiles_prev = load_existing_profiles("steam_output_test_profiles.json")

# Chạy crawl mới, truyền visited_global để tránh trùng
SEEDS = ["76561198294300457", "76561197973974836"]  
G_new, profiles_new, visited_new = crawl(
    SEEDS, depth=2, max_users=5000, max_apps_per_user=3, visited_global=visited_prev
)

# Merge profiles cũ + mới
profiles_prev.update(profiles_new)

# Lưu lại (ghi đè)
save_outputs(G_new, profiles_prev, out_prefix="steam_output_test")


🚀 Batch 1 (target 100 users)...
[WARN] 400 from https://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v1/
[WARN] 400 from https://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v1/
[WARN] 400 from https://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v1/
[WARN] 400 from https://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v1/
[WARN] Exception HTTPSConnectionPool(host='api.steampowered.com', port=443): Read timed out. (read timeout=15), backoff 1.0s
[WARN] 400 from https://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v1/
[WARN] 400 from https://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v1/
[WARN] 400 from https://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v1/
Stats: {'public': 99, 'private': 1, 'skipped': 0}
✅ Checkpoint saved: 99 users, 8065 nodes

🚀 Batch 2 (target 100 users)...
Stats: {'public': 0, 'private': 0, 'skipped': 0}
✅ Checkpoint saved: 99 users, 8065 nodes

🚀 Batch 3 (target 

KeyboardInterrupt: 

In [None]:
# %% 
"""
STEAM CRAWLER - SAFE THROTTLED VERSION
--------------------------------------
- Cache để tránh request trùng
- API request với backoff, skip private
- Throttle ngẫu nhiên để tránh spam
- Các endpoint Steam: summaries, friends, owned games, recently played, groups, bans, achievements
- BFS crawl nhiều batch, skip user/private, merge cuối cùng
- Safe với NetworkX GEXF (loại bỏ NoneType)
"""

import json, time, hashlib, random
from itertools import islice
import requests
import networkx as nx
from pathlib import Path

# ================================
# Cache utils
# ================================
CACHE_DIR = Path("steam_cache")
CACHE_DIR.mkdir(exist_ok=True)

def cache_key(endpoint, params):
    s = endpoint + json.dumps(params, sort_keys=True)
    return hashlib.sha1(s.encode("utf-8")).hexdigest()

def cache_load(endpoint, params):
    k = cache_key(endpoint, params)
    p = CACHE_DIR / f"{k}.json"
    if p.exists():
        try:
            return json.loads(p.read_text(encoding="utf-8"))
        except Exception:
            return None
    return None

def cache_save(endpoint, params, data):
    k = cache_key(endpoint, params)
    p = CACHE_DIR / f"{k}.json"
    p.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")

# ================================
# Config crawl
# ================================
DELAY_MIN = 1.0        # minimum delay (s)
DELAY_MAX = 2.0        # maximum delay (s)
BATCH_SIZE = 50
MAX_APPS_PER_USER = 3
REQUEST_TIMEOUT = 15
BASE = "https://api.steampowered.com"
API_KEY = os.getenv("STEAM_API_KEY")

# ================================
# API request with backoff & throttle
# ================================
def api_get(endpoint, params, use_cache=True):
    params = dict(params)
    params["key"] = API_KEY
    if use_cache:
        cached = cache_load(endpoint, params)
        if cached is not None:
            return cached

    url = f"{BASE}/{endpoint}"
    backoff = 1.0
    for attempt in range(6):
        try:
            r = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
            if r.status_code == 200:
                try: data = r.json()
                except ValueError: data = r.text
                cache_save(endpoint, params, data)
                # Throttle ngẫu nhiên
                time.sleep(random.uniform(DELAY_MIN, DELAY_MAX))
                return data
            elif r.status_code in (429, 503):
                time.sleep(backoff + random.uniform(0.1,0.5))
                backoff *= 2
            elif r.status_code in (401,403):
                print(f"[PRIVATE] skip: {params}")
                return {"private": True}
            else:
                print(f"[WARN] {r.status_code} from {url} params={params}")
                return None
        except requests.RequestException as e:
            print(f"[WARN] Request exception {e}, backoff {backoff}s")
            time.sleep(backoff + random.uniform(0.1,0.5))
            backoff *= 2
    print("[ERROR] Max retries exceeded for", url)
    return None

# ================================
# Helper chunks
# ================================
def chunks(iterable, n):
    it = iter(iterable)
    while True:
        chunk = list(islice(it, n))
        if not chunk: break
        yield chunk

# ================================
# Steam endpoints
# ================================
def get_player_summaries(steamid_list):
    endpoint="ISteamUser/GetPlayerSummaries/v0002/"; results=[]
    for batch in chunks(steamid_list,BATCH_SIZE):
        params={"steamids":",".join(batch)}
        data=api_get(endpoint,params)
        if data and not data.get("private"):
            results.extend(data.get("response",{}).get("players",[]))
    return results

def get_friend_list(steamid):
    endpoint="ISteamUser/GetFriendList/v0001/"
    params={"steamid":steamid,"relationship":"friend"}
    data=api_get(endpoint,params)
    if data and data.get("private"): return []
    return [f.get("steamid") for f in data.get("friendslist",{}).get("friends",[])] if data else []

def get_owned_games(steamid):
    endpoint="IPlayerService/GetOwnedGames/v0001/"
    params={"steamid":steamid,"include_appinfo":1,"include_played_free_games":1,"format":"json"}
    data=api_get(endpoint,params)
    if data and data.get("private"): return {}
    return data.get("response",{}) if data else {}

def get_recently_played(steamid):
    endpoint="IPlayerService/GetRecentlyPlayedGames/v0001/"
    params={"steamid":steamid}
    data=api_get(endpoint,params)
    if data and data.get("private"): return {}
    return data.get("response",{}) if data else {}

def get_user_groups(steamid):
    endpoint="ISteamUser/GetUserGroupList/v1/"
    params={"steamid":steamid}
    data=api_get(endpoint,params)
    if data and data.get("private"): return []
    return data.get("response",{}).get("groups",[]) if data else []

def get_player_bans(steamid_list):
    endpoint="ISteamUser/GetPlayerBans/v1/"; results=[]
    for batch in chunks(steamid_list,BATCH_SIZE):
        params={"steamids":",".join(batch)}
        data=api_get(endpoint,params)
        if data and not data.get("private"): results.extend(data.get("players",[]))
    return results

def get_player_achievements(steamid,appid):
    endpoint="ISteamUserStats/GetPlayerAchievements/v1/"
    params={"steamid":steamid,"appid":appid}
    data=api_get(endpoint,params)
    if data and data.get("private"): return {}
    return data.get("playerstats",{}) if data else {}

def resolve_vanity(name):
    endpoint="ISteamUser/ResolveVanityURL/v0001/"
    params={"vanityurl":name}
    data=api_get(endpoint,params)
    if data and data.get("response",{}).get("success")==1:
        return data["response"].get("steamid")
    return None

# ================================
# BFS crawler with visited_global
# ================================
def crawl(seed_ids, depth=1, max_users=1000, max_apps_per_user=3, visited_global=None):
    G=nx.Graph()
    visited_global=visited_global or set()
    queue=[(s,0) for s in seed_ids]
    all_profiles={}
    stats={"public":0,"private":0,"skipped":0}

    while queue and len(visited_global)<max_users:
        steamid,d=queue.pop(0)
        if steamid in visited_global or d>depth: continue
        if not steamid.isdigit(): steamid=resolve_vanity(steamid) or steamid
        if not steamid.isdigit(): stats["skipped"]+=1; continue

        players=get_player_summaries([steamid])
        if not players: stats["private"]+=1; visited_global.add(steamid); continue
        player=players[0]; visited_global.add(steamid); stats["public"]+=1
        all_profiles[steamid]=player
        G.add_node(steamid,label=player.get("personaname",""),avatar=player.get("avatarfull",""),country=player.get("loccountrycode",""))

        friends=get_friend_list(steamid)
        for f in friends:
            G.add_edge(steamid,f)
            if f not in visited_global and d+1<=depth: queue.append((f,d+1))

        owned=get_owned_games(steamid); recent=get_recently_played(steamid); groups=get_user_groups(steamid)
        G.nodes[steamid]["owned_game_count"]=owned.get("game_count",0)
        G.nodes[steamid]["recent_count"]=recent.get("total_count",0)
        G.nodes[steamid]["groups"]=[g.get("groupid64") for g in groups] if groups else []

        apps=owned.get("games",[]) if isinstance(owned,dict) else []
        top_apps=sorted(apps,key=lambda x:x.get("playtime_forever",0),reverse=True)[:max_apps_per_user]
        G.nodes[steamid]["top_apps"]=[a.get("appid") for a in top_apps]
        achievements={}
        for app in top_apps: achievements[app.get("appid")]=get_player_achievements(steamid,app.get("appid"))
        G.nodes[steamid]["achievements_sample"]=achievements

    bans=get_player_bans(list(visited_global))
    bans_map={b["SteamId"]:b for b in bans}
    for node in G.nodes: G.nodes[node]["ban_info"]=bans_map.get(node,{})

    print(f"Stats: {stats}")
    return G, all_profiles, visited_global

# ================================
# Clean node attributes for GEXF
# ================================
def clean_node_attrs(G):
    for n, data in G.nodes(data=True):
        for k, v in list(data.items()):
            if v is None: data[k] = ""
            elif isinstance(v,(list,dict)): data[k]=str(v)
    return G

# ================================
# Load existing profiles
# ================================
def load_existing_profiles(json_path):
    if Path(json_path).exists():
        with open(json_path,"r",encoding="utf-8") as f:
            data=json.load(f)
        return set(data.keys()), data
    return set(), {}

# ================================
# Save outputs
# ================================
def save_outputs(G, profiles, out_prefix="steam_output"):
    G=clean_node_attrs(G)
    json_path=f"{out_prefix}_profiles.json"
    with open(json_path,"w",encoding="utf-8") as f: json.dump(profiles,f,indent=2,ensure_ascii=False)
    gexf_path=f"{out_prefix}_graph.gexf"
    nx.write_gexf(G,gexf_path)
    print(f"Saved profiles -> {json_path}")
    print(f"Saved graph -> {gexf_path}")

# ================================
# Example run
# ================================
visited_prev, profiles_prev = load_existing_profiles("steam_output_test_profiles.json")
SEEDS = ["76561198294300457","76561197973974836"]  # thay bằng steamid của bạn

G_new, profiles_new, visited_new = crawl
