In [2]:
import torch.nn as nn
import torch

In [3]:
class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel,
                               kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channel)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel,
                               kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channel)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        # 需要降采样的残差边
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity
        out = self.relu(out)
        return out

In [4]:
class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_channel, out_channel, stride=1, downsample=None,
                groups=1, width_per_group=64):
        super(Bottleneck, self).__init__()

        width = int(out_channel * (width_per_group / 64.)) * groups

        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width,
                                kernel_size=1, stride=1, bias=False)  # squeeze channels
        self.bn1 = nn.BatchNorm2d(width)
        
        self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups,
                                kernel_size=3, stride=stride, bias=False, padding=1)
        self.bn2 = nn.BatchNorm2d(width)
        
        self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel*self.expansion,
                                kernel_size=1, stride=1, bias=False)  # unsqueeze channels
        self.bn3 = nn.BatchNorm2d(out_channel*self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        out += identity
        out = self.relu(out)
        return out

In [5]:
class ResNet(nn.Module):
    def __init__(self, block, blocks_num, in_dim, 
                 num_classes=1000, include_top=True, groups=1, width_per_group=64):
        super(ResNet, self).__init__()
        self.include_top = include_top
        self.in_channel = 64

        self.groups = groups
        self.width_per_group = width_per_group

        self.conv1 = nn.Conv2d(in_dim, self.in_channel, kernel_size=7, stride=2,
                               padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_channel)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, blocks_num[0])
        self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)
        self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)
        self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2)
        if self.include_top:
            self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # output size = (1, 1)
            self.fc = nn.Linear(512 * block.expansion, num_classes)

        # 卷积层参数初始化
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

    def _make_layer(self, block, channel, block_num, stride=1):
        downsample = None
        if stride != 1 or self.in_channel != channel * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channel * block.expansion))

        layers = []
        layers.append(block(self.in_channel, channel, downsample=downsample,
                            stride=stride, groups=self.groups, width_per_group=self.width_per_group))
        self.in_channel = channel * block.expansion

        for _ in range(1, block_num):
            layers.append(block(self.in_channel, channel, groups=self.groups, width_per_group=self.width_per_group))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        if self.include_top:
            x = self.avgpool(x)
            x = torch.flatten(x, 1)
            x = self.fc(x)

        return x


def resnet34(in_dim=3, num_classes=1000, include_top=True):
    return ResNet(BasicBlock, [3, 4, 6, 3], in_dim=in_dim, num_classes=num_classes, include_top=include_top)

In [6]:
from torchvision import datasets,transforms as T

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
type(device)

torch.device

In [7]:
# batch大小
batch_size = 64

# 训练集和测试集的数据增强
train_transform = T.Compose([
                    T.Resize((224,224)),
                    T.RandomHorizontalFlip(0.5),
                    T.ToTensor()
])
test_transform = T.Compose([
                    T.Resize((224,224)),
                    T.ToTensor()
])

# mnist数据集
train_dataset = datasets.MNIST(root=r'data\mnist', 
                                train = True, transform = train_transform, download = True)
test_dataset = datasets.MNIST(root=r'data\mnist', 
                                train = False, transform =test_transform)
train_loader = torch.utils.data.DataLoader(dataset = train_dataset, batch_size = batch_size, shuffle = True)
test_loader = torch.utils.data.DataLoader(dataset = test_dataset, batch_size = batch_size, shuffle = False)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to data\mnist\MNIST\raw\train-images-idx3-ubyte.gz


13.3%





KeyboardInterrupt: 

In [None]:
model = resnet34(in_dim=1, num_classes=10).to(device)

In [None]:
torch.manual_seed(1)
# 学习率
learning_rate = 1e-3
# 训练轮数
num_epochs = 10
# 优化算法Adam = RMSProp + Momentum (梯度、lr两方面优化下降更快更稳)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) 
# 交叉熵损失函数
loss_fn = torch.nn.CrossEntropyLoss()  


def evaluate_accuracy(data_iter,model):
    '''
        模型预测精度
    '''
    total = 0
    correct = 0 
    with torch.no_grad():
        model.eval()
        for images,labels in data_iter:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _,predicts = torch.max(outputs.data, dim=1)
            total += labels.size(0)
            correct += (predicts == labels).cpu().sum().numpy()
    return  correct / total

In [None]:
def train(data_loader=train_loader, optimizer=optimizer, loss_fn=loss_fn, epochs=num_epochs, device=device,):
    for epoch in range(epochs):
        print('current epoch = {}'.format(epoch))
        for i,(images,labels) in enumerate(data_loader):
            train_accuracy_total = 0
            train_correct = 0
            train_loss_sum = 0
            model.train()
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = loss_fn(outputs,labels)   # 计算模型的损失
            optimizer.zero_grad()            # 在做反向传播前先清除网络状态
            loss.backward()                  # 损失值进行反向传播
            optimizer.step()                 # 参数迭代更新

            train_loss_sum += loss.item()    # item()返回的是tensor中的值，且只能返回单个值（标量），不能返回向量，使用返回loss等
            _,predicts = torch.max(outputs.data,dim=1)  # 输出10类中最大的那个值
            train_accuracy_total += labels.size(0)
            train_correct += (predicts == labels).cpu().sum().item()
        test_acc = evaluate_accuracy(test_loader,model)
        print('epoch:{0},   loss:{1:.4f},   train accuracy:{2:.3f},  test accuracy:{3:.3f}'.format(
                epoch, train_loss_sum/batch_size, train_correct/train_accuracy_total, test_acc))
    print('------------finish training-------------')