<a href="https://colab.research.google.com/github/BATiger/Classic/blob/main/DenseNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets,transforms
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt

In [None]:
class DenseNet(nn.Module):
    def __init__(self):
        super(DenseNet, self).__init__()
        
    
# 一个
class Single(nn.Module):
    def __init__(self, input_channel, output_channel, drop_out=0.2, type='A'):
        super(Single, self).__init__()
        if type == 'A':
            self.block = nn.Sequential(
                # 参数和channel数一样
                nn.BatchNorm2d(input_channel),
                nn.ReLU(),
                nn.Conv2d(input_channel, output_channel, kernel_size=3,padding=1),
                nn.Dropout(p=drop_out),
            )

        elif type == 'B':
            self.block = nn.Sequential(
                nn.BatchNorm2d(input_channel),
                nn.ReLU(),
                nn.Conv2d(input_channel, output_channel*4, kernel_size=1),

                nn.BatchNorm2d(output_channel*4),
                nn.ReLU(),
                nn.Conv2d(output_channel*4, output_channel, kernel_size=3,padding=1),
                nn.Dropout(p=drop_out)
            )
        else:
            raise Exception('Runtime Error: Undefined Type')

    def forward(self,x):
        out = self.block(x)
        return torch.cat([x,out],1)

class Transition(nn.Module):
    # here if the number of output channel is smaller than the input channel, we call it densenet-C
    # bn + 1*1 conv + avg pooling 
    def __init__(self, input_channel, output_channel,drop_out=0):
        super(Transition, self).__init__()
        self.block = nn.Sequential(
            nn.BatchNorm2d(input_channel),
            nn.Conv2d(input_channel, output_channel, kernel_size=1),
            nn.Dropout(drop_out),
            nn.AvgPool2d(kernel_size=2),
        )

    def forward(self, x):
        out = self.block(x)
        return out

class DenseBlock(nn.Module):
    # output_channel = input_channel + (layers)*grow_rate
    def __init__(self, num_layers,input_channel, grow_rate,drop_out=0.2,type='A'):
        super(DenseBlock,self).__init__()
        self.layer = self._make_layer(num_layers,input_channel, grow_rate,drop_out,type)
    
    def _make_layer(self,num_layers,input_channel,grow_rate,drop_out,type):
        layers = []
        for i in range(num_layers):
            layers.append(Single(input_channel+i*grow_rate, grow_rate, drop_out, type))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        return self.layer(x)

# there are two parameters in the original paper: depth L and growth rates k
# here we use a different depth which means the depth for one block in dense block
# here we have 3 dense blocks
# if you want to implement densenet for imagenet, add one more dense block as described in paper
# feature-map in the three denseblock 32,32 -- 16,16 -- 8,8
# reduction corresponds to the compression in the original paper
class DenseNet3(nn.Module):
    def __init__(self,depth,num_classes,growth_rate,reduction=0.5,type='A',drop_t=0, drop_d=0.2):
        super(DenseNet3, self).__init__()
        self.conv1 = nn.Conv2d(3,2*growth_rate,kernel_size=3,padding=1)
        
        self.block1 = DenseBlock(depth, 2*growth_rate,growth_rate,drop_d,type)
        input_channel = int(2*growth_rate + depth*growth_rate)
        self.trans1 = Transition(input_channel,int(math.floor(input_channel*reduction)),drop_t)
        input_channel = int(math.floor(input_channel*reduction))
        
        self.block2 = DenseBlock(depth, input_channel,growth_rate,drop_d,type)
        input_channel = int(input_channel+depth*growth_rate)
        self.trans2 = Transition(input_channel,int(math.floor(input_channel*reduction)),drop_t)
        input_channel = int(math.floor(input_channel*reduction))
        
        self.block3 = DenseBlock(depth, input_channel,growth_rate,drop_d,type)
        input_channel = int(input_channel+depth*growth_rate)

        self.bn1 = nn.BatchNorm2d(input_channel)
        self.relu = nn.ReLU()
        
        self.pool = nn.AvgPool2d(kernel_size=8)
        
        self.fc = nn.Linear(input_channel, num_classes)
        self.input_channel = input_channel
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.trans1(self.block1(x))
        x = self.trans2(self.block2(x))
        x = (self.block3(x))
        x = self.relu(self.bn1(x))
        x = self.pool(x)
        x = x.view(-1, self.input_channel)
        return self.fc(x)

