# 投球タイプ分類器（Pitch Identification Model）

本プロジェクトは、pybaseballライブラリを用いてMLBのStatcastデータを取得し、投球の物理的な特徴量（球速、回転、変化量など）からその球種（pitch_type）を分類する機械学習モデルを構築します。


## 目次
0. ライブラリ・データ読み込み
1. データの概観・分析・前処理
2. ベースラインモデルの構築
3. 特徴量エンジニアリング
4. 様々なモデルの構築・調整
5. モデルのアンサンブリング
6. 予測の出力・評価


## 0. ライブラリ・データ読み込み

まず初めに使用するライブラリを読み込みます。


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# データ取得ライブラリ
from pybaseball import statcast

# 機械学習モデル
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score
import lightgbm as lgb
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier

# 警告非表示
import warnings
warnings.filterwarnings('ignore')


: 

次にpybaseball.statcast()を用いて、特定の期間の投球データを取得します。

注意：全期間を取得するとデータ量が膨大になるため、期間を絞るか、特定の投手でテストすることを推奨します。


In [None]:
# 例：2023年シーズンのデータを取得
# (注意：全期間を取得するとデータ量が膨大になるため、期間を絞るか、特定の投手でテストする)
print("Fetching Statcast data...")
data = statcast(start_dt='2023-04-01', end_dt='2023-10-01')
print("Data fetched.")
print(f"Data shape: {data.shape}")


## 1. データの概観・分析・前処理

### 1.1 データの概観

データのサイズを確認します。


In [None]:
print(f'データ数は{data.shape[0]}、変数は{data.shape[1]}種類です。')


In [None]:
# データの最初の数行を確認
data.head(10)


In [None]:
# 変数名の一覧を確認
data.columns.tolist()


### 1.2 データの分析（EDA）

探索的データ分析（EDA）を行います。まず、必要な特徴量とターゲットが存在することを確認します。


In [None]:
# 必要な特徴量の確認
required_features = ['release_speed', 'release_spin_rate', 'spin_axis', 
                     'pfx_x', 'pfx_z', 'release_pos_x', 'release_pos_z', 'pitch_type']

# 存在する特徴量を確認
available_features = [feat for feat in required_features if feat in data.columns]
missing_features = [feat for feat in required_features if feat not in data.columns]

print("利用可能な特徴量:", available_features)
print("欠損している特徴量:", missing_features)

# 利用可能な特徴量のみを使用
if len(missing_features) > 0:
    print(f"\n警告: 以下の特徴量がデータに存在しません: {missing_features}")
    print("利用可能な特徴量のみを使用します。")


In [None]:
# 欠損値の確認（利用可能な特徴量のみ）
print("欠損値の確認:")
if len(available_features) > 0:
    print(data[available_features].isnull().sum())
else:
    print("利用可能な特徴量がありません。")


In [None]:
# ターゲット（pitch_type）の分析
if 'pitch_type' in data.columns:
    print("球種の内訳:")
    print(data['pitch_type'].value_counts())
    print("\n球種の割合:")
    print(data['pitch_type'].value_counts(normalize=True))
else:
    print("エラー: 'pitch_type'列がデータに存在しません。")


In [None]:
# 球種ごとの分布を可視化（バイオリン図）
# 利用可能な特徴量のみを使用
plot_features = []
if 'release_speed' in data.columns:
    plot_features.append(('release_speed', 'Release Speed'))
if 'pfx_x' in data.columns:
    plot_features.append(('pfx_x', 'Horizontal Movement (pfx_x)'))
if 'pfx_z' in data.columns:
    plot_features.append(('pfx_z', 'Vertical Movement (pfx_z)'))
if 'release_spin_rate' in data.columns:
    plot_features.append(('release_spin_rate', 'Spin Rate'))

if len(plot_features) > 0 and 'pitch_type' in data.columns:
    # データのコピーを作成してインデックスをリセット（重複インデックスの問題を回避）
    plot_data = data[['pitch_type'] + [feat for feat, _ in plot_features]].copy().reset_index(drop=True)
    
    n_plots = min(len(plot_features), 4)
    n_cols = 2
    n_rows = (n_plots + 1) // 2
    f, ax = plt.subplots(n_rows, n_cols, figsize=(18, 6*n_rows), facecolor='gray')
    if n_rows == 1:
        ax = ax.reshape(1, -1)
    
    for idx, (feat, title) in enumerate(plot_features[:4]):
        row = idx // n_cols
        col = idx % n_cols
        sns.violinplot(x="pitch_type", y=feat, data=plot_data, ax=ax[row, col])
        ax[row, col].set_title(f'Pitch Type vs {title}')
        ax[row, col].tick_params(axis='x', rotation=45)
    
    # 余ったサブプロットを非表示
    for idx in range(n_plots, n_rows * n_cols):
        row = idx // n_cols
        col = idx % n_cols
        ax[row, col].axis('off')
    
    plt.tight_layout()
    plt.show()
