In [1]:
class Config:
    def __init__(self):
        self.BATCH_SIZE = 100
        self.EPOCHS = 15
        self.NUM_OF_INPUTS = 3000

        self.NUM_CHARACTER_LEVEL_LAYERS = 4
        self.NUM_WORD_LEVEL_LAYERS = 12

        self.CHARACTER_LEVEL_D_MODEL = 64
        self.WORD_LEVEL_D_MODEL = 256

        self.NUM_HEADS = 3
        self.DFF = 256

        self.MAX_WORD_LENGTH = 16
        self.MAX_SENTENCE_LENGTH = 64

        self.VOCAB_SIZE = 20000
        self.CHARACTER_VOCAB_SIZE = 3000


config = Config()

In [2]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

2024-04-18 12:09:59.691906: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-04-18 12:10:00.016227: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# Global Attention and FeedForward

In [3]:
class BaseAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
        self.layernorm = tf.keras.layers.LayerNormalization()
        self.add = tf.keras.layers.Add()

In [4]:
class GlobalSelfAttention(BaseAttention):
    def call(self, inputs):
        x, mask = inputs
        attn_output = self.mha(query=x, value=x, key=x, attention_mask=mask)
        x = self.add([x, attn_output])
        x = self.layernorm(x)
        return x

In [5]:
class FeedForward(tf.keras.layers.Layer):
    def __init__(self, d_model, dff, dropout_rate=0.1):
        super().__init__()
        self.seq = tf.keras.Sequential([
            tf.keras.layers.Dense(dff, activation='relu'),
            tf.keras.layers.Dense(d_model),
            tf.keras.layers.Dropout(dropout_rate)
        ])
        self.add = tf.keras.layers.Add()
        self.layer_norm = tf.keras.layers.LayerNormalization()

    def call(self, x):
        x = self.add([x, self.seq(x)])
        x = self.layer_norm(x)
        return x

# Positional Embedding layer

In [6]:
def positional_encoding(length, depth):
    depth = depth / 2

    positions = np.arange(length)[:, np.newaxis]
    depths = np.arange(depth)[np.newaxis, :] / depth

    angle_rates = 1 / (10000 ** depths)
    angle_rads = positions * angle_rates

    pos_encoding = np.concatenate([np.sin(angle_rads), np.cos(angle_rads)], axis=-1)

    return tf.cast(pos_encoding, dtype=tf.float32)


class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.d_model = d_model
        self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True)
        self.pos_encoding = positional_encoding(length=2048, depth=d_model)

    def compute_mask(self, *args, **kwargs):
        return self.embedding.compute_mask(*args, **kwargs)

    def call(self, x):
        length = tf.shape(x)[1]
        x = self.embedding(x)

        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = x + self.pos_encoding[tf.newaxis, :length, :]
        return x

# Encoder layers

In [7]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1, name='EncoderLayer'):
        super().__init__()

        self.self_attention = GlobalSelfAttention(
            num_heads=num_heads, key_dim=d_model, dropout=dropout_rate, name=name
        )

        self.ffn = FeedForward(d_model, dff)

    def call(self, inputs):
        x = self.self_attention(inputs)
        x = self.ffn(x)
        return x


In [8]:
def generate_sequence(actual_length, MAX_LENGTH):
    one_tensor_length = tf.cast(tf.cast(actual_length, tf.float32) * 0.85, tf.int64)

    masked_sequence = tf.random.shuffle(tf.sequence_mask(one_tensor_length, maxlen=actual_length, dtype=tf.int64))

    padded_sequence = tf.pad(masked_sequence, paddings=[[0, MAX_LENGTH - tf.size(masked_sequence)]], mode='CONSTANT',
                             constant_values=0.0)
    print("output", padded_sequence.shape)
    return padded_sequence


def create_random_mask(input_tokens, actual_lengths):
    # input_tokens has shape (64,16,10)
    # actual_lengths has shape (64,)
    batch_size, sentence_len = input_tokens.shape[0], input_tokens.shape[1]
    
    mask = tf.map_fn(lambda length: generate_sequence(length, sentence_len), actual_lengths, dtype=tf.int64) # return (64,None)
    mask = tf.expand_dims(mask, axis=1) # return (64,1,None)
    
    return mask


