In [None]:
# Google Colabでドライブのデータを使う
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# --- 組み込みモジュール ---
import sys
import pickle

# --- パスの設定（ローカルモジュールを読み込むための設定） ---
sys.path.append('/content/drive/Shareddrives/MuraolabDocument/技術/機械学習/Transformer/')

# --- ローカルモジュール ---
import Transformer_Encoder as te

# --- サードパーティモジュール ---
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import Model, Sequential, load_model
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

In [None]:
# === 1. データの読み込み ===
def load_signals(file_paths):
    """
    複数の信号ファイルを読み込み、各サンプルごとに信号をスタックして3次元配列にする
    """
    signals = [np.loadtxt(fp) for fp in file_paths]  # 各配列の shape: (サンプル数, 128)
    return np.stack(signals, axis=-1)  # 結果の shape: (サンプル数, 128, 信号数)

# データセットのディレクトリ
data_dir = "/content/drive/Shareddrives/MuraolabDocument/技術/機械学習/Transformer/UCI HAR Dataset"

# --- 訓練データ ---
train_signals_dir = f'{data_dir}/train/Inertial Signals/'
signal_files_train = [
    'body_acc_x_train.txt',
    'body_acc_y_train.txt',
    'body_acc_z_train.txt',
    'body_gyro_x_train.txt',
    'body_gyro_y_train.txt',
    'body_gyro_z_train.txt',
    'total_acc_x_train.txt',
    'total_acc_y_train.txt',
    'total_acc_z_train.txt'
]
train_file_paths = [train_signals_dir + fname for fname in signal_files_train]
X_train = load_signals(train_file_paths)  # shape: (7352, 128, 9)

# --- テストデータ ---
test_signals_dir = f'{data_dir}/test/Inertial Signals/'
signal_files_test = [fname.replace('_train', '_test') for fname in signal_files_train]
test_file_paths = [test_signals_dir + fname for fname in signal_files_test]
X_test = load_signals(test_file_paths)  # shape: (2947, 128, 9)

# --- ラベルの読み込み ---
y_train = np.loadtxt(f'{data_dir}/train/y_train.txt')
y_test = np.loadtxt(f'{data_dir}/test/y_test.txt')

# ラベルを 0〜5 に変換（もともとは 1〜6）
y_train = y_train.astype(np.int32) - 1
y_test = y_test.astype(np.int32) - 1

In [None]:
# === 2. 標準化 ===
# UCI HARデータセットはすでに標準化されているため、以下の標準化処理はデフォルトではコメントアウトしている。
# 別のデータセットを使用する場合は、コメントアウトを外して利用できる。

"""
n_train, timesteps, n_features = X_train.shape
scaler = StandardScaler()
# 各チャネルごとに2次元に変形して標準化
X_train_reshaped = X_train.reshape(-1, n_features)  # shape: (n_train * timesteps, n_features)
X_train_scaled = scaler.fit_transform(X_train_reshaped).reshape(n_train, timesteps, n_features)

n_test = X_test.shape[0]
X_test_scaled = scaler.transform(X_test.reshape(-1, n_features)).reshape(n_test, timesteps, n_features)
"""
X_train_scaled = X_train
X_test_scaled = X_test

In [None]:
# === 3. 最大データ長に合わせてパディング ===
# UCI HARデータセットはすでに長さが統一されているため意味なし
PAD = 0.0
max_length_train = max(len(seq) for seq in X_train_scaled)
max_length_test = max(len(seq) for seq in X_test_scaled)
max_length = max(max_length_train, max_length_test)

#シーケンスのパディング
X_train_padded = pad_sequences(X_train_scaled, maxlen=max_length, padding='post', value=PAD, dtype='float32')
X_test_padded = pad_sequences(X_test_scaled, maxlen=max_length, padding='post', value=PAD, dtype='float32')

