# 🤖 カテゴリ別代表商品のモデル比較分析

各カテゴリから1商品を選び、どのモデルが最適な予測精度を出すか比較します。

In [None]:
# ライブラリインポート
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from pycaret.regression import *
import warnings
warnings.filterwarnings('ignore')

# 日本語フォント設定
import matplotlib.font_manager as fm
JP_FONT_PATH = "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"
if Path(JP_FONT_PATH).exists():
    JP_FP = fm.FontProperties(fname=JP_FONT_PATH)
    plt.rcParams['font.family'] = JP_FP.get_name()
else:
    print("⚠️ 日本語フォントが見つかりません")

print("=" * 80)
print("📊 カテゴリ別代表商品のモデル比較分析")
print("=" * 80)

In [None]:
# データ読み込み
input_file = Path("output/06_final_enriched_20250701_20250930.csv")
print(f"📁 データ読み込み中: {input_file}")
df = pd.read_csv(input_file)
print(f"✅ データ形状: {df.shape}")
print(f"列数: {len(df.columns)}")

In [None]:
# カテゴリ・商品・売上列の自動検出
category_candidates = [col for col in df.columns if any(x in col for x in ['カテゴリ', 'category', '分類', 'フェイスくくり'])]
product_candidates = [col for col in df.columns if any(x in col for x in ['商品', 'product', '品名', '商品名', '商品コード'])]
sales_candidates = [col for col in df.columns if any(x in col for x in ['売上', '金額', 'sales', 'amt'])]
qty_candidates = [col for col in df.columns if any(x in col for x in ['数量', 'qty', 'quantity', '個数'])]

# カテゴリ列の優先順位: 大分類 > 中分類 > 小分類 > カテゴリ
if 'フェイスくくり大分類' in df.columns:
    category_col = 'フェイスくくり大分類'
elif 'フェイスくくり中分類' in df.columns:
    category_col = 'フェイスくくり中分類'
elif 'フェイスくくり小分類' in df.columns:
    category_col = 'フェイスくくり小分類'
elif 'カテゴリ' in df.columns:
    category_col = 'カテゴリ'
else:
    category_col = category_candidates[0] if category_candidates else None

# 商品列の検出
product_col = '商品名' if '商品名' in df.columns else ('商品' if '商品' in df.columns else (product_candidates[0] if product_candidates else None))

# 売上列の検出（優先順位: 売上金額 > 売上数量 > その他）
if '売上金額' in df.columns:
    sales_col = '売上金額'
elif sales_candidates:
    sales_col = sales_candidates[0]
else:
    sales_col = None

# 数量列の検出
if '売上数量' in df.columns:
    qty_col = '売上数量'
elif qty_candidates:
    qty_col = qty_candidates[0]
else:
    qty_col = None

# 目的変数の選択（数量があれば数量、なければ売上金額）
target_col = qty_col if qty_col else sales_col

print(f"✅ カテゴリ列: {category_col}")
print(f"✅ 商品列: {product_col}")
print(f"✅ 売上列: {sales_col}")
print(f"✅ 数量列: {qty_col}")
print(f"\n🎯 目的変数: {target_col}")
print(f"\nユニークカテゴリ数: {df[category_col].nunique() if category_col else 0}")

In [None]:
# カテゴリ別の代表商品選定
print("📦 各カテゴリから代表商品を選定中...")

# 集約ディクショナリの構築
agg_dict = {target_col: ['sum', 'mean', 'count']}
if sales_col and sales_col != target_col:
    agg_dict[sales_col] = 'sum'

# グループ化と集約
category_product_summary = df.groupby([category_col, product_col]).agg(agg_dict).reset_index()

# MultiIndex列のフラット化
category_product_summary.columns = ['_'.join(col).strip('_') if isinstance(col, tuple) else col
                                    for col in category_product_summary.columns]

# 列名の日本語化
rename_dict = {
    f'{target_col}_sum': '総数量',
    f'{target_col}_mean': '平均数量',
    f'{target_col}_count': 'データ数'
}

if sales_col and sales_col != target_col:
    rename_dict[f'{sales_col}_sum'] = '総売上'
    category_product_summary = category_product_summary.rename(columns=rename_dict)
else:
    # 売上列がない場合は数量の合計を総売上として使用
    category_product_summary['総売上'] = category_product_summary[f'{target_col}_sum']
    category_product_summary = category_product_summary.rename(columns=rename_dict)

representative_products = []

for category in df[category_col].unique():
    if pd.isna(category):
        continue
    
    cat_products = category_product_summary[
        category_product_summary[category_col] == category
    ].copy()
    
    # データ数が30以上ある商品のみ対象
    cat_products = cat_products[cat_products['データ数'] >= 30]
    
    if len(cat_products) == 0:
        continue
    
    # 総売上が最大の商品を選定
    best_product = cat_products.nlargest(1, '総売上').iloc[0]
    
    representative_products.append({
        'カテゴリ': category,
        '商品': best_product[product_col],
        '総売上': best_product['総売上'],
        '総数量': best_product['総数量'],
        '平均数量': best_product['平均数量'],
        'データ数': int(best_product['データ数'])
    })

