In [1]:
import torch
import torch.nn as nn

# 恒等写像
def identity(x):
    return x # pytorchのテンソルをそのまま処理可能

# 入力層
class Input(nn.Module):
    def __init__(self, N_u, N_x, input_scale, seed=0):
        '''
        param N_u: 入力次元
        param N_x: リザバーのノード数
        param input_scale: 入力スケーリング
        '''
        super(Input, self).__init__()
        # 一様分布に従う乱数
        torch.manual_seed(seed)
        self.Win = torch.empty((N_x, N_u)).uniform_(-input_scale, input_scale)
        self.Win.requires_grad = False

        return self.Win




In [11]:
import torch
import torch.nn as nn
import numpy as np

class Reservoir(nn.Module):
    def __init__(self, N_x, density, rho, activation_func, leaking_rate, seed):
        '''
        param N_x: リザバーのノード数
        param density: ネットワークの結合密度
        param rho: リカレント結合重み行列のスペクトル半径
        param activation_func: ノードの活性化関数 (torch.nn.functional)
        param leaking_rate: leaky integratorモデルのリーク率
        param seed: 乱数の種
        '''
        super(Reservoir, self).__init__()
        self.seed = seed
        self.N_x = N_x
        self.W = self.make_connection_pytorch(N_x, density, rho, seed)
        self.W = nn.Parameter(self.W)  # Wをnn.Parameterとしてラップ
        self.x = torch.zeros(N_x, dtype=torch.float32)  # リザバー状態ベクトルの初期化
        self.activation_func = activation_func
        self.alpha = leaking_rate
        self.requires_grad = False  # リザバーの重みは訓練しない

    @staticmethod
    def make_connection_pytorch(N_x, density, rho, seed):
        torch.manual_seed(seed)

        # ランダムな接続行列の生成
        connection = torch.bernoulli(torch.full((N_x, N_x), density))
        rec_scale = 1.0
        W = connection * (torch.rand((N_x, N_x)) * 2 * rec_scale - rec_scale)

        # Wが空でないことを確認
        if W.numel() == 0:
            raise ValueError("Weight matrix W is empty. Please check the matrix generation process.")

        # スペクトル半径を計算
        eigenvalues = torch.linalg.eigvals(W)
        if eigenvalues.numel() == 0:
            raise ValueError("Eigenvalues are empty. Check the matrix W.")

        sp_radius = torch.max(torch.abs(eigenvalues.real))

        # スペクトル半径をrhoにスケーリング
        W *= rho / sp_radius

        return W

    def forward(self, x_in):
      '''
      param x: N_x次元のベクトル (torch.Tensor)
      return: N_x次元のベクトル (torch.Tensor)
      '''
      # グラフを切り離す
      self.x = (1.0 - self.alpha) * self.x.detach() + self.alpha * self.activation_func(
          torch.matmul(self.W, self.x) + x_in
      )
      return self.x


In [3]:
import torch

class Output(nn.Module):
    def __init__(self, N_x, N_y, seed=0):
        '''
        param N_x: リザバーのノード数
        param N_y: 出力次元
        param seed: 乱数の種
        '''
        super(Output, self).__init__()
        # 正規分布に従う乱数で初期化
        torch.manual_seed(seed)
        self.Wout = torch.randn(N_y, N_x, dtype=torch.float32)

    def forward(self, x):
        '''
        param x: N_x次元のベクトル (torch.Tensor)
        return: N_y次元のベクトル (torch.Tensor)
        '''
        return torch.matmul(self.Wout, x)

    def setweight(self, Wout_opt):
        '''
        学習済みの出力結合重み行列を設定
        param Wout_opt: 新しい重み行列 (torch.Tensor)
        '''
        if not isinstance(Wout_opt, torch.Tensor):
            raise ValueError("Wout_opt must be a torch.Tensor.")
        self.Wout = Wout_opt


In [4]:
import torch

class Feedback:
    def __init__(self, N_y, N_x, fb_scale, seed=0):
        '''
        param N_y: 出力次元
        param N_x: リザバーのノード数
        param fb_scale: フィードバックスケーリング
        param seed: 乱数の種
        '''
        # 一様分布に従う乱数で初期化
        torch.manual_seed(seed)
        self.Wfb = (torch.rand(N_x, N_y, dtype=torch.float32) * 2 * fb_scale - fb_scale)

    def __call__(self, y):
        '''
        param y: N_y次元のベクトル (torch.Tensor)
        return: N_x次元のベクトル (torch.Tensor)
        '''
        return torch.matmul(self.Wfb, y)


