In [13]:
#importing libraries
import pandas as pd
import glob
import cv2
from PIL import Image
import os
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import string
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import CTCLoss
import torch.optim as optim

%matplotlib inline

In [14]:
dataset_path = "/home/maximus1/Downloads/archive (2)/cropped_lps/cropped_lps"

In [15]:
#visualising few images from the directory
disp_counter=0
for file_name in glob.glob(dataset_path+"/*.jpg"):
    img = cv2.imread(file_name)
    plt.imshow(img)
    plt.show()
    disp_counter+=1
    if disp_counter==2:
        break

In [16]:
# creating dataset
class VehicleDataset(Dataset):
    def __init__(self,dataset_path, transform=None):
        self.dataset_path = dataset_path
        self.transform = transform
        self.images,self.labels = [],[]
        for file_name in glob.glob(dataset_path+"cropped_lps/cropped_lps/*.jpg"):
            self.images.append(Image.open(file_name))
            label_id = file_name.split('/')[-1]
            labels_mapper = pd.read_csv(dataset_path+"lpr.csv")
            index = labels_mapper.loc[labels_mapper['images']==label_id].index
            self.labels.append(labels_mapper['labels'][index].values[0])
     

    def __len__(self):
        return len(self.labels)

    def __getitem__(self,index):
        img = self.images[index]
        label = self.labels[index]
        if self.transform is not None:
            img = self.transform(img)
        
        item = {'img':img, 'label':label}
        return item

In [17]:
#creating a collate function #optional that could other wise do padding
class Collater(object):

    def __init__(self,arg=None):
        self.arg = arg
    
    def __call__(self, batch):
        for b in batch:
            img,label = b['img'],b['label']

        item = {}
        item['img'] = img
        item['label'] = label
        return item

In [18]:
dataset_path = "/home/maximus1/Downloads/archive (2)/"
transform = transforms.Compose([transforms.ToTensor(),transforms.Resize((100,32)), transforms.Grayscale(num_output_channels=1)])
train_data = VehicleDataset(dataset_path=dataset_path,transform=transform) #alternatively,arg=transforms =transform instead of collate_fn can be passsed.

In [19]:
for i,batch in enumerate(train_data):
    print(batch['img'], batch['label'])
    print(batch['img'].dtype)
    plt.imshow(batch['img'][0])
    plt.show()
    if i==2:
        break

In [None]:
#creating data loader
batch_size=8
dataloader = DataLoader(train_data, batch_size=8, shuffle=True, num_workers=4, collate_fn=Collater())

In [None]:
for batch in dataloader:
    print(batch)
    break

In [None]:
alphabets = string.ascii_uppercase + string.digits
print(alphabets)

In [None]:
# Util functions for converting string label
class strLabelConverter(object):
    """ Class for converting str and label
    blank should be inserted to the alphabets for CTC
    ignoring case = True, since number plate's char is upper case
    """
    def __init__(self,alphabet, ignore_case=True):
        self.ignore_case =ignore_case
        
        self.alphabet = alphabet + '-' # at last index
        self.dict = {}
        for i, char in enumerate(alphabet):
            self.dict[char]=i+1

    #encoding 
    def encode(self,text):
        length = []
        result = []
        for item in text:
            item = item.encode().decode('utf-8','strict')
            length.append(len(item))
            r = []
            for char in item:
                index = self.dict[char]
                r.append(index)
            result.append(r)
            #converting each char to their relative numbers
        
        max_len =0
        for r in result:
            if len(r)>max_len:
                max_len = len(r)
        
        result_temp = []
        for r in result:
            for i in range(max_len - len(r)):
                r.append(0)
            result_temp.append(r)
        
        text = result_temp
        return (torch.LongTensor(text), torch.LongTensor(length))
        # since CTC expects long tensor of encoded text and its seq_length
        # [a,b,c], len = [0,1,2], [3]
    #decoding

    def decode(self, t, length, raw=False):
        '''
        Decode encoded texts back into strs
        '''
        if length.numel() == 1:
            length = length[0]
            assert t.numel() == length, "text with length: {} does not match declared length: {}".format(t.numel(), length)
            if raw:
                return ''.join([self.alphabet[i - 1] for i in t])
            else:
                char_list = []
                for i in range(length):
                    if t[i] != 0 and (not (i > 0 and t[i - 1] == t[i])):
                        char_list.append(self.alphabet[t[i] - 1])
                return ''.join(char_list)
        else:
            # batch mode
            assert t.numel() == length.sum(), "texts with length: {} does not match declared length: {}".format(t.numel(), length.sum())
            texts = []
            index = 0
            for i in range(length.numel()):
                l = length[i]
                texts.append(
                    self.decode(
                        t[index:index + l], torch.LongTensor([l]), raw=raw))
                index += l
            return texts