rep_df = pd.DataFrame(representative_products)
print(f"\n✅ 代表商品選定完了: {len(rep_df)}カテゴリ")
rep_df[['カテゴリ', '商品', '総売上', 'データ数']]

In [None]:
# 特徴量の準備
exclude_cols = [
    target_col, '日付', 'date', category_col, product_col,
    '店舗', '店舗名', '商品コード', '天気'
]
exclude_cols_actual = [col for col in exclude_cols if col in df.columns]

print(f"除外列: {exclude_cols_actual}")

In [None]:
# 各商品でPyCaretモデル比較
print("\n🤖 各商品でPyCaretモデル比較を実行中...")

model_results = []

for idx, row in rep_df.iterrows():
    category = row['カテゴリ']
    product = row['商品']
    
    print(f"\n{'=' * 60}")
    print(f"[{idx+1}/{len(rep_df)}] カテゴリ: {category} | 商品: {product}")
    print(f"{'=' * 60}")
    
    # 商品データ抽出
    product_data = df[
        (df[category_col] == category) &
        (df[product_col] == product)
    ].copy()
    
    print(f"データ数: {len(product_data)}行")
    
    if len(product_data) < 30:
        print(f"⚠️ データ数不足（{len(product_data)}行）スキップ")
        continue
    
    # 特徴量選択
    feature_cols = [col for col in product_data.columns
                   if col not in exclude_cols_actual
                   and product_data[col].dtype in ['int64', 'float64']]
    
    # 欠損値が多い列を除外
    feature_cols = [col for col in feature_cols
                   if product_data[col].isna().sum() / len(product_data) < 0.3]
    
    print(f"特徴量数: {len(feature_cols)}")
    
    # 分析用データフレーム作成
    analysis_data = product_data[feature_cols + [target_col]].copy()
    analysis_data = analysis_data.fillna(analysis_data.median())
    
    try:
        # PyCaretセットアップ（GPU無効化でLightGBMのCUDAエラー回避）
        print("\n🔧 PyCaretセットアップ中...")
        exp = setup(
            data=analysis_data,
            target=target_col,
            session_id=123,
            verbose=False,
            html=False,
            use_gpu=False,  # GPU無効化（LightGBMのCUDA Tree Learnerが無効なため）
            n_jobs=-1,
            log_experiment=False,
            system_log=False
        )
        
        # モデル比較
        print("⚙️ モデル比較実行中...")
        best_models = compare_models(
            n_select=5,
            sort='RMSE',
            verbose=False,
            errors='ignore'
        )
        
        # 結果取得
        results_df_temp = pull()
        
        # TOP5モデルの結果保存
        for i in range(min(5, len(results_df_temp))):
            model_name = results_df_temp.index[i] if hasattr(results_df_temp.index[i], 'strip') else str(results_df_temp.index[i])
            mae = results_df_temp.iloc[i]['MAE']
            rmse = results_df_temp.iloc[i]['RMSE']
            r2 = results_df_temp.iloc[i]['R2']
            
            model_results.append({
                'カテゴリ': category,
                '商品': product,
                'データ数': len(product_data),
                'ランク': i + 1,
                'モデル': model_name,
                'MAE': mae,
                'RMSE': rmse,
                'R2': r2
            })
        
        print(f"✅ 完了: TOP5モデル取得")
        
    except Exception as e:
        print(f"❌ エラー: {str(e)[:100]}")
        continue

results_final = pd.DataFrame(model_results)
print(f"\n✅ 分析完了: {len(results_final)}件の結果")

# 結果が空の場合の処理
if len(results_final) == 0:
    print("\n⚠️ モデル比較結果が得られませんでした。")
    print("   - データ数が不足している可能性があります（各商品30行以上必要）")
    print("   - 特徴量の数が不足している可能性があります")
    print("   - PyCaretのバージョンを確認してください（3.0以上推奨）")

In [None]:
# 各商品のベストモデル抽出
if len(results_final) > 0 and 'ランク' in results_final.columns:
    best_models_per_product = results_final[results_final['ランク'] == 1].copy()

    print("\n🏆 各商品のベストモデル:")
    display(best_models_per_product[['カテゴリ', '商品', 'モデル', 'RMSE', 'R2']])
else:
    print("\n⚠️ モデル比較結果が空のため、ベストモデル抽出をスキップします")
    best_models_per_product = pd.DataFrame()

In [None]:
# モデル別の採用数集計
if len(best_models_per_product) > 0 and 'モデル' in best_models_per_product.columns:
    model_counts = best_models_per_product['モデル'].value_counts()
    print("\n📈 ベストモデル採用回数:")
    display(model_counts)
else:
    print("\n⚠️ ベストモデルデータが空のため、採用数集計をスキップします")
    model_counts = pd.Series(dtype=int)

