# CMI-BFRB IMU推論ノートブック v10

IMU専用モデルによる推論（周波数特徴量強化版）。

**Version**: v10.0.0 

**モデル**: IMU Only Model with Frequency Features

**改良点**:
- 周波数領域特徴量（FFT、パワースペクトラム）
- スペクトログラム特徴量
- 変化点検出
- より洗練された後処理

**目標**: LB 0.70+

In [1]:
import os
import sys
import pandas as pd
import numpy as np
import polars as pl
from pathlib import Path
import pickle
import warnings
from scipy import signal
from scipy.stats import kurtosis, skew
warnings.filterwarnings('ignore')

# Kaggle環境でのパス設定
if os.path.exists('/kaggle/input'):
    DATA_DIR = Path('/kaggle/input/cmi-detect-behavior-with-sensor-data')
    UTILS_DIR = Path('/kaggle/input/cmi-bfrb-utils-v131')
    MODEL_DIR = Path('/kaggle/input/cmi-bfrb-imu-models-v10')
    OUTPUT_DIR = Path('/kaggle/working')
else:
    # ローカル環境（テスト用）
    DATA_DIR = Path('../../experiments/data')
    UTILS_DIR = Path('../../baseline/src')
    MODEL_DIR = Path('../../models/imu_only')
    OUTPUT_DIR = Path('../../submissions')

# utilsディレクトリをパスに追加
sys.path.insert(0, str(UTILS_DIR))

print("環境設定完了")
print(f"MODEL_DIR: {MODEL_DIR}")

環境設定完了
MODEL_DIR: /kaggle/input/cmi-bfrb-imu-models-v10


In [2]:
# utilsのインポート
from utils import (
    load_data,
    create_features,
    prepare_submission,
    ALL_GESTURES
)
from imu_features import create_imu_features

print("モジュール読み込み完了")

モジュール読み込み完了


## 周波数特徴量関数の定義

In [3]:
def extract_frequency_features(data, sampling_rate=20):
    """
    周波数領域の特徴量を抽出
    
    Args:
        data: 時系列データ (1次元配列)
        sampling_rate: サンプリングレート (Hz)
    
    Returns:
        特徴量の辞書
    """
    features = {}
    
    # NaN処理
    data = np.nan_to_num(data, nan=0.0)
    
    if len(data) < 10:
        # データが短すぎる場合はダミー値を返す
        return {
            'fft_max_freq': 0, 'fft_max_power': 0,
            'fft_mean_power': 0, 'fft_std_power': 0,
            'spectral_centroid': 0, 'spectral_rolloff': 0,
            'spectral_entropy': 0
        }
    
    # FFT計算
    fft_vals = np.fft.rfft(data)
    fft_power = np.abs(fft_vals) ** 2
    freqs = np.fft.rfftfreq(len(data), 1/sampling_rate)
    
    # 最大パワーの周波数
    max_idx = np.argmax(fft_power)
    features['fft_max_freq'] = freqs[max_idx]
    features['fft_max_power'] = fft_power[max_idx]
    
    # パワースペクトラムの統計量
    features['fft_mean_power'] = np.mean(fft_power)
    features['fft_std_power'] = np.std(fft_power)
    
    # スペクトラル重心
    features['spectral_centroid'] = np.sum(freqs * fft_power) / np.sum(fft_power)
    
    # スペクトラルロールオフ（85%のエネルギーが含まれる周波数）
    cumsum_power = np.cumsum(fft_power)
    rolloff_idx = np.searchsorted(cumsum_power, 0.85 * cumsum_power[-1])
    features['spectral_rolloff'] = freqs[min(rolloff_idx, len(freqs)-1)]
    
    # スペクトラルエントロピー
    normalized_power = fft_power / np.sum(fft_power)
    spectral_entropy = -np.sum(normalized_power * np.log2(normalized_power + 1e-10))
    features['spectral_entropy'] = spectral_entropy
    
    return features


