# RCNN

**Reference:** https://www.youtube.com/watch?v=IcLEJB2pY2Y&t=2055s

In [None]:
%config Completer.use_jedi = False    # for autocompletion

In [1]:
import albumentations
import glob
import json
import matplotlib.pyplot as plt
import numpy as np
import os

import torch
import torch.nn as nn
import torch.nn.functional as F

from sklearn import preprocessing
from sklearn import model_selection
from sklearn import metrics

from PIL import Image
from PIL import ImageFile
from pprint import pprint
from tqdm import tqdm

In [3]:
# constants: configurations
if os.path.exists('gen-plate-dataset'):
    DATA_DIR = 'gen-plate-dataset'
else:
    DATA_DIR = os.path.join('..', 'datasets', 'gen-plate-dataset')
BATCH_SIZE = 4
IMAGE_WIDTH = 230
IMAGE_HEIGHT = 50
NUM_WORKERS = 2
DEVICE = 'cpu'    # cpu / cuda
EPOCHS = 50       # in actual initialized 200 but trained till 100

In [4]:
if os.sys.platform=='linux' and os.path.exists('../input/indmhnumberplate/gen-plate-dataset'):
    DATA_DIR = '../input/indmhnumberplate/gen-plate-dataset'

In [5]:
DATA_DIR

'../datasets/gen-plate-dataset'

# Dataset

In [6]:
# dataset creations

ImageFile.LOAD_TRUNCATED_IMAGES = True

class ClassificationDataset:
    def __init__(self, img_paths, targets, resize = None):
        self.img_paths = img_paths
        self.targets = targets
        self.resize = resize
        self.aug = albumentations.Compose(
            [albumentations.Normalize(always_apply=True)]
        )
        
    def __len__(self):
        return len(self.img_paths)
    
    def __getitem__(self, item_index):
        img = Image.open(self.img_paths[item_index])
        targets = self.targets[item_index]
        
        if self.resize is not None:
            img = img.resize((self.resize[0], self.resize[1]), resample= Image.BILINEAR)
        
        img = np.array(img)
        augmented = self.aug(image = img)
        img = augmented['image']
        img = np.transpose(img, (2, 1, 0)).astype(np.float32)
        return {
            'imgs': torch.tensor(img, dtype=torch.float),
            'targets': torch.tensor(targets, dtype=torch.long)
        }

# Engine

In [7]:
# engine

def train_fn(model, data_loader, optimizer):
    model.train()
    fin_loss = 0
    tk = tqdm(data_loader, total=len(data_loader))
    for data in tk:
        for k, v in data.items():
            data[k] = v.to(DEVICE)
        optimizer.zero_grad()
        _, loss = model(**data)
        loss.backward()
        optimizer.step()
        fin_loss += loss.item()
        
    return fin_loss / len(data_loader)

def eval_fn(model, data_loader, optimizer):
    model.eval()
    fin_loss = 0
    fin_preds = []
    with torch.no_grad():
        tk = tqdm(data_loader, total=len(data_loader))
        for data in tk:
            for k, v in data.items():
                data[k] = v.to(DEVICE)
            batch_preds, loss = model(**data)
            
            fin_loss += loss.item()
            fin_preds.append(batch_preds)
        
        fin_preds = torch.cat(fin_preds, dim=0)
        return fin_preds, fin_loss / len(data_loader)

# Model

In [8]:
# model

class PlateModel(nn.Module):
    def __init__(self, num_chars):
        super(PlateModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 128, kernel_size=(3,3), padding=(1,1))
        self.max_pool_1 = nn.MaxPool2d(kernel_size=(2,2))
        self.conv2 = nn.Conv2d(128, 64, kernel_size=(3,3), padding=(1,1))
        self.max_pool_2 = nn.MaxPool2d(kernel_size=(2,2))
        
        self.linear1 = nn.Linear(768, 512)
        self.linear2 = nn.Linear(512, 64)
        self.drop = nn.Dropout(0.2)   # doesn't change size
        
        self.gru = nn.GRU(64, 32, bidirectional=True, num_layers=2, dropout=0.25)
        self.output = nn.Linear(64, num_chars + 1)
        
    def forward(self, imgs, targets=None):
        bs, c, w, h = imgs.size()
        # print(bs, c, w, h)    # for debugging
        x = F.relu(self.conv1(imgs))
        # print('Conv1', x.size())
        x = self.max_pool_1(x)
        # print('MaxPool', x.size())
        x = F.relu(self.conv2(x))
        # print('Conv2', x.size())
        x = self.max_pool_2(x) # 1, 64, 212, 64
        # print('MaxPool', x.size())
        
        # to brind width first but in our case it's properly arranged
        # x = x.permute(0, 3, 1, 2) # 1, 75, 64, 18
        x = x.view(bs, x.size(2), -1)
        # print('View', x.size())
        
        x = self.linear1(x)
        # print('Linear1', x.size())
        x = self.linear2(x)
        x = self.drop(x)
        # print('Linear2', x.size())
        
        x, _ = self.gru(x)
        # print('GRU', x.size())
        
        x = self.output(x)
        # print('output', x.size())
        
        x = x.permute(1, 0, 2)
        if targets is not None:
            # CTC
            log_softmax_values = F.log_softmax(x, 2)
            input_lengths = torch.full(
                size=(bs,), fill_value = log_softmax_values.size(0), dtype=torch.int32
            )
            # print('input lengths', input_lengths)
            target_lengths = torch.full(
                size=(bs,), fill_value = log_softmax_values.size(1), dtype=torch.int32
            )
            # print('target lengths', target_lengths)
            loss = nn.CTCLoss(blank=0)(
                log_softmax_values, targets, input_lengths, target_lengths
            )
            return x, loss
        
        return x, None