In [12]:
class ESN(nn.Module):
    def __init__(self, N_u, N_y, N_x, density=0.05, input_scale=1.0, rho=0.95, activation_func=torch.tanh, fb_scale=None, fb_seed=0, noise_level=None, leaking_rate=0.9, output_func=None, classification=False, average_window=None, seed = 40):
        super(ESN, self).__init__()

        # モジュールの初期化
        self.Input = Input(N_u, N_x, input_scale)
        self.Reservoir = Reservoir(N_x, density, rho, activation_func, leaking_rate, seed)
        self.Output = Output(N_x, N_y)
        self.N_u = N_u
        self.N_y = N_y
        self.N_x = N_x
        self.output_func = output_func if output_func is not None else lambda x: x
        self.classification = classification
        self.alpha = leaking_rate
        self.Input.requires_grad = False
        self.Reservoir.requires_grad = False

        # 出力層からのリザバーへのフィードバックの有無
        if fb_scale is None:
            self.Feedback = None
        else:
            self.Feedback = Feedback(N_y, N_x, fb_scale, fb_seed)

        # リザバーの状態更新にノイズを加えるか
        if noise_level is None:
            self.noise = None
        else:
            self.noise = torch.rand(N_x, 1) * noise_level * 2 - noise_level  # -noise_level to +noise_level

        # 分類問題の場合の設定
        if classification:
            if average_window is None:
                raise ValueError('Window for time average is not given!')
            else:
                self.window = torch.zeros((average_window, self.N_x))

        # リザバー状態の初期化
        self.reset_reservoir_state()

    def forward(self, u):
        '''
        param u: N_u次元のベクトル (torch.Tensor)
        return: N_y次元のベクトル (torch.Tensor)
        '''
        train_len = len(u)
        Y = []

        for n in range(train_len):
          x_in = self.Input(u[n])
        # リザバー状態ベクトル
          x = self.Reservoir(x_in)

          y = self.Output(x)
          Y.append(self.output_func(y))

        # フィードバックがある場合の処理
        if self.Feedback is not None:
            self.x += self.Feedback(self.x)

        # ノイズの追加
        if self.noise is not None:
            self.x += self.noise

        # 学習前のモデル出力

        return torch.stack(Y)

    def reset_reservoir_state(self):
        '''リザバー状態ベクトルの初期化'''
        self.x = torch.zeros(self.N_x)

In [13]:
 # test
# 複雑な波形データを生成（複数のサイン波を合成）
# 複雑な波形データを生成（複数のサイン波を合成）
def generate_complex_wave(seq_length=100, n_features=1, freqs=[0.1, 0.3, 0.5], amplitudes=[1, 0.5, 0.2]):
    t = np.linspace(0, seq_length * np.pi, seq_length)
    data = np.zeros_like(t)

    # 複数の周波数成分のサイン波を合成
    for f, a in zip(freqs, amplitudes):
        data += a * np.sin(t * f)

    return data.reshape(-1, n_features)

import matplotlib.pyplot as plt
import torch.optim as optim
import numpy as np
import torch.nn.functional as F

# 訓練データの準備
# seq_length = 50
# n_features = 1  # 単一の特徴量（単一のチャネル）
# freqs = [0.1, 0.3, 0.5]
# amplitudes = [1, 0.5, 0.2]
# train_data = torch.tensor(generate_complex_wave(500, n_features=1), dtype=torch.float32)
# train_data = generate_complex_wave(seq_length=seq_length, n_features=n_features, freqs=freqs, amplitudes=amplitudes)
# train_data = torch.tensor(train_data, dtype=torch.float32)
# target_data = torch.tensor(generate_complex_wave(seq_length=seq_length, n_features=n_features, freqs=freqs, amplitudes=amplitudes), dtype=torch.float32)
# target_data = torch.tensor(target_data, dtype=torch.float32)



# 訓練データの準備（修正なしの部分）
seq_length = 50
n_features = 1
freqs = [0.1, 0.3, 0.5]
amplitudes = [1, 0.5, 0.2]

train_data = torch.tensor(generate_complex_wave(seq_length=seq_length, n_features=n_features, freqs=freqs, amplitudes=amplitudes), dtype=torch.float32)
target_data = train_data.clone().detach()  # 修正: tensorのコピーを使用

# モデル初期化
N_u, N_y, N_x = 1, 1, 200
esn_model = ESN(N_u, N_y, N_x)

# 最適化設定
criterion = nn.MSELoss()
optimizer = optim.Adam(esn_model.parameters(), lr=1e-3)

# 訓練ループ
train_len = len(train_data)
for epoch in range(10):  # エポック数を指定
    esn_model.reset_reservoir_state()  # リザバー状態をリセット
    optimizer.zero_grad()  # 勾配を初期化

    # モデルの順伝播
    predictions = esn_model(train_data)

    # ロス計算
    loss = criterion(predictions, target_data)

    # 誤差逆伝播
    loss.backward()  # retain_graph=Trueを外す
    optimizer.step()

    print(f"Epoch {epoch+1}, Loss: {loss.item()}")


# 訓練後のモデルで予測
model.reset_reservoir_state()
predicted_output = model(train_data).detach().numpy()
predicted_output = predicted_output.transpose(0,1)
print(len(predicted_output))

# 結果の描画
plt.plot(train_data.numpy(), label='True Waveform')
plt.plot(predicted_output, label='Predicted Waveform', linestyle='--')
plt.legend()
plt.title('True vs Predicted Waveform')
plt.xlabel('Time Steps')
plt.ylabel('Amplitude')
plt.show()


Epoch 1, Loss: 748.73876953125


  File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.10/dist-packages/colab_kernel_launcher.py", line 37, in <module>
    ColabKernelApp.launch_instance()
  File "/usr/local/lib/python3.10/dist-packages/traitlets/config/application.py", line 992, in launch_instance
    app.start()
  File "/usr/local/lib/python3.10/dist-packages/ipykernel/kernelapp.py", line 619, in start
    self.io_loop.start()
  File "/usr/local/lib/python3.10/dist-packages/tornado/platform/asyncio.py", line 195, in start
    self.asyncio_loop.run_forever()
  File "/usr/lib/python3.10/asyncio/base_events.py", line 603, in run_forever
    self._run_once()
  File "/usr/lib/python3.10/asyncio/base_events.py", line 1909, in _run_once
    handle._run()
  File "/usr/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(s

RuntimeError: Trying to backward through the graph a second time (or directly access saved tensors after they have already been freed). Saved intermediate values of the graph are freed when you call .backward() or autograd.grad(). Specify retain_graph=True if you need to backward through the graph a second time or if you need to access saved tensors after calling backward.