In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

# GPUの使用可否を確認
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [18]:
# 1000日のVIXデータを仮定
np.random.seed(42)
data = np.random.rand(1000, 1)  # VIXサンプルデータ

# 時間ウィンドウのサイズを定義
window_size = 12

# 入力シーケンスxとターゲットyを生成
x = []
y = []

for i in range(len(data) - window_size):
    x.append(data[i:i + window_size])
    y.append(data[i + window_size, 0])  # VIXが唯一の特徴量

x = np.array(x)
y = np.array(y)

# データセットを時間順に分割
train_size = int(0.7 * len(x))
val_size = int(0.2 * len(x))

X_train, y_train = x[:train_size], y[:train_size]
X_val, y_val = x[train_size:train_size + val_size], y[train_size:train_size + val_size]
X_test, y_test = x[train_size + val_size:], y[train_size + val_size:]

In [19]:
# Time2Vecモジュールを定義
class Time2Vec(nn.Module):
    def __init__(self, kernel_size):
        super(Time2Vec, self).__init__()
        self.k = kernel_size
        self.w = nn.Parameter(torch.randn(kernel_size))
        self.b = nn.Parameter(torch.randn(kernel_size))
        self.w0 = nn.Parameter(torch.randn(1))
        self.b0 = nn.Parameter(torch.randn(1))

    def forward(self, x):
        v1 = self.w0 * x + self.b0  # 線形部分
        v2 = torch.sin(self.w * x + self.b)  # 正弦部分
        return torch.cat([v1, v2], dim=-1)  # 線形部分と正弦部分を連結

# Time2Vecを初期化し、データに適用
kernel_size = 2
time2vec = Time2Vec(kernel_size=kernel_size).to(device)

def apply_time2vec(x):
    batch_size, seq_len, num_features = x.shape
    time_features = time2vec(torch.tensor(x, dtype=torch.float32).to(device)).detach()
    return torch.cat([torch.tensor(x, dtype=torch.float32).to(device), time_features], dim=-1)

X_train = apply_time2vec(X_train)
X_val = apply_time2vec(X_val)
X_test = apply_time2vec(X_test)

# カスタムモデルを定義
class CustomModel(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_layers, lstm_hidden_size, dropout=0.1):
        super(CustomModel, self).__init__()
        self.input_projection = nn.Linear(input_dim, d_model)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead)
        self.lstm = nn.LSTM(d_model, lstm_hidden_size, batch_first=True)
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        self.dropout = nn.Dropout(dropout)
        self.fc1 = nn.Linear(lstm_hidden_size, 128)
        self.fc2 = nn.Linear(128, 1)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(lstm_hidden_size)

    def forward(self, x):
        # 入力プロジェクション
        x = self.input_projection(x)
        x = x.permute(1, 0, 2)  # 次元を調整してMultiheadAttentionの入力に合わせる
        # MultiheadAttention
        attn_output, _ = self.multihead_attn(x, x, x)
        x = x + attn_output
        x = self.layer_norm1(x)
        x = x.permute(1, 0, 2)  # 次元を元の形式に戻す
        # LSTM層
        lstm_output, _ = self.lstm(x)
        lstm_output = self.layer_norm2(lstm_output)
        # グローバル平均プーリング
        avg_pool_output = self.global_avg_pool(lstm_output.permute(0, 2, 1)).squeeze(-1)
        # 全結合層とDropout
        x = self.dropout(nn.ReLU()(self.fc1(avg_pool_output)))
        output = self.fc2(x)
        return output



In [21]:
input_dim = X_train.shape[-1]
d_model = 64
nhead = 8
num_layers = 3
lstm_hidden_size = 256
dropout = 0.1

model = CustomModel(input_dim, d_model, nhead, num_layers, lstm_hidden_size, dropout).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# データローダーを作成
batch_size = 32
train_loader = DataLoader(TensorDataset(X_train, torch.tensor(y_train, dtype=torch.float32).to(device)), batch_size=batch_size, shuffle=False)
val_loader = DataLoader(TensorDataset(X_val, torch.tensor(y_val, dtype=torch.float32).to(device)), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test, torch.tensor(y_test, dtype=torch.float32).to(device)), batch_size=batch_size, shuffle=False)

# モデルを訓練
num_epochs = 100
best_val_loss = float('inf')
best_model_wts = model.state_dict()

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for x_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(x_batch)
        loss = criterion(output, y_batch.unsqueeze(1))
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * x_batch.size(0)
    train_loss /= len(train_loader.dataset)

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for x_batch, y_batch in val_loader:
            output = model(x_batch)
            loss = criterion(output, y_batch.unsqueeze(1))
            val_loss += loss.item() * x_batch.size(0)
    val_loss /= len(val_loader.dataset)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model_wts = model.state_dict()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

model.load_state_dict(best_model_wts)

Epoch [10/100], Train Loss: 0.0887, Val Loss: 0.0892
Epoch [20/100], Train Loss: 0.0947, Val Loss: 0.0842
Epoch [30/100], Train Loss: 0.0928, Val Loss: 0.0847
Epoch [40/100], Train Loss: 0.0929, Val Loss: 0.0844
Epoch [50/100], Train Loss: 0.0903, Val Loss: 0.0841
Epoch [60/100], Train Loss: 0.0916, Val Loss: 0.0840
Epoch [70/100], Train Loss: 0.0905, Val Loss: 0.0840
Epoch [80/100], Train Loss: 0.0909, Val Loss: 0.0841
Epoch [90/100], Train Loss: 0.0920, Val Loss: 0.0843
Epoch [100/100], Train Loss: 0.0906, Val Loss: 0.0844


<All keys matched successfully>

In [22]:
# 訓練済みのモデルを使用して100日間のボラティリティ予測を生成
model.eval()
predictions = []

input_seq = X_test[:1].clone()  # 最初のテストサンプルを初期入力として選択し、元データを変更しないようにコピー
for _ in range(100):
    with torch.no_grad():
        pred = model(input_seq)
        predictions.append(pred.item())
        # 入力シーケンスを更新し、最初の時間ステップを削除し、新しい予測値を追加
        pred_expanded = pred.unsqueeze(-1).repeat(1, 1, input_seq.size(-1))
        new_feature = torch.cat([input_seq[:, 1:, :], pred_expanded], dim=1)
        input_seq = new_feature

print(predictions)


[0.5203465223312378, 0.5144959688186646, 0.5079623460769653, 0.5045025944709778, 0.5016416311264038, 0.5001329779624939, 0.4978712201118469, 0.4983901381492615, 0.497533917427063, 0.5012999773025513, 0.5081307291984558, 0.5041447877883911, 0.4876466989517212, 0.48803505301475525, 0.4883384108543396, 0.4885469675064087, 0.4887157380580902, 0.48884105682373047, 0.48892539739608765, 0.48898839950561523, 0.489069402217865, 0.48911625146865845, 0.4892856478691101, 0.4895988702774048, 0.48991596698760986, 0.48988139629364014, 0.48986101150512695, 0.48984280228614807, 0.4898262023925781, 0.4898117184638977, 0.4897993803024292, 0.48978957533836365, 0.4897770583629608, 0.4897612929344177, 0.4897497296333313, 0.48973673582077026, 0.4897347092628479, 0.48973798751831055, 0.48974186182022095, 0.489744633436203, 0.4897457957267761, 0.4897478520870209, 0.4897497594356537, 0.48975247144699097, 0.4897555708885193, 0.4897586703300476, 0.48975953459739685, 0.4897586405277252, 0.48975837230682373, 0.4897