In [None]:
# Google Colabでライブラリをアップロードする
from google.colab import files
uploaded = files.upload()

In [None]:
# Google Colabでドライブのデータを使う
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Google Colabでライブラリをインストールする
!pip install japanize_matplotlib bottleneck tslearn

In [1]:
# 自作関数
import MasterResearchFunction as mr

# 基本ライブラリ
import csv
import math
import os
import pickle
import random
import re
import statistics
from datetime import datetime, timedelta
from decimal import Decimal

# 数値計算とデータ処理
import bottleneck as bn
import numpy as np
import pandas as pd

# 機械学習ライブラリ
from sklearn import preprocessing
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    roc_curve,
    roc_auc_score,
    auc,
)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler

# ディープラーニングライブラリ
import tensorflow as tf
from tensorflow.keras.layers import (
    Activation,
    Add,
    Conv1D,
    Dense,
    Dropout,
    Flatten,
    GlobalAveragePooling1D,
    Input,
    LayerNormalization,
    LSTM,
    Masking,
    MaxPooling1D,
    MultiHeadAttention,
)
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical

# プロットと可視化
import japanize_matplotlib
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import seaborn as sns

# その他のライブラリ
from fastdtw import fastdtw
from scipy import signal, stats
from scipy.interpolate import interp1d, UnivariateSpline
from scipy.signal import savgol_filter
from scipy.spatial.distance import euclidean
from scipy.cluster.hierarchy import fcluster, linkage
from tslearn.metrics import dtw_path
import openpyxl
from natsort import natsorted

In [2]:
# 定数の定義
NAMES = ['sakamoto', 'watabe', 'kotera', 'nakazawa', 'mizuno', 'todaka', 'takagawa', 'uchida']
LABELS = ['circle', 'cross', 'tri', 'check']
COLUMNS_TO_DROP = [
    'Sensor', 'Participant name', 'Event', 'Event value',
    'Eye movement type', 'Eye movement type index', 'Ungrouped',
    'Validity left', 'Validity right', 'Gaze event duration', 'Gaze2D_Distance',
    'Fixation_Distance', 'Gaze3D_Distance', 'Pupil_Diameter_Change',
    'GazeDirection_Distance', 'PupilPosition_Distance'
]
BASE_DIR = "/Users/hinase/CodeChord/MasterResearch/datasets/new"
Hz = 100
NON_GESTURE = 160
#各被験者がジェスチャを行った順番(NAMES順)
ROUTINE_LABELS = [
    [1, 2, 3, 4],
    [2, 4, 3, 1],
    [4, 3, 1, 2],
    [2, 4, 3, 1],
    [1, 4, 3, 2],
    [3, 2, 1, 4],
    [3, 1, 2, 4],
    [2, 3, 1, 4]
]
# 数字からラベルへの変換マッピング
label_map = {1: 'circle', 2: 'cross', 3: 'tri', 4: 'check'}
# ラベル辞書を作成
routine_label_dict = {name: [label_map[num] for num in labels] for name, labels in zip(NAMES, ROUTINE_LABELS)}

In [None]:
#ジェスチャの教師データの読み込み
trimmed_data_dict = mr.load_trimmed_gesture_data(NAMES, BASE_DIR)
first_gesture_data_dict = mr.load_trimmed_first_gesture_data(NAMES, BASE_DIR)
#教師データの最小の長さと最大の長さを格納
length_extremes_dict = mr.calculate_length_extremes(trimmed_data_dict)

In [None]:
#実験ルーティンのデータの読み込み
routine_data_dict = {}
eye_routine_data_dict = {}
for name in NAMES:
    print(f"Processing {name}...")
    # データの読み込みと前処理
    routine_data_dict[name] = mr.process_apple_watch_csv(f'{BASE_DIR}/{name}/motion/{name}_new_routine.csv')
    eye_routine_data_dict[name] = mr.process_tobii_csv(f'{BASE_DIR}/{name}/eye/{name}_eye_new_routine.csv')

In [None]:
#ルーティン中の各ジェスチャの実行部分のインデックスの抽出
grouped_intervals = {}
group_label_mapping = {}

