In [4]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])



trainset = torchvision.datasets.MNIST(root='./data', train=True,
                                      download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(trainset, batch_size=64,
                                          shuffle=True, num_workers=2)
testset = torchvision.datasets.MNIST(root='./data', train=False,
                                        download=True, transform=transform)
test_loader = torch.utils.data.DataLoader(testset, batch_size=64,
                                          shuffle=False, num_workers=2)
# 这是一个nmist数据集
#
#trainset是训练集，testset是测试集
print(len(trainset))
print(len(testset))
#trainset和testset都是一个数据集，里面有60000个样本,被分成了两个数据集，一个是训练集，一个是测试集，确保训练集和测试集是独立的
# print(trainset)

60000
10000


In [6]:
def train(model, device, train_loader, optimizer, loss_function):
    model.train()  
    total_loss = 0
    correct = 0
    total = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_function(output, target)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        pred = output.argmax(dim=1, keepdim=True)  
        correct += pred.eq(target.view_as(pred)).sum().item()
        total += target.size(0)
    
    average_loss = total_loss / len(train_loader)
    accuracy = 100. * correct / total
    return average_loss, accuracy


def test(model, device, test_loader, loss_function):
    model.eval() # 将模型设置为评估模式
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = loss_function(output, target)
            test_loss += loss.item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            total += target.size(0)

    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / total
    return test_loss, accuracy

#写一个循环来做训练和测试
# train(mlp, trainloader, criterion, optimizer)
# test(mlp, testloader, criterion）

#现在我们把训练和测试的代码封装到一个函数里面

def train_test_loop(model, train_loader, test_loader, optimizer, loss_function, num_epochs, device, save_path='model_checkpoint/', scheduler=None, checkpoint_filename=None, save_frequency=1):
    start_epoch = 0
    for epoch in range(start_epoch, num_epochs):
        train_loss, train_accuracy = train(model, device, train_loader, optimizer, loss_function)
        test_loss, test_accuracy = test(model, device, test_loader, loss_function)

        if scheduler is not None:
            scheduler.step()
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%')



In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AttentionLayer(nn.Module):
    def __init__(self, key_size, query_size, value_size):
        """
        初始化注意力层
        :param key_size: 键向量的维度
        :param query_size: 查询向量的维度
        :param value_size: 值向量的维度
        """
        super(AttentionLayer, self).__init__()
        self.key_size = key_size
        self.query_size = query_size
        self.value_size = value_size

        # 定义权重矩阵
        self.key_layer = nn.Linear(query_size, key_size, bias=False)
        self.query_layer = nn.Linear(query_size, key_size, bias=False)
        self.value_layer = nn.Linear(value_size, value_size, bias=False)

    def forward(self, keys, queries, values, mask=None):
        """
        前向传播方法
        :param keys: 键向量
        :param queries: 查询向量
        :param values: 值向量
        :param mask: 可选的掩码向量，用于遮盖不需要关注的部分
        """
        # 计算查询、键、值
        queries = self.query_layer(queries)
        keys = self.key_layer(keys)
        values = self.value_layer(values)

        # 计算注意力分数
        scores = torch.matmul(queries, keys.transpose(-2, -1)) / self.key_size**0.5

        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
            
        # 应用softmax
        attention_weights = F.softmax(scores, dim=-1)

        # 计算加权和
        output = torch.matmul(attention_weights, values)
        return output, attention_weights

# 示例用法
key_size = 64
query_size = 64
value_size = 128
batch_size = 10
seq_length = 20

# 创建模型
attention = AttentionLayer(key_size, query_size, value_size)

# 模拟输入数据
keys = torch.rand(batch_size, seq_length, key_size)
queries = torch.rand(batch_size, seq_length, query_size)
values = torch.rand(batch_size, seq_length, value_size)

# 运行注意力层
output, attention_weights = attention(keys, queries, values)

print("Output shape:", output.shape)
print("Attention weights shape:", attention_weights.shape)

        

Output shape: torch.Size([10, 20, 128])
Attention weights shape: torch.Size([10, 20, 20])


In [None]:
class my_transformer(nn.Module):
    def __init__(self):
        super(my_transformer, self).__init__()
        
    def forward(self, x):

In [None]:

# 定义损失函数和优化器

loss_function = nn.CrossEntropyLoss()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = my_transformer().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

num_epochs=4
# train_test_loop(model,train_loader, test_loader, 
#                 optimizer, 
#                 loss_function, 
#                 num_epochs,device,
#                 )