In [9]:
# for debugging model

cm = PlateModel(19)
img = torch.rand(5, 3, IMAGE_WIDTH, IMAGE_HEIGHT)
targets = torch.randint(1, 6, (5, 5))
x, loss = cm(img, targets)

del(cm)



# Train

In [10]:
# generate img & target list

def get_img_label():
    '''Returns tuple of img filename list and target_label list.'''
    img_files = glob.glob(os.path.join(DATA_DIR, '*.png'))
    targets_orig = [x.split('/')[-1][ : -4] for x in img_files]
    return img_files, targets_orig


In [11]:
# target preprocessing

def get_target_list(target_orig):
    targets = [[c for c in x] for x in targets_orig]
    targets_flat = [c for clist in targets for c in clist]
    return targets, targets_flat


In [12]:
# target encoding

def encode_labels(targets, targets_flat):
    lbl_enc = preprocessing.LabelEncoder()
    lbl_enc.fit(targets_flat)
    targets_enc = [lbl_enc.transform(x) for x in targets]
    targets_enc = np.array(targets_enc) + 1
    return lbl_enc, targets_enc

In [26]:
# decode

def decode_predictions(preds, encoder, collapse_repeated=True):
    ''' Decodes CTC String to normal string'''
    preds = preds.permute(1, 0, 2)
    preds = torch.softmax(preds, 2)
    preds = torch.argmax(preds, 2)
    preds = preds.detach().cpu().numpy() # change cpu to cuda if training on gpu
    cap_preds = []
    for j in range(preds.shape[0]):
        temp = ''
        prev_char = None
        for k in preds[j]:
            k = k-1
            # k = -1 mean a empty value
            if (k == -1) or (collapse_repeated and k==prev_char):
                continue
            else:
                # print(encoder.inverse_transform([k]), k)
                temp += encoder.inverse_transform([k])[0]
            prev_char = k
        tp = "" + temp
        # print(preds[j], targets[j], tp)
        cap_preds.append(tp)
    return cap_preds

In [14]:
# checks if all characters of targets are in predictions

def char_accuracy(preds, targets):
    sum_accuracy = 0
    total_preds = 0
    for index in range(len(preds)):
        correct_char = 0
        for char in preds[index]:
            if char in targets[index]:
                correct_char += 1
        accuracy = correct_char / len(preds[index])
        sum_accuracy += accuracy
        total_preds += 1
    return sum_accuracy / total_preds
                

In [15]:
# checks if all predictions are same as labels

def label_accuracy(preds, targets):
    correct_labels = 0
    for index in range(len(preds)):
        if preds[index] == targets[index]:
            correct_labels += 1
    return correct_labels / len(preds)

In [16]:
# data preprocessing & encoding

img_files, targets_orig = get_img_label()

targets, targets_flat = get_target_list(targets_orig)
lbl_enc, targets_enc = encode_labels(targets, targets_flat)

In [17]:
# data splitting

( train_imgs, test_imgs,
 train_targets,
 test_targets,
 train_orig_targets,
 test_orig_targets 
) = model_selection.train_test_split(
    img_files, targets_enc, targets_orig, test_size=0.1, random_state=42
)

In [18]:
# train dataset & loader

train_dataset = ClassificationDataset(
                    img_paths=train_imgs, 
                    targets=train_targets,
                    resize=(IMAGE_WIDTH, IMAGE_HEIGHT)
                )