for name in NAMES:
    print(f"Processing {name}...")
    # データの読み込みと前処理
    a = mr.find_true_intervals(routine_data_dict[name])

    # インターバルの開始インデックスを取得
    start_indices = np.array([interval[0] for interval in a]).reshape(-1, 1)

    # 階層的クラスタリング
    Z = linkage(start_indices, method='ward')
    cluster_labels = fcluster(Z, t=4, criterion='maxclust')  # クラスタ数を4に固定

    # クラスタごとにインターバルをグループ化
    interval_groups = {i: [] for i in range(1, 5)}
    for interval, label in zip(a, cluster_labels):
        interval_groups[label].append(interval)

    # グループの順序をインターバルの開始インデックスに基づいてソート
    sorted_groups = sorted(interval_groups.items(), key=lambda x: min(interval[0] for interval in x[1]))
    sorted_interval_groups = {i + 1: group for i, (_, group) in enumerate(sorted_groups)}

    # グループを保存
    grouped_intervals[name] = sorted_interval_groups

    # グループとラベルを関連付け
    group_label_mapping[name] = {label: sorted_interval_groups[group_id]
                                for group_id, label in enumerate(routine_label_dict[name], start=1)}

    # 結果を出力
    print(f"\n{name} のグループとラベル対応:")
    for label, intervals in group_label_mapping[name].items():
        print(f"  {label}: {intervals}")

# 'nakazawa' の 'check' に対応する最初のインデックスの組を削除
if 'nakazawa' in group_label_mapping and 'check' in group_label_mapping['nakazawa']:
    if len(group_label_mapping['nakazawa']['check']) > 0:
        removed_interval = group_label_mapping['nakazawa']['check'].pop(0)
        print(f"'nakazawa' の 'check' から削除されたインターバル: {removed_interval}")
    else:
        print("'nakazawa' の 'check' に削除するインターバルがありません。")
else:
    print("'nakazawa' または 'check' に対応するデータが見つかりません。")

In [57]:
#ジェスチャ実行時の-5秒から+5秒の範囲を抜き出す
extracted_data_dict = {}

for name in NAMES:
    print(f"Processing {name}...")
    extracted_data_dict[name] = {}

    for label in LABELS:
        print(f"Processing label: {label}...")

        # 該当するインターバルを取得
        intervals = group_label_mapping[name][label]

        # 最初のインデックスと最後のインデックスを計算
        start_idx = intervals[0][0] - 500
        end_idx = intervals[-1][1] + 500

        # インデックスの範囲を調整（負のインデックス防止）
        start_idx = max(0, start_idx)
        end_idx = min(len(routine_data_dict[name]), end_idx)

        # データを抜き出し
        extracted_data = routine_data_dict[name].iloc[start_idx:end_idx].reset_index(drop=True)

        # 抜き出したデータを辞書に格納
        extracted_data_dict[name][label] = extracted_data

        print(f"Extracted data for {name} - {label}: start={start_idx}, end={end_idx}, length={len(extracted_data)}")

In [58]:
# 初期化
first_gesture = {}
# 各被験者とジェスチャに対して最初のデータを抽出
for name in NAMES:
    first_gesture[name] = {}  # 各被験者の辞書を初期化

    for label in LABELS:
        print(f"Processing {name} - {label}...")
        try:
            # インデックス範囲を取得
            start_idx = group_label_mapping[name][label][0][0]
            end_idx = group_label_mapping[name][label][0][1]

            # データを抽出
            extracted_data = routine_data_dict[name].iloc[start_idx:end_idx].reset_index(drop=True)

            # 辞書に格納
            first_gesture[name][label] = extracted_data
            print(f"Extracted data for {name} - {label} (Index range: {start_idx}-{end_idx})")
        except KeyError as e:
            print(f"KeyError: {e}. Skipping {name} - {label}.")
        except IndexError as e:
            print(f"IndexError: {e}. Skipping {name} - {label}.")


In [76]:
c = mr.filter_and_combine_segments(b, Hz, min_time=length_extremes_dict[NAMES[namae]][LABELS[jesutya]]['min_length']*0.5/Hz, max_time=length_extremes_dict[NAMES[namae]][LABELS[jesutya]]['max_length']*2.5/Hz, overlap_count=2)

In [126]:
with open('/Users/hinase/Downloads/spring_results/watabe_spring_results.pkl', 'rb') as f:
 = pickle.load(f)