def extract_change_point_features(data, penalty=1):
    """
    変化点検出による特徴量抽出
    
    Args:
        data: 時系列データ
        penalty: 変化点検出のペナルティパラメータ
    
    Returns:
        特徴量の辞書
    """
    features = {}
    
    # NaN処理
    data = np.nan_to_num(data, nan=0.0)
    
    if len(data) < 10:
        return {'n_change_points': 0, 'mean_segment_length': len(data)}
    
    # 簡易的な変化点検出（差分の閾値ベース）
    diff = np.abs(np.diff(data))
    threshold = np.mean(diff) + 2 * np.std(diff)
    change_points = np.where(diff > threshold)[0]
    
    features['n_change_points'] = len(change_points)
    
    # セグメント長の統計
    if len(change_points) > 0:
        segments = np.diff(np.concatenate([[0], change_points, [len(data)]]))
        features['mean_segment_length'] = np.mean(segments)
    else:
        features['mean_segment_length'] = len(data)
    
    return features


def create_enhanced_imu_features(df, is_train=True):
    """
    強化されたIMU特徴量の作成（周波数特徴量含む）
    """
    # 基本的なIMU特徴量
    features = create_imu_features(df, is_train)
    
    # IMUカラム
    imu_cols = ['acc_x', 'acc_y', 'acc_z', 'rot_w', 'rot_x', 'rot_y', 'rot_z']
    
    # 各シーケンスごとに周波数特徴量を追加
    freq_features = []
    change_features = []
    
    for seq_id in df['sequence_id'].unique():
        seq_data = df[df['sequence_id'] == seq_id]
        seq_freq_feat = {}
        seq_change_feat = {}
        
        for col in imu_cols:
            if col in seq_data.columns:
                # 周波数特徴量
                freq_feat = extract_frequency_features(seq_data[col].values)
                for feat_name, feat_val in freq_feat.items():
                    seq_freq_feat[f'{col}_{feat_name}'] = feat_val
                
                # 変化点特徴量
                change_feat = extract_change_point_features(seq_data[col].values)
                for feat_name, feat_val in change_feat.items():
                    seq_change_feat[f'{col}_{feat_name}'] = feat_val
        
        # 加速度の大きさに対する特徴量
        acc_magnitude = np.sqrt(
            seq_data['acc_x'].values**2 + 
            seq_data['acc_y'].values**2 + 
            seq_data['acc_z'].values**2
        )
        
        mag_freq_feat = extract_frequency_features(acc_magnitude)
        for feat_name, feat_val in mag_freq_feat.items():
            seq_freq_feat[f'acc_magnitude_{feat_name}'] = feat_val
        
        freq_features.append(seq_freq_feat)
        change_features.append(seq_change_feat)
    
    # DataFrameに変換して結合
    freq_df = pd.DataFrame(freq_features)
    change_df = pd.DataFrame(change_features)
    
    # 元の特徴量と結合
    features = pd.concat([features, freq_df, change_df], axis=1)
    
    # NaN処理
    features = features.fillna(0)
    
    return features

## モデルの読み込み

In [4]:
import lightgbm as lgb
from sklearn.preprocessing import LabelEncoder

# baselineモジュールのエラーを回避
import sys
class DummyModule:
    pass

sys.modules['baseline'] = DummyModule()
sys.modules['baseline.src'] = DummyModule()
sys.modules['baseline.src.utils'] = DummyModule()

# グローバル変数として保持
models = []
trained_feature_cols = []
label_encoder = None
training_results = {}

# モデルファイルの読み込み
try:
    model_path = MODEL_DIR / 'imu_only_model.pkl'
    results_path = MODEL_DIR / 'imu_training_results.pkl'
    
    if model_path.exists() and results_path.exists():
        with open(model_path, 'rb') as f:
            model_data = pickle.load(f)
        
        with open(results_path, 'rb') as f:
            training_results = pickle.load(f)
        
        # モデルとその他のデータを抽出
        if isinstance(model_data, dict):
            if 'models' in model_data:
                models = model_data['models']
                trained_feature_cols = model_data.get('feature_columns', [])
                label_encoder = model_data.get('label_encoder', None)
            else:
                models = [model_data]
        elif isinstance(model_data, list):
            models = model_data
        else:
            models = [model_data]
            
        print(f"モデル読み込み成功")
        print(f"モデル数: {len(models)}")
        print(f"Mean CV Score: {training_results.get('mean_cv_score', 'N/A')}")
        
