In [14]:
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, Dense, LayerNormalization, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.layers import TextVectorization
import numpy as np

def custom_standardization(text):
    text = tf.strings.lower(text)
    text = tf.strings.regex_replace(text, r"[^a-z0-9\s\[\]]", "")
    return text
def load_and_prepare_data(input_file, label_file, max_samples=5000, max_len=40):
    with open(input_file, 'r', encoding='utf-8') as f:
        input_texts = f.readlines()
    with open(label_file, 'r', encoding='utf-8') as f:
        label_texts = f.readlines()

    input_texts = [f"[sos] {line.strip()} [eos]" for line in input_texts[:max_samples]]
    label_texts = [f"[sos] {line.strip()} [eos]" for line in label_texts[:max_samples]]
    return input_texts, label_texts

def create_text_vectorizer(texts, max_tokens=10000, max_len=40):
    vectorizer = TextVectorization(
        max_tokens=max_tokens,
        output_sequence_length=max_len,
        standardize=custom_standardization
    )
    vectorizer.adapt(texts)
    return vectorizer

def create_padding_mask(seq):
    mask = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return mask[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

class PositionalEncoding(Layer):
    def __init__(self, max_len, d_model):
        super().__init__()
        self.pos_encoding = self.positional_encoding(max_len, d_model)

    def get_angles(self, position, i, d_model):
        angles = 1 / tf.pow(10000.0, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
        return position * angles

    def positional_encoding(self, max_len, d_model):
        angle_rads = self.get_angles(
            tf.range(max_len, dtype=tf.float32)[:, tf.newaxis],
            tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
            d_model
        )
        sines = tf.math.sin(angle_rads[:, 0::2])
        cosines = tf.math.cos(angle_rads[:, 1::2])
        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return tf.cast(pos_encoding, tf.float32)

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

class MultiHeadAttentionLayer(Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.num_heads = num_heads
        self.d_model = d_model
        self.depth = d_model // num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)
        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def scaled_dot_product_attention(self, q, k, v, mask):
        matmul_qk = tf.matmul(q, k, transpose_b=True)
        dk = tf.cast(tf.shape(k)[-1], tf.float32)
        scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
        if mask is not None:
            scaled_attention_logits += (mask * -1e9)
        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
        output = tf.matmul(attention_weights, v)
        return output, attention_weights

    def call(self, v, k, q, mask=None):
        batch_size = tf.shape(q)[0]
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        scaled_attention, attention_weights = self.scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0,2,1,3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        output = self.dense(concat_attention)
        return output, attention_weights

class EncoderLayer(Layer):
    def __init__(self, d_model, num_heads, ff_dim, dropout_rate=0.1):
        super().__init__()
        self.mha = MultiHeadAttentionLayer(d_model, num_heads)
        self.ffn = tf.keras.Sequential([Dense(ff_dim, activation='relu'), Dense(d_model)])
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(dropout_rate)
        self.dropout2 = Dropout(dropout_rate)

    def call(self, x, mask=None, training=False):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)
        return out2

class DecoderLayer(Layer):
    def __init__(self, d_model, num_heads, ff_dim, dropout_rate=0.1):
        super().__init__()
        self.mha1 = MultiHeadAttentionLayer(d_model, num_heads)
        self.mha2 = MultiHeadAttentionLayer(d_model, num_heads)
        self.ffn = tf.keras.Sequential([Dense(ff_dim, activation='relu'), Dense(d_model)])
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(dropout_rate)
        self.dropout2 = Dropout(dropout_rate)
        self.dropout3 = Dropout(dropout_rate)

    def call(self, x, enc_output, look_ahead_mask=None, padding_mask=None, training=False):
        attn1, _ = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(x + attn1)

        attn2, _ = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(out1 + attn2)

        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(out2 + ffn_output)
        return out3
# TRANSFORMER MODEL
class Transformer(Model):
    def __init__(self, num_layers, d_model, num_heads, ff_dim, input_vocab_size, target_vocab_size, max_len, dropout_rate=0.1):
        super().__init__()
        self.enc_emb = Embedding(input_vocab_size, d_model)
        self.dec_emb = Embedding(target_vocab_size, d_model)
        self.pos_enc = PositionalEncoding(max_len, d_model)
        self.enc_layers = [EncoderLayer(d_model, num_heads, ff_dim, dropout_rate) for _ in range(num_layers)]
        self.dec_layers = [DecoderLayer(d_model, num_heads, ff_dim, dropout_rate) for _ in range(num_layers)]
        self.final_dense = Dense(target_vocab_size, activation='softmax')

    def call(self, inputs, training=False):
        enc_inp, dec_inp = inputs
        enc_padding_mask = create_padding_mask(enc_inp)
        look_ahead_mask = create_look_ahead_mask(tf.shape(dec_inp)[1])
        dec_padding_mask = create_padding_mask(enc_inp)

        enc_x = self.enc_emb(enc_inp)
        enc_x = self.pos_enc(enc_x)
        for layer in self.enc_layers:
            enc_x = layer(enc_x, mask=enc_padding_mask, training=training)

        dec_x = self.dec_emb(dec_inp)
        dec_x = self.pos_enc(dec_x)
        for layer in self.dec_layers:
            dec_x = layer(dec_x, enc_output=enc_x,
                           look_ahead_mask=look_ahead_mask,
                           padding_mask=dec_padding_mask,
                           training=training)

        return self.final_dense(dec_x)

# HÀM DỰ ĐOÁN
def decode_sequence(model, input_vectorizer, label_vectorizer, text, max_len=40):
    vocab = label_vectorizer.get_vocabulary()
    start_token = vocab.index('[sos]')
    end_token = vocab.index('[eos]')
    input_seq = input_vectorizer([f"[sos] {text.strip()} [eos]"])
    input_seq = tf.cast(input_seq, tf.int32)
    output_seq = tf.expand_dims([start_token], 0)
    for _ in range(max_len):
        predictions = model([input_seq, output_seq], training=False)
        predictions = predictions[:, -1:, :]
        predicted_id = tf.argmax(predictions, axis=-1)
        if predicted_id.numpy()[0][0] == end_token:
            break
        predicted_id = tf.cast(predicted_id, output_seq.dtype)
        output_seq = tf.concat([output_seq, predicted_id], axis=-1)
    predicted_sentence = [vocab[id] for id in output_seq.numpy()[0] if id < len(vocab)]
    return ' '.join(predicted_sentence).replace('[sos]', '').replace('[eos]', '').strip()

# CHẠY TRAINING
input_file = 'input_texts.txt'
label_file = 'label_texts.txt'
max_samples = 5000
max_len = 40
d_model = 128
num_heads = 4
ff_dim = 256
num_layers = 2
dropout_rate = 0.1
batch_size = 32
epochs = 10
input_texts, label_texts = load_and_prepare_data(input_file, label_file, max_samples, max_len)
input_vectorizer = create_text_vectorizer(input_texts, max_tokens=10000, max_len=max_len)
label_vectorizer = create_text_vectorizer(label_texts, max_tokens=10000, max_len=max_len)
input_data = tf.cast(input_vectorizer(input_texts), tf.int32)
label_data = tf.cast(label_vectorizer(label_texts), tf.int32)
label_input_data = label_data[:, :-1]
label_target_data = label_data[:, 1:]
input_vocab_size = len(input_vectorizer.get_vocabulary())
target_vocab_size = len(label_vectorizer.get_vocabulary())

transformer = Transformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    ff_dim=ff_dim,
    input_vocab_size=input_vocab_size,
    target_vocab_size=target_vocab_size,
    max_len=max_len,
    dropout_rate=dropout_rate
)
transformer.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
transformer.fit([input_data, label_input_data], label_target_data, batch_size=batch_size, epochs=epochs)
# TEST
test_text = "hi, how are you?"
response = decode_sequence(transformer, input_vectorizer, label_vectorizer, test_text)
print("Input:", test_text)
print("Response:", response)


Epoch 1/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 177ms/step - accuracy: 0.7299 - loss: 3.0697
Epoch 2/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 185ms/step - accuracy: 0.8311 - loss: 1.1171
Epoch 3/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 185ms/step - accuracy: 0.8365 - loss: 1.0475
Epoch 4/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 176ms/step - accuracy: 0.8376 - loss: 1.0016
Epoch 5/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 174ms/step - accuracy: 0.8395 - loss: 0.9636
Epoch 6/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 176ms/step - accuracy: 0.8419 - loss: 0.9230
Epoch 7/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 160ms/step - accuracy: 0.8452 - loss: 0.8799
Epoch 8/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 159ms/step - accuracy: 0.8481 - loss: 0.8388
Epoch 9/10
[1m1

In [None]:
import tensorflow as tf
# Hàm mất mát
def loss_function(y_true, y_pred):
    """
    y_true: (batch_size, seq_len) tensor int32
    y_pred: (batch_size, seq_len, vocab_size) tensor float32 (softmax output)
    """
    mask = tf.cast(tf.not_equal(y_true, 0), dtype=tf.float32)
    loss_ = tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred)
    loss_ *= mask
    return tf.reduce_sum(loss_) / tf.reduce_sum(mask)


In [None]:
transformer.compile(
    optimizer='adam',
    loss=loss_function,
    metrics=['accuracy']
)