In [None]:
# 1. データの読み込みと前処理
X_filled, y_labels, true_labels, data_dict, label_dict, true_label_dict = mr.preprocess_gesture_data(
    NAMES,
    LABELS,
    COLUMNS_TO_DROP,
    NON_GESTURE
)

In [6]:
mr.check_timestamp_gaps(NAMES, data_dict)

In [None]:
# 2. データのフィルタリングとスケーリング
# 2. 前処理とフィルタリングの適用
filtered_X, filtered_y, filtered_true = mr.preprocess_and_filter_sequences(
    X_filled,
    y_labels,
    true_labels,
    COLUMNS_TO_DROP,
    sampling_rate=50.0,
    max_gap_seconds=0.1,
    max_nan_seconds=5.0
)

In [None]:
filtered_X[0].columns

In [None]:
# 最終的なデータの確認
print(f"処理後のデータ数: {len(filtered_X)}")
print(f"ラベル配列の長さ（y_labels_scaled）: {len(filtered_y)}")
print(f"真偽ラベル配列の長さ（true_labels_scaled）: {len(filtered_true)}")

In [10]:
X_scaled = []
scaler = StandardScaler()
for i in range(len(filtered_X)):
    X_scaled.append(scaler.fit_transform(filtered_X[i]))

In [11]:
# 4. ラベルのエンコーディング（2値分類用に修正）
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(filtered_true)  # 0または1にエンコード

# 5. データの分割（パディングの前に行う）
X_train_raw, X_test_raw, y_train, y_test = train_test_split(
    X_scaled, y_encoded, test_size=0.3, random_state=42
)

In [None]:
for i, seq in enumerate(X_scaled):
    print(f"Sequence {i}: shape {seq.shape}")

In [None]:
# 6. 訓練データとテストデータでそれぞれ最大シーケンス長を計算
max_length_train = max(len(seq) for seq in X_train_raw)
max_length_test = max(len(seq) for seq in X_test_raw)
max_length = max(max_length_train, max_length_test)

# 7. シーケンスのパディング
X_train_padded = pad_sequences(X_train_raw, maxlen=max_length, padding='post', value=0.0, dtype='float32')
X_test_padded = pad_sequences(X_test_raw, maxlen=max_length, padding='post', value=0.0, dtype='float32')

# 8. パディング後のデータに NaN が含まれていないか確認
print('NaN in X_train_padded:', np.isnan(X_train_padded).any())
print('NaN in X_test_padded:', np.isnan(X_test_padded).any())

# 必要に応じて NaN を 0 で置換
if np.isnan(X_train_padded).any():
    X_train_padded = np.nan_to_num(X_train_padded, nan=0.0)
if np.isnan(X_test_padded).any():
    X_test_padded = np.nan_to_num(X_test_padded, nan=0.0)

