<a href="https://colab.research.google.com/github/Diwakar1997/Word-Spotting-in-DCT-Compressed-Domain/blob/main/PHOCNetProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import keras
import tensorflow_addons
# from keras.utils import multi_gpu_model
 
from keras.models import Sequential, model_from_json
from keras.layers import (Conv2D, MaxPooling2D, Dense, Dropout, Flatten,LeakyReLU, Activation)
 
from keras.optimizers import SGD
from keras import losses
from keras.callbacks import TensorBoard
from tensorflow_addons.layers import SpatialPyramidPooling2D


In [None]:
!pip install tensorflow_addons

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
'''This code will take an input word as in string and will
output the PHOC label of the word. The Phoc label is a
vector of length 604.
'''
 
def generate_36(word):
  '''The vector is a binary and stands for:
  [0123456789abcdefghijklmnopqrstuvwxyz]
  '''
  vector_36 = [0 for i in range(36)]
  for char in word:
    if char.isdigit():
      vector_36[ord(char) - ord('0')] = 1
    elif char.isalpha():
      vector_36[10+ord(char) - ord('a')] = 1
 
  return vector_36
 
def generate_50(word):
  '''This vector is going to count the number of most frequent
  bigram words found in the text
  '''
  
  bigram = ['th', 'he', 'in', 'er', 'an', 're', 'es', 'on', 'st', 'nt', 'en',
  'at', 'ed', 'nd', 'to', 'or', 'ea', 'ti', 'ar', 'te', 'ng', 'al',
  'it', 'as', 'is', 'ha', 'et', 'se', 'ou', 'of', 'le', 'sa', 've',
  'ro', 'ra', 'hi', 'ne', 'me', 'de', 'co', 'ta', 'ec', 'si', 'll',
  'so', 'na', 'li', 'la', 'el', 'ma']

  vector_50 = [0 for i in range(50)]
  for char in word:
    try:
      vector_50[bigram.index(char)] = 1
    except:
      continue
 
  return vector_50
 