def create_padding_mask(input_tokens, actual_lengths):
    batch_size, sentence_len = tf.shape(input_tokens)[0], tf.shape(input_tokens)[1]

    mask = tf.sequence_mask(actual_lengths, maxlen=sentence_len, dtype=tf.float32)

    mask = tf.expand_dims(mask, axis=1)

    return mask


class Encoder(tf.keras.layers.Layer):
    def __init__(
            self, *, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate=0.1, masked=False,
            name='Encoder'
    ):
        super().__init__()

        self.masked = masked
        self.d_model = d_model
        self.num_layers = num_layers
        self.enc_layers = [
            EncoderLayer(
                d_model=d_model, num_heads=num_heads, dff=dff,
                dropout_rate=dropout_rate,
                name=f'encoder_layer_{i + 1}'
            )
            for i in range(num_layers)
        ]
        # self.dropout = tf.keras.layers.Dropout(dropout_rate)

    def call(self, inputs):
        x, lengths = inputs

        mask = None
        if self.masked is True:
            mask = create_random_mask(x, lengths)
            print("Random Mask shape: ", mask.shape)
        else:
            mask = create_padding_mask(x, lengths)
            print(f"Padding Mask shape: ", mask.shape)

        for i in range(self.num_layers):
            inputs = self.enc_layers[i]((x, mask))

        return inputs  # Shape (batch_size, seq_len, d_model).

# Hierarchical Transformer Encoder model

In [9]:
class HierarchicalTransformerEncoder(tf.keras.models.Model):
    def __init__(self, *, num_character_level_layers, num_word_level_layers,
                 character_level_d_model, word_level_d_model, num_heads, dff,
                 max_word_length, max_sentence_length,
                 vocab_size, character_vocab_size, dropout_rate=0.1):
        super().__init__()

        self.word_pos_embedding = PositionalEmbedding(vocab_size=vocab_size,
                                                      d_model=word_level_d_model)
        self.character_pos_embedding = PositionalEmbedding(vocab_size=character_vocab_size,
                                                           d_model=character_level_d_model)

        self.character_level_encoder = Encoder(num_layers=num_character_level_layers,
                                               d_model=character_level_d_model,
                                               num_heads=num_heads, dff=dff,
                                               vocab_size=character_vocab_size,
                                               dropout_rate=dropout_rate,
                                               name='character_level_encoder', masked=True)

        self.flatten_layer = tf.keras.layers.Flatten(input_shape=(max_word_length, character_level_d_model),
                                                     name='flatten_character_level_encoders')
        self.linear_layer = tf.keras.layers.Dense(units=word_level_d_model, activation=None,
                                                  name='linear_character_level_encoders')

        self.combined_layer = tf.keras.layers.Concatenate(axis=-1, name='combined_character_and_word_level_encoders')

        self.word_level_encoder = Encoder(num_layers=num_word_level_layers,
                                          d_model=(word_level_d_model * 2),
                                          num_heads=num_heads, dff=dff,
                                          vocab_size=vocab_size,
                                          dropout_rate=dropout_rate,
                                          name='word_level_encoder')

        self.correction_layer = tf.keras.layers.Dense(vocab_size, activation='softmax', name='correction_layer', )

    def call(self, inputs):
        print("Start")
        (word_level_inputs, sentences_lengths), (character_level_inputs, words_lengths) = inputs

        word_embedding_outputs = self.word_pos_embedding(word_level_inputs)  # (batch_size, max_len, word_level_d_model)
        print("Shape của word_embedding_outputs:", word_embedding_outputs.shape)

        character_level_encoder_outputs = tf.map_fn(
            lambda sentence: self.character_level_encoder(
                (self.character_pos_embedding(sentence[0]), sentence[1])),
            (character_level_inputs, words_lengths),
            dtype=tf.float32
        )
        # (batch_size, max_sen_len, max_word_length, character_level_d_model)

        character_level_encoder_outputs = tf.map_fn(
            lambda sentence: tf.map_fn(
                lambda word: tf.squeeze(self.linear_layer(self.flatten_layer(tf.expand_dims(word, axis=0))), axis=0),
                sentence,
                dtype=tf.float32
            ),
            character_level_encoder_outputs,
            dtype=tf.float32)
        # (batch_size, max_sen_len, word_level_d_model)
        print("Shape của character_level_encoder_outputs:", character_level_encoder_outputs.shape)

        concat_output = self.combined_layer([word_embedding_outputs, character_level_encoder_outputs])
        # (batch_size, max_sen_len, word_level_d_model * 2)
        print("Shape của concat_output:", concat_output.shape)

        word_level_output = self.word_level_encoder((concat_output, sentences_lengths))
        # (batch_size, max_sen_len, word_level_d_model + character_level_d_model)
        print("Shape của word_level_output:", word_level_output.shape)

        correction_output = self.correction_layer(word_level_output)  # (batch_size, max_sen_len, vocab_size)
        print("Shape của correction_output:", correction_output.shape)

        # detection_output = tf.squeeze(self.detection_layer(word_level_output), axis=-1)  # (batch_size, max_sen_len)
        # print("Shape của detection_output:", detection_output.shape)
        print("End")
        return correction_output


