<a href="https://colab.research.google.com/github/haturusinghe/cnn-lp-detection/blob/main/LRPNET_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mount Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
BASE_DIR = '/content/drive/MyDrive/npr/'
LPRNET_DIR = BASE_DIR + 'LPRNET/'

# Imports

In [None]:
from google.colab.patches import cv2_imshow

In [None]:
import tensorflow as tf
from tensorflow import keras
import os
import time
import argparse
import math
import numpy as np
from keras import backend as K

In [None]:
from keras import Sequential, Model
from keras.layers import Activation,Conv2D, BatchNormalization, MaxPool2D,Softmax,Dropout, Input, ReLU, \
    Concatenate, Dense, Flatten
from keras.models import load_model

In [None]:
import cv2
import random

In [None]:
import editdistance

In [None]:
from data_aug import data_augmentation

In [None]:
from time import time

# Variables

In [None]:
CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
CHARS_SIN = "ගුවන්යුහනාහශ්‍රී"
CHARS = CHARS + CHARS_SIN
NUM_CLASS = len(set(CHARS))+1
tf.compat.v1.enable_eager_execution()

In [None]:
train_dir = LPRNET_DIR +"train"  #path to the train directory
val_dir = LPRNET_DIR + "valid"  #path to the validation directory

In [None]:
train_epochs = 100    #number of training epochs  #default = 1000
batch_size =  2  #default = 8  #batch size (train)
val_batch_size =  2  #default = 4  #Validation batch size
lr = 1e-3  #  #default = 1e-3  #initial learning rate
decay_steps = 500  #  #default = 500  #learning rate decay rate
decay_rate =   0.995  #learning rate decay rate  #default = 0.995
staircase = "smooth"  #learning rate decay on step (default:smooth)

pretrained = None  #pretrained model location
saved_dir= "saved_models" #default = "saved_models"  #folder for saving models

# Utils

## Util Variables

In [None]:
IMG_SIZE = [94, 24]
CH_NUM = 3

CHARS_DICT = {char:i for i, char in enumerate(set(CHARS))}
DECODE_DICT = {i:char for i, char in enumerate(set(CHARS))}

## Helper Functions

In [None]:
def encode_label(label, char_dict):
    encode = [char_dict[c] for c in label]
    return encode

In [None]:
def sparse_tuple_from(sequences, dtype=np.int32):
    """
    Create a sparse representention of x.
    Args:
        sequences: a list of lists of type dtype where each element is a sequence
    Returns:
        A tuple with (indices, values, shape)
    """
    indices = []
    values = []

    for n, seq in enumerate(sequences):
        indices.extend(zip([n] * len(seq), range(len(seq))))
        values.extend(seq)

    indices = np.asarray(indices, dtype=np.int64)
    values = np.asarray(values, dtype=dtype)
    shape = np.asarray([len(sequences), np.asarray(indices).max(0)[1] + 1], dtype=np.int64)

    return indices, values, shape

## Data Iterator Class for Training

