自定義 Dataset 模組

In [None]:
import torch
from torch.utils.data import Dataset


class StockDataset(Dataset):
    def __init__(self, data, seq_length):
        # 這個方法是用來初始化資料集。
        # 參數data是輸入資料，參數seq_length是設定每個樣本特徵的序列長度。
        # 舉例來說，如果想使用前3天的開高低收量來預測隔天的收盤價，
        # 就可以將 seq_length 設為 3。
        self.data = data
        self.seq_length = seq_length

    def __len__(self):
        # 這個方法是回傳資料集的樣本數。
        return len(self.data) - self.seq_length

    def __getitem__(self, idx):
        # 這個方法是用來根據索引回傳資料。把要回傳的資料放在 return 後面。
        # self.data 第一個位置索引對應的是時間軸
        # self.data 第二個位置索引對應的是欄位（0:開,1:高,2:低,3:收,4:量）
        x = self.data[idx : (idx + self.seq_length), [0, 1, 2, 3, 4]]
        y = self.data[idx + self.seq_length, 3]
        return (
            torch.tensor(x, dtype=torch.float32),
            torch.tensor(y, dtype=torch.float32),
        )


# 創建資料集實例
train_dataset = StockDataset(data=train_data, seq_length=1)

使用 DataLoader

In [None]:
import torch
from torch.utils.data import DataLoader

# 創建數據加載器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# 可以透過 for 迴圈來迭代顯示數據加載器內的資料
for batch_data in train_loader:
    x, y = batch_data
    print(x)
    print(y)

均方誤差（Mean Squared Error；MSE

In [1]:
import torch
import torch.nn as nn

# 定義均方誤差損失函式
mse_loss = nn.MSELoss()
# 假設有一些預測值 predictions 和真實值 targets
predictions = torch.tensor([2.5, 0.0, 2.1, 7.8], dtype=torch.float32)
targets = torch.tensor([3.0, -0.5, 2.0, 7.0], dtype=torch.float32)
# 計算均方誤差
loss = mse_loss(predictions, targets)
print(f"Mean Squared Error: {loss.item()}")
# Mean Squared Error: 0.2875000834465027
print(f"Mean Squared Error: {loss}")
# Mean Squared Error: 0.2875000834465027

Mean Squared Error: 0.2875000834465027
Mean Squared Error: 0.2875000834465027


二元交叉熵損失（Binary Cross Entropy Loss）

In [2]:
import torch
import torch.nn as nn

# 定義二元交叉熵損失函式
bce_loss = nn.BCELoss()
# 假設有一些預測機率 predictions（值域是 0 到 1） 和真實值 targets（0 或 1）
predictions = torch.tensor([0.5, 0.3, 0.9, 0.7], dtype=torch.float32)
targets = torch.tensor([1, 0, 1, 1], dtype=torch.float32)
# 計算二元交叉熵損失
loss = bce_loss(predictions, targets)
print(f"Binary Cross Entropy Loss: {loss.item()}")
# Binary Cross-Entropy Loss: 0.3779643774032593

Binary Cross Entropy Loss: 0.3779643774032593


交叉熵損失（Cross Entropy Loss）

In [3]:
import torch
import torch.nn as nn

# 定義交叉熵損失函式
cross_entropy_loss = nn.CrossEntropyLoss()
# 假設有一些預測機率 predictions（範圍是 0 到 1） 和真實值 targets（四類）
predictions = torch.tensor(
    [[0.25, 0.25, 0.25, 0.25], [0.1, 0.2, 0.3, 0.4]], dtype=torch.float32
)
targets = torch.tensor([1, 3], dtype=torch.long)
# 計算交叉熵損失
loss = cross_entropy_loss(predictions, targets)
print(f"Cross Entropy Loss: {loss.item()}")
# Cross-Entropy Loss: 1.3144149780273438

Cross Entropy Loss: 1.3144149780273438


One-Hot Encoding

In [4]:
import torch

# 假設有 5 個類別，類別標籤為 0, 1, 2, 3, 4
labels = torch.tensor([0, 1, 2, 3, 4])
# 將類別標籤轉換為 One-Hot Encoding
one_hot_encoded = torch.nn.functional.one_hot(labels, num_classes=5)
print(one_hot_encoded)
type(one_hot_encoded)  # <class 'torch.Tensor'>