@tf.function
def training_step(model, optimizer, x, y):
    with tf.GradientTape() as tape:
        correction_output = model(x)
        loss1 = correction_loss(y, correction_output)
        # loss2 = detection_loss(y[1], detection_output)
        total_loss = loss1

    gradients = tape.gradient(total_loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    return total_loss


def correction_loss(true_outputs, pred_outputs):
    softmax_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)(true_outputs, pred_outputs)
    return softmax_loss


def detection_loss(true_detection_infos, pred_detection_infos):
    sigmoid_loss = tf.keras.losses.BinaryCrossentropy(from_logits=False)(true_detection_infos, pred_detection_infos)
    return sigmoid_loss

# Data preprocessing

In [10]:
data = pd.read_csv('data/vi_processed.csv')

input_sentences = []

for index, row in data.iterrows():
    if len(input_sentences) == config.NUM_OF_INPUTS: break
    input_sentences.append(row.correct_text)

correct_texts = input_sentences[:config.NUM_OF_INPUTS]

In [11]:
word_level_tokenizer = Tokenizer(num_words=config.VOCAB_SIZE, oov_token='<UNK>', lower=True, split=' ', )

character_level_tokenizer = Tokenizer(num_words=config.CHARACTER_VOCAB_SIZE, lower=True, char_level=True)

In [12]:
word_level_tokenizer.fit_on_texts(input_sentences)
character_level_tokenizer.fit_on_texts(input_sentences)

In [13]:
input_sequences = word_level_tokenizer.texts_to_sequences(input_sentences)

# Get character-level words lengths.
input_words_lengths = []

# Get character-level sequences.
character_level_input_sequences = []

for sequence in input_sequences:
    character_level_input_sequence = []
    words_lengths = []
    for word_token in sequence:
        word = word_level_tokenizer.index_word[word_token]
        word = character_level_tokenizer.texts_to_sequences(word)
        word_chars = [each[0] for each in word]
        character_level_input_sequence.append(word_chars)
        words_lengths.append((len(word_chars) if len(word_chars) <= config.MAX_WORD_LENGTH
                              else config.MAX_WORD_LENGTH))

    # Add padding for each word.
    character_level_input_sequence = pad_sequences(character_level_input_sequence, maxlen=config.MAX_WORD_LENGTH,
                                                   padding='post', truncating='post')

    character_level_input_sequences.append(character_level_input_sequence)

    input_words_lengths.append(words_lengths)

# Get word-level sentences lengths.
input_sentences_lengths = []
for sequence in input_sequences: input_sentences_lengths.append(
    (len(sequence) if len(sequence) <= config.MAX_SENTENCE_LENGTH
     else config.MAX_SENTENCE_LENGTH))

# Add padding for each.
input_sequences = pad_sequences(input_sequences, maxlen=config.MAX_SENTENCE_LENGTH, padding='post', truncating='post')

character_level_input_sequences = pad_sequences(character_level_input_sequences, maxlen=config.MAX_SENTENCE_LENGTH,
                                                padding='post', truncating='post')