else:
    print("可視化に必要な特徴量が不足しています。")


In [None]:
# 相関行列のヒートマップ
numeric_features = ['release_speed', 'release_spin_rate', 'spin_axis', 
                    'pfx_x', 'pfx_z', 'release_pos_x', 'release_pos_z']
# 存在する特徴量のみを使用
numeric_features_available = [feat for feat in numeric_features if feat in data.columns]

if len(numeric_features_available) > 0:
    df_numeric = data[numeric_features_available].select_dtypes(include=['number'])
    if len(df_numeric.columns) > 0:
        sns.heatmap(df_numeric.corr(), annot=True, cmap='bwr', linewidths=0.2, fmt='.2f')
        fig = plt.gcf()
        fig.set_size_inches(10, 8)
        plt.title('Correlation Matrix of Numeric Features')
        plt.show()
    else:
        print("数値型の特徴量がありません。")
else:
    print("相関行列を作成するための特徴量が不足しています。")


### 1.3 データの前処理

機械学習モデルが学習できるようにデータの前処理を行います。


In [None]:
# データのコピーを作成
df = data.copy()

# 必要な特徴量を選択（存在する特徴量のみ）
feature_cols = ['release_speed', 'release_spin_rate', 'spin_axis', 
                'pfx_x', 'pfx_z', 'release_pos_x', 'release_pos_z']
feature_cols = [feat for feat in feature_cols if feat in df.columns]

if len(feature_cols) == 0:
    raise ValueError("必要な特徴量がデータに存在しません。")

print(f"使用する特徴量: {feature_cols}")

# 欠損値の処理：欠損が多い行を削除
print("前処理前のデータ数:", len(df))
if 'pitch_type' in df.columns:
    df = df.dropna(subset=feature_cols + ['pitch_type'])
else:
    raise ValueError("'pitch_type'列がデータに存在しません。")
print("前処理後のデータ数:", len(df))

# 欠損値の確認
print("\n欠損値の確認:")
print(df[feature_cols + ['pitch_type']].isnull().sum())


In [None]:
# 投球数が少ない球種を除外または「Others」にまとめる
pitch_counts = df['pitch_type'].value_counts()
print("各球種の投球数:")
print(pitch_counts)

# 投球数が100未満の球種を「Others」にまとめる（閾値は調整可能）
min_pitches = 100
rare_pitches = pitch_counts[pitch_counts < min_pitches].index.tolist()
print(f"\n投球数が{min_pitches}未満の球種（Othersにまとめる）: {rare_pitches}")

if len(rare_pitches) > 0:
    df['pitch_type'] = df['pitch_type'].replace(rare_pitches, 'Others')
    print("\n処理後の球種の内訳:")
    print(df['pitch_type'].value_counts())


In [None]:
# 特徴量（X）とターゲット（Y）を抽出
X = df[feature_cols].values
y = df['pitch_type'].values

# ターゲットのエンコーディング（LabelEncoder）
le = LabelEncoder()
y_encoded = le.fit_transform(y)

print("特徴量の形状:", X.shape)
print("ターゲットの形状:", y_encoded.shape)
print("球種のクラス数:", len(le.classes_))
print("球種のクラス:", le.classes_)


In [None]:
# データの分割（訓練データと検証データ）
X_train, X_valid, y_train, y_valid = train_test_split(X, y_encoded, test_size=0.3, random_state=42, stratify=y_encoded)

print(f"訓練データ数: {X_train.shape[0]}")
print(f"検証データ数: {X_valid.shape[0]}")


## 2. ベースラインモデルの構築

まずは単純なモデルで、現状のデータと前処理でどの程度の精度が出るかを確認します。


In [None]:
# ベースラインモデル：全特徴量を使用
rfc_baseline = RandomForestClassifier(max_depth=10, min_samples_leaf=1, n_estimators=100, n_jobs=-1, random_state=42)
rfc_baseline.fit(X_train, y_train)

print('ベースラインモデル（全特徴量）')
print('Train Score: {}'.format(round(rfc_baseline.score(X_train, y_train), 3)))
print('Valid Score: {}'.format(round(rfc_baseline.score(X_valid, y_valid), 3)))


## 3. 特徴量エンジニアリング

ドメイン知識（野球の知識）に基づき、よりモデルの精度向上に寄与する特徴量を作成します。


In [None]:
# 特徴量エンジニアリング用のデータフレームを作成
df_fe = df.copy()

