In [1]:
import torch
from torch import nn
from torchvision import datasets, transforms


In [9]:

class VGG(nn.Module):
    def vgg_block(self, num_convs, in_channels, out_channels):
        layer = []

        # 卷积层重复
        for i in range(num_convs):
            layer.append( nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1 ))
            layer.append( nn.ReLU() )
            in_channels = out_channels

        # 添加池化层
        layer.append( nn.MaxPool2d(kernel_size=2, stride=2) )

        return nn.Sequential( * layer )


    def vgg_net( self, conv_arch ):

        block_list = []

        in_channels = 1
        for num_convs, out_channels in conv_arch:

            # 依次添加VGG块
            block_list.append( self.vgg_block(num_convs, in_channels, out_channels) )
            in_channels = out_channels

        # MLP 部分
        block_list.append(
            nn.Sequential(
                nn.Flatten(),
                nn.Linear(out_channels * 7 * 7, 4096), nn.ReLU(), nn.Dropout(0.5), 
                nn.Linear(4096, 4096), nn.ReLU(), nn.Dropout(0.5),\
                nn.Linear(4096, 10))
        )

        return nn.Sequential( * block_list )
    
    def __init__(self, conv_arch ) -> None:

        super().__init__()
        self.net = self.vgg_net(conv_arch)

    def forward(self, x):
        x = self.net(x)
        return x

    

In [3]:
import torch.utils
import torch.utils.data
from torch.utils.data import DataLoader

trans = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# 加载数据集
train_dataset = datasets.FashionMNIST(root="./data", train=True,
                                                  transform=trans, download=True)
test_dataset = datasets.FashionMNIST(root="./data", train=False,
                                                 transform=trans, download=True)

print( f'train size = {len(train_dataset)}' )
print( f'test size = {len(test_dataset)}' )


batch_size = 128
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

train size = 60000
test size = 10000


In [8]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# device = 'cpu'

# (num_conv, out_channels), (num_conv, out_channels),(num_conv, out_channels) 
conv_arch = ((1, 16), (1, 32), (2, 64), (2, 128), (2, 128))

model = VGG(conv_arch).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.003)

In [5]:
def training(dataloader, model, loss_fn, optimizer):
    
    model.train()

    loss, correct= 0.0, 0.0
    n = 0

    for X,y in dataloader:
    
        X,y = X.to(device), y.to(device)
        output = model(X)

        cur_loss = loss_fn(output, y)
    
        _, pred = torch.max(output, axis = 1)
    
        optimizer.zero_grad()
        cur_loss.backward()
        optimizer.step()

        loss += cur_loss * len(y)
        correct += torch.sum(pred==y).item()
        n += len(y)
    return loss / n, correct / n


In [6]:
def val(dataloader, model, loss_fn):
    
    model.eval()

    with torch.no_grad():

        loss, correct= 0.0, 0.0
        n = 0 

        for X,y in dataloader:

            X,y = X.to(device), y.to(device)
            output = model(X)
            _, pred = torch.max(output, axis = 1)
            
            cur_loss = loss_fn(output, y)
            correct += torch.sum(pred==y).item()
            loss += cur_loss * len(y)
            n += len(y)
            
    return loss/n, correct/n

In [7]:
epochs = 10
max_acc = 0

params = model.state_dict()

for epoch in range(epochs):

    loss, acc = training(train_loader, model, loss_fn, optimizer)
    print(f'epoch {epoch + 1}/{epochs}: loss = {loss} acc = {acc}')
    loss, acc = val(test_loader, model, loss_fn)
    print(f'Validation: loss = {loss} acc = {acc}\n')

    # if acc > max_acc:
    #     max_acc = acc
    #     params = model.state_dict()
    #     print('saved local best')

KeyboardInterrupt: 

: 