In [64]:
import gan_transformer as transformer
import torch
import torch.nn as nn

In [65]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [66]:
h = 8
d_model = 512
d_ff = 2048
dropout = 0.1

In [86]:
class Params:
    def __init__(self, h=8, d_model=512, d_ff=2048, dropout=0.1, attn_type = 'softmax', N=6):
        self.h = h
        self.d_model = d_model
        self.d_ff = d_ff
        self.dropout = dropout
        self.attn_type = attn_type
        self.N = N

params = Params()


In [87]:
self_attn = transformer.MultiHeadedAttention(params)
src_attn = transformer.MultiHeadedAttention(params)


In [88]:
feed_forward = nn.Linear(512, 512)

In [89]:
encoder_layer = transformer.EncoderLayer(params, self_attn, feed_forward, dropout)
encoder_layer


EncoderLayer(
  (self_attn): MultiHeadedAttention(
    (linears): ModuleList(
      (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
    )
    (dropout): Dropout(p=0.2, inplace=False)
    (alpha_choser): AlphaChooser()
  )
  (feed_forward): Linear(in_features=512, out_features=512, bias=True)
  (sublayer): ModuleList(
    (0-1): 2 x SublayerConnection(
      (norm): LayerNorm()
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
)

In [90]:
encoder = transformer.Encoder(params, encoder_layer)
encoder

Encoder(
  (layers): ModuleList(
    (0-5): 6 x EncoderLayer(
      (self_attn): MultiHeadedAttention(
        (linears): ModuleList(
          (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
        )
        (dropout): Dropout(p=0.2, inplace=False)
        (alpha_choser): AlphaChooser()
      )
      (feed_forward): Linear(in_features=512, out_features=512, bias=True)
      (sublayer): ModuleList(
        (0-1): 2 x SublayerConnection(
          (norm): LayerNorm()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
  )
  (norm): LayerNorm()
)

In [91]:
import torch

def generate_test_data(batch_size, sequence_length, hidden_size):
    return torch.rand(batch_size, sequence_length, hidden_size)

# 生成测试数据
batch_size = 2
sequence_length = 10
hidden_size = 512
test_data = generate_test_data(batch_size, sequence_length, hidden_size)

print("测试数据的形状:", test_data.shape)


测试数据的形状: torch.Size([2, 10, 512])


In [92]:
x = encoder_layer(test_data, None)
x.shape

torch.Size([2, 10, 512])

In [93]:
encoder = transformer.Encoder(params, encoder_layer)
encoder

Encoder(
  (layers): ModuleList(
    (0-5): 6 x EncoderLayer(
      (self_attn): MultiHeadedAttention(
        (linears): ModuleList(
          (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
        )
        (dropout): Dropout(p=0.2, inplace=False)
        (alpha_choser): AlphaChooser()
      )
      (feed_forward): Linear(in_features=512, out_features=512, bias=True)
      (sublayer): ModuleList(
        (0-1): 2 x SublayerConnection(
          (norm): LayerNorm()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
  )
  (norm): LayerNorm()
)

In [94]:
encoder(test_data, None).shape

torch.Size([2, 10, 512])

In [95]:
decoder_layer = transformer.DecoderLayer(params, self_attn, src_attn, feed_forward, dropout)

In [96]:
decoder = transformer.Decoder(params, decoder_layer)

In [98]:

decoder(test_data, encoder(test_data, None), None, None).shape

torch.Size([2, 10, 512])

In [99]:
import torch
import torch.nn as nn

class TransformerModel(nn.Module):
    def __init__(self, input_dim, output_dim, num_layers, hidden_dim, num_heads, dropout):
        super(TransformerModel, self).__init__()
        self.transformer = nn.Transformer(d_model=input_dim, nhead=num_heads, num_encoder_layers=num_layers,
                                          num_decoder_layers=num_layers, dim_feedforward=hidden_dim, dropout=dropout)
        self.linear = nn.Linear(input_dim, output_dim)

    def forward(self, src, tgt):
        memory = self.transformer(src, tgt)
        output = self.linear(memory[-1])  # 取最后一个时间步的输出
        return output

# 准备数据集（示例）
# 假设您有一个时间序列数据集，包括输入序列 x 和控制序列 u
x = torch.randn(10, 1, 1)  # 输入序列，形状为 (seq_len, batch_size, input_dim)
u = torch.randn(10, 1, 1)  # 控制序列，形状为 (seq_len, batch_size, input_dim)
y = torch.randn(10, 1, 1)  # 输出序列，形状为 (seq_len, batch_size, output_dim)

# 定义模型
input_dim = 2  # 输入维度（包括 x 和 u）
output_dim = 1  # 输出维度（预测的下一个时间步的 x）
num_layers = 2  # Transformer 的层数
hidden_dim = 128  # Transformer 中间层的维度
num_heads = 4  # 注意力头的数量
dropout = 0.1  # Dropout 概率
model = TransformerModel(input_dim, output_dim, num_layers, hidden_dim, num_heads, dropout)

# 训练模型（示例）
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
    optimizer.zero_grad()
    output = model(x, u)
    loss = criterion(output, y)
    loss.backward()
    optimizer.step()

# 使用模型进行预测
future_x = torch.randn(1, 1, 1)  # 未来的输入序列
future_u = torch.randn(1, 1, 1)  # 未来的控制序列
predicted_output = model(future_x, future_u)
print("Predicted next value:", predicted_output)


AssertionError: embed_dim must be divisible by num_heads