In [1]:
import torch
import torch.nn as nn
from torch.nn import Transformer
from torch.optim import Adam
from tqdm import tqdm
from torch.optim.lr_scheduler import ReduceLROnPlateau
import numpy as np
from torch.utils.data import DataLoader

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
# 定義 Transforme rModel
class TransformerModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, num_heads):
        super(TransformerModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.embedding = nn.Linear(input_dim, hidden_dim)
        self.lstm = nn.LSTM(input_dim, hidden_dim, 1, batch_first=True)
        self.transformer = Transformer(
            d_model=hidden_dim,
            #d_model=input_dim,
            nhead=num_heads,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers
        )
        self.fc = nn.Linear(hidden_dim, output_dim)
        #self.fc = nn.Linear(BATCH_SIZE*input_dim, output_dim)

    def forward(self, src):
    
        # batch_size, _ = src.shape[0], src.shape[1]
        # h_0 = torch.randn(1, batch_size,  self.hidden_dim).to(device)
        # c_0 = torch.randn(1, batch_size,  self.hidden_dim).to(device)
        # src, _ = self.lstm(src, (h_0, c_0))
        src = self.embedding(src)
        # #print('lstm後的大小:',src.shape)
        output = self.transformer(src, src)
        #print('T後的大小:',output.shape)
        output = self.fc(output)
        #output = self.fc(output)
        #print('最終大小:',output.shape)
        output = output[:, -1, :]
        return output

In [4]:
# 模型參數
input_dim = 10 # 輸入詞彙表大小
hidden_dim = 32  # 隱藏層維度
output_dim = 1  # 輸出維度，輸出維度為1，代表預測的下一個數
num_layers = 4  # Transformer Encoder/Decoder 層數
num_heads = 8  # Attention heads 



# 創建 Transformer
model = TransformerModel(input_dim, hidden_dim, output_dim, num_layers, num_heads)
model.to(device)

# 定義損失函數和優化器
criterion = nn.MSELoss()
optimizer = Adam(model.parameters(), lr=1e-2, weight_decay=1e-6)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=20, verbose=True)

In [5]:
def batch_data(final_seq, batch_size):
    """
    Batches the sequence data using PyTorch DataLoader.

    Args:
        final_seq: A list of tuples containing sequence input data and target data.
        batch_size: The desired batch size.

    Returns:
        A DataLoader object containing the batched sequence data.

    """
    final_seq = DataLoader(dataset=final_seq, batch_size=batch_size, shuffle=False, num_workers=0, drop_last=False)

    return final_seq

In [6]:
# 生成 l * w * h 維度的資料
# l : batch_size
# w : sequence length
# h : amounts of features 
# 以下以 30 * 6 * 1 做示範
def data_generated(beg, num):
    final_seq = []
    for i in range(num):
        end = beg + 30
        q_tensors = [torch.tensor([i]*10, dtype=torch.float32) for i in range(beg, end)]
        q_tensors = torch.stack(q_tensors)
        a_tensors = torch.tensor([end], dtype=torch.float32) 
        final_seq.append((q_tensors, a_tensors))
        beg += 1
    print('final_seq : 資料類型={}, 列數={}'.format(type(final_seq), len(final_seq)))
    print('final_seq[0] : 資料類型={}, 列數={}'.format(type(final_seq[0]), len(final_seq[0])))
    print('final_seq[0][0] : 資料類型={}, 內容數={}'.format(type(final_seq[0][0]), final_seq[0][0].shape))
    print('final_seq[0][1] : 資料類型={}, 內容數={}'.format(type(final_seq[0][1]), final_seq[0][1].shape))
    print(final_seq[0][0])
    print(final_seq[0][1])
    data = batch_data(final_seq, 30)

    return data


In [7]:
data = data_generated(0, 571)