tensor([[1, 0, 0, 0, 0],
        [0, 1, 0, 0, 0],
        [0, 0, 1, 0, 0],
        [0, 0, 0, 1, 0],
        [0, 0, 0, 0, 1]])


torch.Tensor

In [5]:
import numpy as np

# 假設有 5 個類別，類別標籤為 0, 1, 2, 3, 4
labels = np.array([0, 1, 2, 3, 4])
# 將類別標籤轉換為 One-Hot Encoding
one_hot_encoded = np.eye(5)[labels]
print(one_hot_encoded)
type(one_hot_encoded)  # <class 'numpy.ndarray'>

[[1. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0.]
 [0. 0. 1. 0. 0.]
 [0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 1.]]


numpy.ndarray

隨機梯度下降（Stochastic Gradient Descent；SGD）

In [6]:
import torch
import torch.optim as optim

# 定義一個簡單的線性回歸模型
# 假設輸入數據有兩個特徵，輸出是一個目標值
model = torch.nn.Linear(2, 1)
# 定義損失函數
# 這裡使用均方誤差（MSE）作為損失函數，適合回歸問題
criterion = torch.nn.MSELoss()
# 定義優化器
# 使用隨機梯度下降（SGD）優化算法，學習率設為 0.01
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 準備數據
# 輸入數據包含兩個樣本，每個樣本有兩個特徵
data = torch.tensor([[1000, 3], [1500, 4]], dtype=torch.float32)
# 目標值（真實標籤），每個樣本對應一個目標值
targets = torch.tensor([[320000], [430000]], dtype=torch.float32)
# 清除上一輪計算中累積的梯度訊息
# 這一步很重要，因為 PyTorch 中的梯度是累積的，如果不清零會導致梯度被疊加
optimizer.zero_grad()
# 模型預測
# 將輸入數據傳入模型，得到模型的預測輸出
outputs = model(data)
# 計算損失值
# 使用定義的損失函數比較模型輸出（outputs）和目標值（targets）
# 得到當前模型的損失值，用於衡量預測值與真實值的差異
loss = criterion(outputs, targets)
# 根據損失值對模型的參數計算梯度，用於指導參數的更新方向
loss.backward()
# 更新模型參數
# 根據計算出的梯度，使用優化器更新模型的參數，從而減小損失值
optimizer.step()

自適應矩估計（Adaptive Moment Estimation；Adam）

In [7]:
import torch
import torch.optim as optim

# 定義一個簡單的線性回歸模型
# 假設輸入數據包含兩個特徵，模型輸出是一個目標值
# 模型表達式為 y = Wx + b，其中 W 是權重，b 是偏置
model = torch.nn.Linear(2, 1)
# 定義損失函數
# 使用均方誤差（MSE）作為損失函數，用於衡量模型預測值與真實目標值的差異
criterion = torch.nn.MSELoss()
# 定義優化器
# 使用 Adam 優化器，它是一種基於自適應學習率的優化算法，比標準的 SGD 收斂更快且穩定
# 設定學習率 lr=0.01
optimizer = optim.Adam(model.parameters(), lr=0.01)
# 準備數據
# 定義輸入數據和目標值
# data 是輸入特徵的數據張量，每行是一個樣本，每列是一個特徵
data = torch.tensor([[1000, 3], [1500, 4]], dtype=torch.float32)
# targets 是輸入數據對應的真實目標值
# 每個樣本對應一個目標值，表示模型需要學習的結果
targets = torch.tensor([[320000], [430000]], dtype=torch.float32)
# 訓練模型
# 使用一個簡單的循環（epoch）來訓練模型，共進行 100 次迭代
for epoch in range(100):
    # 清除上一輪計算中累積的梯度
    # PyTorch 中的梯度是累積的，因此在每次反向傳播前需要先清零
    optimizer.zero_grad()
    # 模型預測
    # 將輸入數據傳入模型，得到模型的預測輸出
    outputs = model(data)
    # 計算損失值
    # 使用 MSE 損失函數比較模型預測值（outputs）與目標值（targets）
    # 損失值用於衡量當前模型的預測誤差大小
    loss = criterion(outputs, targets)
    # 計算梯度
    # 根據損失值對模型參數計算梯度，用於指導模型參數更新的方向
    loss.backward()
    # 更新模型參數
    # 使用 Adam 優化器根據梯度和學習率，更新模型的權重和偏置
    optimizer.step()
    # 用於觀察模型的訓練進度
    if epoch % 10 == 0:
        print(f"Epoch {epoch+1}/100, Loss: {loss.item()}")