train_loader = torch.utils.data.DataLoader(
                    train_dataset,
                    batch_size = BATCH_SIZE,
                    num_workers = NUM_WORKERS,
                    shuffle = True
                )

# for debugging

# npimg = train_dataset[0]['imgs'].numpy()
# plt.imshow(np.transpose(npimg, (1, 2, 0)))

In [19]:
# test_dataset & loader

test_dataset = ClassificationDataset(
                    img_paths = test_imgs,
                    targets = test_targets,
                    resize = (IMAGE_WIDTH, IMAGE_HEIGHT)
                )
test_loader = torch.utils.data.DataLoader(
                test_dataset,
                batch_size = BATCH_SIZE,
                num_workers = NUM_WORKERS,
                shuffle = False
            )

In [20]:
# model, optimizer & schedular

model = PlateModel(num_chars=len(lbl_enc.classes_))
model.to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
schedular = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, factor=0.8, patience=5, verbose=True
)

In [22]:
# actual training

def run_training(model, train_loader, test_loader, optimizer, schedular, lbl_enc):
    loss = {'train': [], 'valid': []}
    accuracy = {'character': [], 'label': []}
    
    epoch_count = 0
    for epoch in range(EPOCHS):
        train_loss = train_fn(model, train_loader, optimizer)
        valid_preds, valid_loss = eval_fn(model, test_loader, optimizer)
        
        valid_cap_preds = []
        for vp in tqdm(valid_preds, total=len(valid_preds)):
            current_preds = decode_predictions(valid_preds, lbl_enc)
            valid_cap_preds.extend(current_preds)
            
        char_acc = char_accuracy(test_orig_targets, valid_cap_preds)
        label_acc = label_accuracy(test_orig_targets, valid_cap_preds)
            
        # calculate accuracy of model and log it   
        pprint(list(zip(test_orig_targets[6:15], valid_cap_preds))[6:15])
        print(f"Epoch:{epoch}, train_loss:{train_loss}, valid_loss={valid_loss}")
        print(f"char_accuracy:{char_acc}, label_accuracy:{label_acc}")
        
        loss['train'].append(train_loss)
        loss['valid'].append(valid_loss)
        accuracy['character'].append(char_acc)
        accuracy['label'].append(label_acc)
        
        epoch_count += 1
        
    return loss, accuracy, epoch_count

# Start Training 

In [28]:
loss, accuracy, epoch_count = run_training(model, train_loader, test_loader, optimizer, schedular, lbl_enc)

100%|██████████| 189/189 [00:22<00:00,  8.28it/s]
100%|██████████| 21/21 [00:01<00:00, 14.82it/s]
  2%|▏         | 29/1197 [00:00<00:17, 65.83it/s]


KeyboardInterrupt: 

## ENDGAME

### Weights

In [29]:
TRAIN_DIR = os.path.join('..','train')

try:
    os.mkdir(TRAIN_DIR)
except FileExistsError:
    pass

In [30]:
TRAINER_DIR = os.path.join(TRAIN_DIR,'harshad')    # change this

try:
    os.mkdir(TRAINER_DIR)
except FileExistsError:
    pass

In [31]:
# change this for every training
# or it will overwrite your previous data

VER_DIR = os.path.join(TRAINER_DIR, 'text_recognition ver-1.0')

try:
    os.mkdir(VER_DIR)
except FileExistsError:
    pass

In [32]:
# saving weights & optimizer

WT_PATH = os.path.join(VER_DIR, 'weights.pth')
torch.save(model.state_dict(), WT_PATH)

OPTIM_PATH = os.path.join(VER_DIR, 'optimizer.pth')
torch.save(model.state_dict(), OPTIM_PATH)

### Hyperparameters

In [33]:
# saving hyperparametes

HYP_PATH = os.path.join(VER_DIR, 'hyperparam.json')

hyper_dict = dict()
hyper_dict["INITIALIED EPOCH"] = EPOCHS
hyper_dict["ACTUAL EPOCH"] = epoch_count
hyper_dict["MODEL"] = str(model.parameters)
hyper_dict["LOSS"] = dict()
hyper_dict["LOSS"]["train"] = [float(train_loss) for train_loss in loss['train']]
hyper_dict["LOSS"]["valid"] = [float(valid_loss) for valid_loss in loss['valid']]
hyper_dict["ACCURACY"] = dict()
hyper_dict["ACCURACY"]["char"] = [float(train_loss) for train_loss in accuracy['character']]
hyper_dict["ACCURACY"]["label"] = [float(valid_loss) for valid_loss in accuracy['label']]


hyp_file = open(HYP_PATH, "w")
hyperparam_json = json.dump(hyper_dict, fp=hyp_file)
hyp_file.close()

