In [1]:
import numpy as np
import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torchvision.datasets import CIFAR10

In [2]:
class bottleneck(nn.Module):
    def __init__(self,in_channel,out_channel,same_shape=True):         #same_shape指x与F（x）规模是否一致，不一致要变成一致
        super(bottleneck,self).__init__()
        self.same_shape=same_shape
        stride=1 if self.same_shape else 2
        
        self.conv1=nn.Conv2d(in_channel,out_channel,3,stride,padding=1,bias=False)
        self.bn1=nn.BatchNorm2d(out_channel)
        
        self.conv2=nn.Conv2d(out_channel,out_channel,3,1,padding=1,bias=False)
        self.bn2=nn.BatchNorm2d(out_channel)
        
        if not self.same_shape:
            self.conv3=nn.Conv2d(in_channel,out_channel,1,stride,padding=0,bias=False)
            
    def forward(self,x):
        out=self.conv1(x)
        out=F.relu(self.bn1(out),True)
        out=self.conv2(out)
        out=F.relu(self.bn2(out),True)
        if not self.same_shape:
            x=self.conv3(x)
        return F.relu(x+out,True)

In [3]:
test_net=bottleneck(32,64,False)                             #做完一个板块就测试一个板块是否符合要求
test_x=Variable(torch.rand(1,32,96,96))
print(test_x.size())
test_y=test_net(test_x)
print(test_y.size())

torch.Size([1, 32, 96, 96])
torch.Size([1, 64, 48, 48])


In [4]:
class resnet(nn.Module):
    def __init__(self,in_channel,num_classes,verbose=False):
        super(resnet,self).__init__()
        self.verbose=verbose
        
        self.block1=nn.Conv2d(in_channel,64,7,2)
        
        self.block2=nn.Sequential(
            nn.MaxPool2d(3,2),
            bottleneck(64,64),
            bottleneck(64,64)
        )
        
        self.block3=nn.Sequential(
            bottleneck(64,128,False),
            bottleneck(128,128),
            bottleneck(128,128),
        )
        
        self.block4=nn.Sequential(
            bottleneck(128,256,False),
            bottleneck(256,256),
            bottleneck(256,256),
        )
        
        self.block5=nn.Sequential(
            bottleneck(256,512,False),
            bottleneck(512,512),
            nn.AvgPool2d(3)
        )
        
        self.classfy=nn.Linear(512,num_classes)
        
    def forward(self,x):
        x=self.block1(x)
        if self.verbose:
            print('block 1 output:{}'.format(x.shape))
        x=self.block2(x)
        if self.verbose:
            print('block 2 output:{}'.format(x.shape))
        x=self.block3(x)
        if self.verbose:
            print('block 3 output:{}'.format(x.shape))
        x=self.block4(x)
        if self.verbose:
            print('block 4 output:{}'.format(x.shape))
        x=self.block5(x)
        if self.verbose:
            print('block 5 output:{}'.format(x.shape))
        x=x.view(x.size(0),-1)
        x=self.classfy(x)
        if self.verbose:
            print('class output:{}'.format(x.shape))
        return x

In [5]:
test_net=resnet(3,10,True)                             #做完一个板块就测试一个板块是否符合要求
test_x=Variable(torch.rand(1,3,96,96))
print(test_x.size())
test_y=test_net(test_x)

torch.Size([1, 3, 96, 96])
block 1 output:torch.Size([1, 64, 45, 45])
block 2 output:torch.Size([1, 64, 22, 22])
block 3 output:torch.Size([1, 128, 11, 11])
block 4 output:torch.Size([1, 256, 6, 6])
block 5 output:torch.Size([1, 512, 1, 1])
class output:torch.Size([1, 10])


In [6]:
def data_tf(x):
    x=x.resize((96,96),2)
    x=np.array(x,dtype='float32')/32
    x=(x-0.5)/0.5
    x=x.transpose((2,0,1))
    x=torch.from_numpy(x)
    return x

In [7]:
from torch.utils.data import DataLoader
train_set=CIFAR10('./data',train=True,transform=data_tf,download=True)
test_set=CIFAR10('./data',train=False,transform=data_tf,download=True)
train_data=DataLoader(train_set,batch_size=64,shuffle=True)
test_data=DataLoader(train_set,batch_size=64,shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [8]:
from datetime import datetime
def get_acc(output, label):

    total = output.shape[0]

    _, pred_label = output.max(1)

    num_correct = (pred_label == label).sum().item()

    return num_correct / total

def train(net, train_data, valid_data, num_epochs, optimizer, criterion):

    if torch.cuda.is_available():

        net = net.cuda()

    prev_time = datetime.now()

    for epoch in range(num_epochs):

        train_loss = 0

        train_acc = 0

        net = net.train()

        for im, label in train_data:

            if torch.cuda.is_available():

                im = Variable(im.cuda())  # (bs, 3, h, w)

                label = Variable(label.cuda())  # (bs, h, w)

            else:

                im = Variable(im)

                label = Variable(label)

            # forward

            output = net(im)

            loss = criterion(output, label)

            # backward

            optimizer.zero_grad()

            loss.backward()

            optimizer.step()



            train_loss += loss.item()

            train_acc += get_acc(output, label)



        cur_time = datetime.now()

        h, remainder = divmod((cur_time - prev_time).seconds, 3600)

        m, s = divmod(remainder, 60)

        time_str = "Time %02d:%02d:%02d" % (h, m, s)

        if valid_data is not None:

            valid_loss = 0

            valid_acc = 0

            net = net.eval()

            for im, label in valid_data:

                if torch.cuda.is_available():

                    im = Variable(im.cuda(), volatile=True)

                    label = Variable(label.cuda(), volatile=True)

                else:

                    im = Variable(im, volatile=True)

                    label = Variable(label, volatile=True)

                output = net(im)

                loss = criterion(output, label)

                valid_loss += loss.item()

                valid_acc += get_acc(output, label)

            epoch_str = (

                "Epoch %d. Train Loss: %f, Train Acc: %f, Valid Loss: %f, Valid Acc: %f, "

                % (epoch, train_loss / len(train_data),

                   train_acc / len(train_data), valid_loss / len(valid_data),

                   valid_acc / len(valid_data)))

        else:

            epoch_str = ("Epoch %d. Train Loss: %f, Train Acc: %f, " %

                         (epoch, train_loss / len(train_data),

                          train_acc / len(train_data)))

        prev_time = cur_time

        print(epoch_str + time_str)

In [9]:
net=resnet(3,10)
optimizer=torch.optim.SGD(net.parameters(),lr=0.01)
criterion=nn.CrossEntropyLoss()

In [None]:
train(net,train_data,test_data,20,optimizer,criterion)



Epoch 0. Train Loss: 1.355932, Train Acc: 0.507373, Valid Loss: 2.123440, Valid Acc: 0.269881, Time 00:02:09
Epoch 1. Train Loss: 0.919471, Train Acc: 0.675991, Valid Loss: 0.986277, Valid Acc: 0.647838, Time 00:02:46
Epoch 2. Train Loss: 0.688440, Train Acc: 0.760110, Valid Loss: 0.731440, Valid Acc: 0.734635, Time 00:02:47


In [10]:
if torch.cuda.is_available():
    print('Ture')

Ture
