## A Simple Transformer Model and Its Application on a Small Dataset

Make sure you have installed pytorch in the current environment. Instructions for pytorch installation: https://pytorch.org/get-started/locally/

In [1]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu

Looking in indexes: https://download.pytorch.org/whl/cpu
Collecting torch
  Downloading https://download.pytorch.org/whl/cpu/torch-2.5.1%2Bcpu-cp312-cp312-win_amd64.whl (205.4 MB)
     ---------------------------------------- 0.0/205.4 MB ? eta -:--:--
     ---------------------------------------- 0.0/205.4 MB ? eta -:--:--
     ---------------------------------------- 0.0/205.4 MB ? eta -:--:--
     ---------------------------------------- 0.0/205.4 MB ? eta -:--:--
     ---------------------------------------- 0.0/205.4 MB ? eta -:--:--
     ---------------------------------------- 0.0/205.4 MB ? eta -:--:--
     ---------------------------------------- 0.0/205.4 MB ? eta -:--:--
     ---------------------------------------- 0.0/205.4 MB ? eta -:--:--
     -------------------------------------- 0.1/205.4 MB 297.7 kB/s eta 0:11:30
     -------------------------------------- 0.1/205.4 MB 297.7 kB/s eta 0:11:30
     -------------------------------------- 0.1/205.4 MB 266.9 kB/s eta 0:12

In [1]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import torch.nn.functional as F
import math

