In [None]:
import torch
import torch.nn as nn # All neural network modules, nn.Linear, nn.Conv2d, BatchNorm, Loss functions
from torchvision import datasets # Has standard datasets we can import in a nice way
from torchvision import transforms # Transformations we can perform on our dataset
from torch.utils.data.sampler import SubsetRandomSampler # Sampling the dataset

# Check if CUDA is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
### Loading Dataset from torchvision.datasets

def data_loader(data_dir, batch_size,random_seed = 42,valid_size = 0.1, shuffle = True, test = False):
    
    #normalize the data
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.2023,0.1994,0.2010])

    #transform the data
    transform = transforms.Compose([
        transforms.Resize((224,224)), #resize the image to 224*224
        transforms.ToTensor(), #convert the image to tensor
        normalize #normalize the image
    ])

    #test data
    if test:
        dataset = datasets.CIFAR10(
            root=data_dir, #root directory
            train=False, #test data
            download=True, #download the data if not present
            transform=transform #transform the data
        )

        #data loader
        data_loader = torch.utils.data.DataLoader(
            dataset,
            batch_size=batch_size, #batch size
            shuffle=shuffle #shuffle the data
        )

        return data_loader #return the data loader
    
    #load the dataset
    train_dataset = datasets.CIFAR10(
        root=data_dir, #root directory
        train=True, #train data
        download=True, #download the data if not present
        transform=transform #transform the data
    )

    valid_dataset = datasets.CIFAR10(
        root=data_dir, #root directory
        train=True, #train data
        download=True, #download the data if not present
        transform=transform #transform the data
    )

    num_train = len(train_dataset) #number of training data
    indices = list(range(num_train)) #indices of training data
    split = int(np.floor(valid_size*num_train)) #split the data into train and validation

    if shuffle:
        np.random.seed(random_seed)
        np.random.shuffle(indices)
    
    train_idx, valid_idx = indices[split:], indices[:split] #split the data into train and validation
    train_sampler = SubsetRandomSampler(train_idx) #train sampler
    valid_sampler = SubsetRandomSampler(valid_idx) #validation sampler

    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size, #batch size
        sampler=train_sampler, #train sampler
    )
    valid_loader = torch.utils.data.DataLoader(
        valid_dataset,
        batch_size=batch_size, #batch size
        sampler=valid_sampler, #validation sampler
    )

    return (train_loader, valid_loader) #return train and validation data loader

train_loader, valid_loader = data_loader(data_dir = './data', batch_size = 32) #load the data for training and validation

test_loader = data_loader(data_dir = './data', batch_size = 32, test = True) #load the test data

In [None]:
### Residual Block
class ResidualBlock(nn.Module):
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = None):
        super(ResidualBlock, self).__init__() #initialize the super 
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels,out_channels,kernel_size = 3,stride = stride, padding = 1), #convolution layer
            nn.BatchNorm2d(out_channels), #batch normalization
            nn.ReLU() #activation function
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(out_channels,out_channels,kernel_size = 3,stride = 1, padding = 1), #convolution layer
            nn.BatchNorm2d(out_channels), #batch normalization
        )
        self.downsample = downsample #downsample the data
        self.relu = nn.ReLU() #activation function
        self.out_channels = out_channels #output channels

    #forward pass
    def forward(self, x):
        residual = x #residual
        out = self.conv1(x) #convolution layer
        out = self.conv2(out) #convolution layer
        if self.downsample is not None:
            residual = self.downsample(x) #downsample the data
    
        out += residual #add the residual
        out = self.relu(out) #activation function

        return out #return the output

In [None]:
### ResNet
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes = 10):
        super(ResNet, self).__init__() #initialize the super
        self.inplanes = 64 #input channels
        self.conv1 = nn.Sequential(
            nn.Conv2d(3,64,kernel_size = 7,stride = 2, padding = 3), #convolution layer
            nn.BatchNorm2d(64), #batch normalization
            nn.ReLU() #activation function
        )

        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1) #maxpooling layer
        self.layer0 = self._make_layer(block, 64,layers[0], stride = 1) #layer 0
        self.layer1 = self._make_layer(block, 128,layers[1], stride = 2) #layer 1
        self.layer2 = self._make_layer(block, 256,layers[2], stride = 2) #layer 2
        self.layer3 = self._make_layer(block, 512,layers[3], stride = 2) #layer 3
        
        self.avgpool = nn.AvgPool2d(7, stride = 1) #average pooling layer
        self.fc = nn.Linear(512, num_classes) #fully connected layer
    
    #make layer
    def _make_layer(self, block, planes, blocks, stride = 1):
        downsample = None

        if stride != 1 or self.inplanes != planes:

            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes, kernel_size = 1, stride = stride), #convolution layer
                nn.BatchNorm2d(planes), #batch normalization
            )

            layers = [] 
            layers.append(block(self.inplanes, planes, stride, downsample)) #residual block
    
            self.inplanes = planes
            
            for i in range(1, blocks):
                layers.append(block(self.inplanes, planes)) #residual block
            
        return nn.Sequential(*layers) #return the layers

    #forward pass
    def forward(self,x):
        x = self.conv1(x) #convolution layer
        x = self.maxpool(x) #maxpooling layer
        x = self.layer0(x) #layer 0
        x = self.layer1(x) #layer 1
        x = self.layer2(x) #layer 2
        x = self.layer3(x) #layer 3
        x = self.avgpool(x) #average pooling layer

        x = x.view(x.size(0), -1) #flatten the data
        x = self.fc(x) #fully connected layer

        return x #return the output
        

In [None]:
### Setting Hyperparameters
num_classes = 10 
num_epochs = 20 
batch_size = 16 
learning_rate = 0.001  

model = ResNet(ResidualBlock, [3,4,6,3]).to(device) #initialize the model

# Loss and optimizer

criterion = nn.CrossEntropyLoss() #loss function
optimizer = torch.optim.SGD(model.parameters(),lr = learning_rate, 
    weight_decay=1e-3, momentum=0.9
    ) #optimizer

# Train the model
total_step = len(train_loader) #total steps

In [None]:
### Training the model
import gc
import time
total_step = len(train_loader) #total steps

for epoch in range(num_epochs):
    for i, (images,labels) in enumerate(train_loader):
        #Move tensors to the configured device
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad() #zero the gradient
        loss.backward() #backward pass
        optimizer.step() #optimize the model
        del images, labels, outputs #
        torch.cuda.empty_cache() #empty the cache
        gc.collect() #collect the garbage
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}") #print the loss

    # Validation

    with torch.no_grad(): 
        correct = 0 
        total = 0 
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1) #get the predictions
            total += labels.size(0) #total labels
            correct += (predicted == labels).sum().item() #correct predictions
            del images, labels, outputs
        
        print(f"Accuracy of the network on the 5000 validation images: {100*correct/total} ")



In [None]:
with torch.no_grad():
    correct = 0
    total = 0

    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        del images, labels, outputs
    
    print(f"Accuracy of the network on the 10000 test images: {100*correct/total} ")