In [None]:
class DataIterator:
    def __init__(self, img_dir, batch_size, runtime_generate=False):
        self.img_dir = img_dir
        self.batch_size = batch_size
        self.channel_num = CH_NUM
        self.img_w, self.img_h = IMG_SIZE

        if runtime_generate:
            self.generator = None
        else:
            self.init()

    def init(self):
        self.filenames = []
        self.labels = []
        fs = os.listdir(self.img_dir)
        for filename in fs:
            self.filenames.append(filename)
            label = filename.split('_')[0] # format: [label]_[random number].jpg
            # print(label)
            label = encode_label(label, CHARS_DICT)
            self.labels.append(label)
        self.sample_num = len(self.labels)
        self.labels = np.array(self.labels)
        self.random_index = list(range(self.sample_num))
        random.shuffle(self.random_index)
        self.cur_index = 0

    def next_sample_ind(self):
        ret = self.random_index[self.cur_index]
        self.cur_index += 1
        if self.cur_index >= self.sample_num:
            self.cur_index = 0
            random.shuffle(self.random_index)
        return ret

    def next_batch(self):

        batch_size = self.batch_size
        images = np.zeros([batch_size, self.img_h, self.img_w, self.channel_num])
        labels = []

        for i in range(batch_size):
            sample_ind = self.next_sample_ind()
            fname = self.filenames[sample_ind]
            img = cv2.imread(os.path.join(self.img_dir, fname))
            #img = data_augmentation(img)
            img = cv2.resize(img, (self.img_w, self.img_h))
            images[i] = img

            labels.append(self.labels[sample_ind])

        sparse_labels = sparse_tuple_from(labels)

        return images, sparse_labels, labels

    def next_test_batch(self):

        start = 0
        end = self.batch_size
        is_last_batch = False

        while not is_last_batch:
            if end >= self.sample_num:
                end = self.sample_num
                is_last_batch = True

            #print("s: {} e: {}".format(start, end))

            cur_batch_size = end-start
            images = np.zeros([cur_batch_size, self.img_h, self.img_w, self.channel_num])

            for j, i in enumerate(range(start, end)):
                fname = self.filenames[i]
                img = cv2.imread(os.path.join(self.img_dir, fname))
                img = cv2.resize(img, (self.img_w, self.img_h))
                images[j, ...] = img

            labels = self.labels[start:end, ...]
            sparse_labels = sparse_tuple_from(labels)

            start = end
            end += self.batch_size

            yield images, sparse_labels, labels

    def next_gen_batch(self):

        batch_size = self.batch_size
        imgs, labels = self.generator.generate_images(batch_size)
        labels = [encode_label(label, CHARS_DICT) for label in labels]

        images = np.zeros([batch_size, self.img_h, self.img_w, self.channel_num])
        for i, img in enumerate(imgs):
            img = data_augmentation(img)
            img = cv2.resize(img, (self.img_w, self.img_h))
            images[i, ...] = img

        sparse_labels = sparse_tuple_from(labels)

        return images, sparse_labels, labels

# Model

In [None]:
def conv2D_batchnorm(*args, **kwargs):
    return Sequential([Conv2D(*args, **kwargs),
                   BatchNormalization(),
                   ReLU()])

