In [59]:
from torch import  nn
import torch as t
from torch.nn import  functional as F
from PIL import Image
from torchvision.transforms import ToTensor,ToPILImage
# to_tensor = ToTensor() # img -> tensor
# to_pil = ToPILImage()
# lena = Image.open('/content/sample_data/lena.png')
# lena = to_tensor(lena).unsqueeze(0)
# conv1 = nn.Conv2d(1,1,7,2,3)
# output = conv1(lena)
# output.size()

In [60]:
# Basicblock的实现
class BasicBlock(nn.Module):
  def __init__(self, inchannel, outchannel, stride=1, shortcut=None):
    super().__init__()
    self.layer = nn.Sequential(
            nn.Conv2d(inchannel,outchannel,3,stride, 1,bias=False),
            nn.BatchNorm2d(outchannel),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannel,outchannel,3,1,1,bias=False),
            nn.BatchNorm2d(outchannel) )
    self.right = shortcut

  def forward(self,x):
    out = self.layer(x)
    residual = x if self.right is None else self.right(x)
    out+=residual
    return F.relu(out)


In [61]:
# resnet18
class Resnet18(nn.Module):
  def __init__(self,num_classes=10):
    super().__init__()
    #预处理块
    self.pre=nn.Sequential(
        nn.Conv2d(3, 64, 7, 2, 3, bias=False),
        nn.BatchNorm2d(64),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(3,2,1)
    )
    #各个块
    self.layer1=self._maker_layer(64,64,2)
    self.layer2=self._maker_layer(64,128,2,stride=2)
    self.layer3=self._maker_layer(128,256,2,stride=2)
    self.layer4=self._maker_layer(256,512,2,stride=2)
    #全连接
    self.fc = nn.Linear(512,num_classes)

  def _maker_layer(self,inchannel, outchannel, block_num, stride=1):
    shortcut=nn.Sequential(
          nn.Conv2d(inchannel,outchannel,1,stride, bias=False),
          nn.BatchNorm2d(outchannel)
      )
    layers=[]
    layers.append(BasicBlock(inchannel,outchannel,stride,shortcut))

    for i in range(1,block_num):
      layers.append(BasicBlock(outchannel, outchannel))
    return nn.Sequential(*layers)

  def forward(self,x):
    x=self.pre(x)

    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)

    x = F.avg_pool2d(x, 7)
    x = x.view(x.size(0), -1)
    return self.fc(x)
# resnet34
class Resnet34(nn.Module):
  def __init__(self,num_classes=10):
    super().__init__()
    #预处理块
    self.pre=nn.Sequential(
        nn.Conv2d(3, 64, 7, 2, 3, bias=False),
        nn.BatchNorm2d(64),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(3,2,1)
    )
    #各个块
    self.layer1=self._maker_layer(64,64,3)
    self.layer2=self._maker_layer(64,128,4,stride=2)
    self.layer3=self._maker_layer(128,256,6,stride=2)
    self.layer4=self._maker_layer(256,512,3,stride=2)
    #全连接
    self.fc = nn.Linear(512,num_classes)

  def _maker_layer(self,inchannel, outchannel, block_num, stride=1):
    shortcut=nn.Sequential(
          nn.Conv2d(inchannel,outchannel,1,stride, bias=False),
          nn.BatchNorm2d(outchannel)
      )
    layers=[]
    layers.append(BasicBlock(inchannel,outchannel,stride,shortcut))

    for i in range(1,block_num):
      layers.append(BasicBlock(outchannel, outchannel))
    return nn.Sequential(*layers)

  def forward(self,x):
    x=self.pre(x)

    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)

    x = F.avg_pool2d(x, 7)
    x = x.view(x.size(0), -1)
    return self.fc(x)