except Exception as e:
    print(f"モデル読み込みエラー: {e}")
    
# ラベルエンコーダーが無い場合は作成
if label_encoder is None:
    print("\nラベルエンコーダーを作成中...")
    label_encoder = LabelEncoder()
    label_encoder.fit(ALL_GESTURES)
    print(f"クラス数: {len(label_encoder.classes_)}")

モデル読み込みエラー: Can't get attribute 'CMIBaseline' on <__main__.DummyModule object at 0x7a2a16563510>

ラベルエンコーダーを作成中...
クラス数: 18


## 後処理関数の定義（改良版）

In [5]:
# 後処理用の定数
CONFIDENCE_THRESHOLD = 0.40  # v10では少し下げる
BFRB_BOOST_FACTOR = 1.20    # より強いブースト
SMOOTH_WINDOW = 5           # より大きな窓サイズ

# BFRB行動の定義（正確なラベル名）
BFRB_BEHAVIORS = [
    'Above ear - pull hair',
    'Cheek - pinch skin', 
    'Eyebrow - pull hair',
    'Eyelash - pull hair',
    'Forehead - pull hairline',
    'Forehead - scratch',
    'Neck - pinch skin',
    'Neck - scratch'
]

# 混同しやすいジェスチャーのマッピング（改良版）
GESTURE_CONFUSION_MAP = {
    'Neck - scratch': ['Forehead - scratch', 'Scratch knee/leg skin'],
    'Above ear - pull hair': ['Eyebrow - pull hair', 'Eyelash - pull hair'],
    'Cheek - pinch skin': ['Neck - pinch skin', 'Pinch knee/leg skin'],
    'Text on phone': ['Write name on leg'],  # 似た手の動き
}


def apply_enhanced_post_processing(predictions, confidence_scores, frequency_features=None):
    """
    強化された後処理（周波数特徴量を考慮）
    """
    predictions_adjusted = predictions.copy()
    
    # 1. BFRB行動の確率をブースト
    for i, gesture in enumerate(label_encoder.classes_):
        if gesture in BFRB_BEHAVIORS:
            boost = BFRB_BOOST_FACTOR
            
            # 周波数特徴量がある場合、追加の調整
            if frequency_features is not None:
                # 高周波成分が多い場合、細かい動きを示すので追加ブースト
                if 'spectral_rolloff' in frequency_features and frequency_features['spectral_rolloff'] > 5:
                    boost *= 1.05
            
            predictions_adjusted[:, i] *= boost
    
    # 2. 低信頼度予測の調整
    for idx in range(len(predictions_adjusted)):
        max_prob = np.max(predictions_adjusted[idx])
        
        if max_prob < CONFIDENCE_THRESHOLD:
            # 上位5つの予測を考慮
            top_5_indices = np.argsort(predictions_adjusted[idx])[-5:]
            top_5_gestures = [label_encoder.classes_[i] for i in top_5_indices]
            
            # BFRB行動が上位に含まれる場合、その確率を強化
            bfrb_count = sum(1 for g in top_5_gestures if g in BFRB_BEHAVIORS)
            if bfrb_count >= 2:  # 複数のBFRB行動が候補にある
                for i, gesture in enumerate(top_5_gestures):
                    if gesture in BFRB_BEHAVIORS:
                        predictions_adjusted[idx, top_5_indices[i]] *= 1.15
    
    # 3. 混同しやすいジェスチャーの調整
    for idx in range(len(predictions_adjusted)):
        pred_idx = np.argmax(predictions_adjusted[idx])
        pred_gesture = label_encoder.classes_[pred_idx]
        
        if pred_gesture in GESTURE_CONFUSION_MAP:
            confused_gestures = GESTURE_CONFUSION_MAP[pred_gesture]
            for conf_gesture in confused_gestures:
                if conf_gesture in label_encoder.classes_:
                    conf_idx = list(label_encoder.classes_).index(conf_gesture)
                    prob_diff = predictions_adjusted[idx, pred_idx] - predictions_adjusted[idx, conf_idx]
                    
                    # 確率が近い場合の調整
                    if prob_diff < 0.15:
                        # BFRB行動を優先
                        if conf_gesture in BFRB_BEHAVIORS and pred_gesture not in BFRB_BEHAVIORS:
                            predictions_adjusted[idx, conf_idx] *= 1.1
                        elif pred_gesture in BFRB_BEHAVIORS and conf_gesture not in BFRB_BEHAVIORS:
                            predictions_adjusted[idx, pred_idx] *= 1.1
    
    # 正規化
    predictions_adjusted = predictions_adjusted / predictions_adjusted.sum(axis=1, keepdims=True)
    
    return predictions_adjusted

