In [1]:
import torch
from torch import nn

In [2]:
def vgg_blk(num_convs, in_channels, out_channels):
    layers = []
    for _ in range(num_convs):
        layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
        layers.append(nn.ReLU())
        
        in_channels = out_channels
    layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
    
    return nn.Sequential(*layers)

In [3]:
conv_arch = ((1, 64), (1, 128), (2, 256), (2, 256), (2, 512))

In [4]:
def vgg(conv_arch):
    in_channels = 1
    conv_blks = []
    for num_conv, out_channels in conv_arch:
        conv_blks.append(vgg_blk(num_conv, in_channels, out_channels))
        in_channels = out_channels
    
    return nn.Sequential(*conv_blks, nn.Flatten(1),
                        nn.Linear(out_channels * 7 * 7, 4096), nn.ReLU(), nn.Dropout(0.5),
                        nn.Linear(4096, 4096), nn.ReLU(), nn.Dropout(0.5),
                        nn.Linear(4096, 10))

In [5]:
model = vgg(conv_arch)

In [6]:
X = torch.randn(size=(1, 1, 224, 224))
for blk in model:
    X = blk(X)
    print(f"blk:{blk.__class__.__name__}\t output shape:{X.shape}")

blk:Sequential	 output shape:torch.Size([1, 64, 112, 112])
blk:Sequential	 output shape:torch.Size([1, 128, 56, 56])
blk:Sequential	 output shape:torch.Size([1, 256, 28, 28])
blk:Sequential	 output shape:torch.Size([1, 256, 14, 14])
blk:Sequential	 output shape:torch.Size([1, 512, 7, 7])
blk:Flatten	 output shape:torch.Size([1, 25088])
blk:Linear	 output shape:torch.Size([1, 4096])
blk:ReLU	 output shape:torch.Size([1, 4096])
blk:Dropout	 output shape:torch.Size([1, 4096])
blk:Linear	 output shape:torch.Size([1, 4096])
blk:ReLU	 output shape:torch.Size([1, 4096])
blk:Dropout	 output shape:torch.Size([1, 4096])
blk:Linear	 output shape:torch.Size([1, 10])


In [7]:
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

In [8]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,)),
])     

In [9]:
train_data = torchvision.datasets.FashionMNIST(root="../data", train=True, download=False, transform=transform)
test_data = torchvision.datasets.FashionMNIST(root="../data", train=False, download=False, transform=transform)

In [10]:
train_loader = DataLoader(train_data, shuffle=True, batch_size=128)
test_loader = DataLoader(test_data, shuffle=True, batch_size=128)

In [11]:
def accuracy(y_hat, y):
    if(len(y_hat.shape) > 1 and y_hat.shape[1] > 1):
        y_hat = y_hat.argmax(axis=1)
    
    cmp = y_hat.type(y.dtype) == y
    return float(cmp.type(y.dtype).sum())

In [12]:
for X, y in train_loader:
    print(X.shape)
    break

torch.Size([128, 1, 224, 224])


In [13]:
class Accumulator:
    def __init__(self, num):
        self.data = [0.0] * num
    
    def __getitem__(self, idx):
        return self.data[idx]
    
    def add(self, *args):
        self.data = [a + float(b) for a, b in zip(self.data, args)]
        
    def reset(self):
        self.data = [0.0] * len(self.data)

In [14]:
def eval_acc_gpu(model, test_loader, device="cuda"):
    # 放在train函数中使用 故将test_loader中的数据移动到model的参数所在的device上即可
    if(isinstance(model, nn.Module)):
        model.eval()  # 设置为评估模式
        if(device != "cuda"):
            device = next(model.parameters()).device
        
        accumulator = Accumulator(2)
        
        with torch.no_grad():
            for X, y in test_loader:
                if(isinstance(X, list)):
                    X = [x.to(device) for x in X]               
                else:
                    X = X.to(device)
            y = y.to(device)
            y_hat = model(X)
            accumulator.add(accuracy(y_hat, y), X.shape[0])
    return accumulator[0] / accumulator[1]

In [15]:
def train(model, train_loader, test_loader, num_epochs=10, lr=0.05, device='cuda'):
    if device == "cuda":
        model.to(device)
        print(f"model running in: {next(model.parameters()).device}")
        
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    accumulator = Accumulator(3)
    
    for epoch in range(num_epochs):
        print(f"Epoch{epoch+1} start")
        model.train()
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            
            
            y_hat = model(X)
            l = loss(y_hat, y)  # 此处二者位置不能更改
            l.backward()
            optimizer.step()
            print(".", end="")
            
            with torch.no_grad():
                # 累加器中放入每个batch的total loss 准确率 batch_size
                accumulator.add(l * X.shape[0], accuracy(y_hat, y), X.shape[0])
            
            train_l = accumulator[0] / accumulator[2]
            train_acc = accumulator[1] / accumulator[2]
            
        # 使用test_data对model进行eval
        test_acc = eval_acc_gpu(model, test_loader)
        
        print(f"Epoch{epoch+1}/{num_epochs}\t train_loss:{train_l}\t train_acc:{train_acc}")
        print(f"test_acc:{test_acc}")


In [None]:
train(model, train_loader, test_loader)

model running in: cuda:0
Epoch1 start
.....................................................................................