In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.autograd import Variable
from torchvision import datasets, transforms
import matplotlib.pyplot as plt

In [None]:
CUDA = torch.cuda.is_available()
device = torch.device("cuda" if CUDA else "cpu")
print(device)

In [None]:
# The initial Dense block is structured with Batch Normalization, ReLU, and 3x3 convolutional layers.
class basic_layer(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(basic_layer, self).__init__()      
        self.basic = nn.Sequential(nn.BatchNorm2d(in_channels),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, padding=1, bias=False))

    def forward(self, x):
        out = self.basic(x)
        out = torch.cat([out, x], dim=1) 

        return out

In [None]:
# Bottleneck (Batch Normalization, ReLU, 1x1 convolutional layer, Batch Normalization, ReLU, 3x3 convolutional layer) is used in order to reduce the channel dimensionality and the amount of parameters calculated.
class bottleneck_layer(nn.Module):
    def __init__(self, in_channels, bottleneck_size, growth_rate, drop_rate):
        super(bottleneck_layer, self).__init__()      
        self.bottleneck = nn.Sequential(nn.BatchNorm2d(in_channels),
                      nn.ReLU(inplace=True),
                      nn.Conv2d(in_channels=in_channels, out_channels=bottleneck_size*growth_rate, kernel_size=1, padding=0, bias=False),
                      nn.BatchNorm2d(bottleneck_size*growth_rate),
                      nn.ReLU(inplace=True),
                      nn.Conv2d(in_channels=bottleneck_size*growth_rate, out_channels=growth_rate, kernel_size=3, padding=1, bias=False))

        self.drop_rate = drop_rate
        self.dropout = nn.Dropout(p=self.drop_rate)

    def forward(self, x):
        out = self.bottleneck(x)
        if self.drop_rate > 0:
        out = self.dropout(out)

        out = torch.cat([out, x], dim=1) 

        return out

