In [1]:
# All required python standard libraries
import os
import time

In [2]:
# All torch related imports 
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import transforms
from torch import nn, optim
import torchvision

In [3]:
# using cv2 to read an image
import cv2

In [4]:
# All sci-kit related imports 
import pandas as pd
import numpy as np

In [5]:
from tqdm.notebook import tqdm as tq

In [6]:
IMAGE_DIMS = 28

In [7]:
def accuracy_finder(predictions , labels):
    values, max_indices = torch.max(predictions, dim=1)
    accuracy = ( max_indices == labels ).sum()/max_indices.size()[0]
    return accuracy

In [8]:
def csv_preprocessor(base_dir:str, directory:str):
    return os.path.join(base_dir,directory).replace("\\","/")

In [9]:
def return_all_image_list_from_processed_csv(csv_file):
    ### This returns the entire list full of images to be loaded into cpu
    ###
    ###
    ALL_IMAGES = []
    start = time.time()
    for i, items in tq(enumerate(csv_file.iloc[:,1])):
        image = cv2.imread(items, cv2.COLOR_BGR2RGB)
        resized = cv2.resize(image,(IMAGE_DIMS,IMAGE_DIMS))
        ALL_IMAGES.append(resized)
    
    print("Tt took us approximately {} seconds".format(time.time()-start))  
    return ALL_IMAGES

In [10]:
def get_one_hot_encoded_labels(input_data_frame):
    input_data_frame.labels = input_data_frame.labels.map(lambda x: x-1)
    # return pd.get_dummies(input_data_frame.labels, prefix='labels').to_numpy()
    return input_data_frame.labels

In [11]:
# pytorch device configurations 
BATCH_SIZE = 256
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [12]:
DEVICE

device(type='cuda')

In [13]:
train_csv_directory = os.path.join(os.getcwd(),'guides\\isolated-dataset-csv\\IsolatedTrain.csv')

In [14]:
TRAIN_DIR = pd.read_csv(train_csv_directory, usecols=["labels","directory"])

In [15]:
class IsolatedCharacterDataset(Dataset):
    def __init__(self, csv_dir_path,  transforms=None, custom_transform=None ):
        ### complete dataset path
        self.dataset_csv = pd.read_csv(csv_dir_path, usecols=["labels","directory"])  
        
        ### labels
        self.labels = get_one_hot_encoded_labels(self.dataset_csv)
        ### @previously -> self.labels = self.dataset_csv_numpy[:,0]
        
        ### loading dataset into memory
        self.dataset_csv["directory"] = self.dataset_csv["directory"].map(lambda x: csv_preprocessor(base_dir=str(os.getcwd()), directory=str(x)))
        self.dataset_csv_numpy = self.dataset_csv.to_numpy()
        self.ALL_IMAGES = return_all_image_list_from_processed_csv(csv_file=self.dataset_csv)
        
        ### transformations to apply on images
        self.transforms = transforms
        
    def __getitem__(self, index):
        # convert labels to tensor 
        label = torch.tensor(self.labels[index])
        # load single image from list of all preloaded images
        image = self.ALL_IMAGES[index]
        if self.transforms:
            ## apply transforms 
            image = self.transforms(image)    
            image = image.float()
        label = label.long()
        return image, label 
    
    def __len__(self):
        rows , _ = self.dataset_csv_numpy.shape
        return rows

In [16]:
DATA_NORMALIZER = transforms.Compose([transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),transforms.Resize((IMAGE_DIMS,IMAGE_DIMS))])

In [17]:
TRAIN_DATASET = IsolatedCharacterDataset(csv_dir_path=train_csv_directory,transforms=DATA_NORMALIZER)

