In [4]:
import pandas as pd
import os
import json
from datetime import datetime
from tqdm.notebook import tqdm
import concurrent.futures # 並列処理のために追加

print("ライブラリのインポートが完了しました。")

ライブラリのインポートが完了しました。


In [5]:
# --- 定数定義 ---
INFLUENCERS_FILE = 'influencers.txt'
INFO_DIR = 'posts_info/unzipped_data_7z/info/'
OUTPUT_CSV_FILE = 'all_categories_posts_summary_parallel.csv' # 出力ファイル名を変更

# --- 関数定義 ---

def load_influencer_data(filepath):
    """influencers.txtを読み込み、DataFrameを返す。"""
    try:
        df = pd.read_csv(filepath, sep='\t', skiprows=[1])
        return df
    except FileNotFoundError:
        print(f"エラー: `{filepath}` が見つかりません。")
        return pd.DataFrame()

def process_single_influencer(username):
    """
    一人のインフルエンサーに関する全ての投稿ファイルを処理する関数（並列処理のワーカー）。
    """
    local_posts = []
    try:
        all_files = os.listdir(INFO_DIR)
        user_post_files = [f for f in all_files if f.startswith(f"{username}-") and f.endswith('.info')]

        for filename in user_post_files:
            filepath = os.path.join(INFO_DIR, filename)
            with open(filepath, 'r', encoding='utf-8') as f:
                data = json.load(f)

            timestamp = data.get('taken_at_timestamp', 0)
            likes = data.get('edge_media_preview_like', {}).get('count', 0)
            comments = data.get('edge_media_to_parent_comment', {}).get('count', 0)

            local_posts.append({
                'username': username,
                'timestamp': timestamp,
                'datetime': datetime.fromtimestamp(timestamp),
                'likes': likes,
                'comments': comments,
            })
        return local_posts
    except Exception as e:
        # エラーが発生した場合は空のリストを返し、エラーメッセージを表示
        print(f"警告: {username}の処理中にエラーが発生しました: {e}")
        return []

def load_all_posts_data_parallel(influencer_list):
    """
    ThreadPoolExecutorを使って、全インフルエンサーの投稿データを並列で読み込む。
    """
    all_posts = []
    
    # max_workersはお使いのPCのコア数やディスク性能に応じて調整
    # Noneに設定すると適切な数が自動で選ばれます
    with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor:
        # tqdmを使って進捗を表示
        # executor.mapは、influencer_listの各要素をprocess_single_influencer関数に渡し、並列実行する
        results = list(tqdm(executor.map(process_single_influencer, influencer_list), total=len(influencer_list), desc="インフルエンサー"))

    # 並列処理の結果はリストのリストになっているため、一つのリストにまとめる（平坦化）
    for post_list in results:
        all_posts.extend(post_list)

    if not all_posts:
        print("警告: 投稿データが一件も見つかりませんでした。")
        return pd.DataFrame()

    df = pd.DataFrame(all_posts)
    df = df.sort_values('datetime', ascending=False).reset_index(drop=True)
    return df

print("関数が定義されました。")

関数が定義されました。


In [6]:
# 1. 全インフルエンサーの情報をDataFrameとして読み込む
df_influencers = load_influencer_data(INFLUENCERS_FILE)

if not df_influencers.empty:
    # 2. 全てのインフルエンサーのUsernameを取得
    all_influencer_list = sorted(df_influencers['Username'].unique())
    print(f"全カテゴリのインフルエンサーを {len(all_influencer_list)} 人見つけました。")
    
    # 3. 並列処理で全インフルエンサーの投稿データをDataFrameに集約
    df_all_posts = load_all_posts_data_parallel(all_influencer_list)
    
    if not df_all_posts.empty:
        print(f"\n処理が完了しました。合計 {len(df_all_posts)} 件の投稿データを読み込みました。")
        print("\n--- データプレビュー ---")
        display(df_all_posts.head())
    else:
        print("データフレームが空です。")
else:
    print("インフルエンサー情報が取得できなかったため、処理を中断します。")

全カテゴリのインフルエンサーを 33934 人見つけました。


インフルエンサー:   0%|          | 0/33934 [00:00<?, ?it/s]

警告: 40_little_toesの処理中にエラーが発生しました: Expecting value: line 1 column 1 (char 0)


In [None]:
if 'df_all_posts' in locals() and not df_all_posts.empty:
    columns_to_save = ['datetime', 'username', 'likes', 'comments']
    df_all_posts[columns_to_save].to_csv(OUTPUT_CSV_FILE, index=False)
    print(f"✅ 全カテゴリの投稿データを '{OUTPUT_CSV_FILE}' として保存しました。")
else:
    print("保存するデータがありません。")