### 导入必要工具包

In [1]:
import aiohttp
import asyncio
import aiofiles
import json
import pandas as pd
import gc # 防止内存泄漏
import os

### 初始化CSV文件

首先先要在ids_for_api_use.csv文件中添加一列read or not用于后续调用api时被系统判断是否被读取过。<br>
原因是识别到哪些账号已经被fetch过，能够继续在没有检索过的ids里继续自动fetch。<br>

In [2]:

# 读取CSV文件
df = pd.read_csv('../../../data/raw/ids_for_api_use.csv')

# 检查 'read_or_not' 列是否存在
if 'read_or_not' not in df.columns:
    # 如果 'read_or_not' 列不存在，则创建这列并将所有值初始化为0
    df['read_or_not'] = 0
    print("初始化 'read_or_not' 列为0，因为列不存在。")
else:
    # 如果列已存在，检查是否有任何已读的条目（即值为1）
    if df['read_or_not'].eq(1).any():
        print("存在已读条目。")
    else:
        # 如果没有已读条目，也可以选择将所有值设置为0
        df['read_or_not'] = 0
        print("没有已读条目，所有条目设置为未读。")

# 保存DataFrame回到CSV文件，不包括索引
df.to_csv('../../../data/raw/ids_for_api_use.csv', index=False)


存在已读条目。


### 定义SteamAPI类

In [3]:
class SteamAPI:
    '''初始化，创建构造器'''
    def __init__(self, api_key, steam_id, session):
        self.api_key = api_key
        self.steam_id = steam_id
        self.session = session

    '''定义抓取数据函数'''
    async def fetch(self, url):
        try:
            async with self.session.get(url) as response:
                response.raise_for_status()
                return await response.json() # 返回数据为json格式
        except aiohttp.ClientResponseError as e:
            print(f"HTTP Error: {e.status} for URL: {url}")
        except aiohttp.ClientError as e:
            print(f"Client Error: {str(e)} for URL: {url}")
        except asyncio.TimeoutError:
            print(f"TimeoutError: Request to {url} timed out")
        return None

    async def get_player_info(self): # 抓取用户个人资料
        player_info_url = f"http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={self.api_key}&steamids={self.steam_id}"
        return await self.fetch(player_info_url)

    async def get_player_games(self): # 抓取用户游戏信息
        player_games_url = f"https://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key={self.api_key}&steamid={self.steam_id}&format=json&include_appinfo=true&include_extended_appinfo=true"
        return await self.fetch(player_games_url)

    # async def get_player_friends(self): # 抓取用户好友信息
    #     player_friends_url = f"http://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key={self.api_key}&steamid={self.steam_id}&relationship=friend"
    #     return await self.fetch(player_friends_url)
    
    async def get_recently_played_games(self): # 抓取过去两周好友游玩时长
        player_recently_played_games_url = f"http://api.steampowered.com/IPlayerService/GetRecentlyPlayedGames/v0001/?key={self.api_key}&steamid={self.steam_id}&&format=json&count=3"
        return await self.fetch(player_recently_played_games_url)    

    async def run(self):
        return {
            'player_info': await self.get_player_info(),
            'player_games': await self.get_player_games(),
            # 'player_friends': await self.get_player_friends(),
            'recently_played_games': await self.get_recently_played_games()
        }

save data

In [4]:
async def save_json(filename, data):
    async with aiofiles.open(filename, 'a') as f:
        await f.write(json.dumps(data, indent=4) + ',\n')

handle_steam_ids

In [5]:
'''异步获取并处理指定 Steam 用户数据,semaphore是顶层设计，同一时间的信号数'''
async def handle_steam_id(api_key, steam_id, session, semaphore):
    async with semaphore:
        steam_api = SteamAPI(api_key, steam_id, session)
        result = await steam_api.run()
        return {steam_id: result}

process batches

In [6]:
'''处理一批先前的return字典信息，组织成列表，并将处理结果保存到 JSON 文件中'''

async def process_batch(api_key, batch, session, semaphore, filename, df):
    tasks = [handle_steam_id(api_key, steam_id, session, semaphore) for steam_id in batch]
    results = await asyncio.gather(*tasks)
    await save_json(filename, results)
    
    # 标记已处理过的 IDs
    for steam_id in batch:
        df.loc[df['key'] == steam_id, 'read_or_not'] = 1
    # 实时保存更新后的CSV文件
    df.to_csv('../../../data/raw/ids_for_api_use.csv',index = False)

get_next_filename

In [7]:
'''获取下一个可用的文件名，例如：当没有all_steam_data1.json时候就用'''
def get_next_filename(base_path, base_name, extension):
    index = 1
    while True:
        filename = os.path.join(base_path, f"{base_name}{index}.{extension}") # 生成结构化文件
        if not os.path.exists(filename):
            return filename
        index += 1

main process

In [8]:
async def main():
    api_key = os.getenv('STEAM_API_KEY2')
    df = pd.read_csv(r'../../../data/raw/ids_for_api_use.csv')
    start_index = df[df['read_or_not'] == 0].index[0]
    steam_ids = df.loc[start_index:, 'key'].tolist()
    semaphore = asyncio.Semaphore(5) 
    batch_size = 5
    base_path = '../../../data/raw'
    base_name = 'all_steam_data'
    extension = 'json'

    # 获取下一个可用的文件名
    filename = get_next_filename(base_path, base_name, extension)

    timeout = aiohttp.ClientTimeout(total=6)  # 设置总超时时间为6秒
    connector = aiohttp.TCPConnector(limit_per_host=5)
    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        for i in range(0, len(steam_ids), batch_size):
            batch = steam_ids[i:i + batch_size]
            await process_batch(api_key, batch, session, semaphore, filename,df)
            await asyncio.sleep(2)
            gc.collect()


网络正常大约运行1小时左右

In [None]:
await main()