In [1]:
import torch
import numpy as np
import pandas as pd
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from sklearn.preprocessing import LabelEncoder
from tqdm import tqdm
from google.colab import drive
from PIL import Image

# check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
  print('CUDA is not available.  Training on CPU ...')
else:
  print('CUDA is available!  Training on GPU ...')

drive.mount('/content/drive')
%cd /content/drive/MyDrive/Colab Notebooks/Image and Video recognition/dataset
root = "/content/drive/MyDrive/Colab Notebooks/Image and Video recognition/dataset/"
# root = "dataset/"

class CustomDatasetFromCSV(Dataset):
    def __init__(self, csv_path, transform = None):
        self.data = pd.read_csv(root + csv_path)
        self.contents = np.asarray(self.data['content'])
        self.fontencoder = LabelEncoder()
        self.fonts = self.fontencoder.fit_transform(np.asarray(self.data['font']))
        self.authors = np.asarray(self.data['author'])
        self.len = len(self.contents)
        self.images = np.asarray([ np.array(Image.open(root + self.data['word_path'][i])) / 255. for i in tqdm(range(self.len)) ])

    def __getitem__(self, index):
        image = self.images[index]
        content = self.contents[index]
        font = self.fonts[index]
        author = self.authors[index]
        return image, content, font, author

    def __len__(self):
        return self.len

CUDA is available!  Training on GPU ...
Mounted at /content/drive
/content/drive/MyDrive/Colab Notebooks/Image and Video recognition/dataset


In [2]:
def split_dataset(dataset, batch_size, split_size, method = "all", shuffle_dataset = True):
  if method == "all":
    indices = list(range(len(dataset)))
    if shuffle_dataset :
        np.random.seed(0)
        np.random.shuffle(indices)

    split_1 = int(np.floor(split_size[0] * len(dataset)))
    split_2 = int(np.floor((split_size[0] + split_size[1]) * len(dataset)))
    train_indices, val_indices, test_indices = indices[:split_1], indices[split_1:split_2], indices[split_2:]
  else:
    print("wrong method!!")

  # Creating PT data samplers and loaders:
  train_sampler = SubsetRandomSampler(train_indices)
  valid_sampler = SubsetRandomSampler(val_indices)
  test_sampler = SubsetRandomSampler(test_indices)
  train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
  validation_loader = DataLoader(dataset, batch_size=batch_size, sampler=valid_sampler)
  test_loader = DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)

  return train_loader, validation_loader, test_loader

In [3]:
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class BasicBlock(nn.Module):
  expansion = 1
  def __init__(self, in_planes, planes, stride=1):
    super(BasicBlock, self).__init__()
    self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(planes)
    self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(planes)

    self.shortcut = nn.Sequential()
    if stride != 1 or in_planes != self.expansion*planes:
      self.shortcut = nn.Sequential(
          nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
          nn.BatchNorm2d(self.expansion*planes)
      )
  
  def forward(self, x):
    out = F.relu(self.bn1(self.conv1(x)))
    out = self.bn2(self.conv2(out))
    out += self.shortcut(x)
    out = F.relu(out)
    return out

class BottleNeck(nn.Module):
  expansion = 4

  def __init__(self, in_planes, planes, stride=1):
    super(BottleNeck, self).__init__()
    self.conv1 = nn.Conv2d(in_planes , planes, kernel_size=1, bias=False)
    self.bn1 = nn.BatchNorm2d(planes)
    self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(planes)
    self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
    self.bn3 = nn.BatchNorm2d(self.expansion*planes)

    self.shortcut = nn.Sequential()
    if stride != 1 or in_planes != self.expansion*planes :
      self.shortcut = nn.Sequential(
          nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
          nn.BatchNorm2d(self.expansion*planes)
      )

  def forward(self, x):
    out = F.relu(self.bn1(self.conv1(x)))
    out = F.relu(self.bn2(self.conv2(out)))
    out = self.bn3(self.conv3(out))
    out += self.shortcut(x)
    out = F.relu(out)
    return out