HBox(children=(HTML(value=''), FloatProgress(value=1.0, bar_style='info', layout=Layout(width='20px'), max=1.0…


Tt took us approximately 4.680847883224487 seconds


In [18]:
TRAIN_LOADER = DataLoader(dataset=TRAIN_DATASET,batch_size=BATCH_SIZE,shuffle=True)

In [19]:
class block(nn.Module):
    def __init__(self, in_channels, intermediate_channels, identity_downsample=None, stride=1):
        super(block, self).__init__()
        self.expansion = 4
        self.conv1 = nn.Conv2d(in_channels, intermediate_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(intermediate_channels)
        self.conv2 = nn.Conv2d(intermediate_channels, intermediate_channels, kernel_size=3, stride=stride, padding=1,)
        self.bn2 = nn.BatchNorm2d(intermediate_channels)
        self.conv3 = nn.Conv2d(intermediate_channels, intermediate_channels * self.expansion, kernel_size=1, stride=1, padding=0,)
        self.bn3 = nn.BatchNorm2d(intermediate_channels * self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
        self.stride = stride
    
    #Identity block
    def forward(self, x):
        identity = x.clone()

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)
        #x = self.relu(x) #custom 

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x

In [20]:
class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Essentially the entire ResNet architecture are in these 4 lines below
        self.layer1 = self._make_layer(block, layers[0], intermediate_channels=64, stride=1)
        self.layer2 = self._make_layer(block, layers[1], intermediate_channels=128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], intermediate_channels=256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], intermediate_channels=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, block, num_residual_blocks, intermediate_channels, stride):
        identity_downsample = None
        layers = []

        # Either if we half the input space for ex, 56x56 -> 28x28 (stride=2), or channels change
        # we need to adapt the Identity (skip connection) so it will be able to be added
        # to the layer that's ahead
        if stride != 1 or self.in_channels != intermediate_channels * 4:
            identity_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, intermediate_channels * 4, kernel_size=1, stride=stride,),
                nn.BatchNorm2d(intermediate_channels * 4),
            )

        layers.append(block(self.in_channels, intermediate_channels, identity_downsample, stride))

        # The expansion size is always 4 for ResNet 50,101,152
        self.in_channels = intermediate_channels * 4

        # For example for first resnet layer: 256 will be mapped to 64 as intermediate layer,
        # then finally back to 256. Hence no identity downsample is needed, since stride = 1,
        # and also same amount of channels.
        for i in range(num_residual_blocks - 1):
            layers.append(block(self.in_channels, intermediate_channels))

        return nn.Sequential(*layers)

In [21]:
def ResNet50(img_channel=3, num_classes=171):
    return ResNet(block, [2, 3, 5, 2], img_channel, num_classes)
net = ResNet50()

In [22]:
optimizer =  optim.Adam(net.parameters(), lr=0.07) # learning rate 
# defining the loss function
criterion =  nn.CrossEntropyLoss() # reduction='none'
net = net.to(DEVICE)
criterion = criterion.to(DEVICE)

In [23]:
from tqdm import tqdm 

In [24]:
def training(epochs:int):
    all_training_losses = []
    all_training_accuracy = []
    for epoch in tq(range(epochs)):
        total_epoch_loss = 0
        total_accuracy_epoch = 0
        for i, data in tqdm(enumerate(TRAIN_LOADER, 0)): 
            image,label = data
            optimizer.zero_grad(set_to_none=True)
            
            label = label.to(DEVICE)
            image = image.to(DEVICE)
            output = net(image)
            
            
            
            loss = criterion(output, label)
            loss.backward()
            optimizer.step()

            with torch.no_grad():
                total_epoch_loss += loss
                batches_training_accuracy = accuracy_finder(predictions=output, labels=label)
                total_accuracy_epoch = total_accuracy_epoch  + batches_training_accuracy   
            
            if i+1 % 1000 == 0: 
                print("Batch : {}/{}".format(i, len(TRAIN_LOADER))) 
        # total epoch loss 
        total_epoch_loss = total_epoch_loss / len(TRAIN_LOADER)
        # total epoch accuracy 
        total_accuracy_epoch = total_accuracy_epoch /len(TRAIN_LOADER)
        
        # display the epoch training loss
        print("epoch : {}/{}, loss = {:.8f}, acc = {:.8f}".format(epoch + 1, epochs, total_epoch_loss, total_accuracy_epoch ))
        all_training_losses.append(total_epoch_loss)
        all_training_accuracy.append(total_accuracy_epoch)
        
    print("Training completed")
    return all_training_accuracy, all_training_losses

In [25]:
t_acc, t_loss = training(1)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=1.0), HTML(value='')))

171it [01:16,  2.25it/s]





KeyboardInterrupt: 

In [35]:
DEVICE

device(type='cuda')