def generate_label(word):
  word = word.lower()
  vector = []
  L = len(word)
  for split in range(2, 6):
    parts = L//split
    for mul in range(split-1):
      vector += generate_36(word[mul*parts:mul*parts+parts])
    vector += generate_36(word[(split-1)*parts:L])
 
  vector += generate_50(word[0:L//2])
  vector += generate_50(word[L//2: L])


  return vector

In [None]:
'''This loads data in accordance to the standards mentioned in the IAM database.'''

from glob import glob
import cv2
import math
from xml.etree import ElementTree as ET
from skimage import transform
import numpy as np
from datetime import datetime
 
WORD_IMAGE_DIR = '/content/drive/My Drive/Project CNN/words/'
XML_DIR = '/content/drive/My Drive/Project CNN/xml/'
transcripts = {}
block_size = 8

QUANTIZATION_TABLE = np.array([[16,11,10,16,24,40,51,61],
                             [12,12,14,19,26,58,60,55],
                             [14,13,16,24,40,57,69,56 ],
                             [14,17,22,29,51,87,80,62],
                             [18,22,37,56,68,109,103,77],
                             [24,35,55,64,81,104,113,92],
                             [49,64,78,87,103,121,120,101],
                             [72,92,95,98,112,100,103,99]])


def apply_dct(img):

    height, width = 50,100
    h = height/block_size
    w = width/block_size
    h = np.int32(h)
    w = np.int32(w)
    final_out = np.zeros((height,width))
    for i in range(h):
        start_row = i*block_size
        end_row = (i+1)*block_size
        for j in range(w):
            start_col = j*block_size
            end_col = (j+1)*block_size
            
            block = np.float32(img[start_row:end_row,start_col:end_col])
            
            block_dct = cv2.dct(block)
            
            quan_dct = np.divide(block_dct,QUANTIZATION_TABLE).astype(int)*2
            
            final_out[start_row:end_row,start_col:end_col] = quan_dct
    
    return final_out
 
def rule():
    """IAM Dataset has some set of rules against which we must compare
    our models. We are loading those rules to set:
    (Training_data, Validation_data, Test_data)
    """
    with open('/content/drive/My Drive/Project CNN/rules/trainset.txt', 'r') as fp:
        train_rule = fp.readlines()
    train_rule = [x.strip() for x in train_rule]
 
    with open('/content/drive/My Drive/Project CNN/rules/validationset1.txt', 'r') as fp: 
        valid_rule = fp.readlines()
    with open('/content/drive/My Drive/Project CNN/rules/validationset2.txt', 'r') as fp: 
        valid_rule += fp.readlines()
    valid_rule = [x.strip() for x in valid_rule]
 
    with open('/content/drive/My Drive/Project CNN/rules/testset.txt', 'r') as fp: 
        test_rule = fp.readlines()
    test_rule = [x.strip() for x in test_rule]
 
    return train_rule, valid_rule, test_rule
 
 
def append_data(x, y, transcript, data):
    x.append(data[0])
    y.append(data[1])
    transcript.append(data[2])
 
 
def load_data():
    time_start = datetime.now()
 
    train_rule, valid_rule, test_rule = rule()
 
    xml_files = glob(XML_DIR+'*.xml')
 
    x_train = []
    y_train = []
    train_transcript = []
    x_valid = []
    y_valid = []
    valid_transcript = []
    x_test = []
    y_test = []
    test_transcript = []
    global transcripts
    
    br = 0
    for xml_file in xml_files:
        br += 1
        if br == 150:
            break
        
        print("Read Iteration = {}, time = {}".format(br, datetime.now() - time_start))
        tree = ET.parse(xml_file)
        root = tree.getroot()
       
        image_dir = xml_file.split('/')[-1].split('.')[0].split('-')
        image_dir = image_dir[0] + '/' + image_dir[0]+'-'+image_dir[1]+ '/'
        image_dir = WORD_IMAGE_DIR + image_dir
 
        for word in root.iter('word'):
            img_id = word.get('id')
            img_name = image_dir+img_id+'.png'
            img_line = '-'.join(img_id.split('-')[:-1])
            img_transcript = word.get('text').lower()
 
            img = cv2.imread(img_name, 0)
            if img is None:
                continue
            target = generate_label(img_transcript)
            # if br == 1:
            #     print("original word ", img_transcript)
            #     print("phoc vector - ", target)

            if sum(target) == 0: 
                img_transcript = ''
            
            img = cv2.resize(img, (100, 50))
            # print()
            # print("image before applying dct ")
            print(np.ndarray.flatten(img).tolist())
            #to dct
            img = apply_dct(img)
            # print("image after applying dct ")
            # print(np.ndarray.flatten(img).tolist())
            # print()
            #end

            img = cv2.resize(img, (100, 50))
            img = np.where(img<200, 1, 0)
            img = img[:, :, np.newaxis]
            data = [img, target, img_transcript]
 
            if br <= 100:
                append_data(x_train, y_train, train_transcript, data)
            elif br > 101 and br <= 125:
                append_data(x_valid, y_valid, valid_transcript, data)
            elif br > 125 and br <= 150:
                append_data(x_test, y_test, test_transcript, data)
            
            # if img_line in train_rule:
            #     append_data(x_train, y_train, train_transcript, data)
            # elif img_line in valid_rule:
            #     append_data(x_valid, y_valid, valid_transcript, data)
            # elif img_line in test_rule:
            #     append_data(x_test, y_test, test_transcript, data)


    N = len(x_train) + len(x_valid) + len(x_test)
 
    x_train = np.array(x_train)
    y_train = np.array(y_train)
    train_trainscript = np.array(train_transcript)
 
    x_valid = np.array(x_valid)
    y_valid = np.array(y_valid)
    valid_transcript = np.array(valid_transcript)
 
    x_test = np.array(x_test)
    y_test = np.array(y_test)
    test_transcript = np.array(test_transcript)
 
    print ("Time to fetch data: ", datetime.now() - time_start)
 
    return (x_train, y_train, train_transcript,
            x_valid, y_valid, valid_transcript,
            x_test, y_test, test_transcript)

In [None]:
def test_model(model, x_test, y_test, transcripts):
    start = datetime.now()
    y_pred = model.predict(x_test)
    y_pred = np.where(y_pred<0.5, 0, 1)
    print("Time taken to predict ", datetime.now()-start)
    count = 0
    n = len(x_test)
    for i in range(n):
        pred = y_pred[i]
        acc = np.sum(abs(y_test[i]-pred))
        tmp = np.argmin(acc)
        if transcripts[tmp] == transcripts[i]:
            count += 1
        print("Word = ", transcripts[i])
        print("predicted phoc vector ")
        print(pred)
        print("predicted word = ",transcripts[tmp])
        print()
    precision = count/n
    print("Precision = ", precision)

    print("Total time taken = ", datetime.now()-start)

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import keras
 
from keras.models import Sequential, model_from_json
from keras.layers import (Conv2D, MaxPooling2D, Dense, Dropout, Flatten, LeakyReLU, Activation)
from keras.optimizers import SGD
from keras import losses
from keras.callbacks import TensorBoard
from datetime import datetime
 
def create_model():
  """This module creates an Instance of the Sequential Class in Keras.
  Args:
    None.
  Return:
    model: Instance of the Sequential Class
  """
  time_start = datetime.now()
 
  model = Sequential()
  model.add(Conv2D(64, (3, 3), padding='same',activation='relu', input_shape=(50, 100,1)))
#   model.add(Conv2D(64, (3, 3), padding='same', activation='relu'))
#   model.add(MaxPooling2D(pool_size=(2, 2), strides=2))
#   model.add(Conv2D(128, (3, 3), padding='same', activation='relu'))
  model.add(Conv2D(128, (3, 3), padding='same', activation='relu'))
  model.add(MaxPooling2D(pool_size=(2, 2), strides=2))
  model.add(Conv2D(256, (3, 3), padding='same', activation='relu'))
  model.add(Conv2D(256, (3, 3), padding='same', activation='relu'))
  model.add(Conv2D(256, (3, 3), padding='same', activation='relu'))
  model.add(Conv2D(256, (3, 3), padding='same', activation='relu'))
  model.add(Conv2D(256, (3, 3), padding='same', activation='relu'))
  model.add(Conv2D(256, (3, 3), padding='same', activation='relu'))
  model.add(Conv2D(512, (3, 3), padding='same', activation='relu'))
  model.add(Conv2D(512, (3, 3), padding='same', activation='relu'))
  model.add(Conv2D(512, (3, 3), padding='same', activation='relu'))
  model.add(SpatialPyramidPooling2D([1,2,4]))
  model.add(Flatten())
  model.add(Dense(4096, activation='relu'))
  model.add(Dropout(0.5))
  model.add(Dense(4096, activation='relu'))
  model.add(Dropout(0.5))
  model.add(Dense(604, activation='sigmoid'))
 
  loss = losses.binary_crossentropy
  optimizer = SGD(lr=1e-4, momentum=.9, decay=5e-5)
#   optimizer = SGD(lr=1e-4)
  model.compile(loss=loss, optimizer=optimizer, metrics=['accuracy'])
  model.summary()
  print ("Time taken to create model: ", datetime.now()-time_start) 
    
  return model
 
 
def train():
 
  time_start = datetime.now()
  model = create_model()
 
  data = load_data()
  print("data_loaded\n")
  x_train = data[0]
  y_train = data[1]
  x_valid = data[3]
  y_valid = data[4]
  x_test = data[6]
  y_test = data[7]
  test_transcripts = data[8]
 
  model.fit(x_train,y_train,batch_size=10,epochs = 10,validation_data=(x_valid, y_valid))
  train_time = datetime.now()-time_start
  print ("Time taken to train the model: ", datetime.now()-time_start)
  test_model(model, x_test, y_test, test_transcripts)
 
  print ("Time taken to test the model: ", datetime.now()-train_time)

In [None]:
train()