In [25]:
from torchvision.transforms.functional import pad
from torch.utils.data import Dataset
from PIL import Image

class MinstNumDataset(Dataset):
    def __init__(self, data_files, transform=None):
        """
        Args:
            data_files (list): List of file paths to the data batches.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.data = []
        self.labels = []
        self.transform = transform

        # Load data from all files
        for file in data_files:
            with open(file, 'r') as f:
                for line in f:
                    image_path, label = line.strip().split()
                    image_path = image_path.replace("\\", "/")
                    image_path = '/Users/mini4chaos/Developer/cifar-10/' + image_path

                    self.data.append(image_path)
                    self.labels.append(int(label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path = self.data[idx]
        label = self.labels[idx]

        # 加载图片并转换为 RGB
        image = Image.open(image_path).convert("RGB")

        # 如果图片是 28x28，则补全到 32x32
        if image.size == (28, 28):
            image = pad(image, padding=2, fill=(0, 0, 0))  # 填充黑色边框

        if self.transform:
            image = self.transform(image)

        return image, label

In [26]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class LeNetMINSTNUM(nn.Module):
    def __init__(self, num_classes=10):
        super(LeNetMINSTNUM, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, kernel_size=5, stride=1, padding=2)  # 输入通道为3（RGB），输出通道为6
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)  # 池化层
        self.conv2 = nn.Conv2d(6, 16, kernel_size=3)  # 第二个卷积层
        self.fc1 = nn.Linear(16 * 7 * 7, 120)  # 全连接层1
        self.fc2 = nn.Linear(120, 84)  # 全连接层2
        self.fc3 = nn.Linear(84, num_classes)  # 输出层

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # 卷积1 -> ReLU -> 池化
        x = self.pool(F.relu(self.conv2(x)))  # 卷积2 -> ReLU -> 池化
        x = x.view(-1, 16 * 7 * 7)  # 展平操作
        x = F.relu(self.fc1(x))  # 全连接层1 -> ReLU
        x = F.relu(self.fc2(x))  # 全连接层2 -> ReLU
        x = self.fc3(x)  # 输出层
        return x

def evaluate(model, test_loader, device):
    """评估模型在测试集上的准确率"""
    model.eval()  # 确保模型处于评估模式
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model.forward(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    return accuracy

def train_model(model, train_loader, criterion, optimizer, num_epochs, device, test_loader=None):
    for epoch in range(num_epochs):
        model.train()  # 确保模型处于训练模式
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)  # 将数据移动到设备
            optimizer.zero_grad()
            outputs = model.forward(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
        
        # 每 10 个 epoch 评估一次
        if (epoch + 1) % 10 == 0 and test_loader is not None:
            print(f"Evaluating at epoch {epoch+1}...")
            accuracy = evaluate(model, test_loader, device)
            print(f'Accuracy at epoch {epoch+1}: {accuracy:.2f}%')

In [27]:
import torch
from torch.utils.data import DataLoader
import torch.optim as optim
from torchvision import transforms

train_dataset = MinstNumDataset(
    data_files=["/Users/mini4chaos/Developer/cifar-10/data/LEDNUM/train.txt"],
    transform=transforms.ToTensor()
)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataset = MinstNumDataset(
    data_files=["/Users/mini4chaos/Developer/cifar-10/data/LEDNUM/test.txt"],
    transform=transforms.ToTensor()
)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

# 初始化模型、损失函数和优化器
model = LeNetMINSTNUM().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
num_epochs = 20
train_model(model, train_loader, criterion, optimizer, num_epochs, device, test_loader)

Using device: mps
Epoch [1/20], Loss: 1.1372
Epoch [2/20], Loss: 0.1778
Epoch [3/20], Loss: 0.0063
Epoch [4/20], Loss: 0.0009
Epoch [5/20], Loss: 0.0004
Epoch [6/20], Loss: 0.0004
Epoch [7/20], Loss: 0.0009
Epoch [8/20], Loss: 0.0003
Epoch [9/20], Loss: 0.0004
Epoch [10/20], Loss: 0.0003
Evaluating at epoch 10...
Accuracy at epoch 10: 100.00%
Epoch [11/20], Loss: 0.0001
Epoch [12/20], Loss: 0.0001
Epoch [13/20], Loss: 0.0004
Epoch [14/20], Loss: 0.0001
Epoch [15/20], Loss: 0.0002
Epoch [16/20], Loss: 0.0002
Epoch [17/20], Loss: 0.0005
Epoch [18/20], Loss: 0.0000
Epoch [19/20], Loss: 0.0001
Epoch [20/20], Loss: 0.0001
Evaluating at epoch 20...
Accuracy at epoch 20: 100.00%