final_seq : 資料類型=<class 'list'>, 列數=571
final_seq[0] : 資料類型=<class 'tuple'>, 列數=2
final_seq[0][0] : 資料類型=<class 'torch.Tensor'>, 內容數=torch.Size([30, 10])
final_seq[0][1] : 資料類型=<class 'torch.Tensor'>, 內容數=torch.Size([1])
tensor([[ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.],
        [ 2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.],
        [ 3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.],
        [ 4.,  4.,  4.,  4.,  4.,  4.,  4.,  4.,  4.,  4.],
        [ 5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.],
        [ 6.,  6.,  6.,  6.,  6.,  6.,  6.,  6.,  6.,  6.],
        [ 7.,  7.,  7.,  7.,  7.,  7.,  7.,  7.,  7.,  7.],
        [ 8.,  8.,  8.,  8.,  8.,  8.,  8.,  8.,  8.,  8.],
        [ 9.,  9.,  9.,  9.,  9.,  9.,  9.,  9.,  9.,  9.],
        [10., 10., 10., 10., 10., 10., 10., 10., 10., 10.],
        [11., 11., 11., 11., 11., 11., 11., 11., 11., 11.],
        [12., 12., 12., 12., 12., 12., 12., 12., 12., 12.],

In [8]:
# 生成訓練資料
train_data = data_generated(0, 6000)

# 生成驗證資料
valid_data = data_generated(600, 60)

# 生成測試資料
test_data = data_generated(700, 60)

final_seq : 資料類型=<class 'list'>, 列數=6000
final_seq[0] : 資料類型=<class 'tuple'>, 列數=2
final_seq[0][0] : 資料類型=<class 'torch.Tensor'>, 內容數=torch.Size([30, 10])
final_seq[0][1] : 資料類型=<class 'torch.Tensor'>, 內容數=torch.Size([1])
tensor([[ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.],
        [ 2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.],
        [ 3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.],
        [ 4.,  4.,  4.,  4.,  4.,  4.,  4.,  4.,  4.,  4.],
        [ 5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.],
        [ 6.,  6.,  6.,  6.,  6.,  6.,  6.,  6.,  6.,  6.],
        [ 7.,  7.,  7.,  7.,  7.,  7.,  7.,  7.,  7.,  7.],
        [ 8.,  8.,  8.,  8.,  8.,  8.,  8.,  8.,  8.,  8.],
        [ 9.,  9.,  9.,  9.,  9.,  9.,  9.,  9.,  9.,  9.],
        [10., 10., 10., 10., 10., 10., 10., 10., 10., 10.],
        [11., 11., 11., 11., 11., 11., 11., 11., 11., 11.],
        [12., 12., 12., 12., 12., 12., 12., 12., 12., 12.]

In [9]:
# # 生成訓練資料
# train_data = []
# for i in range(1000): 
#     sequence = torch.arange(i, i+6).float() # 輸入等差數列
#     target = torch.tensor([sequence[-1] + 1])  # 預測下一個值，等於最後一個+1
#     sequence = sequence.unsqueeze(0)
#     sequence = torch.cat([sequence, sequence], dim=0)
#     train_data.append((sequence, target))

# # 生成驗證資料
# valid_data = []
# for i in range(1000,1100):
#     sequence = torch.arange(i, i+6).float()  # 輸入等差數列
#     target = torch.tensor([sequence[-1] + 1])  # 預測下一個值，等於最後一個+1
#     sequence = sequence.unsqueeze(0)
#     sequence = torch.cat([sequence, sequence], dim=0)
#     valid_data.append((sequence, target))

# # 生成測試資料
# test_data = []
# for i in range(1100,1200):
#     sequence = torch.arange(i, i+6).float()  # 輸入等差數列
#     target = torch.tensor([sequence[-1] + 1])  # 預測下一個值，等於最後一個+1
#     sequence = sequence.unsqueeze(0)
#     sequence = torch.cat([sequence, sequence], dim=0)
#     test_data.append((sequence, target))

In [10]:
# final_seq = train_data
# print('final_seq : 資料類型={}, 列數={}'.format(type(final_seq), len(final_seq)))
# print('final_seq[0] : 資料類型={}, 列數={}'.format(type(final_seq[0]), len(final_seq[0])))
# print('final_seq[0][0] : 資料類型={}, 內容數={}'.format(type(final_seq[0][0]), final_seq[0][0].shape))
# print('final_seq[0][1] : 資料類型={}, 內容數={}'.format(type(final_seq[0][1]), final_seq[0][1].shape))

In [11]:
def get_val_loss(model, val_data, loss_function):
    """
    Computes the average validation loss for a given model and validation data.

    Args:
        model: The model for which to compute the validation loss.
        val_data: The validation data (a DataLoader object).
        loss_function: The loss function to compute the loss.

    Returns:
        The average validation loss.

    """

    model.eval()
    val_loss = []
    with torch.no_grad():
        for seq, label in val_data:
            try:
                seq, label = seq.to(device), label.to(device)
                y_pred = model(seq)
                loss = loss_function(y_pred[0].view(-1), label)
                val_loss.append(loss.item())
            except:
                seq, label = seq.to(device), label.to(device)
                print(seq.shape)
                print(seq.dtype)
                print(seq.type())
                y_pred = model(seq, seq)
                loss = loss_function(y_pred[0].view(-1), label)
                val_loss.append(loss.item())

    return np.mean(val_loss)

In [12]:
# 模型訓練

#for epoch in tqdm(range(1000)):
for epoch in range(10000):
    model.train()
    total_loss = 0.0
    time = 0
    for sequence, target in train_data:

        optimizer.zero_grad()
        sequence = sequence.to(device)
        # print(sequence.shape)
        # print(sequence.dtype)
        # print(sequence.type())
        target = target.to(device)
        # print(target.shape)
        # print(target.dtype)
        # print(target.type())
        output = model(sequence)
        #print('model :', output.view(-1)[0])
        #print('true :', target.view(-1)[0])
        
        # 計算損失
        loss = criterion(output.view(-1), target.view(-1)).to(device)

        # 反向傳播與參數更新
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        time += 1
    #print('結束1')  
    val_loss = get_val_loss(model, valid_data, criterion)
    #print('結束2')  
    print('epoch {:03d} train_loss {:.8f} val_loss {:.8f}'.format(epoch, total_loss/time, val_loss))
    scheduler.step(val_loss)
    

# 模型測試
model.eval()
correct_predictions = 0
ans = 730
with torch.no_grad():
    for sequence, target in test_data:
        sequence = sequence.to(device)
        target = target.to(device)
        output = model(sequence)
        for i in output:
            print('預測 :', i, '正解 :', ans)
            ans += 1
        loss = criterion(output, target).to(device)
    
        predicted_value = output.view(-1).item()
       

accuracy = correct_predictions / len(test_data)
print(f"Test Accuracy: {accuracy}")

  return F.mse_loss(input, target, reduction=self.reduction)


epoch 000 train_loss 9890255.43455444 val_loss 32619.92382812


KeyboardInterrupt: 