In [14]:
def get_positional_encoding(max_len, d_model):
    """
    固定のサイン・コサイン位置エンコーディングを生成する関数。

    Parameters:
    - max_len (int): シーケンスの最大長
    - d_model (int): 埋め込み次元数

    Returns:
    - pos_encoding (np.array): 位置エンコーディング行列
    """
    angle_rads = np.arange(max_len)[:, np.newaxis] / np.power(10000, (2 * (np.arange(d_model)[np.newaxis, :]//2)) / np.float32(d_model))

    # 偶数次元はsin、奇数次元はcos
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]  # (1, max_len, d_model)
    return tf.cast(pos_encoding, dtype=tf.float32)

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    """
    Transformerエンコーダーブロックを構築する関数。
    """
    # 自己注意機構（マスクは自動的に適用される）
    x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(inputs, inputs)
    x = Dropout(dropout)(x)
    x = LayerNormalization(epsilon=1e-6)(inputs + x)

    # フィードフォワードネットワーク
    x_ff = Dense(ff_dim, activation='relu')(x)
    x_ff = Dense(inputs.shape[-1])(x_ff)
    x_ff = Dropout(dropout)(x_ff)
    out = LayerNormalization(epsilon=1e-6)(x + x_ff)
    return out

def build_transformer_model(max_length, num_features, head_size, num_heads, ff_dim, num_transformer_blocks, dropout):
    """
    Transformerベースの2値分類モデルを構築する関数。
    """
    inputs = Input(shape=(max_length, num_features))

    # マスキングレイヤーの追加（パディング値が0の場合）
    x = Masking(mask_value=0.0)(inputs)

    # 位置エンコーディングの追加
    pos_encoding = get_positional_encoding(max_length, num_features)
    x = x + pos_encoding[:,:max_length, :]

    # Transformerエンコーダーブロックの追加
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    # グローバルプーリング
    x = GlobalAveragePooling1D()(x)
    x = Dropout(dropout)(x)

    # 出力層
    outputs = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
# モデルの構築
model = build_transformer_model(
    max_length=max_length,
    num_features=X_train_padded.shape[2],
    head_size=64,
    num_heads=8,
    ff_dim=128,
    num_transformer_blocks=1,
    dropout=0.1
)

# モデルのサマリー表示
model.summary()

# モデルのコンパイル
model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

# モデルの訓練
history = model.fit(
    X_train_padded, y_train,
    epochs=50,
    batch_size=32,
    validation_data=(X_test_padded, y_test),
)

In [None]:
# 最終的なデータの確認
print(f"処理後のデータ数: {len(filtered_X)}")
print(f"ラベル配列の長さ（y_labels_scaled）: {len(filtered_y)}")
print(f"真偽ラベル配列の長さ（true_labels_scaled）: {len(filtered_true)}")

In [None]:
# 学習曲線をプロット
def plot_learning_curve(history):
    # 損失をプロット
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # 精度をプロット
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.show()

# 関数を呼び出してプロット
plot_learning_curve(history)


In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, roc_auc_score

# 予測確率を取得
y_pred_proba = model.predict(X_test_padded)

# ROC曲線を計算
fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba)

# AUCスコアを計算
auc_score = roc_auc_score(y_test, y_pred_proba)

# ROC曲線をプロット
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc_score:.2f})', color='blue')
plt.plot([0, 1], [0, 1], 'k--', label='Random Guess', color='red')  # ランダムな分類の基準線
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Positive Rate (TPR)')
plt.title('ROC Curve')
plt.legend(loc='lower right')
plt.grid(alpha=0.3)
plt.show()

In [None]:
# 4. モデルの評価
loss, accuracy = model.evaluate(X_test_padded, y_test)
print(f'Test Loss: {loss}')
print(f'Test Accuracy: {accuracy}')

# 5. 予測と結果の表示
y_pred_prob = model.predict(X_test_padded)
y_pred_classes = (y_pred_prob > 0.5).astype(int).flatten()  # 閾値0.5でクラスを決定
y_true_classes = y_test

# ラベルを逆変換（元のクラス名に戻す場合）
y_pred_labels = label_encoder.inverse_transform(y_pred_classes)
y_true_labels = label_encoder.inverse_transform(y_true_classes)

# 分類レポートの表示
print(classification_report(y_true_labels, y_pred_labels))

In [22]:
# 定数の定義
NAMES = ['sakamoto', 'watabe', 'kotera', 'nakazawa']

In [None]:
#実験ルーティンのデータの読み込み
spring_index_dict = {}
for name in NAMES:
  print(f"Processing {name}...")
  spring_index_dict[name] = {}
  with open(f'/Users/hinase/CodeChord/MasterResearch/datasets/new/spring_results/{name}_spring_results.pkl', 'rb') as f:
    results = pickle.load(f)
  for label in LABELS:
    spring_index_dict[name][label] = []
    spring_index_dict[name][label] = results[label]

In [24]:
BASE_DIR = "/Users/hinase/CodeChord/MasterResearch/datasets/new"
SPRING_RESULTS_DIR = os.path.join(BASE_DIR, "spring_results")

In [None]:
# 各種結果を格納する辞書
spring_index_dict = {}
spring_final_index_dict = {}
extracted_eye_data_dict = {}