Epoch 1/100, Loss: 143459155968.0
Epoch 11/100, Loss: 143362392064.0
Epoch 21/100, Loss: 143265660928.0
Epoch 31/100, Loss: 143168995328.0
Epoch 41/100, Loss: 143072346112.0
Epoch 51/100, Loss: 142975762432.0
Epoch 61/100, Loss: 142879227904.0
Epoch 71/100, Loss: 142782742528.0
Epoch 81/100, Loss: 142686306304.0
Epoch 91/100, Loss: 142589902848.0


線性層（Linear Layer）

In [8]:
import torch
import torch.nn as nn

# 定義模型
# 使用 PyTorch 的 nn.Linear 定義一個全連接線性層（Linear Layer）
# - `in_features=20` 表示輸入特徵的數量為 20
# - `out_features=30` 表示輸出的特徵數量為 30
# 這個層將實現一個線性變換：y = xW^T + b，其中 W 是權重，b 是偏置
model = nn.Linear(in_features=20, out_features=30)
# 創建輸入張量
# 使用 torch.randn 隨機生成一個 2 維輸入張量，大小為 128 x 20
# - 第一個維度（128）表示有 128 個樣本
# - 第二個維度（20）表示每個樣本有 20 個特徵
# 可以將這個張量想像成一組批量的輸入數據，每行是一個樣本，每列是一個特徵
input = torch.randn(128, 20)
type(input)  # <class 'torch.Tensor'>
input.size()  # torch.Size([128, 20])
# 將輸入張量傳入模型
# 將 input 作為輸入傳遞給線性層，得到輸出張量 output
# 線性層會對輸入張量進行線性變換，輸出張量的形狀會根據 out_features 調整
output = model(input)
type(output)  # <class 'torch.Tensor'>
output.size()  # torch.Size([128, 30])

torch.Size([128, 30])

卷積層（Convolutional Layer）

In [9]:
import torch
import torch.nn as nn

# 定義一個一維卷積層
model = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=2, stride=1)
# 創建輸入張量
# 使用 torch.randn 隨機生成一個一維輸入張量，大小為 1x10
# - 第一個維度（1）表示輸入的通道數
# - 第二個維度（10）表示輸入的特徵數
input = torch.randn(1, 10)
type(input)  # <class 'torch.Tensor'>
# 檢查輸入張量的尺寸
# 使用 PyTorch 的 size 方法，檢查 input 的形狀
# 結果為 torch.Size([1, 10])，表示輸入有 1 個通道，特徵數為 10
input.size()  # torch.Size([1, 10])
# 將輸入張量傳入模型
# 將 input 作為輸入傳遞給卷積層，得到輸出張量 output
# 輸出大小會因卷積核大小和步幅計算而有所變化
output = model(input)
type(output)  # <class 'torch.Tensor'>
output.size()  # torch.Size([1, 9])

# 將卷積層改為使用三個卷積核
model = nn.Conv1d(in_channels=1, out_channels=3, kernel_size=2, stride=1)
input = torch.randn(1, 10)
type(input)  # <class 'torch.Tensor'>
input.size()  # torch.Size([1, 10])
output = model(input)
type(output)  # <class 'torch.Tensor'>
output.size()  # torch.Size([3, 9])

torch.Size([3, 9])

循環層（Recurrent Layer）

In [10]:
import torch
import torch.nn as nn

# 定義 LSTM 層
model = nn.LSTM(input_size=10, hidden_size=32, num_layers=2)
# 創建一個3維輸入張量，大小為 3 x 5 x 10，
# 可以將這個張量想成有 3 個樣本，每個樣本是長度為 5 的時間序列，每個時間點下有 10 個特徵。
input = torch.randn(5, 3, 10)
type(input)  # <class 'torch.Tensor'>
input.size()  # torch.Size([5, 3, 10])
# 將張量input輸入模型，得到模型輸出張量output。
output, (h_n, c_n) = model(input)
type(output)  # <class 'torch.Tensor'>
output.size()  # torch.Size([5, 3, 32])

torch.Size([5, 3, 32])

注意力層（Attention Layer）

In [13]:
import torch
import torch.nn as nn