In [62]:
#可以进行合并
# resnet
class Resnet(nn.Module):
  def __init__(self,layer_num,num_classes=10):
    super().__init__()
    #预处理块
    self.pre=nn.Sequential(
        nn.Conv2d(3, 64, 7, 2, 3, bias=False),
        nn.BatchNorm2d(64),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(3,2,1)
    )
    #各个块
    self.layer1=self._maker_layer(64,64,layer_num[0])
    self.layer2=self._maker_layer(64,128,layer_num[1],stride=2)
    self.layer3=self._maker_layer(128,256,layer_num[2],stride=2)
    self.layer4=self._maker_layer(256,512,layer_num[3],stride=2)
    #全连接
    self.fc = nn.Linear(512,num_classes)

  def _maker_layer(self,inchannel, outchannel, block_num, stride=1):
    shortcut=nn.Sequential(
          nn.Conv2d(inchannel,outchannel,1,stride, bias=False),
          nn.BatchNorm2d(outchannel)
      )
    layers=[]
    layers.append(BasicBlock(inchannel,outchannel,stride,shortcut))

    for i in range(1,block_num):
      layers.append(BasicBlock(outchannel, outchannel))
    return nn.Sequential(*layers)

  def forward(self,x):
    x=self.pre(x)

    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)

    x = F.avg_pool2d(x, 7)
    x = x.view(x.size(0), -1)
    return self.fc(x)

In [63]:
resnet18=Resnet(layer_num=[2,2,2,2])
print(resnet18)