#パディング後のデータに NaN が含まれていないか確認
print('NaN in X_train_padded:', np.isnan(X_train_padded).any())
print('NaN in X_test_padded:', np.isnan(X_test_padded).any())

In [None]:
# === 4. 訓練データの中から検証データを分割 ===
X_train_final, X_val, y_train_final, y_val = train_test_split(
    X_train_padded, y_train, test_size=0.2, random_state=42
)

In [None]:
# === 5. Transformer モデルの構築・コンパイル・学習 ===
config = te.TransformerConfig(
    max_length=max_length,               # シーケンスの長さ
    d_model=X_train_final.shape[2],      # 特徴量の次元数（ここでは 9）
    key_dim=16,                           # 各アテンションヘッドの次元数
    num_heads=8,                         # アテンションヘッドの数
    ff_dim=X_train_final.shape[2] * 4,   # FeedForwardネットワークの中間次元数（9×4=36）
    num_transformer_blocks=1,            # トランスフォーマーブロックの数
    dropout=0.1,                         # ドロップアウト率
    l2_lambda=1e-4,                      # L2正則化係数
    pad=0.0,                             # パディングの値
    pooling='average',                   # プーリングの方法 ('average' または 'max')
    task='multiclass',                   # タスクの種類（今回は多クラス分類）
    num_classes=6                        # クラス数（6クラス）
)

# モデルの構築
model = te.build_transformer_model(config)

# モデルのコンパイル（多クラス分類用損失関数）
model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# モデル概要の表示
model.summary()

# Early Stoppingの実装
# early_stop = EarlyStopping(
#     monitor='val_loss',      # 検証損失を監視する
#     patience=5,              # 5エポック改善が見られなければ学習を停止する
#     restore_best_weights=True  # 学習中の最良の重みを復元する
# )

# モデルの学習（検証データとして X_val, y_val を利用）
history = model.fit(
    X_train_final, y_train_final,
    epochs=50,
    batch_size=16,
    validation_data=(X_val, y_val)
    #validation_split=0.2,     #データの何割を検証用に使用するか，本コードでは手順４番で明示的に検証用データを切り出しているので不要
    #callbacks=[early_stop]     ＃設定したEarly Stoppingに沿って途中で学習を停止する
)

In [None]:
# === 5. テストデータでモデルの性能を評価 ===
test_loss, test_accuracy = model.evaluate(X_test_padded, y_test)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)

In [None]:
# -----------------------------
# 学習曲線のプロット
# -----------------------------
# lossの推移をプロット
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Loss Curve')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# accuracy の推移をプロット
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Accuracy Curve')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# -----------------------------
# 混同行列の出力
# -----------------------------
# テストデータで予測
y_pred_probs = model.predict(X_test_padded)
# sparse_categorical_crossentropy を使用している場合、予測結果は各クラスの確率となるため、
# argmaxを用いてクラスラベルを取得する
y_pred = np.argmax(y_pred_probs, axis=1)

# 混同行列の計算
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.show()

In [None]:
# 予測済みラベルは、既に y_pred に格納されている前提
target_names = ["WALKING", "WALKING_UPSTAIRS", "WALKING_DOWNSTAIRS", "SITTING", "STANDING", "LAYING"]
report = classification_report(y_test, y_pred, target_names=target_names)
print(report)


In [None]:
# scaler を保存
with open(f'/content/drive/Shareddrives/MuraolabDocument/技術/機械学習/Transformer/scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)

# 保存したscalerを読み込む
with open('/content/drive/Shareddrives/MuraolabDocument/技術/機械学習/Transformer/scaler.pkl', "rb") as f:
    spring_final_index_dict = pickle.load(f)

# modelを保存する
model.save(f'/content/drive/Shareddrives/MuraolabDocument/技術/機械学習/Transformer/model.keras')
# 保存したmodelを読み込む
model = load_model('/content/drive/Shareddrives/MuraolabDocument/技術/機械学習/Transformer/model.keras')