In [1]:
from PIL import ImageFont, ImageDraw, Image
from fontTools.ttLib import TTFont
from macrotoolchain import Data, Graph, plot 

import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split

import time

In [2]:
class Glyph(object):
    # transform character to bitmap
    def __init__(self, fonts, size=64):
        # load fonts, size. We will use 2 fonts for all CJK characters, so keep 2 codepoint books.
        self.codepoints = [set()] * len(fonts)
        self.size = int(size * 0.8)
        self.size_img = size
        self.pad = (size - self.size) // 2
        self.fonts = [ImageFont.truetype(f, self.size) for f in fonts]
        # use a cache to reduce computation if duplicated characters encountered.
        self.cache = {}
        for cp, font in zip(self.codepoints, fonts):
            font = TTFont(font)
            # store codepoints in font cmap into self.codepoints
            for cmap in font['cmap'].tables:
                if not cmap.isUnicode():
                    continue
                for k in cmap.cmap:
                    cp.add(k)
    
    def draw(self, ch):
        if ch in self.cache:
            return self.cache[ch]
        # search among fonts, use the first found
        exist = False
        for i in range(len(self.codepoints)):
            if ord(ch) in self.codepoints[i]:
                font = self.fonts[i]
                exist = True
                break
        if not exist:
            return None

        img = Image.new('L', (self.size_img, self.size_img), 0)
        draw = ImageDraw.Draw(img)
        (width, baseline), (offset_x, offset_y) = font.font.getsize(ch)
        draw.text((self.pad - offset_x, self.pad - offset_y + 4), ch, font=font, fill=255, stroke_fill=255) 
        img_array = np.array(img.getdata(), dtype='float32').reshape((self.size_img, self.size_img)) / 255
        self.cache[ch] = img_array

        return img_array

In [3]:
glyphbook = Glyph(['data/fonts/HanaMinA.otf', 'data/fonts/HanaMinB.otf'])

In [4]:
code_chart = pd.read_csv('data/cangjie6.txt', delimiter='\t', header=None, names=['Char', 'Code'], 
                        keep_default_na=False)

In [5]:
def preprocess_chart(chart):
    glyphs = []
    codes = []
    for char, code in chart.values:
        glyph = glyphbook.draw(char)
        if glyph is not None:
            glyphs.append(glyph)
            codes.append(code)
    return np.expand_dims(np.array(glyphs), -1), np.array(codes)

In [6]:
def tokenizer(code_table):
    # Cangjie code consists only of a-z, with maximum length of 5, minimum of 1
    # start with 0, a-z are 1-26, end and padding are 27
    tokens = np.expand_dims(np.zeros(code_table.shape, dtype='int64'), -1)
    code_index = list(map(lambda x: list(map(lambda y: ord(y) - 96, list(x))) + [27] * (5-len(x)), code_table))
    tokens = np.append(tokens, np.array(code_index), axis=-1)
    return tokens

In [7]:
glyphs, codes = preprocess_chart(code_chart)
del code_chart

In [8]:
train_glyphs, validation_glyphs, train_tokens, validation_tokens = train_test_split(
    glyphs, tokenizer(codes), test_size=0.1)
del glyphs, codes

In [9]:
class CNN_Encoder(tf.keras.Model):
    # This is essentially a CNN layer, 
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        self.cnn1 = tf.keras.layers.Convolution2D(8, (5, 5), input_shape=(64, 64, 1))
        self.pool1 = tf.keras.layers.MaxPool2D((2, 2))
        self.cnn2 = tf.keras.layers.Convolution2D(32, (5, 5))
        self.pool2 = tf.keras.layers.MaxPool2D((2, 2))
        self.cnn3 = tf.keras.layers.Convolution2D(128, (5, 5))
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        # x shape after cnn1 == (batch_size, 60, 60, 8)
        x = self.cnn1(x)
        # x shape after pool1 == (batch_size, 30, 30, 8)
        x = self.pool1(x)
        # x shape after cnn2 == (batch_size, 26, 26, 64)
        x = self.cnn2(x)
        # x shape after pool2 == (batch_size, 13, 13, 64)
        x = self.pool2(x)
        # x shape after cnn3 == (batch_size, 9, 9, 256)
        x = self.cnn3(x)
        # reshape from (batch_size, 9, 9, 256) to (batch_size, 81, 256)
        x = tf.reshape(x, [x.shape[0], -1, x.shape[-1]])
        # x shape after fc == (batch_size, 81, embedding_dim)
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