## 推論関数の定義（v10）

In [6]:
def predict(sequence: pl.DataFrame, demographics: pl.DataFrame) -> str:
    """
    CMI評価API用の推論関数（v10 - 周波数特徴量版）
    """
    try:
        # Polars DataFrameをPandasに変換
        test = sequence.to_pandas() if isinstance(sequence, pl.DataFrame) else sequence
        
        # IMU列の定義
        imu_cols = ['acc_x', 'acc_y', 'acc_z', 'rot_w', 'rot_x', 'rot_y', 'rot_z']
        
        # IMUセンサーのチェック
        has_imu = all(col in test.columns for col in imu_cols)
        
        if not has_imu:
            # IMUセンサーがない場合はデフォルト予測
            return 'Wave hello'
        
        # subjectを追加
        if demographics is not None and not demographics.is_empty():
            demo_pd = demographics.to_pandas() if isinstance(demographics, pl.DataFrame) else demographics
            test['subject'] = demo_pd['subject'].iloc[0] if 'subject' in demo_pd.columns else 'test_subject'
        else:
            test['subject'] = 'test_subject'
        
        # sequence_idを追加（必要に応じて）
        if 'sequence_id' not in test.columns:
            test['sequence_id'] = 'test_seq'
        
        # IMUデータの抽出
        meta_cols = ['sequence_id', 'subject']
        test_imu = test[meta_cols + imu_cols].copy()
        
        # 強化されたIMU特徴量の作成（周波数特徴量含む）
        test_features = create_enhanced_imu_features(test_imu, is_train=False)
        
        # 追加の特徴量計算
        # 1. 加速度の大きさ
        acc_magnitude = np.sqrt(
            test['acc_x']**2 + test['acc_y']**2 + test['acc_z']**2
        )
        test_features['acc_magnitude_mean'] = acc_magnitude.mean()
        test_features['acc_magnitude_std'] = acc_magnitude.std()
        test_features['acc_magnitude_max'] = acc_magnitude.max()
        
        # 2. ジャーク（加速度の変化率）
        for axis in ['x', 'y', 'z']:
            acc_col = f'acc_{axis}'
            jerk = test[acc_col].diff().abs()
            test_features[f'jerk_{axis}_mean'] = jerk.mean()
            test_features[f'jerk_{axis}_max'] = jerk.max()
        
        # 3. 回転エネルギー
        rot_energy = test['rot_w']**2 + test['rot_x']**2 + test['rot_y']**2 + test['rot_z']**2
        test_features['rot_energy_mean'] = rot_energy.mean()
        test_features['rot_energy_std'] = rot_energy.std()
        
        # 学習時の特徴量に合わせる
        # v10では新しい特徴量も含めるため、存在しない特徴量は0で埋める
        all_feature_cols = list(test_features.columns)
        for col in all_feature_cols:
            if col not in trained_feature_cols and col not in ['sequence_id', 'subject']:
                # 新しい特徴量は使用可能
                pass
        
        # 必要な特徴量だけを選択（学習時の特徴量 + 新しい特徴量）
        available_cols = [col for col in test_features.columns if col in trained_feature_cols or col.endswith(('_fft_max_freq', '_spectral_centroid', '_n_change_points'))]
        
        # 学習時の特徴量で存在しないものは0で埋める
        for col in trained_feature_cols:
            if col not in test_features.columns:
                test_features[col] = 0
        
        X_test = test_features[trained_feature_cols]
        
        # 各モデルで予測
        predictions = np.zeros((1, 18))
        
        # モデルごとの重み（v10用に調整）
        model_weights = [1.0, 1.15, 0.95, 1.05, 1.1]
        
        for i, model in enumerate(models):
            pred = model.predict(X_test, num_iteration=model.best_iteration)
            weight = model_weights[i] if i < len(model_weights) else 1.0
            predictions += pred * weight
        
        # 重み付き平均
        total_weight = sum(model_weights[:len(models)])
        predictions /= total_weight
        
        # 信頼度の計算
        confidence_scores = np.array([np.max(predictions[0])])
        
        # 周波数特徴量の情報を集約
        freq_info = {
            'spectral_rolloff': test_features.get('acc_magnitude_spectral_rolloff', 0).iloc[0] if 'acc_magnitude_spectral_rolloff' in test_features else 0
        }
        
        # 強化された後処理の適用
        predictions_adjusted = apply_enhanced_post_processing(
            predictions, confidence_scores, freq_info
        )
        
        # 最も確率の高いクラスを選択
        pred_label = np.argmax(predictions_adjusted[0])
        
        # 非常に低い信頼度の場合の特別処理
        if confidence_scores[0] < 0.25:
            # 上位3つを確認
            top_3_indices = np.argsort(predictions_adjusted[0])[-3:]
            top_3_gestures = [label_encoder.classes_[i] for i in top_3_indices]
            
            # BFRB行動を優先
            for gesture in reversed(top_3_gestures):  # 高い確率から確認
                if gesture in BFRB_BEHAVIORS:
                    pred_label = list(label_encoder.classes_).index(gesture)
                    break
        
        # ラベル名に変換
        pred_gesture = label_encoder.inverse_transform([pred_label])[0]
        
        return pred_gesture
        
    except Exception as e:
        print(f"エラー発生: {str(e)}")
        import traceback
        traceback.print_exc()
        # エラー時はデフォルト予測
        return 'Wave hello'