# 一連の処理をループ内で実行
for name in NAMES:
    print(f"Processing {name}...")
    spring_index_dict[name] = {}
    spring_final_index_dict[name] = {}
    extracted_eye_data_dict[name] = {}

    # SPRING結果を読み込み
    spring_results_path = os.path.join(SPRING_RESULTS_DIR, f"{name}_spring_results.pkl")
    with open(spring_results_path, "rb") as f:
        results = pickle.load(f)

    for label in LABELS:
        # SPRING結果をフィルタリング
        spring_index_dict[name][label] = results[label]
        spring_final_index_dict[name][label] = mr.filter_and_combine_segments(
            spring_index_dict[name][label],
            Hz,
            min_time=length_extremes_dict[name][label]['min_length'] * 0.5 / Hz,
            max_time=length_extremes_dict[name][label]['max_length'] * 2.5 / Hz,
            overlap_count=2
        )

        # タイムスタンプを算出
        filtered_results = mr.extract_timestamp_from_overlap(
            routine_data_dict[name], spring_final_index_dict[name][label]
        )

        # 視線データを抽出
        extracted_eye_data = []
        for start_time, end_time in filtered_results:
            extracted_eye_data.append(
                eye_routine_data_dict[name][
                    (eye_routine_data_dict[name]['Timestamp'] >= start_time) &
                    (eye_routine_data_dict[name]['Timestamp'] <= end_time)
                ]
            )

        # 抽出した視線データを辞書に格納
        extracted_eye_data_dict[name][label] = extracted_eye_data

# 結果を確認
for name, labels_data in extracted_eye_data_dict.items():
    for label, eye_data in labels_data.items():
        print(f"{name} - {label}: {len(eye_data)} segments extracted")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# プロット関数
def plot_with_highlights(routine_data_dict, group_label_mapping, spring_final_index_dict, name, label):
    """
    特定の被験者とジェスチャーのルーティンデータをプロットし、
    group_label_mappingとspring_final_index_dictのインデックス範囲をハイライト。

    Parameters:
    - routine_data_dict (dict): 被験者ごとのルーティンデータの辞書。
    - group_label_mapping (dict): 被験者ごとのグループとラベルの対応辞書。
    - spring_final_index_dict (dict): SPRINGによるインデックスの辞書。
    - name (str): 被験者名。
    - label (str): ジェスチャーラベル。
    """
    data = routine_data_dict[name]

    # プロット
    plt.figure(figsize=(100, 8))
    plt.plot(data['EuclideanNorm'], label='EuclideanNorm', color='blue', alpha=0.9)

    # ラベルごとのインデックス範囲を黄色でハイライト (group_label_mapping)
    if label in group_label_mapping[name]:
        intervals = group_label_mapping[name][label]
        for start, end in intervals:
            plt.axvspan(start, end, color='yellow', alpha=0.3, label=f'{label} (manual)' if start == intervals[0][0] else "")

    # SPRINGのインデックス範囲を水色でハイライト (spring_final_index_dict)
    if label in spring_final_index_dict[name]:
        intervals = spring_final_index_dict[name][label]
        for start, end in intervals:
            plt.axvspan(start, end, color='cyan', alpha=0.4, label=f'{label} (SPRING)' if start == intervals[0][0] else "")

    # プロットの設定
    plt.title(f"{name}'s Routine Data ({label}) with Highlighted Gesture Segments", fontsize=16)
    plt.xlabel("Time Step", fontsize=14)
    plt.ylabel("Acceleration Value", fontsize=14)
    plt.legend(fontsize=12)
    plt.grid(True, which='both', linestyle='--', linewidth=0.5, alpha=0.7)
    plt.tight_layout()
    plt.show()

# 被験者ごと、ラベルごとのプロットを実行
for name in NAMES:
    for label in LABELS:
        print(f"Plotting {name}'s data for {label}...")
        plot_with_highlights(routine_data_dict, group_label_mapping, spring_final_index_dict, name, label)