PyTorch offers domain-specific libraries such as
[TorchText](https://pytorch.org/text/stable/index.html),
[TorchVision](https://pytorch.org/vision/stable/index.html), and
[TorchAudio](https://pytorch.org/audio/stable/index.html), all of which
include datasets. For this tutorial, we will be using a TorchVision
dataset.

The `torchvision.datasets` module contains `Dataset` objects for many
real-world vision data like CIFAR, COCO ([full list
here](https://pytorch.org/vision/stable/datasets.html)). In this
tutorial, we use the FashionMNIST dataset. Every TorchVision `Dataset`
includes two arguments: `transform` and `target_transform` to modify the
samples and labels respectively.


In [2]:
# Download training data from open datasets.
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

We pass the `Dataset` as an argument to `DataLoader`. This wraps an
iterable over our dataset, and supports automatic batching, sampling,
shuffling and multiprocess data loading. Here we define a batch size of
64, i.e. each element in the dataloader iterable will return a batch of
64 features and labels.


In [4]:
batch_size = 64

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

# Here, we use the FashionMNIST dataset. Each image in the dataset is a 28x28 pixel square.
# The images are grayscale, hence the channel number (C) is 1.
# To simplify our description, we can think of one image as a sequence of fixed-length 28, and inside each sequence, the vector of each element is of length 28.

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X [N, C, H, W]: torch.Size([64, 1, 28, 28])
Shape of y: torch.Size([64]) torch.int64


Read more about [loading data in PyTorch](data_tutorial.html).

Visualize your dataset

In [4]:
import matplotlib.pyplot as plt
import numpy as np
import torchvision

# functions to show an image


def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# get some random training images
dataiter = iter(train_dataloader)
images, labels = next(dataiter)

# show images
imshow(torchvision.utils.make_grid(images))

: 

------------------------------------------------------------------------


Creating A Transformer Model
===============

To define a neural network in PyTorch, we create a class that inherits
from
[nn.Module](https://pytorch.org/docs/stable/generated/torch.nn.Module.html).
We define the layers of the network in the `__init__` function and
specify how data will pass through the network in the `forward`
function. To accelerate operations in the neural network, we move it to
the GPU if available.


In [5]:
# Get cpu or gpu for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cpu device


In [6]:
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, input_dim, emb_dim, num_heads):  # 当我们创建 MultiHeadSelfAttention 对象时将完成的初始化步骤。
        super().__init__()  # 初始化 nn.Module 对象。
        self.emb_dim = emb_dim  # embedding维度，每个embedding向量的长度
        self.num_heads = num_heads  # 注意力头的数量
        self.head_dim = emb_dim // num_heads  # 每个注意力头的嵌入向量的长度
 
        assert self.head_dim * num_heads == emb_dim, "Embedding dimension must be divisible by the number of heads"
 
        # nn.Linear(input_dim, emb_dim) 是线性层，输入维度为 input_dim 
        # 输出维度也为 emb_dim，
        # 相当于大小为 input_dimension (height) *output_dimension (width) 的矩阵，
        # 能够将长度为 input_dimension 的输入向量转换为长度为 output_dimension 的嵌入向量
        self.query = nn.Linear(input_dim, emb_dim)
        self.key = nn.Linear(input_dim, emb_dim)
        self.value = nn.Linear(input_dim, emb_dim)
        # self.out 是一个线性层，它将每个嵌入向量转换为输出嵌入向量（相同长度）。
        self.out = nn.Linear(emb_dim, emb_dim)
 
    def forward(self, x): # 当我们调用创建的 MultiHeadSelfAttention 对象时将执行的步骤。
        # “x”包含输入数据，一堆输入向量。 
        #  在此笔记本中，x 是形状为 [batch_size, input_dimension] 的张量，表示“batch_size”向量，每个向量的长度为“input_dimension”。
        batch_size = x.shape[0]  # 输入批次中输入向量的数量。
        
        
        # 在这里填写代码
        ## 学习用python编写注意力机制
        
        # 使用线性层计算 Q、K、V
        
        # 不完整的第 1 行：Q 的形状：[batch_size、sequence_length、emb_dim]
        Q=self.query(x)
        # 不完整的第 2 行：K 的形状：[batch_size、sequence_length、emb_dim]
        K=self.key(x)
        # 不完整的第 3 行：V 的形状：[batch_size、sequence_length、emb_dim]
        V=self.value(x)
        # 不完整的第 4 行：按头分割
        Q = Q.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        # 我们已经看到 emb_dim = num_heads *head_dim， 
        # 因此 Q 的最后一个维度（其大小为 emb_dim）可以分为两个维度 [num_heads, head_dim]
        # 上面的代码行将 Q 的形状从 [batch_size, emb_dim] 更改为 [batch_size, 1, num_heads, head_dim]， 
        # 然后交换第 1 个和第 2 个（从 0 开始计数）维度，
        # 最终 Q 的形状变为 [batch_size, num_heads,equence_length, head_dim]

        # 不完整的第 5 行
        K = K.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        # 不完整的6号线 
        V = V.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        # 不完整的第 7 行：缩放点积注意力
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
        # 在上面的行中，我们将 K_ 从形状 [batch_size, num_heads,equence_length, head_dim] 转置为形状 [batch_size, num_heads, head_dim, sequence_length]
        # 因此，Q_（形状为[sequence_length，head_dim]）的最后两个维度和转置的K_（形状为[head_dim，sequence_length]）可以执行矩阵乘法。
        # math.sqrt(self.head_dim) 的结果是一个标量值。
        # torch.matmul 结果的形状（以及“energy”的形状）为 [batch_size, num_heads,equence_length,sequence_length]

        # 不完整的第 8 行：执行 softmax 函数。
        attention = F.softmax(scores, dim=-1)
        # 不完整 第 9 行：在 softmax 结果（Q_ matmul 转置 K_）和 V_ 之间执行矩阵乘法。
        out = torch.matmul(attention, V)
        # 连接所有头并变换到输出空间
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.emb_dim)  # 将整个序列的嵌入向量作为序列中所有元素的嵌入向量的平均值。
        out = out.sum(dim=1) / out.shape[1]  # 将整个序列（整个图像）的嵌入向量作为序列中所有元素的嵌入向量的平均值。
        out = self.out(out)  # 对嵌入向量执行线性变换。
        
        return out
model = MultiHeadSelfAttention(28, 28, 4).to(device)
print(model)

MultiHeadSelfAttention(
  (query): Linear(in_features=28, out_features=28, bias=True)
  (key): Linear(in_features=28, out_features=28, bias=True)
  (value): Linear(in_features=28, out_features=28, bias=True)
  (out): Linear(in_features=28, out_features=28, bias=True)
)


In [7]:
# Define model
class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, emb_dim, num_heads):  # The initialization steps that will be done when we create a TransformerEncoder object.
        super().__init__()  # Initialize the nn.Module object.
        self.attention = MultiHeadSelfAttention(input_dim, emb_dim, num_heads)  # Create a MultiHeadSelfAttention object, which will be used in "def forward".
        self.feed_forward = nn.Sequential(  # Create a Sequential object, consisting of two Linear layer and a ReLU function between them.
            nn.Linear(emb_dim, emb_dim * 4),
            nn.ReLU(),
            nn.Linear(emb_dim * 4, 10)
        )
 
    def forward(self, x):  # The steps that will be done when we call the created TransformerEncoder object.
        # Attention
        x_att = self.attention(x)  # call the MultiHeadSelfAttention object we created, with the flattened x as input

        # Feed Forward
        out = self.feed_forward(x_att)  
        # The above line calls the Sequential object we created, with x_att as input, 
        # x_att will go through a Linear layer, a ReLU function and another Linear layer.
        return out

# Create the model, a TransformerEncoder object, using TransformerEncoder.__init__(self, input_dim=28, emb_dim=32, num_heads=4),
# and put the model to the device we set (cpu or gpu).
model = TransformerEncoder(28, 32, 4).to(device)  
# Print the information of the TransformerEncoder object we created.
print(model)

TransformerEncoder(
  (attention): MultiHeadSelfAttention(
    (query): Linear(in_features=28, out_features=32, bias=True)
    (key): Linear(in_features=28, out_features=32, bias=True)
    (value): Linear(in_features=28, out_features=32, bias=True)
    (out): Linear(in_features=32, out_features=32, bias=True)
  )
  (feed_forward): Sequential(
    (0): Linear(in_features=32, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=10, bias=True)
  )
)


Read more about [building neural networks in
PyTorch](buildmodel_tutorial.html).


------------------------------------------------------------------------


Optimizing the Model Parameters
===============================

To train a model, we need a [loss
function](https://pytorch.org/docs/stable/nn.html#loss-functions) and an
[optimizer](https://pytorch.org/docs/stable/optim.html).


In [8]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

In a single training loop, the model makes predictions on the training
dataset (fed to it in batches), and backpropagates the prediction error
to adjust the model\'s parameters.


In [9]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()  # Let the model enter the training mode.
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation, to update the weights (parameters) of the model.
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

We also check the model\'s performance against the test dataset to
ensure it is learning.


In [10]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()  # Let the model enter the evaluation mode.
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

The training process is conducted over several iterations (*epochs*).
During each epoch, the model learns parameters to make better
predictions. We print the model\'s accuracy and loss at each epoch;
we\'d like to see the accuracy increase and the loss decrease with every
epoch.


In [11]:
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 2.305506  [   64/60000]
loss: 2.302889  [ 6464/60000]
loss: 2.301923  [12864/60000]
loss: 2.293305  [19264/60000]
loss: 2.306347  [25664/60000]
loss: 2.293540  [32064/60000]
loss: 2.292616  [38464/60000]
loss: 2.289670  [44864/60000]
loss: 2.294953  [51264/60000]
loss: 2.296227  [57664/60000]
Test Error: 
 Accuracy: 18.9%, Avg loss: 2.290862 

Epoch 2
-------------------------------
loss: 2.296207  [   64/60000]
loss: 2.295927  [ 6464/60000]
loss: 2.292201  [12864/60000]
loss: 2.287051  [19264/60000]
loss: 2.296992  [25664/60000]
loss: 2.284914  [32064/60000]
loss: 2.286256  [38464/60000]
loss: 2.281190  [44864/60000]
loss: 2.286198  [51264/60000]
loss: 2.287723  [57664/60000]
Test Error: 
 Accuracy: 22.2%, Avg loss: 2.282607 

Epoch 3
-------------------------------
loss: 2.286937  [   64/60000]
loss: 2.288819  [ 6464/60000]
loss: 2.282034  [12864/60000]
loss: 2.279938  [19264/60000]
loss: 2.286767  [25664/60000]
loss: 2.275184  [32064/600

Read more about [Training your model](optimization_tutorial.html).


------------------------------------------------------------------------


This model can now be used to make predictions.


In [12]:
classes = [
    "T-shirt/top",
    "Trouser",
    "Pullover",
    "Dress",
    "Coat",
    "Sandal",
    "Shirt",
    "Sneaker",
    "Bag",
    "Ankle boot",
]

model.eval()  # Let the model enter the evaluation mode.
with torch.no_grad():
    counts = 0
    true_counts = 0
    for item in test_data:
        x, y = item[0], item[1]
        x = x.to(device)
        pred = model(x)  # pred is a tensor of shape [1, 10], 1 is the test batch size, 10 is the number of classes. The 10 values contain the predicted probability of the 10 classes.
        predicted, actual = classes[pred[0].argmax(0)], classes[y]  # pred[0].argmax(0) returns the index of the maximum value in pred[0].
        if predicted == actual:
            true_counts += 1
        counts += 1
        # print(f'Predicted: "{predicted}", Actual: "{actual}"')
    print()
    print(f"Test data size: {counts}")
    print(f"Accuracy: {true_counts/counts}")  # Fill the code here


Test data size: 10000
Accuracy: 0.3912


Read more about [Saving & Loading your
model](saveloadrun_tutorial.html).