input_words_lengths = pad_sequences(input_words_lengths, maxlen=config.MAX_SENTENCE_LENGTH, padding='post',
                                    truncating='post')

input_sequences_np = np.array(input_sequences)
character_level_input_sequences_np = np.array(character_level_input_sequences)

input_words_lengths_np = np.array(input_words_lengths)
input_sentences_lengths_np = np.array(input_sentences_lengths)

In [14]:
character_level_input_sequences_np.shape

(3000, 64, 16)

# Model building

In [15]:
model = HierarchicalTransformerEncoder(num_character_level_layers=config.NUM_CHARACTER_LEVEL_LAYERS,
                                       num_word_level_layers=config.NUM_WORD_LEVEL_LAYERS,
                                       character_level_d_model=config.CHARACTER_LEVEL_D_MODEL,
                                       word_level_d_model=config.WORD_LEVEL_D_MODEL,
                                       num_heads=config.NUM_HEADS, dff=config.DFF,
                                       max_word_length=config.MAX_WORD_LENGTH,
                                       max_sentence_length=config.MAX_SENTENCE_LENGTH,
                                       vocab_size=config.VOCAB_SIZE,
                                       character_vocab_size=config.CHARACTER_VOCAB_SIZE)

  super().__init__(**kwargs)


In [16]:
from CustomSchedule import CustomSchedule

learning_rate = CustomSchedule(config.WORD_LEVEL_D_MODEL)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

In [None]:
from tqdm import tqdm

for e in range(config.EPOCHS):
    pbar = tqdm(range(0, len(input_sequences_np), config.BATCH_SIZE))

    for i in pbar:
        input_batch = [
            [input_sequences_np[i:i + config.BATCH_SIZE], input_sentences_lengths_np[i:i + config.BATCH_SIZE]],
            [character_level_input_sequences_np[i:i + config.BATCH_SIZE],
             input_words_lengths_np[i:i + config.BATCH_SIZE]]
        ]
        output_batch = input_sequences_np[i:i + config.BATCH_SIZE]

        total_loss = training_step(model, optimizer, input_batch, output_batch)

        pbar.set_description(f'Epoch {e + 1}/{config.EPOCHS}')
        pbar.set_postfix_str(f'loss: {total_loss:.4f}')
        pbar.refresh()


  0%|          | 0/30 [00:00<?, ?it/s]

Start
Shape của word_embedding_outputs: (100, 64, 256)
Instructions for updating:
Use fn_output_signature instead
output (None,)
Random Mask shape:  (64, 1, None)




output (None,)
Random Mask shape:  (64, 1, None)
Shape của character_level_encoder_outputs: (100, 64, 256)
Shape của concat_output: (100, 64, 512)
Padding Mask shape:  (100, 1, 64)
Padding Mask shape:  (100, 1, 64)
Shape của word_level_output: (100, 64, 512)
Shape của correction_output: (100, 64, 20000)
End
Start
Shape của word_embedding_outputs: (100, 64, 256)
output (None,)
Random Mask shape:  (64, 1, None)
Shape của character_level_encoder_outputs: (100, 64, 256)
Shape của concat_output: (100, 64, 512)
Padding Mask shape:  (100, 1, 64)
Shape của word_level_output: (100, 64, 512)
Shape của correction_output: (100, 64, 20000)
End




Start
Shape của word_embedding_outputs: (100, 64, 256)
output (None,)
Random Mask shape:  (64, 1, None)
Shape của character_level_encoder_outputs: (100, 64, 256)
Shape của concat_output: (100, 64, 512)
Padding Mask shape:  (100, 1, 64)
Shape của word_level_output: (100, 64, 512)
Shape của correction_output: (100, 64, 20000)
End


Epoch 1/15:  33%|███▎      | 10/30 [01:08<01:44,  5.24s/it, loss: 9.6960]

In [None]:
# model.fit(
#     [[input_sequences_np, input_sentences_lengths_np], [character_level_input_sequences_np, input_words_lengths_np]],
#     [output_sequences_np, correct_infos_np], epochs=config.EPOCHS,
#     batch_size=config.BATCH_SIZE)