class ResNet(nn.Module):
  def __init__(self, block, num_blocks, num_classes=10):
    super(ResNet, self).__init__()
    self.in_planes = 64

    self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(64)
    self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
    self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
    self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
    self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
    self.linear = nn.Linear(512*block.expansion, num_classes)

  def _make_layer(self, block, planes, num_blocks, stride):
    strides = [stride] + [1]*(num_blocks-1)
    layers = []
    for stride in strides:
      layers.append(block(self.in_planes, planes, stride))
      self.in_planes = planes * block.expansion      
    return nn.Sequential(*layers)

  def forward(self, x):
    out = F.relu(self.bn1(self.conv1(x)))
    out = self.layer1(out)
    out = self.layer2(out)
    out = self.layer3(out)
    out = self.layer4(out)
    out = F.avg_pool2d(out, 4)
    out = out.view(out.size(0), -1)
    out = self.linear(out)
    return out

In [6]:
annotated_file = "annotated_merged_deleted_x_2.csv"
dataset = CustomDatasetFromCSV(annotated_file)

batch_size = 32
split_size = [0.8, 0.1, 0.1]
train_loader, valid_loader, test_loader = split_dataset(dataset, batch_size, split_size, method = "all", shuffle_dataset = True)

modelname = "ResNet18"
ResNet18 = ResNet(BasicBlock, [2,2,2,2], num_classes = len(dataset.fontencoder.classes_))
#ResNet34 = ResNet(BasicBlock, [3,4,6,3], num_classes = len(dataset.fontencoder.classes_)
#ResNet50 = ResNet(BottleNeck, [3,4,6,3], num_classes = len(dataset.fontencoder.classes_)
#ResNet101 = ResNet(BottleNeck, [3,4,23,3], num_classes = len(dataset.fontencoder.classes_)
#ResNet152 = ResNet(BottleNeck, [3,8,36,3], num_classes = len(dataset.fontencoder.classes_)
ResNet = ResNet18
# print(ResNet)

if train_on_gpu:
  ResNet = torch.nn.DataParallel(ResNet)
  cudnn.benchmark = True

# specify loss function (categorical cross-entropy)
criterion = nn.CrossEntropyLoss()

# specify optimizer
optimizer = optim.SGD(ResNet.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0001)

100%|██████████| 2896/2896 [00:03<00:00, 765.04it/s]


In [7]:
# number of epochs to train the model
n_epochs = 100
valid_loss_min = np.Inf # track change in validation loss

for epoch in range(1, n_epochs+1):
  # keep track of training and validation loss
  train_loss = 0.0
  valid_loss = 0.0
    
  ###################
  # train the model #
  ###################
  ResNet.train()
  for batch_idx, (images, contents, fonts, authors) in enumerate(train_loader):
    resize = transforms.Compose([transforms.Resize(32)])
    data = resize(images.unsqueeze(1)).float()
    target = fonts.long()
    # move tensors to GPU if CUDA is available
    if train_on_gpu:
      data, target = data.cuda(), target.cuda()
    # clear the gradients of all optimized variables
    optimizer.zero_grad()
    # forward pass: compute predicted outputs by passing inputs to the model
    output = ResNet(data)
    # calculate the batch loss
    loss = criterion(output, target)
    # backward pass: compute gradient of the loss with respect to model parameters
    loss.backward()
    # perform a single optimization step (parameter update)
    optimizer.step()
    # update training loss
    train_loss += loss.item()*data.size(0)
        
  ######################    
  # validate the model #
  ######################
  ResNet.eval()
  for batch_idx, (images, contents, fonts, authors) in enumerate(valid_loader):
    resize = transforms.Compose([transforms.Resize(32)])
    data = resize(images.unsqueeze(1)).float()
    target = fonts.long()
    # move tensors to GPU if CUDA is available
    if train_on_gpu:
      data, target = data.cuda(), target.cuda()
    # forward pass: compute predicted outputs by passing inputs to the model
    output = ResNet(data)
    # calculate the batch loss
    loss = criterion(output, target)
    # update average validation loss 
    valid_loss += loss.item()*data.size(0)
    
  # calculate average losses
  train_loss = train_loss/len(train_loader.sampler)
  valid_loss = valid_loss/len(valid_loader.sampler)
        
  # print training/validation statistics 
  print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
      epoch, train_loss, valid_loss))
    
  # save model if validation loss has decreased
  if valid_loss <= valid_loss_min:
    print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
    valid_loss_min,
    valid_loss))
    torch.save(ResNet.state_dict(), modelname + '.pt')
    valid_loss_min = valid_loss