In [None]:
class LPRNet:
    def __init__(
        self,
        num_classes,
        pattern_size=128,
        dropout=0.5,
        input_shape=(24, 94, 3),
        include_STN=False,
    ):
        self.num_classes = num_classes
        self.pattern_size = pattern_size
        self.dropout = dropout
        self.input_shape = input_shape
        self.input_block = self.mixed_input_block
        self.basic_block = self.basic_blocks
        self.model = self._build()

    def _build(self):
        inputs = Input(self.input_shape)
        x = self.input_block()(inputs)
        x = self.basic_block(x.get_shape().as_list()[3], 256)(x)
        x = self.convolution_block(x.get_shape().as_list()[3], 256, 2)(x)

        x = Dropout(self.dropout)(x)
        x = conv2D_batchnorm(256, [4, 1])(x)
        x = Dropout(self.dropout)(x)

        classes = conv2D_batchnorm(self.num_classes, [1, 13], padding="same")(x)
        pattern = Conv2D(128, [1, 1])(classes)
        x = Concatenate()([classes, pattern])
        outs = conv2D_batchnorm(self.num_classes, [1, 1], padding="same")(x)

        return Model(inputs=inputs, outputs=outs)

    @staticmethod
    def basic_blocks(channel_in, channel_out):
        return Sequential(
            [
                conv2D_batchnorm(channel_out // 4, [1, 1], padding="same"),
                conv2D_batchnorm(channel_out // 4, [3, 1], padding="same"),
                conv2D_batchnorm(channel_out // 4, [1, 3], padding="same"),
                conv2D_batchnorm(channel_out // 4, [1, 1], padding="same"),
            ]
        )

    def mixed_input_block(self):
        return Sequential(
            [
                conv2D_batchnorm(64, [3, 3], padding="same"),
                MaxPool2D([3, 3], strides=[1, 1]),
                self.basic_block(64, 128),
                MaxPool2D([3, 3], strides=[2, 1]),
            ]
        )

    # Convolution block for CNN
    def convolution_block(self, channel_in, channel_out, stride):
        return Sequential(
            [
                self.basic_block(channel_in, channel_out),
                MaxPool2D([3, 3], strides=(stride, 1)),
            ]
        )

    def train(self):
        raise NotImplemented

    def predict(self, x, classnames):
        pred = self.model.predict(x)
        return pred

    def decode_pred(self, pred, classnames):
        samples, times = pred.shape[:2]
        input_length = tf.convert_to_tensor([times] * samples)
        decodeds, logprobs = tf.keras.backend.ctc_decode(
            pred, input_length, greedy=True, beam_width=100, top_paths=1
        )
        decodeds = np.array(decodeds[0])

        results = []
        for d in decodeds:
            text = []
            for idx in d:
                if idx == -1:
                    break
                text.append(classnames[idx])
            results.append("".join(text).encode("utf-8"))
        return results

    def save_weights(self, filepath):
        self.model.save_weights(filepath)

    def load_weights(self, filepath):
        self.model.load_weights(filepath)

    def save(self, filepath):
        self.model.save(filepath)

    def summary(self):
        self.model.summary()


# Evaluate

In [None]:
class Evaluator:

    def __init__(self,val_gen, net, class_names,val_batch_len,batch_size):
        self.net = net
        self.val_gen = val_gen
        self.class_names = class_names
        self.batch_size = batch_size
        self.val_gen = val_gen
        self.val_batch_len = val_batch_len

    def _average(self, values):
        if len(values) == 1:
            return values[0]
        return(np.sum(values )/self.val_batch_len)
        # return (np.sum(values[:-1] * self.batch_size) + values[-1] * last_batch_size) / len(self.loader)


    def _decode_label(self,labels):
        results = []
        for d in labels:
            text = []
            for idx in d:
                if idx == -1:
                    break
                text.append(self.class_names[idx])
            results.append(''.join(text).encode('utf-8'))
        return results

    def decode_pred(self,pred ):
        samples, times = pred.shape[:2]
        input_length = tf.convert_to_tensor([times] * samples)
        decodeds, logprobs = tf.keras.backend.ctc_decode(pred, input_length, greedy=True, beam_width=100, top_paths=1)
        decodeds = np.array(decodeds[0])

        results = []
        for d in decodeds:
            text = []
            for idx in d:
                if idx == -1:
                    break
                text.append(self.class_names[idx])
            results.append(''.join(text).encode('utf-8'))
        return results

    def _calc_CER_and_WER(self, label_texts, decoded_texts):
        ed = []
        WER = 0
        for label, pred in zip(label_texts, decoded_texts):
            print("label \t {} \t prediction \t {}".format(label,pred))
            cer = editdistance.eval(label, pred)
            ed.append(cer)
            if cer != 0:
                WER += 1
        WER /= len(label_texts)
        CER = sum(ed) / len(label_texts)
        return CER, WER

    def _print_result(self, loss, CER, WER):
        print("Number of samples in test set: {}\n"
              "mean loss: {}\n"
              "mean CER: {}\n"
              "WER: {}\n".format(self.val_batch_len * self.batch_size,
                                 loss,
                                 CER,
                                 WER
                                 )
              )

    def evaluate(self):
        self.losses, self.CERs, self.WERs = [],[],[]

        for val_batch in range(self.val_batch_len):
            val_inputs, val_targets, val_labels = self.val_gen.next_batch()
            val_inputs = val_inputs.astype('float32')
            val_targets = tf.SparseTensor(val_targets[0], val_targets[1], val_targets[2])
            logits = self.net.model(val_inputs, training = False)
            logits = tf.reduce_mean(logits, axis = 1)
            decoded_texts = self.decode_pred(logits)
            label_texts = self._decode_label(val_labels)
            CER,WER = self._calc_CER_and_WER(label_texts, decoded_texts)

            logits_shape = tf.shape(logits)
            seq_len = tf.fill([logits_shape[0]],logits_shape[1])
            logits = tf.transpose(logits, (1,0,2))
            loss_value = tf.reduce_mean(tf.compat.v1.nn.ctc_loss(labels = val_targets, inputs = logits, sequence_length = seq_len ))
            # print("Loss: {} - CER: {}, WER:{}\n".format(float(loss_value),CER,WER))
            self.losses.append(float(loss_value))
            self.CERs.append(CER)
            self.WERs.append(WER)
        loss = self._average(self.losses)
        cer = self._average(self.CERs)
        wer = self._average(self.WERs)
        self._print_result(loss,cer,wer)
        return(loss)

# Train

In [None]:
def train():

	#Initiate the Neural Network
	#Defined in model.py
	net = LPRNet(NUM_CLASS)


	#initialize the custom data generator
	#Defined in utils.py
	train_gen = DataIterator(img_dir=train_dir, batch_size = batch_size)
	val_gen = DataIterator(img_dir=val_dir,batch_size = val_batch_size)

	#variable intialization used for custom training loop
	train_len = len(next(os.walk(train_dir))[2])
	val_len = len(next(os.walk(val_dir))[2])
	print("Train Len is", train_len)


	if batch_size ==1:
		BATCH_PER_EPOCH = train_len
	else:
		BATCH_PER_EPOCH = int(math.ceil(train_len/batch_size))

	#initialize tensorboard
	tensorboard = keras.callbacks.TensorBoard(log_dir = 'tmp/my_tf_logs',histogram_freq = 0,
		batch_size = batch_size, write_graph = True)

	val_batch_len = int(math.floor(val_len / val_batch_size))
	evaluator = Evaluator(val_gen,net, CHARS,val_batch_len, val_batch_size) #Check evaluate.py
	best_val_loss = float("inf")

	#if a pretrained model is available, load weights from it
	#if pretrained:
	#	net.load_weights(pretrained)


	model = net.model
	tensorboard.set_model(model)

	#initialize the learning rate
	learning_rate = keras.optimizers.schedules.ExponentialDecay(lr,
															decay_steps=decay_steps,
															decay_rate=decay_rate,
															staircase=staircase)

	#define training optimizer
	optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
	print('Training ...')
	train_loss = 0

	#starting the training loop
	for epoch in range(train_epochs):

		print("Start of epoch {} / {}".format(epoch,train_epochs))

		#zero out the train_loss and val_loss at the beginning of every loop
		#This helps us track the loss value for every epoch
		train_loss = 0
		val_loss = 0
		start_time = time.time()

		for batch in range(BATCH_PER_EPOCH):
			print("batch {}/{}".format(batch, BATCH_PER_EPOCH))
			#get a batch of images/labels
			#the labels have to be put into sparse tensor to feed into tf.nn.ctc_loss
			train_inputs,train_targets,train_labels = train_gen.next_batch()
			train_inputs = train_inputs.astype('float32')

			train_targets = tf.SparseTensor(train_targets[0],train_targets[1],train_targets[2])


		# Open a GradientTape to record the operations run
		# during the forward pass, which enables auto-differentiation.
			with tf.GradientTape() as tape:

				#get model outputs
				logits = model(train_inputs,training = True)

				#next we pass the model outputs into the ctc loss function
				logits = tf.reduce_mean(logits, axis = 1)
				logits_shape = tf.shape(logits)
				cur_batch_size = logits_shape[0]
				timesteps = logits_shape[1]
				seq_len = tf.fill([cur_batch_size],timesteps)
				logits = tf.transpose(logits,(1,0,2))
				ctc_loss = tf.compat.v1.nn.ctc_loss(labels = train_targets, inputs = logits, sequence_length = seq_len)
				loss_value =tf.reduce_mean(ctc_loss)



			#Calculate Gradients and Update it
			grads = tape.gradient(ctc_loss, model.trainable_weights,unconnected_gradients=tf.UnconnectedGradients.NONE)
			optimizer.apply_gradients(zip(grads, model.trainable_weights))
			train_loss += float(loss_value)


		tim = time.time() - start_time

		print("Train loss {}, time {} \n".format(float(train_loss/BATCH_PER_EPOCH),tim))
		#run a validation loop in every 25 epoch
		if epoch != 0 and epoch%25 == 0:
			val_loss = evaluator.evaluate()
			#if the validation loss is less the previous best validation loss, update the saved model
			if val_loss < best_val_loss:
				best_val_loss = val_loss
				net.save_weights(os.path.join(saved_dir, "new_out_model_best.pb"))
				print("Weights updated in {}/{}".format(saved_dir,"new_out_model_best.pb"))

			else:
				print("Validation loss is greater than best_val_loss ")

			# if epoch %500 == 0:
			# 	net.save(os.path.join(saved_dir, f"new_out_model_last_{epoch}.pb"))



	net.save(os.path.join(saved_dir, "new_out_model_last.pb"))
	print("Final Weights saved in {}/{}".format(saved_dir, "new_out_model_last.pb"))
	tensorboard.on_train_end(None)

# Test and Train Model

In [None]:
train()

Train Len is 6




Training ...
Start of epoch 0 / 100
batch 0/3
batch 1/3
batch 2/3
Train loss 327.82908121744794, time 0.7231934070587158 

Start of epoch 1 / 100
batch 0/3
batch 1/3
batch 2/3
Train loss 306.97023518880206, time 0.6387524604797363 

Start of epoch 2 / 100
batch 0/3
batch 1/3
batch 2/3
Train loss 300.85740152994794, time 0.7008249759674072 

Start of epoch 3 / 100
batch 0/3
batch 1/3
batch 2/3
Train loss 299.3066914876302, time 0.6992542743682861 

Start of epoch 4 / 100
batch 0/3
batch 1/3
batch 2/3
Train loss 293.1197509765625, time 0.6849119663238525 

Start of epoch 5 / 100
batch 0/3
batch 1/3
batch 2/3
Train loss 280.4199625651042, time 0.5874059200286865 

Start of epoch 6 / 100
batch 0/3
batch 1/3
batch 2/3
Train loss 277.3896077473958, time 0.5903067588806152 

Start of epoch 7 / 100
batch 0/3
batch 1/3
batch 2/3
Train loss 271.1135762532552, time 0.5882546901702881 

Start of epoch 8 / 100
batch 0/3
batch 1/3
batch 2/3
Train loss 268.7672526041667, time 0.5814926624298096 

Sta

# Other

In [None]:
!tar -zcvf archive-saved_models.tar.gz saved_models

saved_models/
saved_models/new_out_model_best.pb.index
saved_models/new_out_model_last.pb/
saved_models/new_out_model_last.pb/variables/
saved_models/new_out_model_last.pb/variables/variables.index
saved_models/new_out_model_last.pb/variables/variables.data-00000-of-00001
saved_models/new_out_model_last.pb/keras_metadata.pb
saved_models/new_out_model_last.pb/saved_model.pb
saved_models/new_out_model_last.pb/assets/
saved_models/new_out_model_best.pb.data-00000-of-00001
saved_models/checkpoint


In [None]:
% zip -r saved_models.zip saved_models/

UsageError: Line magic function `%zip` not found.


# Predict

## Helper Function

In [None]:
classnames = CHARS

In [None]:
def decode_pred(pred,classnames):
	pred = np.mean(pred, axis = 1)
	samples, times = pred.shape[:2]
	input_length = tf.convert_to_tensor([times] * samples)
	decodeds, logprobs = tf.keras.backend.ctc_decode(pred, input_length, greedy=True, beam_width=100, top_paths=1)
	decodeds = np.array(decodeds[0])

	results = []
	for d in decodeds:
		text = []
		for idx in d:
			if idx == -1:
				break
			text.append(classnames[idx])
		results.append(''.join(text).encode('utf-8'))
	return results

In [None]:
def run():
	#load the KERAS model
	model = load_model("./saved_models/new_out_model_last.pb")
	print("Loaded Weights successfully")
	print("Actual Label \t Predicted Label ")
	start_time = time()
	cnt = 0

	#loop through all the files in the test folder
	for filename in os.listdir("./images"):
		#check if the file is an image
		if filename.endswith(".jpg") or filename.endswith(".JPG"):
			#read the file and preprocess it
			frame = cv2.imread(f"./images/{filename}")
			img = cv2.resize(frame, (94,24))
			img = np.expand_dims(img,axis = 0)
			#get the output sequence
			pred = model.predict(img)

			#decode the output sequence using keras ctc decode
			result_ctc = decode_pred(pred, classnames)
			original_label = filename.split("_")[0]

			#print the original sequence and the decoded sequence
			print(original_label,"\t",result_ctc[0].decode('utf-8'))
			cnt+=1
	print("total time taken :", time()-start_time)



In [None]:
run()

Loaded Weights successfully
Actual Label 	 Predicted Label 
img1.jpg 	 KNKNGNN
total time taken : 0.43308115005493164