Resnet(
  (pre): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  )
  (layer1): Sequential(
    (0): BasicBlock(
      (layer): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (right): Sequential(
        (0): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     

In [64]:
# resnet50
# 残差结构不一样，需要重写
class Bottleneck(nn.Module):
    def __init__(self,in_channels,out_channels,stride=[1,1,1],padding=[0,1,0],first=False):
        super().__init__()
        self.bottleneck = nn.Sequential(
            nn.Conv2d(in_channels,out_channels,kernel_size=1,stride=stride[0],padding=padding[0],bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=stride[1],padding=padding[1],bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels,out_channels*4,kernel_size=1,stride=stride[2],padding=padding[2],bias=False),
            nn.BatchNorm2d(out_channels*4)
        )

        # shortcut 部分
        self.shortcut = nn.Sequential()
        if first:
            self.shortcut = nn.Sequential(
                # 进行升降维
                # stride==2
                nn.Conv2d(in_channels, out_channels*4, kernel_size=1, stride=stride[1], bias=False),
                nn.BatchNorm2d(out_channels*4)
            )
    def forward(self, x):
      out = self.bottleneck(x)
      out += self.shortcut(x)
      out = F.relu(out)
      return out
class ResNet50(nn.Module):
    def __init__(self,Bottleneck, num_classes=10) -> None:
        super().__init__()
        self.in_channels = 64
        # 预处理
        self.conv1 = nn.Sequential(
            nn.Conv2d(3,64,kernel_size=7,stride=2,padding=3,bias=False),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.layer1 = self._make_layer(Bottleneck,64,[[1,1,1]]*3,[[0,1,0]]*3)

        self.layer2 = self._make_layer(Bottleneck,128,[[1,2,1]] + [[1,1,1]]*3,[[0,1,0]]*4)

        self.layer3 = self._make_layer(Bottleneck,256,[[1,2,1]] + [[1,1,1]]*5,[[0,1,0]]*6)

        self.layer4 = self._make_layer(Bottleneck,512,[[1,2,1]] + [[1,1,1]]*2,[[0,1,0]]*3)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(2048, num_classes)

    def _make_layer(self,block,out_channels,strides,paddings):
        layers = []
        # 第一层标志
        flag = True
        for i in range(0,len(strides)):
            layers.append(block(self.in_channels,out_channels,strides[i],paddings[i],first=flag))
            flag = False
            self.in_channels = out_channels * 4


        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)

        out = self.avgpool(out)
        out = out.reshape(x.shape[0], -1)
        out = self.fc(out)
        F.log_softmax(out,dim=1)
        return F.log_softmax(out,dim=1)



In [65]:
res50 = ResNet50(Bottleneck)
print(res50)

ResNet50(
  (conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  )
  (layer1): Sequential(
    (0): Bottleneck(
      (bottleneck): Sequential(
        (0): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (5): ReLU(inplace=True)
        (6): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (shortcut): Sequential(
   

In [66]:
# CIFAR10测试
from torch.utils.data import DataLoader
from torchvision import datasets, utils
import matplotlib.pyplot as plt
from PIL import Image
from torchvision.transforms import transforms
# 定义变化管道
transform = transforms.Compose([ToTensor(),
                                transforms.Normalize(
                                    mean=[0.5,0.5,0.5],
                                    std=[0.5,0.5,0.5]
                                ),
                                transforms.Resize((224, 224))
                               ])
# 数据集加载
training_data = datasets.CIFAR10(
    root="/content/sample_data",
    train=True,
    download=True,
    transform=transform,
)

testing_data = datasets.CIFAR10(
    root="/content/sample_data",
    train=False,
    download=True,
    transform=transform,
)
train_dataloader = DataLoader(batch_size=100,dataset=training_data,shuffle=True)
test_dataloader = DataLoader(batch_size=100,dataset=testing_data,shuffle=True)

Files already downloaded and verified
Files already downloaded and verified


In [67]:
#超参等
device = (
    "cuda"
    if t.cuda.is_available()
    else "cpu"
)
print(f"Using {device} device")
import torch.optim as optim
res50 = res50.to(device)
optimizer = optim.Adam(res50.parameters())
epoch =10

Using cuda device


In [68]:
train_losses = []
train_counter = []
test_losses = []
test_counter = [i * len(train_dataloader.dataset) for i in range(epoch+1)]
print(test_counter)
def train_loop(epoch):
  res50.train()
  for batch_idx, (data,target) in enumerate(train_dataloader):
    data, target = data.to(device), target.to(device)
    optimizer.zero_grad()
    outputs = res50(data)
    loss =  F.nll_loss(outputs, target)
    loss.backward()
    optimizer.step()
    if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data),
                                          len(train_dataloader.dataset),
                                          100. * batch_idx / len(train_dataloader),
                                          loss.item()))
    train_losses.append(loss.item())
    train_counter.append((batch_idx * 100) + ((epoch - 1) * len(train_dataloader.dataset)))
    t.save(res50.state_dict(), './model.pth')
    t.save(optimizer.state_dict(), './optimizer.pth')
def test():
  res50.eval()
  test_loss = 0
  correct = 0
  with t.no_grad():
    for data, target in test_dataloader:
      data, target = data.to(device), target.to(device)
      output = res50(data)
      test_loss += F.nll_loss(output, target, reduction='sum').item()
      pred = output.data.max(1, keepdim=True)[1]#返回索引
      correct += pred.eq(target.data.view_as(pred)).sum()#比较pred与target的函数，对应位置是否相同
  test_loss /= len(test_dataloader.dataset)
  test_losses.append(test_loss)
  print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_dataloader.dataset),
        100. * correct / len(test_dataloader.dataset)))


[0, 50000, 100000, 150000, 200000, 250000, 300000, 350000, 400000, 450000, 500000]


In [None]:
for epoch in range(1, epoch+1):
    train_loop(epoch)
    test()
fig = plt.figure()
plt.plot(train_counter,train_losses,color='red')
plt.scatter(test_counter[1:],test_losses,color='blue')
plt.legend(['Train Loss', 'Test Loss'], loc='upper right')
plt.xlabel('number of training examples seen')
plt.ylabel('negative log likelihood loss')




Test set: Avg. loss: 1.3789, Accuracy: 5057/10000 (51%)


Test set: Avg. loss: 1.2232, Accuracy: 5963/10000 (60%)


Test set: Avg. loss: 0.9523, Accuracy: 6670/10000 (67%)


Test set: Avg. loss: 0.8278, Accuracy: 7137/10000 (71%)


Test set: Avg. loss: 0.9721, Accuracy: 7366/10000 (74%)