Epoch: 1 	Training Loss: 1.209047 	Validation Loss: 3.698160
Validation loss decreased (inf --> 3.698160).  Saving model ...
Epoch: 2 	Training Loss: 0.806734 	Validation Loss: 0.936670
Validation loss decreased (3.698160 --> 0.936670).  Saving model ...
Epoch: 3 	Training Loss: 0.549401 	Validation Loss: 0.879035
Validation loss decreased (0.936670 --> 0.879035).  Saving model ...
Epoch: 4 	Training Loss: 0.402504 	Validation Loss: 1.102051
Epoch: 5 	Training Loss: 0.318199 	Validation Loss: 0.832776
Validation loss decreased (0.879035 --> 0.832776).  Saving model ...
Epoch: 6 	Training Loss: 0.220242 	Validation Loss: 0.917398
Epoch: 7 	Training Loss: 0.185919 	Validation Loss: 1.366909
Epoch: 8 	Training Loss: 0.129236 	Validation Loss: 1.446250
Epoch: 9 	Training Loss: 0.092591 	Validation Loss: 0.908262
Epoch: 10 	Training Loss: 0.075348 	Validation Loss: 1.439278
Epoch: 11 	Training Loss: 0.060601 	Validation Loss: 1.531068
Epoch: 12 	Training Loss: 0.049275 	Validation Loss: 1.6

In [16]:
# track test loss
test_loss = 0.0
class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))

ResNet.eval()
# iterate over test data
for batch_idx, (images, contents, fonts, authors) in enumerate(test_loader):
  resize = transforms.Compose([transforms.Resize(32)])
  data = resize(images.unsqueeze(1)).float()
  target = fonts.long()
  # move tensors to GPU if CUDA is available
  if train_on_gpu:
    data, target = data.cuda(), target.cuda()
  # forward pass: compute predicted outputs by passing inputs to the model
  output = ResNet(data)
  # calculate the batch loss
  loss = criterion(output, target)
  # update test loss 
  test_loss += loss.item()*data.size(0)
  # convert output probabilities to predicted class
  _, pred = torch.max(output, 1)    
  # compare predictions to true label
  correct_tensor = pred.eq(target.data.view_as(pred))
  correct = np.squeeze(correct_tensor.numpy()) if not train_on_gpu else np.squeeze(correct_tensor.cpu().numpy())
  # calculate test accuracy for each object class
  for i in range(min(batch_size, len(target.data))):
    label = target.data[i]
    class_correct[label] += correct[i].item()
    class_total[label] += 1

# average test loss
test_loss = test_loss/len(test_loader.dataset)
print('Test Loss: {:.6f}\n'.format(test_loss))

for i in range(len(dataset.fontencoder.classes_)):
  if class_total[i] > 0:
    print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (
        dataset.fontencoder.inverse_transform([i]), 100 * class_correct[i] / class_total[i],
        np.sum(class_correct[i]), np.sum(class_total[i])))
  else:
    print('Test Accuracy of %5s: N/A (no training examples)' % (dataset.fontencoder.inverse_transform([i])))

print('\nTest Accuracy (Overall): %2d%% (%2d/%2d)' % (
    100. * np.sum(class_correct) / np.sum(class_total),
    np.sum(class_correct), np.sum(class_total)))

Test Loss: 0.098382

Test Accuracy of ['clerical']: 100% ( 5/ 5)
Test Accuracy of ['cursive']: 76% (45/59)
Test Accuracy of ['regular']: 80% (51/63)
Test Accuracy of ['seal']: 75% ( 9/12)
Test Accuracy of ['semi-cursive']: 83% (126/151)

Test Accuracy (Overall): 81% (236/290)


In [None]:
# hyper-parameters
batch_size = 16
split_size = [0.8, 0.1, 0.1]
num_epochs = 10

train_set, valid_set, test_set = split_dataset(dataset, batch_size, split_size, method = "all", shuffle_dataset = True)

# Usage Example:
for epoch in range(num_epochs):
    # Train:   
    for batch_index, (images, contents, fonts, authors) in enumerate(train_set):
      print(fonts)