In [41]:
import torchvision 
import torch
import torch.nn as nn
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import random 
import numpy as np
from collections import OrderedDict
import math
from torch.nn import functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split, SubsetRandomSampler, WeightedRandomSampler

In [42]:
# to reproduce  the experiment's results , set the seed 
SEED = 42

def deterministic(seed):
    """
    Setup execution state so that we can reproduce multiple executions.
    Make the execution "as deterministic" as possible.

    random_seed: seed used to feed torch, numpy and python random
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.backends.cudnn.deterministic = True
        torch.cuda.manual_seed_all(seed)      
       
    
deterministic(SEED)

In [43]:
# If there is any GPU use that for computing
def set_device_cuda():
    device = 'cuda' if torch.cuda.is_available() else 'cpu' # the hardware device
    if torch.cuda.is_available():
        torch.set_default_tensor_type("torch.FloatTensor")
    return device
device = set_device_cuda()
print(device)

cuda


In [44]:
# DenseNet model
"""
DenseNet for cifar with pytorch

Reference:
[1] H. Gao, Z. Liu, L. Maaten and K. Weinberger. Densely connected convolutional networks. In CVPR, 2017
"""

class _DenseLayer(nn.Sequential):
    def __init__(self, num_input_features, growth_rate, bn_size, drop_rate):
        super(_DenseLayer, self).__init__()
        self.add_module('norm1', nn.BatchNorm2d(num_input_features)),
        self.add_module('relu1', nn.ReLU(inplace=True)),
        self.add_module('conv1', nn.Conv2d(num_input_features, bn_size *
                        growth_rate, kernel_size=1, stride=1, bias=False)),
        self.add_module('norm2', nn.BatchNorm2d(bn_size * growth_rate)),
        self.add_module('relu2', nn.ReLU(inplace=True)),
        self.add_module('conv2', nn.Conv2d(bn_size * growth_rate, growth_rate,
                        kernel_size=3, stride=1, padding=1, bias=False)),
        self.drop_rate = drop_rate

    def forward(self, x):
        new_features = super(_DenseLayer, self).forward(x)
        if self.drop_rate > 0:
            new_features = F.dropout(new_features, p=self.drop_rate, training=self.training)
        return torch.cat([x, new_features], 1)


class _DenseBlock(nn.Sequential):
    def __init__(self, num_layers, num_input_features, bn_size, growth_rate, drop_rate):
        super(_DenseBlock, self).__init__()
        for i in range(num_layers):
            layer = _DenseLayer(num_input_features + i * growth_rate, growth_rate, bn_size, drop_rate)
            self.add_module('denselayer%d' % (i + 1), layer)


class _Transition(nn.Sequential):
    def __init__(self, num_input_features, num_output_features):
        super(_Transition, self).__init__()
        self.add_module('norm', nn.BatchNorm2d(num_input_features))
        self.add_module('relu', nn.ReLU(inplace=True))
        self.add_module('conv', nn.Conv2d(num_input_features, num_output_features,
                                          kernel_size=1, stride=1, bias=False))
        self.add_module('pool', nn.AvgPool2d(kernel_size=2, stride=2))


class DenseNet_Cifar(nn.Module):
    r"""Densenet-BC model class, based on
    `"Densely Connected Convolutional Networks" <https://arxiv.org/pdf/1608.06993.pdf>`_

    Args:
        growth_rate (int) - how many filters to add each layer (`k` in paper)
        block_config (list of 4 ints) - how many layers in each pooling block
        num_init_features (int) - the number of filters to learn in the first convolution layer
        bn_size (int) - multiplicative factor for number of bottle neck layers
          (i.e. bn_size * k features in the bottleneck layer)
        drop_rate (float) - dropout rate after each dense layer
        num_classes (int) - number of classification classes
    """
    def __init__(self, growth_rate=12, block_config=(16, 16, 16),
                 num_init_features=24, bn_size=4, drop_rate=0, num_classes=10):

        super(DenseNet_Cifar, self).__init__()

        # First convolution
        self.features = nn.Sequential(OrderedDict([
            ('conv0', nn.Conv2d(3, num_init_features, kernel_size=3, stride=1, padding=1, bias=False)),
        ]))

        # Each denseblock
        num_features = num_init_features
        for i, num_layers in enumerate(block_config):
            block = _DenseBlock(num_layers=num_layers, num_input_features=num_features,
                                bn_size=bn_size, growth_rate=growth_rate, drop_rate=drop_rate)
            self.features.add_module('denseblock%d' % (i + 1), block)
            num_features = num_features + num_layers * growth_rate
            if i != len(block_config) - 1:
                trans = _Transition(num_input_features=num_features, num_output_features=num_features // 2)
                self.features.add_module('transition%d' % (i + 1), trans)
                num_features = num_features // 2

        # Final batch norm
        self.features.add_module('norm5', nn.BatchNorm2d(num_features))

        # Linear layer
        self.classifier = nn.Linear(num_features, num_classes)
        
        # initialize conv and bn parameters
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def forward(self, x):
        features = self.features(x)
        out = F.relu(features, inplace=True)
        out = F.avg_pool2d(out, kernel_size=8, stride=1).view(features.size(0), -1)
        out = self.classifier(out)
        return out


def densenet_BC_cifar(depth, k, **kwargs):
    N = (depth - 4) // 6
    model = DenseNet_Cifar(growth_rate=k, block_config=[N, N, N], num_init_features=2*k, **kwargs)
    return model

In [45]:
# define a Dataloader for CIFAR100
def dataLoader( is_train=True,  batch_size=64, shuffle=True):
    
        loader = dict()
        if is_train:
            trans = [transforms.RandomHorizontalFlip(),
                     transforms.RandomCrop(32, padding=4),
                     transforms.ToTensor(),
                     transforms.Normalize(mean=[n/255.
                        for n in [129.3, 124.1, 112.4]], std=[n/255. for n in [68.2,  65.4,  70.4]])]
            trans = transforms.Compose(trans)
            train_set = datasets.CIFAR100('data',download=True, train=True, transform=trans)
#             print(len(train_set))
            train_dataset, val_dataset, test_dataset = random_split(train_set, (len(train_set)-1000, 1000,0))
            print(len(train_dataset), len(val_dataset))
            train_loader = torch.utils.data.DataLoader(
                            train_dataset, batch_size=batch_size, shuffle=shuffle)
            val_loader = torch.utils.data.DataLoader(
                            val_dataset, batch_size=batch_size, shuffle=shuffle)
            loader = {'train_loader':train_loader,'val_loader':val_loader}
        else:
            trans = [transforms.ToTensor(),
                     transforms.Normalize(mean=[n/255.
                        for n in [129.3, 124.1, 112.4]], std=[n/255. for n in [68.2,  65.4,  70.4]])]
            trans = transforms.Compose(trans)
            test_set = datasets.CIFAR100('data',download=True, train=False, transform=trans)
            train_loader = torch.utils.data.DataLoader(
                            test_set, batch_size=batch_size, shuffle=shuffle)
            
            loader = {'test_loader':train_loader}
        return loader

In [46]:
# define the CIFAR100 dataset and dataloader
train = dataLoader( is_train=True,  batch_size=64, shuffle=True)
test = dataLoader( is_train=False,  batch_size=64, shuffle=True)

train_loader =train['train_loader']
val_loader = train['val_loader']
test_loader = test['test_loader']




Using downloaded and verified file: data/cifar-100-python.tar.gz
Extracting data/cifar-100-python.tar.gz to data
49000 1000
Files already downloaded and verified


In [47]:
def correct_samples(pred,label):
    y_hat = torch.argmax(pred,dim=1)
#     print(y_hat,label)
    correct_answers = (torch.where(y_hat==label))[0].shape[0]
    return correct_answers

In [None]:
# initialize the weights in the model with uniform distribution

# train the model and save it 
model = DenseNet_Cifar(block_config=(6, 6, 6), num_classes=100)
model = model.to(device)

# torch.nn.init.uniform_(model.parameters())



loss_function = nn.CrossEntropyLoss().to(device)
opt = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

epoch_numbers = 200


for epoch in range(epoch_numbers):
    csamples_train = 0
    total_loss_train = 0
    csamples_val = 0
    total_loss_val = 0
    
    model.train()
    train_samples = 0
    val_samples = 0
    
    
    for img,label in train_loader:
        img = img.to(device)
        label = label.to(device)
        train_samples = img.shape[0]+train_samples
        pred = model(img)
        loss = loss_function(pred,label)
        
        opt.zero_grad()
        loss.backward()
        opt.step()
        
        csamples_train = csamples_train + correct_samples(pred,label)
        total_loss_train = total_loss_train +loss.item()
    
    
    print("epoch: ", epoch, " train_loss = ", total_loss_train/(len(train_loader)), 
          'train accuracy = ', csamples_train/(train_samples)
         )

    model.eval()
    for img,label in val_loader:
        
        img = img.to(device)
        label = label.to(device)
        pred = model(img)
        loss = loss_function(pred,label)
        val_samples = val_samples +img.shape[0]
        
        
        csamples_val = csamples_val + correct_samples(pred,label)
        total_loss_val = total_loss_val +loss.item()
    
    
    print("epoch: ", epoch, " val_loss = ", total_loss_val/(len(val_loader)), 
          'val accuracy = ', csamples_val/(val_samples)
         )
    
    
    if epoch%10 == 0:
        PATH = './model/model_'+str(epoch)+'.pth'
        torch.save(model.state_dict(), PATH)
        

    
    
        
        

