In [1]:
import os
import glob
import random 
import itertools

import numpy as np
import pandas as pd

from pathlib import Path
from scipy import signal

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader, ConcatDataset, random_split, Subset

from sklearn.svm import SVC
from sklearn import preprocessing
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler, OneHotEncoder

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
label_encoder = preprocessing.LabelEncoder()

In [2]:
import wandb
# WandBの設定
WANDB_API_KEY = "2d996a98ef8dddefa91d675f85b5efd96fb911ae"  # あなたのWandB APIキーをここに入力してください

wandb.login(key = WANDB_API_KEY)

[34m[1mwandb[0m: Currently logged in as: [33mnekodaisuki169[0m ([33mdoctor_thesis_material[0m). Use [1m`wandb login --relogin`[0m to force relogin
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /Users/sota/.netrc


True

In [3]:


class TactileSequenceDataset(Dataset):
    """
    触覚センサ npy データ用 Dataset クラス

    - 入力ファイル: shape (T, n_taxels, 3)
    - seq_start〜seq_end ステップを切り出し
    - (seq_len, n_taxels, 3) -> (seq_len, n_taxels*3) にフラット化
    - ESN 用に (1, seq_len, feature_dim) に reshape し、バッチ化で
      (batch_size, 1, seq_len, feature_dim) になるようにする
    """

    def __init__(self, root_dir, dataset_params):
        """
        Args:
            root_dir: クラスディレクトリが並んでいるルートパス
                      例: dataset_root/
                            paper_A4/
                              xxx.npy
                            cloth/
                              yyy.npy
            dataset_params:
                - "seq_start": 切り出し開始インデックス（含む）
                - "seq_end"  : 切り出し終了インデックス（含まない）
        """
        super().__init__()
        self.root_dir = Path(root_dir)
        self.seq_start = int(dataset_params["seq_start"])
        self.seq_end   = int(dataset_params["seq_end"])
        self.dtype     = torch.float32  # ← カンマ削除

        # クラスディレクトリを列挙
        self.class_names = sorted(
            [d.name for d in self.root_dir.iterdir() if d.is_dir()]
        )
        if not self.class_names:
            raise RuntimeError(f"No class directories under {self.root_dir}")

        self.class_to_idx = {c: i for i, c in enumerate(self.class_names)}
        self.num_classes  = len(self.class_names)  # ★ one-hot 用

        # 各 npy ファイルのパスとラベルを列挙
        self.samples = []  # list of (path, label_idx)
        for class_name in self.class_names:
            class_dir = self.root_dir / class_name
            for npy_path in sorted(class_dir.glob("*.npy")):
                self.samples.append((npy_path, self.class_to_idx[class_name]))

        if not self.samples:
            raise RuntimeError(f"No .npy files found under {self.root_dir}")

        # シーケンス長を確認 (最初のサンプルでチェック)
        arr0 = np.load(self.samples[0][0])
        T, n_taxels, axes = arr0.shape
        assert axes == 3, f"Last dimension must be 3 (x,y,z), got {axes}"
        if self.seq_end > T:
            raise ValueError(
                f"seq_end({self.seq_end}) is larger than T({T}). "
                "Please adjust seq_start/seq_end."
            )

        self.original_T = T
        self.n_taxels   = n_taxels
        self.seq_len    = self.seq_end - self.seq_start
        self.feature_dim = self.n_taxels * 3  # x, y, z をまとめた次元

        print("=== TactileSequenceDataset initialized ===")
        print(f"root_dir     : {self.root_dir}")
        print(f"num_classes  : {self.num_classes}")
        print(f"num_samples  : {len(self.samples)}")
        print(f"original T   : {self.original_T}")
        print(f"seq range    : [{self.seq_start}, {self.seq_end}) -> seq_len={self.seq_len}")
        print(f"n_taxels     : {self.n_taxels}")
        print(f"feature_dim  : {self.feature_dim} (= n_taxels * 3)")
        print("class_to_idx : ", self.class_to_idx)
        print("=========================================")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        npy_path, label_idx = self.samples[idx]
        arr = np.load(npy_path)  # shape (T, n_taxels, 3)

        # 指定区間を切り出し
        seq = arr[self.seq_start:self.seq_end]  # (seq_len, n_taxels, 3)

        # (seq_len, n_taxels, 3) -> (seq_len, n_taxels * 3)
        seq = seq.reshape(self.seq_len, self.feature_dim)

        # torch.Tensor に変換 (seq_len, feature_dim)
        x = torch.from_numpy(seq).to(self.dtype)

        # ESN が期待する (batch, channel_size, seq_len, input_size) に合わせて
        # channel_size = 1 として (1, seq_len, feature_dim) に変換
        x = x.unsqueeze(0)  # (1, seq_len, feature_dim)

        # ラベルを one-hot ベクトルに変換
        label_idx_tensor = torch.tensor(label_idx, dtype=torch.long)
        y = F.one_hot(label_idx_tensor, num_classes=self.num_classes).to(self.dtype)
        # y.shape -> (num_classes,)  バッチ化後: (batch_size, num_classes)

        return x, y


In [4]:
class EchoStateNetwork(nn.Module):
    def __init__(self, model_params, dataset_params):
        super(EchoStateNetwork, self).__init__()
        
        self.reservoir_size = int(model_params["reservoir_size"])
        self.reservoir_weights_scale = float(model_params["reservoir_weights_scale"])
        
        self.input_size = int(model_params["input_size"])
        self.channel_size = int(model_params["channel_size"])
        self.input_weights_scale = float(model_params["input_weights_scale"])
        self.spectral_radius = float(model_params["spectral_radius"])
        self.density = float(model_params["reservoir_density"])
        self.leak_rate = float(model_params["leak_rate"])
        
        self.sequence_length = int(int(dataset_params["sequence_length"]) / int(dataset_params["slicing_size"]))
        # self.sequence_length = int(dataset_params["seq_end"]) - int(dataset_params["seq_start"])

        # リザバー結合行列 (ランダムに初期化)
        self.register_parameter("reservoir_weights", torch.nn.Parameter(torch.empty((self.reservoir_size, self.reservoir_size)).uniform_(-self.reservoir_weights_scale, self.reservoir_weights_scale).to(device), requires_grad=False))
        
        # リザバー入力行列 (ランダムに初期化)
        self.register_parameter("input_weights", torch.nn.Parameter(torch.empty((self.reservoir_size, self.input_size * self.channel_size)).uniform_(-self.input_weights_scale, self.input_weights_scale).to(device), requires_grad=False))
        
        

        #リザバー結合のスパース処理
        self.reservoir_weights_mask = torch.empty((self.reservoir_size, self.reservoir_size)).uniform_(0, 1)
        self.reservoir_weights_mask = torch.where(self.reservoir_weights_mask < self.density, torch.tensor(1), torch.tensor(0)).to(device)
        self.reservoir_weights *= self.reservoir_weights_mask
        
        #スペクトル半径の処理
        _, singular_values, _ = torch.svd(self.reservoir_weights)
        rho_reservoir_weights = torch.max(singular_values).item()
        self.reservoir_weights *= self.spectral_radius / rho_reservoir_weights
        
       #最終時刻における，リザバー状態ベクトル
        self.last_reservoir_state_matrix = torch.zeros(self.channel_size, self.reservoir_size).to(device)
    

    def forward(self, x):
        # print(x.shape)
        batch_size = x.size(0)
        sequence_length = x.size(2)
        # x_seq = x.view(batch_size, self.channel_size, self.input_size, sequence_length).to(device)

        # if self.last_reservoir_state_matrix is None or self.last_reservoir_state_matrix.size(0) != batch_size:
        #     self.last_reservoir_state_matrix = torch.zeros(
        #         batch_size, self.channel_size, self.reservoir_size, device=device
        #     )
        
        # 各シーケンスのバッチに対してリザバー状態を初期化
        self.reservoir_state_matrix = torch.zeros(batch_size, self.channel_size,  sequence_length, self.reservoir_size).to(device)

        self.last_reservoir_state_matrix = torch.zeros(
                batch_size, self.channel_size, self.reservoir_size, device=device
            )
        
        for t in range(sequence_length):
            input_at_t = torch.matmul(x[:, :, t, :].view(batch_size, -1), self.input_weights.t())
            input_at_t = input_at_t.unsqueeze(1)

            if t == 0:
                state_update = torch.matmul(self.last_reservoir_state_matrix, self.reservoir_weights)
            else:
                # print("11111")
                state_update = torch.matmul(self.reservoir_state_matrix[:, :, t-1, :], self.reservoir_weights)
                # print(f"state_update.shape {state_update.shape}")
            self.reservoir_state_matrix[:, :, t, :] = self.leak_rate * torch.tanh(input_at_t + state_update) + \
                                                    (1 - self.leak_rate) * self.reservoir_state_matrix[:, :, t-1, :]

        

        self.last_reservoir_state_matrix = self.reservoir_state_matrix[:, :, -1, :]
        return self.reservoir_state_matrix

    def reset_hidden_state(self):
        self.last_reservoir_state_matrix = torch.zeros(self.channel_size, self.reservoir_size, device=device)
        # pass
        # print("内部状態がリセットされました")
    
#リードアウト層
class ReadOut(nn.Module):
    def __init__(self, model_params, dataset_params):
        super(ReadOut, self).__init__()
        # self.reservoir_state_matrix_size = int(model_params["reservoir_size"]) + int(model_params["input_size"]) + 1
        self.reservoir_state_matrix_size = int(model_params["reservoir_size"])
        self.output_size = int(model_params["ReadOut_output_size"])
        self.batch_training = model_params["Batch_Training"]
        self.channel_size = int(model_params["channel_size"])
        
        self.sequence_length = int(int(dataset_params["sequence_length"]) / int(dataset_params["slicing_size"]))
        # self.sequence_length = int(dataset_params["seq_end"]) - int(dataset_params["seq_start"])
        
        self.readout_dense = nn.Linear(self.reservoir_state_matrix_size, self.output_size, bias=False)
        
        #線形回帰におけるバッチ学習を行うならば，リードアウト層を最急降下法による学習対象にしない
        if self.batch_training == True:
            self.readout_dense.weight.requires_grad = False
        else:
            None
            
        nn.init.xavier_uniform_(self.readout_dense.weight)
        
    def forward(self, x):
        # x shape: [batch_size, channel_size, input_size, sequence_length]
        # batch_size, channel_size, input_size, sequence_length = x.size()
        
        # Reshape x to apply the dense layer to each time step and channel independently
        # New shape: [batch_size * channel_size * sequence_length, input_size]
        # x_reshaped = x.permute(0, 1, 3, 2).contiguous().view(-1, input_size)
        
        # Apply the dense layer
        # Output shape: [batch_size * channel_size * sequence_length, output_size]
        # print(f"readout x shape{x.shape}")
        output = self.readout_dense(x)
        
        # Reshape the output back to the original form
        # Output shape: [batch_size, channel_size, output_size, sequence_length]
        # output = output.view(batch_size, channel_size, sequence_length, -1).permute(0, 1, 3, 2).contiguous()
        return output
    
    # リッジ回帰によるリードアウトの導出
    @staticmethod
    def ridge_regression(X, Y, alpha):
        # データ行列 X の形状を取得
        n, p = X.shape

        # 正則化項の行列を作成
        ridge_matrix = (alpha * torch.eye(n)).float().to(device)
        X = X.float()
        Y = Y.float()

        # リッジ回帰の係数を計算
        coefficients = torch.linalg.solve(torch.matmul(X, X.T) + ridge_matrix, torch.matmul(X, Y.T)).T

        return coefficients

    @staticmethod
    def ridge_regression_update(outputs, targets, model, alpha=0):
        with torch.no_grad():
            # リッジ回帰を用いて重みを求める
            # モデルのパラメータを更新
            new_weights = ReadOut.ridge_regression(outputs.squeeze(), targets.squeeze(), alpha)

            # モデルのパラメータを更新
            model.ReadOut.readout_dense.weight.copy_(new_weights)
        
        return None
    #それぞれのモデルパラメータ候補を辞書に格納する
    @staticmethod
    def model_params_candinate(model_params):
        model_params_combinations = list(itertools.product(*model_params.values()))
        param_dicts = [dict(zip(model_params.keys(), combination)) for combination in model_params_combinations]
        
        return param_dicts

    #モデル構造を辞書型に格納
    @staticmethod
    def model_sturcture_dict(model):
        layers_dict = {}
        for name, module in model.named_modules():
            layers_dict[name] = {
                'type': type(module).__name__,
                'parameters': {p: getattr(module, p) for p in module.__dict__ if not p.startswith('_')}
            }
        
        #モデル名と初期の引数は削除
        del(layers_dict[''])
        
        return layers_dict

class ESN(nn.Module):
    def __init__(self, model_params, training_params, dataset_params):
        super(ESN, self).__init__()
        self.ESN = EchoStateNetwork(model_params, dataset_params)
        self.ReadOut = ReadOut(model_params, dataset_params)
    
    def forward(self, x):
        self.Reservoir_State_Matrix = self.ESN(x)
        # print(f"resever state matrix {self.Reservoir_State_Matrix.shape}")
        self.ReadOut_Reservoir = self.ReadOut(self.Reservoir_State_Matrix)
        # print("重み:", self.ReadOut.readout_dense.weight)
        # print(self.ReadOut_Reservoir)

        #channle_size次元はいらないので，減らす
        self.ReadOut_Reservoir = self.ReadOut_Reservoir.squeeze(1)
        # print(f"output matrix {self.ReadOut_Reservoir.shape}")

        return self.ReadOut_Reservoir

    def reset_hidden_state(self):
        self.ESN.reset_hidden_state()

In [5]:
# K分割交差検証とデータローダーの生成
def create_cross_validation_dataloaders(dataset, dataset_params, traing_params):
    n_splits = int(traing_params["n_splits"])
    batch_size = int(dataset_params["batch_size"])
    augmentation_factor = int(dataset_params["augmentation_factor"])
    
    kf = KFold(n_splits=n_splits)
    split_datasets = []

    for train_idx, val_idx in kf.split(range(len(dataset))):
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)
        
        # 学習用サブセットにのみ拡張を適用
        # train_subset = AugmentedDataset(train_subset, dataset_params)

        train_loader = DataLoader(train_subset, len(train_subset), shuffle=True)
        val_loader = DataLoader(val_subset, len(val_subset), shuffle=False)

        split_datasets.append((train_loader, val_loader))
    
    return split_datasets

def prepare_datasets(dataset_params, traing_params, data_dir):
    testdata_ratio = float(traing_params["testdata_ratio"])
    batch_size = int(dataset_params["batch_size"])
    
    # データセットの準備
    dataset = TactileSequenceDataset(data_dir, dataset_params)  # 拡張はここでは適用しない

    # データセットを学習用、テスト用に分割する
    test_size = int(len(dataset) * testdata_ratio)
    train_size = len(dataset) - test_size
    crossval_dataset, test_dataset = random_split(dataset, [train_size, test_size])

    # 交差検証のためのデータローダーを準備
    cross_val_loaders = create_cross_validation_dataloaders(crossval_dataset, dataset_params, traing_params)

    # テストデータローダーの準備
    test_loader = DataLoader(test_dataset, batch_size, shuffle=False)

    return cross_val_loaders, test_loader

In [6]:
def train_model(model, criterion, optimizer, train_loader, dataset_params, model_params, traing_params):
    model.train()  # Set model to training mode
    num_epochs = int(traing_params["num_epochs"])   
    reservoir_size = int(model_params["reservoir_size"])   
    input_size = int(model_params["input_size"])
    ReadOut_output_size = int(model_params["ReadOut_output_size"])
    sequence_length = int(int(dataset_params["sequence_length"]) / int(dataset_params["slicing_size"]))
    batch_training = model_params["Batch_Training"]
    Regularization_L2 = model_params["Regularization_L2"]

    # ★ 時間方向ストライド（無指定なら 1）
    time_stride = int(dataset_params.get("time_stride", 1))

    for epoch in range(num_epochs):
        running_loss = 0.
        for inputs, labels in train_loader:
            inputs = inputs.float().to(device)
            labels = labels.to(device)
            optimizer.zero_grad()

            # ESN & Readout
            outputs_ESN = model.ESN(inputs)          # 例: [B, 1, T, H] or [B, T, H]
            outputs = model.ReadOut(outputs_ESN)     # 例: [B, 1, T, C] or [B, T, C]
            outputs = outputs.squeeze()              # 期待形状: [B, T, C]

            # ===== 形状を統一: reservoir_states -> [B, T, H] =====
            if outputs_ESN.dim() == 4:
                # [B, 1, T, H] を想定
                B, ch, T_full, H = outputs_ESN.shape
                reservoir_states = outputs_ESN[:, 0]        # [B, T_full, H]
            elif outputs_ESN.dim() == 3:
                # [B, T, H]
                B, T_full, H = outputs_ESN.shape
                reservoir_states = outputs_ESN
            else:
                raise ValueError(f"Unexpected ESN output shape: {outputs_ESN.shape}")

            if outputs.dim() != 3:
                raise ValueError(f"Unexpected ReadOut output shape: {outputs.shape}")

            _, T_out, C = outputs.shape
            assert T_out == T_full, "ESN と ReadOut の時間長が一致していません"

            # ===== 時間方向のストライドサンプリング =====
            if time_stride > 1:
                reservoir_states = reservoir_states[:, ::time_stride, :]  # [B, T_eff, H]
                outputs = outputs[:, ::time_stride, :]                    # [B, T_eff, C]

            B_eff, T_eff, H_eff = reservoir_states.shape
            _, T_eff_out, C = outputs.shape
            assert T_eff == T_eff_out

            # ===== flatten の整合性を取る =====
            # リザバー: [B, T_eff, H] -> [H, B*T_eff]
            outputs_ESN_flatten = (
                reservoir_states.permute(2, 0, 1).contiguous().view(H_eff, -1)
            )

            # 出力: [B, T_eff, C] -> [C, B*T_eff]
            outputs_flatten = (
                outputs.permute(2, 0, 1).contiguous().view(C, -1)
            )

            # ラベル: [B, C] を T_eff 回繰り返し → [B, T_eff, C] → [C, B*T_eff]
            labels_rep = labels.unsqueeze(1).repeat(1, T_eff, 1)          # [B, T_eff, C]
            labels_flatten = (
                labels_rep.permute(2, 0, 1).contiguous().view(C, -1)      # [C, B*T_eff]
            )

            loss = criterion(outputs_flatten, labels_flatten)
            
            if batch_training == True:
                # ★ 時間ストライド後のリザバー状態でリッジ更新
                model.ReadOut.ridge_regression_update(
                    outputs_ESN_flatten, labels_flatten, model, Regularization_L2
                )
            else:
                loss.backward()
                optimizer.step()
            
            running_loss += loss.item() 

        epoch_loss = running_loss / len(train_loader.dataset)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
        wandb.log({"loss": epoch_loss})
    
    print('Training complete')



def validate_model(model, val_loader, dataset_params, model_params):
    model.eval()  # Set model to evaluation mode
    val_running_loss = 0.0

    reservoir_size = int(model_params["reservoir_size"])   
    input_size = int(model_params["input_size"])
    ReadOut_output_size = int(model_params["ReadOut_output_size"])
    sequence_length = int(int(dataset_params["sequence_length"]) / int(dataset_params["slicing_size"]))

    # ★ 学習時と同じ time_stride を使う
    time_stride = int(dataset_params.get("time_stride", 1))

    last_outputs_flatten = None
    last_labels_flatten = None
    last_T_eff = None

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.float().to(device)
            labels = labels.to(device)

            outputs_ESN = model.ESN(inputs)              # [B, 1, T, H] or [B, T, H]
            outputs = model.ReadOut(outputs_ESN).squeeze(1)  # [B, T, C]

            # ===== 形状を統一: reservoir_states -> [B, T, H] =====
            if outputs_ESN.dim() == 4:
                B, ch, T_full, H = outputs_ESN.shape
                reservoir_states = outputs_ESN[:, 0]        # [B, T_full, H]
            elif outputs_ESN.dim() == 3:
                B, T_full, H = outputs_ESN.shape
                reservoir_states = outputs_ESN
            else:
                raise ValueError(f"Unexpected ESN output shape: {outputs_ESN.shape}")

            if outputs.dim() != 3:
                raise ValueError(f"Unexpected ReadOut output shape: {outputs.shape}")

            _, T_out, C = outputs.shape
            assert T_out == T_full

            # ===== 時間方向のストライドサンプリング =====
            if time_stride > 1:
                reservoir_states = reservoir_states[:, ::time_stride, :]
                outputs = outputs[:, ::time_stride, :]

            B_eff, T_eff, H_eff = reservoir_states.shape
            _, T_eff_out, C = outputs.shape
            assert T_eff == T_eff_out
            last_T_eff = T_eff  # compute_accuracy 用に覚えておく

            # ===== flatten =====
            outputs_flatten = (
                outputs.permute(2, 0, 1).contiguous().view(C, -1)
            )
            labels_rep = labels.unsqueeze(1).repeat(1, T_eff, 1)
            labels_flatten = (
                labels_rep.permute(2, 0, 1).contiguous().view(C, -1)
            )

            loss = criterion(outputs_flatten, labels_flatten)
            val_running_loss += loss.item() * B_eff

            last_outputs_flatten = outputs_flatten
            last_labels_flatten = labels_flatten

    val_loss = val_running_loss / len(val_loader.dataset)

    # ★ compute_accuracy にはストライド後の T_eff を渡す
    if last_outputs_flatten is not None and last_labels_flatten is not None and last_T_eff is not None:
        val_accuracy = compute_accuracy(last_outputs_flatten, last_labels_flatten, last_T_eff)
    else:
        val_accuracy = 0.0

    print(f"Accuracy: {val_accuracy * 100:.2f}%")
    print(f'Validation Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}')
    wandb.log({"val_accuracy": val_accuracy, "val_loss": val_loss})

    
def test_model(model, val_loader):
    model.eval()  # Set model to evaluation mode
    val_running_loss = 0.0
    val_running_corrects = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.float().to(device)
            labels = labels.squeeze().to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            _, preds = torch.max(outputs, 1)
#             _, label_preds = torch.max(labels, 1)
            label_preds = labels

            val_running_loss += loss.item() * inputs.size(0)
            print(loss.item())
            val_running_corrects += torch.sum(preds == label_preds)

    val_loss = val_running_loss / len(val_loader.dataset)
    val_accuracy = val_running_corrects.double() / len(val_loader.dataset)
    
    print(f'Test Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}')
    wandb.log({"test_accuracy": val_accuracy, "test_loss": val_loss})

def compute_accuracy(model_output, target, n_taus):
    # モデルの出力と教師ラベルを各データに分割
    split_model_output = torch.split(model_output.squeeze(), n_taus, dim=-1)
    split_target = torch.split(target.squeeze(), n_taus, dim=-1)
    # print("aaaa")
    # print(split_model_output[0].shape)
    # print(split_target[0].shape)
    correct = 0
    total = 0
    
    for pred, true_label in zip(split_model_output, split_target):
        # 最も確率が高いラベルを予測ラベルとして取得
        # print(pred.shape)
        # print(pred)
        # print(true_label.shape)
        # print(true_label)
        count_ones = (true_label == 1).sum().item()
        # print(count_ones)
        histgram_predict = torch.bincount(torch.max(pred, 0)[1])
        _, predicted = torch.max(histgram_predict, 0)
    
        histgram_true_label_idx = torch.bincount(torch.max(true_label, 0)[1])
        _, true_label_idx = torch.max(histgram_true_label_idx, 0)

        # 正解数をカウント
        correct += (predicted == true_label_idx).sum().item()
        total += 1

    # 精度を算出
    accuracy = correct / total
    return accuracy

def model_params_candinate(model_params):
    model_params_combinations = list(itertools.product(*model_params.values()))
    param_dicts = [dict(zip(model_params.keys(), combination)) for combination in model_params_combinations]
    return param_dicts

# モデル構造を辞書型に格納
def model_sturcture_dict(model):
    layers_dict = {}
    for name, module in model.named_modules():
        layers_dict[name] = {
            'type': type(module).__name__,
            'parameters': {p: getattr(module, p) for p in module.__dict__ if not p.startswith('_')}
        }
    # モデル名と初期の引数は削除
    del(layers_dict[''])
    return layers_dict
    

In [17]:


#各種のパラメータ設定
dataset_params = {"seq_start" : 400, "seq_end" : 1200, "sequence_length": 800, "slicing_size" : 1, "augmentation_factor": 0, "batch_size" : 32, "Onehot_Encoding" : None, "augmentation_mu" : 0, "augmentation_sigma" : 0, "augmentation_shift" : 1, "time_stride":1}
model_params = {"reservoir_size" : [300],"input_size" : [48], "channel_size" : [1],  "reservoir_weights_scale" : [1], "input_weights_scale" : [10000], "spectral_radius" : [1.5],"reservoir_density" : [0.02], "leak_rate" : [0.1], "Batch_Training" : [True], "ReadOut_output_size" : [25], "Regularization_L2" : [0.01]}
training_params = {"num_epochs" : 1, "learning_rate" : 0.01, "weight_decay" : 1e-2, "testdata_ratio" : 0, "n_splits" : 5}

#それぞれのモデルパラメータ候補を辞書に格納する
model_params = model_params_candinate(model_params)

#学習データセットの設定
data_dir="./All_materials/"
cross_val_loaders, test_loader = prepare_datasets(dataset_params, training_params, data_dir)

# wandb.init(project="uskin_test_ESN", config=config_dictionary)

#モデルパラメータの候補ごとに，総当たりしてパラメータを探索する

for each_model_params in model_params:
    
#     model = LSTMModel(each_model_params).to(device)
    model = ESN(each_model_params, training_params, dataset_params).to(device)
    model_sturcture = model_sturcture_dict(model)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr = float(training_params["learning_rate"]), weight_decay = float(training_params["weight_decay"]))
    
    config_dictionary = {
    "dataset": data_dir,
    "dataset_params" : dataset_params,
    "architecture": model.__class__.__name__,
    "model_params" : each_model_params,
    "model_sturcture" : model_sturcture,
    "traing_params" : training_params,
    "criterion" : str(criterion),
    "optimizer" : str(optimizer),
    }

    wandb.init(project="uskin_test_ESN", config=config_dictionary)

    # 4. k-fold交差検証のループ
    for fold, (train_loader, val_loader) in enumerate(cross_val_loaders):
        print(f'FOLD {fold}')
        print('--------------------------------')

        train_model(model, criterion, optimizer, train_loader, dataset_params, each_model_params, training_params)
        validate_model(model, val_loader,dataset_params, each_model_params)
        model.__init__(each_model_params, training_params, dataset_params)

    print('CrossVaridation Finished')
    print('--------------------------------')
    #テストデータによる評価
    # テストデータローダーの準備

#     test_model(model, test_loader)
    
    wandb.finish()

=== TactileSequenceDataset initialized ===
root_dir     : All_materials
num_classes  : 25
num_samples  : 250
original T   : 1255
seq range    : [400, 1200) -> seq_len=800
n_taxels     : 16
feature_dim  : 48 (= n_taxels * 3)
class_to_idx :  {'01_table_cover': 0, '02_fur_scarf': 1, '03_washing_towel': 2, '04_carpet1': 3, '05_bubble_wrap': 4, '06_fleece_scarf': 5, '07_knit_hat1': 6, '08_body_towel1': 7, '09_body_towel2': 8, '10_carpet2': 9, '11_work_gloves': 10, '12_knit_hat2': 11, '13_toilet_mat1': 12, '14_floor_mat': 13, '15_sponge1': 14, '16_printed_tatami': 15, '17_cushion1': 16, '18_mop': 17, '19_toilet_mat2': 18, '20_fleece_sock': 19, '21_cushion': 20, '22_carpet3': 21, '23_fleece_mat': 22, '24_carpet4': 23, '25_sponge2': 24}


FOLD 0
--------------------------------
Epoch 1/1, Loss: 0.0095
Training complete
Accuracy: 66.00%
Validation Loss: 0.0271, Accuracy: 0.6600
FOLD 1
--------------------------------
Epoch 1/1, Loss: 0.0091
Training complete
Accuracy: 86.00%
Validation Loss: 0.0242, Accuracy: 0.8600
FOLD 2
--------------------------------
Epoch 1/1, Loss: 0.0103
Training complete
Accuracy: 82.00%
Validation Loss: 0.0238, Accuracy: 0.8200
FOLD 3
--------------------------------
Epoch 1/1, Loss: 0.0084
Training complete
Accuracy: 70.00%
Validation Loss: 0.0251, Accuracy: 0.7000
FOLD 4
--------------------------------
Epoch 1/1, Loss: 0.0152
Training complete
Accuracy: 88.00%
Validation Loss: 0.0235, Accuracy: 0.8800
CrossVaridation Finished
--------------------------------


VBox(children=(Label(value='0.001 MB of 0.001 MB uploaded\r'), FloatProgress(value=1.0, max=1.0)))

0,1
loss,▂▂▃▁█
val_accuracy,▁▇▆▂█
val_loss,█▂▁▄▁

0,1
loss,0.01522
val_accuracy,0.88
val_loss,0.02354


In [8]:
def print_trainable_parameters(model: nn.Module):
    print("\n===== Model Parameters =====")
    for name, param in model.named_parameters():
        print(
            f"{name:40s} | shape={tuple(param.shape)} | requires_grad={param.requires_grad}"
        )

def print_esn_layer_details(model: ESN):
    print("\n===== ESN Layer Details =====")

    # --- Echo State Network ---
    esn = model.ESN
    print("\n--- EchoStateNetwork ---")
    print(f"reservoir_size        : {esn.reservoir_size}")
    print(f"input_size            : {esn.input_size}")
    print(f"channel_size          : {esn.channel_size}")
    print(f"sequence_length       : {esn.sequence_length}")
    print(f"leak_rate             : {esn.leak_rate}")
    print(f"spectral_radius       : {esn.spectral_radius}")
    print(f"reservoir_density     : {esn.density}")
    print(f"input_weights_scale   : {esn.input_weights_scale}")
    print(f"reservoir_weights_scale: {esn.reservoir_weights_scale}")

    print("\n[Reservoir Weights]")
    print(f"shape         : {tuple(esn.reservoir_weights.shape)}")
    print(f"requires_grad : {esn.reservoir_weights.requires_grad}")

    print("\n[Input Weights]")
    print(f"shape         : {tuple(esn.input_weights.shape)}")
    print(f"requires_grad : {esn.input_weights.requires_grad}")

    print("\n[Last Reservoir State]")
    print(f"shape         : {tuple(esn.last_reservoir_state_matrix.shape)}")

    # --- ReadOut ---
    readout = model.ReadOut
    print("\n--- ReadOut ---")
    print(f"reservoir_state_matrix_size : {readout.reservoir_state_matrix_size}")
    print(f"output_size                 : {readout.output_size}")
    print(f"channel_size                : {readout.channel_size}")
    print(f"sequence_length             : {readout.sequence_length}")
    print(f"batch_training              : {readout.batch_training}")

    print("\n[ReadOut Dense Layer]")
    print(f"weight shape    : {tuple(readout.readout_dense.weight.shape)}")
    print(f"requires_grad   : {readout.readout_dense.weight.requires_grad}")
    print(f"bias            : {readout.readout_dense.bias}")

def print_model_structure(model: nn.Module):
    print("===== Model Structure (Hierarchy) =====")
    for name, module in model.named_modules():
        if name == "":
            continue
        print(f"[{name}] -> {module.__class__.__name__}")


In [9]:
model = ESN(each_model_params, training_params, dataset_params).to(device)
print_model_structure(model)
print_esn_layer_details(model)

===== Model Structure (Hierarchy) =====
[ESN] -> EchoStateNetwork
[ReadOut] -> ReadOut
[ReadOut.readout_dense] -> Linear

===== ESN Layer Details =====

--- EchoStateNetwork ---
reservoir_size        : 100
input_size            : 48
channel_size          : 1
sequence_length       : 800
leak_rate             : 0.7
spectral_radius       : 0.9
reservoir_density     : 0.02
input_weights_scale   : 0.5
reservoir_weights_scale: 1.0

[Reservoir Weights]
shape         : (100, 100)
requires_grad : False

[Input Weights]
shape         : (100, 48)
requires_grad : False

[Last Reservoir State]
shape         : (1, 100)

--- ReadOut ---
reservoir_state_matrix_size : 100
output_size                 : 25
channel_size                : 1
sequence_length             : 800
batch_training              : True

[ReadOut Dense Layer]
weight shape    : (25, 100)
requires_grad   : False
bias            : None