In [None]:
test_sentence_inputs = ['hom nay troi dep qua']
test_word_level_sequences = word_level_tokenizer.texts_to_sequences(test_sentence_inputs)
test_sentence_lengths = [(len(sentence) if len(sentence) < config.MAX_SENTENCE_LENGTH else config.MAX_SENTENCE_LENGTH)
                         for sentence in test_word_level_sequences]

test_output = model.predict([[input_sequences_np[:100], input_sentences_lengths_np[:100]],
                             [character_level_input_sequences_np[:100], input_words_lengths_np[:100]]])

for sentence in test_output[10:11]:
    out = ''
    for word in sentence:
        index = tf.argmax(word, axis=0).numpy()
        print(index)
        word_str = word_level_tokenizer.index_word.get(index)
        if word_str is not None:
            out += word_str + ' '
        else:
            out += '<UNK> '
    print(out)
print(tf.argmax(test_output[0][10][5], axis=0).numpy())


In [None]:
test_sentence_inputs = [
    'việc điều khiển xe sẽ trở nên rễ dàng hơn nếu các bánh xe được đặt theo một góc chính xác theo yêu cầu thiết kế, việc đặt saii góc sẽ tạo ra tính ổn định khi lái kém và khiến lốp xe nhanh mòn.']

test_word_level_sequences = word_level_tokenizer.texts_to_sequences(test_sentence_inputs)
test_sentence_lengths = [(len(sentence) if len(sentence) < config.MAX_SENTENCE_LENGTH else config.MAX_SENTENCE_LENGTH)
                         for sentence in test_word_level_sequences]
test_word_level_sequences = pad_sequences(test_word_level_sequences, maxlen=config.MAX_SENTENCE_LENGTH, padding='post',
                                          truncating='post')

character_level_tokenizer.fit_on_texts(test_sentence_inputs)
word_unk_level_tokenizer.fit_on_texts(test_sentence_inputs)
test_unk_input_sequences = word_unk_level_tokenizer.texts_to_sequences(test_sentence_inputs)

test_character_level_input_sequences = []
test_words_lengths = []
for sequence in test_unk_input_sequences:
    character_level_input_sequence = []
    words_lengths = []
    for word_token in sequence:
        word = word_unk_level_tokenizer.index_word[word_token]
        word = character_level_tokenizer.texts_to_sequences(word)
        word_chars = [each[0] for each in word]
        character_level_input_sequence.append(word_chars)
        words_lengths.append((len(word_chars) if len(word_chars) <= config.MAX_WORD_LENGTH
                              else config.MAX_WORD_LENGTH))

    character_level_input_sequence = pad_sequences(character_level_input_sequence, maxlen=config.MAX_WORD_LENGTH,
                                                   padding='post', truncating='post')

    test_character_level_input_sequences.append(character_level_input_sequence)

    test_words_lengths.append(words_lengths)

test_character_level_input_sequences = pad_sequences(test_character_level_input_sequences,
                                                     maxlen=config.MAX_SENTENCE_LENGTH,
                                                     padding='post', truncating='post')
test_words_lengths = pad_sequences(test_words_lengths, maxlen=config.MAX_SENTENCE_LENGTH, padding='post',
                                   truncating='post')

In [None]:
test_character_level_input_sequences = np.array(test_character_level_input_sequences)
test_words_lengths = np.array(test_words_lengths)
test_sentence_lengths = np.array(test_sentence_lengths)
test_word_level_sequences = np.array(test_word_level_sequences)

In [None]:
test_outputs = model.predict(
    [[test_word_level_sequences, test_sentence_lengths], [test_character_level_input_sequences, test_words_lengths]])

In [None]:
for sentence in test_outputs[0]:
    out = ''
    for word in sentence:
        index = tf.argmax(word, axis=0).numpy()
        word_str = word_level_tokenizer.index_word.get(index)
        if word_str is not None:
            out += word_str + ' '
        else:
            out += '<UNK> '
    print(out)

In [None]:
test_outputs[1]

In [None]:
output_sequences_np[1]

In [None]:
input_sequences_np[1]