In [None]:
batch_size = 64
epochs = 20
learning_rate = 0.001
GPU = True

# choose device
print(torch.cuda.is_available())
if GPU:
    device = torch.device("cuda")

True


In [None]:
transform = transforms.Compose([
          transforms.ToTensor()
])

trainset = datasets.CIFAR10(root='data', train=True, download=True,transform=transform)
train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True)

testset = datasets.CIFAR10(root='data',train=False,download=True,transform=transform)
test_loader = DataLoader(testset, batch_size=batch_size,shuffle=False)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar-10-python.tar.gz


  0%|          | 0/170498071 [00:00<?, ?it/s]

Extracting data/cifar-10-python.tar.gz to data
Files already downloaded and verified


In [None]:
# here we try dense block with depth 12, growth rate 12, desnet-C

model = DenseNet3(12, 10, 12).to(device)
optimizer = optim.Adam(model.parameters(),lr=learning_rate)
criterion = nn.CrossEntropyLoss()


for epoch in range(epochs):
    count = 0
    print('epoch {}'.format(epoch + 1))
    # training------------------------------------------
    train_loss = 0
    train_acc = 0
    for x,y in train_loader:
        x,y = x.to(device), y.to(device)
        output = model(x)
        loss = criterion(output, y)

        train_loss += loss.item()
        # torch.max 返回 (1)最大值 (2)最大值的index 这里要的是index
        pred = torch.max(output,1)[1]
        
        train_correct = (pred == y).sum()
        train_acc += train_correct.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        count = count+1

    # 打印每一个epoch的结果
    print('Train Loss: {:.6f}, Acc: {:.6f}'.format(train_loss / count, train_acc / (len(trainset)))) 

epoch 1
Train Loss: 1.405261, Acc: 0.481400
epoch 2
Train Loss: 1.010156, Acc: 0.637620
epoch 3
Train Loss: 0.870009, Acc: 0.689320
epoch 4
Train Loss: 0.771095, Acc: 0.726440
epoch 5
Train Loss: 0.693528, Acc: 0.754620
epoch 6
Train Loss: 0.620617, Acc: 0.783820
epoch 7
Train Loss: 0.570005, Acc: 0.800620
epoch 8
Train Loss: 0.517378, Acc: 0.818760
epoch 9
Train Loss: 0.486280, Acc: 0.829820
epoch 10
Train Loss: 0.455456, Acc: 0.841660
epoch 11
Train Loss: 0.422807, Acc: 0.851120
epoch 12
Train Loss: 0.401333, Acc: 0.859860
epoch 13
Train Loss: 0.376250, Acc: 0.868560
epoch 14
Train Loss: 0.357873, Acc: 0.874340
epoch 15
Train Loss: 0.339965, Acc: 0.882320
epoch 16
Train Loss: 0.318316, Acc: 0.888820
epoch 17
Train Loss: 0.301671, Acc: 0.894060
epoch 18
Train Loss: 0.287703, Acc: 0.898480
epoch 19
Train Loss: 0.277035, Acc: 0.900720
epoch 20
Train Loss: 0.261769, Acc: 0.907140


In [None]:
with torch.no_grad():
    model.eval()
    eval_loss = 0.
    eval_acc = 0.

    for x, y in test_loader:
        x, y = x.to(device), y.to(device)
        out = model(x)
        loss =  criterion(out, y)
        eval_loss += loss.item()
        pred = torch.max(out, 1)[1]
        num_correct = (pred == y).sum()
        eval_acc += num_correct.item()
    
    print('test_loss: ', eval_loss/len(test_loader))
    print('test accuracy: ', eval_acc/len(testset))
    #0.40

test_loss:  0.451122565444108
test accuracy:  0.8551