In [10]:
class Bahdanau_Attention(tf.keras.Model):
    def __init__(self, hidden_size):
        super(Bahdanau_Attention, self).__init__()
        self.W1 = tf.keras.layers.Dense(hidden_size)
        self.W2 = tf.keras.layers.Dense(hidden_size)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        # features(CNN_Encoder output) shape == (batch_size, 81, embedding_dim)

        # hidden shape == (batch_size, hidden_size)
        # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
        hidden_with_time_axis = tf.expand_dims(hidden, 1)

        # score shape == (batch_size, 81, hidden_size)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))

        # attention_weights shape == (batch_size, 81, 1)
        # you get 1 at the last axis because you are applying score to self.V
        attention_weights = tf.nn.softmax(self.V(score), axis=1)

        # context_vector shape after sum == (batch_size, embedding_dim)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

In [11]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, hidden_size, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.hidden_size,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size)

        self.attention = Bahdanau_Attention(hidden_size)

    def call(self, x, features, hidden):
        # defining attention as a separate model
        context_vector, attention_weights = self.attention(features, hidden)

        # x shape before is (batch_size, 1) since it is passed through one by one at a time
        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)
        # context_vector shape is (batch_size, embedding_dim)
        # x shape after concatenation == (batch_size, 1, embedding_dim + embedding_dim)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        # output shape is (batch_size, 1, hidden_size)
        # state is new hidden used in next step
        output, state = self.gru(x)
        # x shape == (batch_size, hidden_size)
        x = tf.reshape(x, (-1, x.shape[2]))
        # x shape == (batch_size, vocab)
        x = self.fc(x)

        return x, state, attention_weights

    def reset_state(self, batch_size):
        # generate new hidden layer with different batch size
        return tf.zeros((batch_size, self.hidden_size))

In [12]:
encoder = CNN_Encoder(embedding_dim=128)
decoder = RNN_Decoder(embedding_dim=128, hidden_size=128, vocab_size=28)

In [13]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

def accuracy_function(real, pred):
    accuracy = tf.math.reduce_all(pred == real, 1)
    return tf.math.reduce_mean(tf.cast(accuracy, tf.float32))

In [14]:
@tf.function
def predict(glyph_tensor, max_length):
    features = encoder(glyph_tensor)
    # start with 0
    dec_input = tf.convert_to_tensor([[0]]*glyph_tensor.shape[0], dtype='int64')
    hidden = decoder.reset_state(batch_size=glyph_tensor.shape[0])
    # iterate predictions, no teacher forcing here
    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(tf.expand_dims(dec_input[:, i], 1), features, hidden)
        # we need deterministic result
        predicted_id = tf.math.argmax(predictions, axis=-1)
        dec_input = tf.concat([dec_input, tf.expand_dims(predicted_id, -1)], axis=1)

    return dec_input

In [15]:
@tf.function
def train_step(glyph_tensor, target):
    loss = 0

    # initializing the hidden state for each batch
    # because the codes are not related from glyph to glyph
    hidden = decoder.reset_state(batch_size=target.shape[0])
    dec_input = tf.expand_dims(target[:, 0], 1)

    with tf.GradientTape() as tape:
        features = encoder(glyph_tensor)

        for i in range(1, target.shape[1]):
            # passing the features through the decoder
            predictions, hidden, attention_weights = decoder(dec_input, features, hidden)
            loss += loss_function(target[:, i], predictions)
            # using teacher forcing
            dec_input = tf.expand_dims(target[:, i], 1)

    # calculate accuracy based on the code's whole string
    predictions = predict(glyph_tensor, target.shape[1] - 1)
    accuracy = accuracy_function(predictions, target)

    trainable_variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, trainable_variables)
    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return loss / int(target.shape[1] - 1), accuracy