In [7]:
# CMI評価APIを使用して推論を実行
import os
from kaggle_evaluation.cmi_inference_server import CMIInferenceServer

# 推論サーバーの作成
print("推論サーバーを作成中...")
inference_server = CMIInferenceServer(predict)

# スコア情報
if training_results:
    best_cv_score = training_results.get('mean_cv_score', 'N/A')
    bfrb_accuracy = training_results.get('bfrb_accuracy', 'N/A')
else:
    best_cv_score = 'N/A'
    bfrb_accuracy = 'N/A'

print("=== IMU推論準備完了 (v10.0.0) ===")
print(f"使用モデル: IMU Only Model with Frequency Features")
print(f"ベースCV: {best_cv_score}")
print(f"BFRB検出精度: {bfrb_accuracy}")
print("\n主要な改良点:")
print("- 周波数領域特徴量（FFT、スペクトログラム）")
print("- 変化点検出による行動パターン認識")
print("- 強化された後処理（周波数情報考慮）")
print("- より洗練されたBFRB行動の優先順位付け")
print("\n目標: LB 0.70+")

# 環境に応じて実行
if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    print("\nKaggle競技環境を検出。推論サーバーを開始します。")
    inference_server.serve()
else:
    print("\n推論サーバーを実行します...")
    inference_server.run_local_gateway(
        data_paths=(
            '/kaggle/input/cmi-detect-behavior-with-sensor-data/test.csv',
            '/kaggle/input/cmi-detect-behavior-with-sensor-data/test_demographics.csv',
        )
    )

推論サーバーを作成中...
=== IMU推論準備完了 (v10.0.0) ===
使用モデル: IMU Only Model with Frequency Features
ベースCV: N/A
BFRB検出精度: N/A

主要な改良点:
- 周波数領域特徴量（FFT、スペクトログラム）
- 変化点検出による行動パターン認識
- 強化された後処理（周波数情報考慮）
- より洗練されたBFRB行動の優先順位付け

目標: LB 0.70+

推論サーバーを実行します...


GatewayRuntimeError: (<GatewayRuntimeErrorType.SERVER_RAISED_EXCEPTION: 3>, '')