## Initialization

In [0]:
# from google.colab import drive
# drive.mount('/content/drive')

In [0]:
import os
import random
import codecs
import copy
import argparse
import cv2
import pandas
import numpy as np
import struct
from PIL import Image
from PIL import ImageFilter
import pickle
import zipfile

import tensorflow as tf
from keras.utils.np_utils import to_categorical
from keras.models import Model
from keras.layers import Input, Flatten, Dense, ZeroPadding2D, Conv2D
from keras.layers import Activation, MaxPooling2D, BatchNormalization
from keras.layers.advanced_activations import LeakyReLU
from keras import backend

In [0]:
data_dir = './'
extension = '.zip'

for item in os.listdir(data_dir):
  if item.endswith(extension):
    file_name = os.path.abspath(item)
    new_dir = file_name.replace('.zip', '')
    
    # print (file_name)
    # print (new_dir)

    zip_ref = zipfile.ZipFile(file_name)
    zip_ref.extractall(new_dir)
    zip_ref.close()
    os.remove(file_name)

## Read data

In [0]:
class ReadData:
  
  def __init__(self, path, is_train=True, char_dict=None):
    self.file_count = 0
    self.is_train = is_train
    self.iter_index = 0
    self.path = path
    self.char_dict = char_dict
    self.use_filter = True
    self.use_rotation = True
  
  # Read data

  def read_one_file(self, f):
    header_size = 10
    while True:
      header = np.fromfile(f, dtype='uint8', count=header_size)
      if not header.size: break
      sample_size = header[0] + (header[1]<<8) + (header[2]<<16) + (header[3]<<24)
      tagcode = header[5] + (header[4]<<8)
      width = header[6] + (header[7]<<8)
      height = header[8] + (header[9]<<8)
      if header_size + width*height != sample_size:
        break
      try:
        image = np.fromfile(f, dtype='uint8', count=width*height).reshape((height,width))
      except:
        print (struct.pack('>H', tagcode).decode('gb2312'))
      yield image, tagcode
  
  def read_from_dir(self, gnt_dir):
    for file_name in os.listdir(gnt_dir):
      if file_name.endswith('.gnt'):
        file_path = os.path.join(gnt_dir, file_name)
        with open(file_path, 'rb') as f:
          for image, tagcode in read_one_file(f):
            yield image, tagcode
  
  def read_one_gnt_file(self):
    for file_name in os.listdir(self.path):
      if file_name.endswith('.gnt'):
        file_path = os.path.join(self.path, file_name)
        with open(file_path, 'rb') as f:
          x = []
          y = []
          for image, tagcode in self.read_one_file(f):
            x.append(image)
            y.append(tagcode)
        yield x, y
  
  def load_next_file(self):
    for x_one, y_one in self.read_one_gnt_file():
      result_x = []
      result_y = []
      for i in range(len(x_one)):
        result = self.read_convert_image(x_one[i])
        result_x.append(result)
        result_y.append(y_one[i])
        if self.use_filter:
          filtered_x = self.apply_filter(x_one[i])
          result_x.append(filtered_x)
          result_y.append(y_one[i])
        if self.use_rotation:
          rotated_x = self.rotate(x_one[i])
          result_x.append(rotated_x)
          result_y.append(y_one[i])
      x = np.array(result_x)
      y = np.array(result_y)
      self.file_count += 1
      print ('Loaded files ', self.file_count)
      yield x, y
  
  def load_all(self):
    x = []
    y = []
    for temp_x, temp_y in self.load_next_file():
      x.extend(temp_x)
      y.extend(temp_y)
    return np.array(x), np.array(y)

  def rotate(self, image):
    im = Image.fromarray(image)
    im.rotate(random.randint(10,20))
    im = im.resize([64, 64])
    new_image = np.asarray(im)
    new_image = new_image.reshape(new_image.shape[0], new_image.shape[1], 1)
    return new_image
  
  def apply_filter(self,image):
    im = Image.fromarray(image)
    filters = [ImageFilter.BLUR, ImageFilter.CONTOUR, ImageFilter.EMBOSS]
    im.filter(random.choice(filters))
    im = im.resize([64, 64])
    new_image = np.asarray(im)
    new_image = new_image.reshape(new_image.shape[0], new_image.shape[1], 1)
    return new_image
  
  def read_convert_image(self, image):
    im = Image.fromarray(image)
    im = im.resize([64, 64])
    new_image = np.asarray(im)
    new_image = new_image.reshape(new_image.shape[0], new_image.shape[1], 1)
    return new_image

In [0]:
class GetCharList:
  
  def __init__(self):
    self.train = ReadData('./HWDB1.1trn_gnt/', is_train=True)
    self.test = ReadData('./HWDB1.1tst_gnt/', is_train=False)

  def generate_char_list(self):
    if os.path.isfile('char_list'):
      with open('char_list', 'rb') as f:
        char_list = pickle.load(f)
        print ('Char list loaded')
        return char_list
    else:
      char_list = []
      for _, tagcode in self.train.read_from_dir(gnt_dir='./HWDB1.1trn_gnt/'):
        char_list.append(tagcode)
      with open('char_list', 'wb') as f:
        pickle.dump(char_list, f)
        print ('Char list generated')
      return char_list

## VGG model