# 1. 回転軸の変換（spin_axisは循環的な特徴量なので、sinとcosに変換）
if 'spin_axis' in df_fe.columns:
    df_fe['spin_axis_sin'] = np.sin(np.radians(df_fe['spin_axis']))
    df_fe['spin_axis_cos'] = np.cos(np.radians(df_fe['spin_axis']))
    print("spin_axis_sin と spin_axis_cos を作成しました。")
else:
    print("警告: 'spin_axis'が存在しないため、spin_axis_sin と spin_axis_cos をスキップします。")

# 2. 変化量の合成（水平変化量と垂直変化量から総変化量を計算）
if 'pfx_x' in df_fe.columns and 'pfx_z' in df_fe.columns:
    df_fe['total_movement'] = np.sqrt(df_fe['pfx_x']**2 + df_fe['pfx_z']**2)
    print("total_movement を作成しました。")
else:
    print("警告: 'pfx_x' または 'pfx_z' が存在しないため、total_movement をスキップします。")

# 3. スピン効率（推定）：球速に対する回転数の比率
if 'release_spin_rate' in df_fe.columns and 'release_speed' in df_fe.columns:
    # ゼロ除算を防ぐため、release_speedが0の場合はNaNにする
    df_fe['spin_per_mph'] = df_fe['release_spin_rate'] / df_fe['release_speed'].replace(0, np.nan)
    print("spin_per_mph を作成しました。")
else:
    print("警告: 'release_spin_rate' または 'release_speed' が存在しないため、spin_per_mph をスキップします。")

# 新しい特徴量を確認
new_feature_list = []
if 'spin_axis_sin' in df_fe.columns:
    new_feature_list.append('spin_axis_sin')
if 'spin_axis_cos' in df_fe.columns:
    new_feature_list.append('spin_axis_cos')
if 'total_movement' in df_fe.columns:
    new_feature_list.append('total_movement')
if 'spin_per_mph' in df_fe.columns:
    new_feature_list.append('spin_per_mph')

if len(new_feature_list) > 0:
    print("\n新しく作成した特徴量:")
    print(df_fe[new_feature_list].head())
    print("\n欠損値の確認:")
    print(df_fe[new_feature_list].isnull().sum())
else:
    print("\n新しく作成された特徴量はありません。")


In [None]:
# 特徴量エンジニアリング後の特徴量リスト
new_features = ['spin_axis_sin', 'spin_axis_cos', 'total_movement', 'spin_per_mph']
feature_cols_fe = feature_cols + [feat for feat in new_features if feat in df_fe.columns]

# 欠損値の処理
df_fe = df_fe.dropna(subset=feature_cols_fe + ['pitch_type'])

# 特徴量とターゲットを抽出
X_fe = df_fe[feature_cols_fe].values
y_fe = df_fe['pitch_type'].values

# ターゲットのエンコーディング（新しいLabelEncoderを作成してfit_transformを使用）
# 特徴量エンジニアリング後のデータでも同じ球種が含まれているはずですが、
# 念のため新しいLabelEncoderを使用します
le_fe = LabelEncoder()
y_fe_encoded = le_fe.fit_transform(y_fe)

# データの分割
# stratifyを使用するため、インデックスではなく配列を直接分割
X_fe_train, X_fe_valid, y_fe_train, y_fe_valid = train_test_split(
    X_fe, y_fe_encoded, test_size=0.3, random_state=42, stratify=y_fe_encoded
)

print(f"特徴量エンジニアリング後の特徴量数: {len(feature_cols_fe)}")
print(f"特徴量: {feature_cols_fe}")
print(f"訓練データ数: {X_fe_train.shape[0]}")
print(f"検証データ数: {X_fe_valid.shape[0]}")
print(f"球種のクラス数: {len(le_fe.classes_)}")
print(f"球種のクラス: {le_fe.classes_}")


In [None]:
# 特徴量エンジニアリング後のベースラインモデル
rfc_fe = RandomForestClassifier(max_depth=7, min_samples_leaf=1, n_estimators=100, n_jobs=-1, random_state=42)
rfc_fe.fit(X_fe_train, y_fe_train)

print('特徴量エンジニアリング後のベースラインモデル')
print('Train Score: {}'.format(round(rfc_fe.score(X_fe_train, y_fe_train), 3)))
print('Valid Score: {}'.format(round(rfc_fe.score(X_fe_valid, y_fe_valid), 3)))


## 4. 様々なモデルの構築・調整

ベースラインモデルや特徴量エンジニアリングの結果を踏まえ、より強力なモデルを構築・調整します。


In [None]:
# XGBoostモデル
xgb_model = xgb.XGBClassifier(
    n_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    random_state=42,
    n_jobs=-1
)
xgb_model.fit(X_fe_train, y_fe_train)