In [None]:
# 可視化1: モデル採用回数の円グラフ & RMSE比較
if len(best_models_per_product) > 0 and len(model_counts) > 0:
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))

    # 円グラフ
    axes[0].pie(model_counts.values, labels=model_counts.index, autopct='%1.1f%%',
                textprops={'fontproperties': JP_FP})
    axes[0].set_title('ベストモデル採用率', fontproperties=JP_FP, fontsize=14)

    # RMSE比較
    best_models_sorted = best_models_per_product.sort_values('RMSE')
    x = range(len(best_models_sorted))

    axes[1].barh(x, best_models_sorted['RMSE'], color='steelblue', alpha=0.7)
    axes[1].set_yticks(x)
    axes[1].set_yticklabels([f"{row['カテゴリ'][:10]}\n({row['モデル'][:10]})"
                             for _, row in best_models_sorted.iterrows()],
                            fontproperties=JP_FP, fontsize=8)
    axes[1].set_xlabel('RMSE', fontproperties=JP_FP)
    axes[1].set_title('カテゴリ別 ベストモデルのRMSE', fontproperties=JP_FP, fontsize=14)
    axes[1].grid(axis='x', alpha=0.3)

    plt.tight_layout()
    plt.savefig('output/category_product_best_models.png', dpi=300, bbox_inches='tight')
    plt.show()
else:
    print("\n⚠️ データが不足しているため、可視化をスキップします")

In [None]:
# 可視化2: モデル×カテゴリ別 RMSEヒートマップ
if len(results_final) > 0 and 'モデル' in results_final.columns and 'カテゴリ' in results_final.columns:
    pivot_rmse = results_final.pivot_table(
        index='モデル',
        columns='カテゴリ',
        values='RMSE',
        aggfunc='mean'
    )

    if not pivot_rmse.empty:
        fig, ax = plt.subplots(figsize=(14, 8))
        sns.heatmap(pivot_rmse, annot=True, fmt='.1f', cmap='RdYlGn_r',
                    cbar_kws={'label': 'RMSE'}, ax=ax)
        ax.set_title('モデル×カテゴリ別 RMSE比較', fontproperties=JP_FP, fontsize=14)
        ax.set_xlabel('カテゴリ', fontproperties=JP_FP)
        ax.set_ylabel('モデル', fontproperties=JP_FP)

        plt.tight_layout()
        plt.savefig('output/model_category_rmse_heatmap.png', dpi=300, bbox_inches='tight')
        plt.show()
    else:
        print("\n⚠️ ピボットテーブルが空のため、ヒートマップをスキップします")
else:
    print("\n⚠️ データが不足しているため、ヒートマップをスキップします")

In [None]:
# 結果保存
if len(results_final) > 0:
    results_final.to_csv('output/category_product_model_comparison.csv', index=False, encoding='utf-8-sig')
    print("✅ 全モデル比較結果をCSV保存しました: output/category_product_model_comparison.csv")
else:
    print("⚠️ 結果が空のため、CSV保存をスキップします")

if len(best_models_per_product) > 0:
    best_models_per_product.to_csv('output/category_best_models.csv', index=False, encoding='utf-8-sig')
    print("✅ ベストモデルをCSV保存しました: output/category_best_models.csv")
else:
    print("⚠️ ベストモデルが空のため、CSV保存をスキップします")

In [None]:
# サマリーレポート
print("\n" + "=" * 80)
print("📊 分析サマリー")
print("=" * 80)

print(f"\n総カテゴリ数: {len(rep_df)}")
print(f"分析完了商品数: {len(best_models_per_product)}")

if len(results_final) > 0 and 'モデル' in results_final.columns:
    print(f"使用モデル数: {results_final['モデル'].nunique()}")
else:
    print("使用モデル数: 0")

if len(model_counts) > 0:
    print("\n🏆 最も採用されたモデル:")
    top_model = model_counts.index[0]
    top_count = model_counts.values[0]
    print(f"  モデル: {top_model}")
    print(f"  採用数: {top_count}商品 ({top_count/len(best_models_per_product)*100:.1f}%)")
else:
    print("\n⚠️ モデル採用データなし")

if len(best_models_per_product) > 0:
    print("\n📉 最小RMSE:")
    best_rmse = best_models_per_product.nsmallest(1, 'RMSE').iloc[0]
    print(f"  カテゴリ: {best_rmse['カテゴリ']}")
    print(f"  商品: {best_rmse['商品']}")
    print(f"  モデル: {best_rmse['モデル']}")
    print(f"  RMSE: {best_rmse['RMSE']:.2f}")
    print(f"  R2: {best_rmse['R2']:.3f}")

    print("\n💡 推奨アクション:")
    if len(model_counts) > 0:
        print(f"1. {model_counts.index[0]} は汎用性が高く、多くのカテゴリで最適")
    print("2. カテゴリごとに最適モデルを選択することで予測精度向上")
    print("3. RMSE上位カテゴリは特徴量追加やデータ蓄積が必要")
else:
    print("\n⚠️ 分析結果が得られませんでした")
    print("   - PyCaretのsetup()パラメータを確認してください")
    print("   - データ数を確認してください（各商品30行以上必要）")
    print("   - GPU設定を確認してください（use_gpu=False で試してください）")