In [None]:
#function to one hot
def one_hot(text, text_length, nc):
    batch_size = text_length.size(0)
    maxLength = text_length.max()
    onehot = torch.FloatTensor(batch_size,maxLength, nc).fill_(0)
    acc = 0
    for i in range(batch_size):
        length = text_length[i]
        label = text[acc:acc+length].view(-1,1).long()
        one_hot[i,:length].scatter_(1,label,1.0)
        acc+=length
    
    return one_hot

In [None]:
# CRNN model -https://github.com/meijieru/crnn.pytorch/blob/master/models/crnn.py
class BidirectionalLSTM(nn.Module):

    def __init__(self, nIn, nHidden, nOut):
        super(BidirectionalLSTM, self).__init__()

        self.rnn = nn.LSTM(nIn, nHidden, bidirectional=True)
        self.embedding = nn.Linear(nHidden * 2, nOut)

    def forward(self, input):
        recurrent, _ = self.rnn(input)
        T, b, h = recurrent.size()
        t_rec = recurrent.view(T * b, h)

        output = self.embedding(t_rec)  # [T * b, nOut]
        output = output.view(T, b, -1)

        return output


class CRNN(nn.Module):

    def __init__(self, imgH, nc, nclass, nh, n_rnn=2, leakyRelu=False):
        super(CRNN, self).__init__()
        assert imgH % 16 == 0, 'imgH has to be a multiple of 16'

        ks = [3, 3, 3, 3, 3, 3, 2]
        ps = [1, 1, 1, 1, 1, 1, 0]
        ss = [1, 1, 1, 1, 1, 1, 1]
        nm = [64, 128, 256, 256, 512, 512, 512]

        cnn = nn.Sequential()

        def convRelu(i, batchNormalization=False):
            nIn = nc if i == 0 else nm[i - 1]
            nOut = nm[i]
            cnn.add_module('conv{0}'.format(i),
                           nn.Conv2d(nIn, nOut, ks[i], ss[i], ps[i]))
            if batchNormalization:
                cnn.add_module('batchnorm{0}'.format(i), nn.BatchNorm2d(nOut))
            if leakyRelu:
                cnn.add_module('relu{0}'.format(i),
                               nn.LeakyReLU(0.2, inplace=True))
            else:
                cnn.add_module('relu{0}'.format(i), nn.ReLU(True))

        convRelu(0)
        cnn.add_module('pooling{0}'.format(0), nn.MaxPool2d(2, 2))  # 64x16x64
        convRelu(1)
        cnn.add_module('pooling{0}'.format(1), nn.MaxPool2d(2, 2))  # 128x8x32
        convRelu(2, True)
        convRelu(3)
        cnn.add_module('pooling{0}'.format(2),
                       nn.MaxPool2d((2, 2), (2, 1), (0, 1)))  # 256x4x16
        convRelu(4, True)
        convRelu(5)
        cnn.add_module('pooling{0}'.format(3),
                       nn.MaxPool2d((2, 2), (2, 1), (0, 1)))  # 512x2x16
        convRelu(6, True)  # 512x1x16

        self.cnn = cnn
        self.rnn = nn.Sequential(
            BidirectionalLSTM(512, nh, nh),
            BidirectionalLSTM(nh, nh, nclass))


    def forward(self, input):
        # conv features
        conv = self.cnn(input)
        b, c, h, w = conv.size()
        assert h == 1, "the height of conv must be 1"
        conv = conv.squeeze(2)
        conv = conv.permute(2, 0, 1)  # [w, b, c]

        # rnn features
        output = self.rnn(conv)
        
        # add log_softmax to converge output
        output = F.log_softmax(output, dim=2)

        return output


    def backward_hook(self, module, grad_input, grad_output):
        for g in grad_input:
            g[g != g] = 0

In [None]:
n_class = len(alphabets)+1

crnn = CRNN(imgH=32,nc=1, nclass=n_class,nh=256) # nc = num channel nh = size of lstm hidden state

In [None]:
# initialising weights
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        m.weight.data.normal_(0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
for param in crnn.parameters():
    param.requires_grad = True

In [None]:
encoder_decoder = strLabelConverter(alphabet=alphabets)

In [None]:
optimizer = optim.Adam(crnn.parameters(), lr=0.01)
criterion = CTCLoss()

In [None]:
# trainiing
crnn.train()
num_epochs=10
for epoch in range(num_epochs):
    for i,batch in enumerate(dataloader):
        img,label = batch['img'],batch['label']
        img = img.unsqueeze(0)
        print(img.shape)
        batch_size = img.size(0)
        text,len = encoder_decoder.encode(label)
        text = text.to(device)
        len = len.to(device)
        pred = crnn(img)
        optimizer.zero_grad()
        preds_size = torch.LongTensor([pred.size(0)]* batch_size)
        loss = criterion(pred,text, preds_size,len)/batch_size
        loss.backward()
        optimizer.step()
    
    print("Epoch - {}, Loss - {}".format(epoch, loss()))