# CNN model with custom dataset

- Data preprocessing
- Defining the CNN class
- Training and testing the model
- Make inference

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.optim import Adam, lr_scheduler
from torch.autograd import Variable
import torchvision
import pathlib
import glob
import random
import shutil
!pip install patool
import patoolib
! pip install -q torchview
! pip install -q -U graphviz
import graphviz
from torchview import draw_graph



In [None]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [None]:
patoolib.extract_archive('30by30 images.zip')

INFO patool: Extracting 30by30 images.zip ...
INFO:patool:Extracting 30by30 images.zip ...
INFO patool: running /usr/bin/7z x -o./Unpack_aeopalh0 -- "30by30 images.zip"
INFO:patool:running /usr/bin/7z x -o./Unpack_aeopalh0 -- "30by30 images.zip"
INFO patool:     with input=''
INFO:patool:    with input=''
INFO patool: ... 30by30 images.zip extracted to `30by30 images1' (local file exists).
INFO:patool:... 30by30 images.zip extracted to `30by30 images1' (local file exists).


'30by30 images1'

#### Splitting the dataset into training and testing sets

In [None]:
# Extracting the dataset and creating the train and test folders
data_dir = pathlib.Path('30by30 images')
classes = os.listdir(data_dir)
print('classes: ',classes)

# Creating the train and test folders
if not os.path.exists('./custom_cnn_dataset'):
    os.makedirs('./custom_cnn_dataset/train')
    os.makedirs('./custom_cnn_dataset/test')

    # Creating the class folders in train and test folders
    for i in classes:
        os.makedirs('./custom_cnn_dataset/train/' + i)
        os.makedirs('./custom_cnn_dataset/test/' + i)

random.seed(0)
# Splitting the dataset into train and test sets
for i in classes:
    src = "./30by30 images/" + i # Folder to copy images from
    allFileNames = os.listdir(src)
    np.random.shuffle(allFileNames)
    train_FileNames, test_FileNames = np.split(np.array(allFileNames),
                                                              [int(len(allFileNames)*0.8)])
    train_FileNames = [src+'/'+ name for name in train_FileNames.tolist()]
    test_FileNames = [src+'/' + name for name in test_FileNames.tolist()]
    print('Total images: ', len(allFileNames))
    print('Training: ', len(train_FileNames))
    print('Testing: ', len(test_FileNames))
    # Copy-pasting images
    for name in train_FileNames:
        shutil.copy(name, "./custom_cnn_dataset/train/" + i)
    for name in test_FileNames:
        shutil.copy(name, "./custom_cnn_dataset/test/" + i)

classes:  ['1', '3', '2']
Total images:  596
Training:  476
Testing:  120
Total images:  626
Training:  500
Testing:  126
Total images:  588
Training:  470
Testing:  118


#### Data preprocessing

In [None]:
transforms = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

In [None]:
train_dir = './custom_cnn_dataset/train'
train_dataloader = DataLoader(
    torchvision.datasets.ImageFolder(train_dir, transform=transforms),
    batch_size=64, shuffle=True
)

test_dir = './custom_cnn_dataset/test'
test_dataloader = DataLoader(
    torchvision.datasets.ImageFolder(test_dir, transform=transforms),
    batch_size=64, shuffle=True
)

#### CNN class defination

In [None]:
class ConvNet(nn.Module):
    def __init__(self, num_classes=3):
        super(ConvNet, self).__init__()

        # Input shape = (64, 3, 30, 30)
        # Output size after convolutional layer = (w-f+2p)/s + 1 = (30-3+2)/1 + 1 = 30
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=12, kernel_size=3, stride=1, padding=1)
        #Input shape = (64, 12, 30, 30)
        self.bn1 = nn.BatchNorm2d(num_features=12)
        self.relu1 = nn.ReLU()

        # Input shape = (64, 12, 30, 30)
        # Output size after max pooling = 30/2 = 15
        self.maxpool1 = nn.MaxPool2d(kernel_size=2)

        # Input shape = (64, 12, 15, 15)
        self.conv2 = nn.Conv2d(in_channels=12, out_channels=20, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()

        self.conv3 = nn.Conv2d(in_channels=20, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(num_features=32)
        self.relu3 = nn.ReLU()
        #shape = (64, 32, 15, 15)

        self.fc = nn.Linear(in_features=15*15*32, out_features=num_classes)

    def forward(self, input):
        output = self.conv1(input)
        output = self.bn1(output)
        output = self.relu1(output)

        output = self.maxpool1(output)

        output = self.conv2(output)
        output = self.relu2(output)

        output = self.conv3(output)
        output = self.bn3(output)
        output = self.relu3(output)

        #reshaping the output to feed into the fully connected layer
        output = output.view(-1, 15*15*32)

        output = self.fc(output)

        return output

In [None]:
model = ConvNet(num_classes=3).to(device)

In [None]:
graphviz.set_jupyter_format('png')
model_graph1 = draw_graph(model, input_size=(64,3,30,30))
model_graph1.visual_graph

RuntimeError: ignored

In [None]:
# Loss and optimizer
optimizer = Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
loss_fn = nn.CrossEntropyLoss()
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

In [None]:
#train and test count
train_count = len(glob.glob(train_dir + '/**/*.png'))
test_count = len(glob.glob(test_dir + '/**/*.png'))
print('train_count: ', train_count)
print('test_count: ', test_count)

In [None]:
#training the model and saving best model
num_epochs = 10
best_accuracy = 0.0

for epoch in range(num_epochs):
    #Evaluation and training on training dataset
    model.train()
    train_accuracy = 0.0
    train_loss = 0.0

    for i, (images, labels) in enumerate(train_dataloader):
        if torch.cuda.is_available():
            images = Variable(images.cuda())
            labels = Variable(labels.cuda())

        optimizer.zero_grad()

        outputs = model(images)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.cpu().data*images.size(0)
        _, prediction = torch.max(outputs.data, 1)

        train_accuracy += int(torch.sum(prediction==labels.data))

    train_accuracy = train_accuracy/train_count
    train_loss = train_loss/train_count

    #Evaluation on testing dataset
    model.eval()
    test_accuracy = 0.0
    for i, (images, labels) in enumerate(test_dataloader):
        if torch.cuda.is_available():
            images = Variable(images.cuda())
            labels = Variable(labels.cuda())

        outputs = model(images)
        _, prediction = torch.max(outputs.data, 1)
        test_accuracy += int(torch.sum(prediction==labels.data))

    test_accuracy = test_accuracy/test_count

    print('Epoch: '+str(epoch)+' Train Loss: '+str(train_loss)+' Train Accuracy: '+str(train_accuracy)+' Test Accuracy: '+str(test_accuracy))

    #Save the best model
    if test_accuracy > best_accuracy:
        torch.save(model.state_dict(), 'best_checkpoint.model')
        best_accuracy = test_accuracy

    scheduler.step()