In [None]:
class DenseNet(nn.Module):
    def __init__(self, out_channels, growth_rate, num_layers, num_classes):
        super(DenseNet, self).__init__()      
        bottleneck_size = 4
        drop_rate = 0

        self.conv1 = nn.Sequential(nn.Conv2d(in_channels=3, out_channels=out_channels, kernel_size=7, stride=2, padding=3, bias=False),
                    nn.BatchNorm2d(out_channels),
                    nn.ReLU(inplace=True))

        self.maxpooling = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        layers = []
        block_in_channels = out_channels
        for i, num_layer in enumerate(num_layers):
        layers.append(self.dense_block(block_in_channels, bottleneck_size, growth_rate, drop_rate, num_layer))
        block_in_channels += num_layer*growth_rate

        if i != len(num_layers)-1:
          layers.append(self.transition_layer(block_in_channels, block_in_channels // 2))
          block_in_channels = block_in_channels // 2

        self.blocks = nn.Sequential(*layers)
        self.bn = nn.BatchNorm2d(block_in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.GAP_pooling = nn.AvgPool2d(7, stride=1)
        self.fc = nn.Linear(block_in_channels, num_classes)

    def dense_block(self, in_channels, bottleneck_size, growth_rate, drop_rate, num_layers):
        block = []
        for i in range(num_layers):
        block.append(bottleneck_layer(in_channels + i*growth_rate, bottleneck_size, growth_rate, drop_rate))

        return nn.Sequential(*block)

    def transition_layer(self, in_channels, out_channels):
        transition = nn.Sequential(nn.BatchNorm2d(in_channels),
                      nn.ReLU(inplace=True),
                      nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, padding=0, bias=False),
                      nn.AvgPool2d(kernel_size=2,stride=2))

        return transition

    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpooling(x)
        x = self.blocks(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.GAP_pooling(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

In [None]:
def DenseNet_n(num_layers):
    if num_layers == 121:
        # DenseNet121
        model = DenseNet(64, 32, [6, 12, 24, 16], num_classes)

    elif num_layers == 169:
        # DenseNet169
        model = DenseNet(64, 32, [6, 12, 32, 32], num_classes)

    elif num_layers == 201:
        # DenseNet201
        model = DenseNet(64, 32, [6, 12, 48, 32], num_classes)

    elif num_layers == 101:
        # DenseNet
        model = DenseNet(64, 32, [6, 12, 64, 48], num_classes)

    else:
        print("error")

        return

    return model


In [None]:
# Parameters
batch_size = 64
num_epochs = 5
lr = 0.001

num_classes=2

# DenseNet121
# model = DenseNet(64, 32, [6, 12, 24, 16], num_classes)

# DenseNet169
model = DenseNet_n(169)

if CUDA:
    model = model.cuda()

optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

In [None]:
import json
import zipfile
import os

# kaggle api
api_token = {"username":"aaa","key":"kkk"}
 
if not os.path.exists("/root/.kaggle"):
    os.makedirs("/root/.kaggle")

with open('/root/.kaggle/kaggle.json', 'w') as file:
    json.dump(api_token, file)
!chmod 600 /root/.kaggle/kaggle.json
 
if not os.path.exists("/kaggle"):
    os.makedirs("/kaggle")
os.chdir('/kaggle')
!kaggle datasets download -d chetankv/dogs-cats-images --force
 
!ls /kaggle

In [None]:
!unzip dogs-cats-images.zip

In [None]:
# Transform
transform = transforms.Compose(
                [transforms.Resize(size=(227,227)),
                 transforms.CenterCrop(224),
                 transforms.RandomRotation(20),
                  transforms.RandomHorizontalFlip(),
                 transforms.ToTensor(),
                transforms.Normalize((0.5,), (0.5,)),]
                )

# Data
train_dataset = datasets.ImageFolder(root='/kaggle/dataset/training_set', transform=transform)
valid_dataset = datasets.ImageFolder(root='/kaggle/dataset/test_set', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

In [None]:
def train(train_loader, model, criterion, optimizer, epoch):
    model.train()
    total_train = 0
    correct_train = 0
    train_loss = 0
    
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = Variable(data), Variable(target) 
        
        if CUDA:
            data, target = data.cuda(), target.cuda()

        # clear gradient
        optimizer.zero_grad()

        # Forward propagation
        output = model(data) 
        loss = criterion(output, target) 

        # Calculate gradients
        loss.backward()

        # Update parameters
        optimizer.step()

        predicted = torch.max(output.data, 1)[1]
        total_train += len(target)
        correct_train += sum((predicted == target).float())
        train_loss += loss.item()

        if batch_idx % 100 == 0:
            print("Train Epoch: {}/{} [iter： {}/{}], acc： {:.6f}, loss： {:.6f}".format(
               epoch+1, num_epochs, batch_idx+1, len(train_loader),
               correct_train / float((batch_idx + 1) * batch_size),
               train_loss / float((batch_idx + 1) * batch_size)))
            
    train_acc_ = 100 * (correct_train / float(total_train))
    train_loss_ = train_loss / total_train
                    
    return train_acc_, train_loss_

In [None]:
def validate(valid_loader, model, criterion, epoch): 
    model.eval()
    total_valid = 0
    correct_valid = 0
    valid_loss = 0
    
    for batch_idx, (data, target) in enumerate(valid_loader):
        data, target = Variable(data), Variable(target) 
        
        if CUDA:
            data, target = data.cuda(), target.cuda()

        output = model(data)
        loss = criterion(output, target) 

        predicted = torch.max(output.data, 1)[1]
        total_valid += len(target)
        correct_valid += sum((predicted == target).float())
        valid_loss += loss.item()

        if batch_idx % 100 == 0:
            print("Valid Epoch: {}/{} [iter： {}/{}], acc： {:.6f}, loss： {:.6f}".format(
               epoch+1, num_epochs, batch_idx+1, len(valid_loader),
               correct_valid / float((batch_idx + 1) * batch_size),
               valid_loss / float((batch_idx + 1) * batch_size)))
            
    valid_acc_ = 100 * (correct_valid / float(total_valid))
    valid_loss_ = valid_loss / total_valid
                    
    return valid_acc_, valid_loss_

In [None]:
def training_loop(model, criterion, optimizer, train_loader, valid_loader):
    # set objects for storing metrics
    total_train_loss = []
    total_valid_loss = []
    total_train_accuracy = []
    total_valid_accuracy = []
 
    # Train model
    for epoch in range(num_epochs):
        # training
        train_acc_, train_loss_ = train(train_loader, model, criterion, optimizer, epoch)
        total_train_loss.append(train_loss_)
        total_train_accuracy.append(train_acc_)

        # validation
        with torch.no_grad():
            valid_acc_, valid_loss_ = validate(valid_loader, model, criterion, epoch)
            total_valid_loss.append(valid_loss_)
            total_valid_accuracy.append(valid_acc_)

        print('==========================================================================')
        print("Epoch: {}/{}， Train acc： {:.6f}， Train loss： {:.6f}， Valid acc： {:.6f}， Valid loss： {:.6f}".format(
               epoch+1, num_epochs, 
               train_acc_, train_loss_,
               valid_acc_, valid_loss_))
        print('==========================================================================')

    print("====== END ==========")

    return total_train_loss, total_valid_loss, total_train_accuracy, total_valid_accuracy

In [None]:
total_train_loss, total_valid_loss, total_train_accuracy, total_valid_accuracy = training_loop(model, criterion, optimizer, train_loader, valid_loader)

In [None]:
def plot_result(total_train, total_valid, label):
    plt.plot(range(num_epochs), total_train, 'b-', label=f'Training_{label}')
    plt.plot(range(num_epochs), total_valid, 'g-', label=f'validation_{label}')
    plt.title(f'Training & Validation {label}')
    plt.xlabel('Number of epochs')
    plt.ylabel(f'{label}')
    plt.legend()
    plt.show()

In [None]:
plot_result(total_train_loss, total_valid_loss, 'loss')

In [None]:
plot_result(total_train_accuracy, total_valid_accuracy, 'accuracy')