# 定義 MultiheadAttention 層
multihead_attn = nn.MultiheadAttention(embed_dim=10, num_heads=2)
# 創建隨機的輸入張量
query = torch.randn(5, 3, 10)  # seq_len=5, batch_size=3, embed_dim=10
key = torch.randn(5, 3, 10)  # seq_len=5, batch_size=3, embed_dim=10
value = torch.randn(5, 3, 10)  # seq_len=5, batch_size=3, embed_dim=10
# 應用 MultiheadAttention 層
attn_output, attn_output_weights = multihead_attn(query, key, value)
attn_output.shape  # torch.Size([5, 3, 10])
attn_output_weights.shape  # torch.Size([5, 3, 10])

torch.Size([3, 5, 5])

激活函式層

In [14]:
import torch
import torch.nn as nn

# 創建一個隨機的輸入張量
input_tensor = torch.randn(1, 3)
print(input_tensor)  # tensor([-1,  1,  2])

# 定義 Sigmoid 激活函式
sigmoid = nn.Sigmoid()
# 使用 Sigmoid 激活函式進行轉換
output_sigmoid = sigmoid(input_tensor)
print(output_sigmoid)  # tensor([0.2689, 0.7311, 0.8808])

# 定義 ReLU 激活函式
relu = nn.ReLU()
# 使用 ReLU 激活函式進行轉換
output_relu = relu(input_tensor)
print(output_relu)  # tensor([0, 1, 2])

# 定義 Tanh 激活函式
tanh = nn.Tanh()
# 使用 Tanh 激活函式進行轉換
output_tanh = tanh(input_tensor)
print(output_tanh)  # tensor([-0.7616,  0.7616,  0.9640])

tensor([[-1.0163,  0.1410,  2.9745]])
tensor([[0.2657, 0.5352, 0.9514]])
tensor([[0.0000, 0.1410, 2.9745]])
tensor([[-0.7684,  0.1400,  0.9948]])


訓練函式

In [None]:
import torch
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter

# 初始化 TensorBoard 紀錄器，指定紀錄內容的位置
writer = SummaryWriter(WRITER_PATH)
# 定義損失函式為均方誤差
loss_fn = nn.MSELoss()
# 定義一個 LSTM 模型
model = nn.LSTM(input_size=10, hidden_size=32, num_layers=2)
# 設定訓練週期的數量
for epoch in range(EPOCHS):
    model.train()  # 設置模型為訓練模式
    train_loss = 0.0  # 初始化訓練損失值
    # 迭代批次訓練集
    for inputs, targets in train_loader:  # 訓練集的資料
        # 透過 LSTM 模型進行輸入資料的預測
        outputs, _ = model(inputs)
        # 計算損失值
        loss = loss_fn(outputs, targets)
        # 清除模型參數的梯度
        optimizer.zero_grad()
        # 計算參數更新的方向
        loss.backward()
        # 根據計算出的更新方向來更新模型參數
        optimizer.step()
        # 累加損失值
        train_loss += loss.item()
    # 計算平均訓練損失值
    train_loss = train_loss / len(train_loader)
    # 使用 TensorBoard 紀錄平均訓練損失值
    writer.add_scalar(tag="Train Loss", scalar_value=train_loss, global_step=epoch)
    # 顯示每個 epoch 的平均訓練損失值
    print(f"Epoch [{epoch+1}/{EPOCHS}], Loss: {train_loss:.4f}")

評估函式

In [None]:
# 初始化 TensorBoard 紀錄器，指定紀錄內容的位置
writer = SummaryWriter(WRITER_PATH)
# 定義損失函式為均方誤差
loss_fn = nn.MSELoss()
# 定義一個 LSTM 模型
model = nn.LSTM(input_size=10, hidden_size=32, num_layers=2)
for epoch in range(EPOCHS):
    # … 省略訓練函式的部分 …
    # 使用 TensorBoard 紀錄平均訓練損失值
    model.eval()
    test_loss = 0.0
    # 評估時禁計算梯度
    with torch.no_grad():
        for inputs, targets in test_loader:  # 測試集的資料
            # 透過 LSTM 模型進行輸入資料的預測
            outputs = model(inputs)
            loss = loss_fn(outputs, targets.unsqueeze(-1))
            test_loss += loss.item()
    test_loss = test_loss / len(test_loader.dataset)
    # 使用 TensorBoard 紀錄平均測試損失值
    writer.add_scalar(tag="Test Loss", scalar_value=test_loss, global_step=epoch)
    # 儲存模型的參數
    torch.save(model.state_dict(), MODEL_PATH)