print('XGBoostモデル')
print('Train Score: {}'.format(round(xgb_model.score(X_fe_train, y_fe_train), 3)))
print('Valid Score: {}'.format(round(xgb_model.score(X_fe_valid, y_fe_valid), 3)))
print('Valid F1 Score (weighted): {}'.format(round(f1_score(y_fe_valid, xgb_model.predict(X_fe_valid), average='weighted'), 3)))


In [None]:
# LightGBMモデル
lgb_model = lgb.LGBMClassifier(
    n_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    random_state=42,
    n_jobs=-1,
    verbose=-1
)
lgb_model.fit(X_fe_train, y_fe_train)

print('LightGBMモデル')
print('Train Score: {}'.format(round(lgb_model.score(X_fe_train, y_fe_train), 3)))
print('Valid Score: {}'.format(round(lgb_model.score(X_fe_valid, y_fe_valid), 3)))
print('Valid F1 Score (weighted): {}'.format(round(f1_score(y_fe_valid, lgb_model.predict(X_fe_valid), average='weighted'), 3)))


In [None]:
# ハイパーパラメータチューニング（GridSearchCV）- LightGBMの例
# 注意：計算時間がかかるため、パラメータの範囲を狭めています
param_grid_lgb = {
    'max_depth': [5, 7],
    'learning_rate': [0.05, 0.1],
    'n_estimators': [100, 200]
}

print("GridSearchCVを実行中...（時間がかかる場合があります）")
lgb_gs = GridSearchCV(
    lgb.LGBMClassifier(random_state=42, n_jobs=-1, verbose=-1),
    param_grid_lgb,
    cv=3,  # クロスバリデーションの分割数（計算時間を考慮して3に設定）
    scoring='accuracy',
    n_jobs=-1
)
lgb_gs.fit(X_fe_train, y_fe_train)

print('Best Parameters: {}'.format(lgb_gs.best_params_))
print('CV Score: {}'.format(round(lgb_gs.best_score_, 3)))
print('Valid Score: {}'.format(round(lgb_gs.score(X_fe_valid, y_fe_valid), 3)))


## 5. モデルのアンサンブリング

複数のモデルを組み合わせて、より頑健なモデルとします。


In [None]:
# 各モデルの予測確率を取得
rfc_pred_proba = rfc_fe.predict_proba(X_fe_valid)
xgb_pred_proba = xgb_model.predict_proba(X_fe_valid)
lgb_pred_proba = lgb_model.predict_proba(X_fe_valid)

# 予測確率の平均（ブレンディング）
ensemble_pred_proba = (rfc_pred_proba + xgb_pred_proba + lgb_pred_proba) / 3
ensemble_pred = ensemble_pred_proba.argmax(axis=1)

# アンサンブルモデルの評価
ensemble_accuracy = accuracy_score(y_fe_valid, ensemble_pred)
ensemble_f1 = f1_score(y_fe_valid, ensemble_pred, average='weighted')

print('アンサンブルモデル（RandomForest + XGBoost + LightGBM）')
print('Valid Accuracy: {}'.format(round(ensemble_accuracy, 3)))
print('Valid F1 Score (weighted): {}'.format(round(ensemble_f1, 3)))


## 6. 予測の出力・評価

最終的に構築したモデルを使い、検証データに対して予測を行い、詳細な評価を行います。


In [None]:
# 最良モデル（アンサンブルモデル）の予測
best_pred = ensemble_pred

# Classification Report（適合率、再現率、F1スコア）
# 特徴量エンジニアリング後のLabelEncoderを使用
print("Classification Report:")
print(classification_report(y_fe_valid, best_pred, target_names=le_fe.classes_))


In [None]:
# 混同行列（Confusion Matrix）
cm = confusion_matrix(y_fe_valid, best_pred)
plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=le_fe.classes_, yticklabels=le_fe.classes_)
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.tight_layout()
plt.show()


In [None]:
# Feature Importance（特徴量重要度）の可視化 - LightGBM
lgb.plot_importance(lgb_model, figsize=(10, 8), max_num_features=15)
plt.title('LightGBM Feature Importance')
plt.tight_layout()
plt.show()


In [None]:
# Feature Importance（特徴量重要度）の可視化 - XGBoost
plt.figure(figsize=(10, 8))
xgb.plot_importance(xgb_model, max_num_features=15)
plt.title('XGBoost Feature Importance')
plt.tight_layout()
plt.show()


In [None]:
# RandomForestの特徴量重要度
feature_importance = pd.DataFrame({
    'feature': feature_cols_fe,
    'importance': rfc_fe.feature_importances_
}).sort_values('importance', ascending=False)

plt.figure(figsize=(10, 8))
sns.barplot(data=feature_importance, x='importance', y='feature')
plt.title('RandomForest Feature Importance')
plt.xlabel('Importance')
plt.tight_layout()
plt.show()