In [16]:
@tf.function
def validation_step(glyph_tensor, target):
    val_loss = 0
    hidden = decoder.reset_state(batch_size=target.shape[0])
    features = encoder(glyph_tensor)
    dec_input = tf.expand_dims(target[:, 0], 1)
    
    for i in range(1, target.shape[1]):
        predictions, hidden, attention_weights = decoder(dec_input, features, hidden)
        val_loss += loss_function(target[:, i], predictions)
        # using teacher forcing
        dec_input = tf.expand_dims(target[:, i], 1)
    
    # calculate accuracy based on the code's whole string
    val_validations = predict(glyph_tensor, target.shape[1] - 1)
    val_accuracy = accuracy_function(val_validations, target)
    
    return val_loss / int(target.shape[1]-1), val_accuracy

In [17]:
BATCH_SIZE = 128
dataset = tf.data.Dataset.from_tensor_slices((train_glyphs, train_tokens))
dataset = dataset.shuffle(2048).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [18]:
# use a checkpoint to store weights
checkpoint_path = "./checkpoints/train"
ckpt = tf.train.Checkpoint(encoder=encoder, decoder=decoder, optimizer = optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

In [None]:
history = {'loss': [], 'accuracy': [], 'val_loss': [], 'val_accuracy': []}
EPOCHS = 200
num_steps = len(train_glyphs) // BATCH_SIZE

start_epoch = 0
if ckpt_manager.latest_checkpoint:
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])

for epoch in range(start_epoch, EPOCHS):
    start = time.time()
    total_loss = 0

    for (batch, (glyph_tensor, target)) in enumerate(dataset):
        t_loss, accuracy = train_step(glyph_tensor, target)
        total_loss += t_loss
        print(f'Epoch {epoch + 1}, progression {batch / num_steps:.1%}, time elapsed {time.time() - start:.2f} sec', end='\r')
        
    if epoch % 5 == 0:
        ckpt_manager.save()
    
    val_loss, val_accuracy = validation_step(validation_glyphs, validation_tokens)
    # storing the epoch end loss value to plot later
    history['loss'].append(total_loss / num_steps)
    history['accuracy'].append(accuracy)
    history['val_loss'].append(val_loss)
    history['val_accuracy'].append(val_accuracy)

    print (f'Epoch {epoch + 1}, Loss {total_loss/num_steps:.6f}, Accuracy {accuracy:.2%}; Validation Loss {val_loss:.6f}, Validation Accuracy {val_accuracy:.2%}\n')


Epoch 1, progression 100.0%, time elapsed 343.30 sec

In [None]:
accuracyData = Data(np.arange(1, len(history['accuracy'])+1), history['accuracy'], 'Training Accuracy')
valAccuracyData = Data(np.arange(1, len(history['val_accuracy'])+1), history['val_accuracy'],
                       'Validation Accuracy')
plot(Graph(accuracyData), Graph(valAccuracyData), ytickformat='.2%')

In [None]:
def evaluate(word):
    test_input = []
    for char in word:
        glyph = glyphbook.draw(char)
        if glyph is not None:
            test_input.append(glyph)
        else:
            raise ValueError(f'Character {char} unsupported.')
    test_input = tf.expand_dims(test_input, -1)
    test_result = predict(test_input, 5)

    def decode(indexes):
        code = ''
        for i in indexes:
            if i <= 0:
                continue
            elif i >= 27:
                break
            else:
                code += chr(i + 96)
        return code

    return np.apply_along_axis(decode, 1, test_result.numpy())

In [None]:
evaluate('劉運操')

正確倉頡碼：hhcln, ybjj, qrrd