In [26]:
def preprocess_and_prepare_input(extracted_eye_data_dict, scaler, max_length, COLUMNS_TO_DROP, sampling_rate=50.0, max_gap_seconds=5.0):
    """
    extracted_eye_data_dict に含まれるデータを前処理し、標準化およびパディングを適用する関数。

    Parameters:
    - extracted_eye_data_dict (dict): 視線データの辞書
    - scaler (StandardScaler): フィット済みの標準化スケーラー
    - max_length (int): パディングの最大長
    - COLUMNS_TO_DROP (list): 削除するカラムのリスト
    - sampling_rate (float): サンプリング周波数（Hz）
    - max_gap_seconds (float): 最大欠損許容時間（秒）

    Returns:
    - dict: 標準化およびパディング後のデータ辞書
    """
    X_padded = {}

    for name, labels_data in extracted_eye_data_dict.items():
        X_padded[name] = {}
        for label, data_list in labels_data.items():
            X_padded[name][label] = []
            for i, df in enumerate(data_list):
                try:
                    print(f"Processing {name} - {label} - Segment {i}...")
                    # データの前処理を適用
                    processed_df = mr.preprocess_sequence(df, COLUMNS_TO_DROP, sampling_rate=sampling_rate, max_gap_seconds=max_gap_seconds)
                    # タイムスタンプを削除
                    processed_df = processed_df.drop(columns=['Timestamp'], errors='ignore')
                    # 標準化
                    scaled_data = scaler.transform(processed_df.values)
                    # 結果をリストに追加
                    X_padded[name][label].append(scaled_data)
                except Exception as e:
                    print(f"Error processing {name} - {label} - Segment {i}: {e}")

            # パディング
            print(f"Padding {name} - {label}...")
            padded_data = pad_sequences(
                X_padded[name][label], maxlen=max_length, padding='post', value=0.0, dtype='float32'
            )
            # NaN の確認と置換
            if np.isnan(padded_data).any():
                print(f"NaN detected in {name} - {label}. Replacing with 0.")
                padded_data = np.nan_to_num(padded_data, nan=0.0)

            # パディング後のデータを保存
            X_padded[name][label] = padded_data

    return X_padded

In [None]:
# extracted_eye_data_dict に preprocess_and_prepare_input を適用
X_padded = preprocess_and_prepare_input(
    extracted_eye_data_dict, scaler, max_length, COLUMNS_TO_DROP, sampling_rate=50.0, max_gap_seconds=5.0
)

In [None]:
# 予測結果を格納する辞書
predictions_dict = {}

# 各データに対して予測を実行
for name, labels_data in X_padded.items():
    predictions_dict[name] = {}
    for label, data in labels_data.items():
        print(f"Predicting for {name} - {label}...")
        try:
            # model.predict を使用して予測を行う
            predictions = model.predict(data)
            # 予測結果を辞書に格納
            predictions_dict[name][label] = predictions
        except Exception as e:
            print(f"Error predicting {name} - {label}: {e}")

# 予測結果の出力
for name, labels_data in predictions_dict.items():
    for label, predictions in labels_data.items():
        print(f"{name} - {label}:")
        print(predictions)

In [None]:
# 予測結果を格納する辞書
predicted_labels_dict = {}

for name, labels_data in X_padded.items():
    predicted_labels_dict[name] = {}
    for label, data in labels_data.items():
        print(f"Predicting and classifying for {name} - {label}...")
        try:
            # モデルで予測
            y_pred_prob = model.predict(data)
            # 閾値0.5でクラスを決定
            y_pred_classes = (y_pred_prob > 0.5).astype(int).flatten()
            # クラスをラベルに逆変換
            y_pred_labels = label_encoder.inverse_transform(y_pred_classes)
            # 予測ラベルを辞書に格納
            predicted_labels_dict[name][label] = y_pred_labels
        except Exception as e:
            print(f"Error predicting for {name} - {label}: {e}")

# 分類結果の出力
for name, labels_data in predicted_labels_dict.items():
    for label, predictions in labels_data.items():
        print(f"{name} - {label}: Predicted Labels:")
        print(predictions)

In [None]:
# 'gesture' に対応するインデックスだけを抜き出す辞書
filtered_spring_index_dict = {}

for name, labels_data in spring_final_index_dict.items():
    filtered_spring_index_dict[name] = {}
    for label, intervals in labels_data.items():
        print(f"Filtering {name} - {label}...")

        # 'gesture' に対応するインデックスだけを抽出
        filtered_intervals = [
            interval for interval, prediction in zip(intervals, predicted_labels_dict[name][label])
            if prediction == 'gesture'
        ]

        # 抽出結果を新しい辞書に格納
        filtered_spring_index_dict[name][label] = filtered_intervals

# 結果の確認
for name, labels_data in filtered_spring_index_dict.items():
    for label, intervals in labels_data.items():
        print(f"{name} - {label}: {len(intervals)} intervals labeled as 'gesture'")


In [None]:
# 被験者ごと、ラベルごとのプロットを実行
for name in NAMES:
    for label in LABELS:
        print(f"Plotting {name}'s data for {label}...")
        plot_with_highlights(routine_data_dict, group_label_mapping, filtered_spring_index_dict, name, label)