In [0]:
def convLayer(x, kHeight, kWidth, strideX, strideY,
              featureNum, name, padding = "SAME"):
  channel = int(x.get_shape()[-1])
  with tf.variable_scope(name) as scope:
    w = tf.get_variable("w", shape = [kHeight, kWidth, channel, featureNum])
    b = tf.get_variable("b", shape = [featureNum])
    featureMap = tf.nn.conv2d(x, w, strides = [1, strideY, strideX, 1], padding = padding)
    out = tf.nn.bias_add(featureMap, b)
    return tf.nn.relu(tf.reshape(out, featureMap.get_shape().as_list()), name = scope.name)

def fcLayer(x, inputD, outputD, reluFlag, name):
  # Fully connect layer
  with tf.variable_scope(name) as scope:
    w = tf.get_variable("w", shape = [inputD, outputD], dtype = "float")
    b = tf.get_variable("b", [outputD], dtype = "float")
    out = tf.nn.xw_plus_b(x, w, b, name = scope.name)
    if reluFlag:
      return tf.nn.relu(out)
    else:
      return out

def maxPoolLayer(x, kHeight, kWidth, strideX, strideY, name, padding = "SAME"):
  return tf.nn.max_pool(x, ksize = [1, kHeight, kWidth, 1],
                        strides = [1, strideX, strideY, 1], 
                        padding = padding, name = name)

def dropout(x, keepPro, name = None):
  return tf.nn.dropout(x, keepPro, name)

In [0]:
class VGG19(object):
  def __init__(self, x, keepPro, classNum, skip, modelPath = "./vgg19.npy"):
    self.X = x
    self.KEEPPRO = keepPro
    self.CLASSNUM = classNum
    self.SKIP = skip
    self.MODELPATH = modelPath
    self.build_model()

  def build_model(self):
    conv1_1 = convLayer(self.X, 3, 3, 1, 1, 128, "conv1_1" )
    conv1_2 = convLayer(conv1_1, 3, 3, 1, 1, 128, "conv1_2")
    pool1 = maxPoolLayer(conv1_2, 2, 2, 2, 2, "pool1")

    conv2_1 = convLayer(pool1, 3, 3, 1, 1, 64, "conv2_1")
    conv2_2 = convLayer(conv2_1, 3, 3, 1, 1, 64, "conv2_2")
    pool2 = maxPoolLayer(conv2_2, 2, 2, 2, 2, "pool2")

    conv3_1 = convLayer(pool2, 3, 3, 1, 1, 64, "conv3_1")
    conv3_2 = convLayer(conv3_1, 3, 3, 1, 1, 64, "conv3_2")
    conv3_3 = convLayer(conv3_2, 3, 3, 1, 1, 64, "conv3_3")
    conv3_4 = convLayer(conv3_3, 3, 3, 1, 1, 64, "conv3_4")
    pool3 = maxPoolLayer(conv3_4, 2, 2, 2, 2, "pool3")

    conv4_1 = convLayer(pool3, 3, 3, 1, 1, 64, "conv4_1")
    conv4_2 = convLayer(conv4_1, 3, 3, 1, 1, 64, "conv4_2")
    conv4_3 = convLayer(conv4_2, 3, 3, 1, 1, 64, "conv4_3")
    conv4_4 = convLayer(conv4_3, 3, 3, 1, 1, 64, "conv4_4")
    pool4 = maxPoolLayer(conv4_4, 2, 2, 2, 2, "pool4")

    conv5_1 = convLayer(pool4, 3, 3, 1, 1, 64, "conv5_1")
    conv5_2 = convLayer(conv5_1, 3, 3, 1, 1, 64, "conv5_2")
    conv5_3 = convLayer(conv5_2, 3, 3, 1, 1, 64, "conv5_3")
    conv5_4 = convLayer(conv5_3, 3, 3, 1, 1, 64, "conv5_4")
    pool5 = maxPoolLayer(conv5_4, 2, 2, 2, 2, "pool5")

    fcIn = tf.reshape(pool5, [-1, 64*64*1])
    fc6 = fcLayer(fcIn, 64*64*1, 4096, True, "fc6")
    dropout1 = dropout(fc6, self.KEEPPRO)

    fc7 = fcLayer(dropout1, 4096, 4096, True, "fc7")
    dropout2 = dropout(fc7, self.KEEPPRO)

    self.fc8 = fcLayer(dropout2, 4096, self.CLASSNUM, True, "fc8")

  def load_model(self, sess):
    wDict = np.load(self.MODELPATH, encoding = "bytes").item()
    for name in wDict:
      if name not in self.SKIP:
        with tf.variable_scope(name, reuse = True):
          for p in wDict[name]:
            # Bias / weight
            if len(p.shape) == 1:
              sess.run(tf.get_variable('b', trainable = False).assign(p))
            else:
              sess.run(tf.get_variable('w', trainable = False).assign(p))

## Train model

In [0]:
# Initialize

classNum = 3755
skip = []
chars = GetCharList()
chars.test.use_rotation = False
chars.test.use_filter = False

with open('char_list', 'rb') as f:
  char_list = pickle.load(f)

In [0]:
model = VGG19.build_model()

with tf.Session() as sess:
  sess.run(tf.global_variables_initializer())
  model.load_model(sess)

## Test model