In [None]:
# 混同行列の作成と可視化
conf_mat = confusion_matrix(y_true_classes, y_pred_classes)
plt.figure(figsize=(8, 6))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues',
            xticklabels=label_encoder.classes_,
            yticklabels=label_encoder.classes_)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

In [287]:
# モデルから予測値を取得する関数をSHAPに渡す
explainer = shap.Explainer(model, X_train_padded)

In [None]:
input_shape = (max_length, num_features)
inputs = Input(shape=input_shape)
x = Masking(mask_value=0.)(inputs)

# num_featuresを定義（入力データの特徴量の次元数）
num_features = input_shape[1]

# Transformerブロックの定義
def transformer_block(x, num_heads, key_dim, ff_dim, rate=0.1):
    # マルチヘッド注意機構
    attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)(x, x)
    attn_output = Dropout(rate)(attn_output)

    # 出力次元をnum_featuresに変換
    attn_output = Dense(num_features)(attn_output)

    out1 = Add()([x, attn_output])
    out1 = LayerNormalization(epsilon=1e-6)(out1)

    # フィードフォワードネットワーク
    ffn_output = Dense(ff_dim, activation='relu')(out1)

    # 出力次元をnum_featuresに変換
    ffn_output = Dense(num_features)(ffn_output)

    ffn_output = Dropout(rate)(ffn_output)
    out2 = Add()([out1, ffn_output])
    out2 = LayerNormalization(epsilon=1e-6)(out2)
    return out2

# Transformerブロックの適用
x = transformer_block(x, num_heads=4, key_dim=64, ff_dim=128)

# プーリングと出力層
x = GlobalAveragePooling1D()(x)
x = Dropout(0.1)(x)
outputs = Dense(num_classes, activation='sigmoid')(x)

# モデルの作成
model = Model(inputs=inputs, outputs=outputs)

In [None]:
# 5. モデルのコンパイルと学習
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
history = model.fit(
    X_train_padded, y_train,
    epochs=50,
    batch_size=32,
    validation_data=(X_test_padded, y_test)
)


In [None]:
# 6. モデルの評価
loss, accuracy = model.evaluate(X_test_padded, y_test)
print(f'Test Loss: {loss}')
print(f'Test Accuracy: {accuracy}')

# 7. 予測と結果の表示
y_pred = model.predict(X_test_padded)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)
y_pred_labels = label_encoder.inverse_transform(y_pred_classes)
y_true_labels = label_encoder.inverse_transform(y_true_classes)
print(classification_report(y_true_labels, y_pred_labels))

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

# 8. 特徴量の重要度評価（Permutation Feature Importance）

# 8.1 ベースラインの性能を計算
# テストデータでの予測
y_pred = model.predict(X_test_padded)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)

# ベースラインのAccuracyを計算
baseline_accuracy = accuracy_score(y_true_classes, y_pred_classes)
print(f'Baseline Accuracy: {baseline_accuracy:.4f}')

# 8.2 特徴量ごとの重要度を計算
num_features = X_test_padded.shape[2]
feature_importances = []

for feature_idx in range(num_features):
    # テストデータをコピー
    X_test_permuted = X_test_padded.copy()

    # 特徴量をランダムにシャッフル（サンプル間でシャッフル）
    permuted_feature = X_test_permuted[:, :, feature_idx].reshape(X_test_permuted.shape[0], -1)
    np.random.shuffle(permuted_feature)
    X_test_permuted[:, :, feature_idx] = permuted_feature.reshape(X_test_permuted.shape[0], X_test_permuted.shape[1])

    # シャッフル後のデータで予測
    y_pred_permuted = model.predict(X_test_permuted)
    y_pred_classes_permuted = np.argmax(y_pred_permuted, axis=1)

    # シャッフル後のAccuracyを計算
    permuted_accuracy = accuracy_score(y_true_classes, y_pred_classes_permuted)

    # 性能の差を計算（重要度）
    importance = baseline_accuracy - permuted_accuracy
    feature_importances.append(importance)
    print(f'Feature {feature_idx + 1} Importance: {importance:.4f}')

# 8.3 特徴量重要度の可視化
feature_names = [f'Feature {i+1}' for i in range(num_features)]

plt.figure(figsize=(10, 6))
plt.barh(X[7].columns, feature_importances)
plt.xlabel('Decrease in Accuracy')
plt.title('Permutation Feature Importance')
plt.gca().invert_yaxis()
plt.show()