In [None]:
# TUGAS IF5250 - MINI TRANSFORMER FROM SCRATCH

In [1]:
from google.colab import drive
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import string
import re
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from nltk.translate.bleu_score import corpus_bleu

In [2]:
# Load & Split Dataset
drive.mount('/content/drive')
df = pd.read_csv('/content/drive/MyDrive/Deep_learning_Tubes/headline_data.csv')
train_texts, test_texts = train_test_split(df['headline'].values, test_size=0.2, random_state=42)

Mounted at /content/drive


In [5]:
df

Unnamed: 0,headline
0,New energy law promises to revolutionize the e...
1,Climate change continues to be a global threat
2,Investors seek opportunities in renewable energy
3,Demand for electric vehicles increases
4,COVID-19 vaccines: When will we all be protected?
...,...
1320,The potential of renewable energy storage solu...
1321,The benefits of eco-friendly landscaping pract...
1322,The influence of digital platforms in advancin...
1323,The role of sustainable packaging innovations ...


In [6]:
df.describe()

Unnamed: 0,headline
count,1325
unique,1202
top,Threats and Challenges in a Hyperconnected World
freq,6


In [8]:
len(df)

1325

In [11]:
len(train_texts)

1060

In [10]:

len(test_texts)

265

In [None]:
#  Text preprocessing
def custom_standardization(input_text):
    lowercase = tf.strings.lower(input_text)
    stripped_html = tf.strings.regex_replace(lowercase, '<br />', ' ')
    cleaned_punct = tf.strings.regex_replace(stripped_html, f"[{re.escape(string.punctuation)}]", '')
    return cleaned_punct

In [None]:
#  Tokenization Layer
raw_vocab = set(" ".join(train_texts).lower().split())
custom_corpus = list(train_texts) + list(raw_vocab)

vocab_size = len(raw_vocab) + 2
sequence_length = 64
embedding_dim = 128

vectorize_layer = layers.TextVectorization(
    standardize=custom_standardization,
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)


In [None]:
vectorize_layer.adapt(tf.data.Dataset.from_tensor_slices(custom_corpus).batch(128))

In [None]:
#  Data Preparation
def prepare_lm_data(text):
    tokenized = vectorize_layer(text)
    return tokenized[:, :-1], tokenized[:, 1:]

def make_dataset(text_array):
    ds = tf.data.Dataset.from_tensor_slices(text_array)
    ds = ds.batch(64).map(prepare_lm_data)
    return ds.prefetch(tf.data.AUTOTUNE)

train_ds = make_dataset(train_texts)
test_ds = make_dataset(test_texts)

In [None]:
#  Causal Attention Mask
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    mask = tf.cast(m, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat([tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0)
    return tf.tile(mask, mult)

In [None]:
#  Transformer Block
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential([
            layers.Dense(ff_dim, activation="relu"),
            layers.Dense(embed_dim),
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training=None, mask=None):
        batch_size = tf.shape(inputs)[0]
        seq_len = tf.shape(inputs)[1]
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        if mask is not None:
            padding_mask = tf.cast(tf.expand_dims(mask, axis=1), tf.bool)
            combined_mask = tf.logical_and(padding_mask, causal_mask)
        else:
            combined_mask = causal_mask
        attn_output = self.att(inputs, inputs, attention_mask=combined_mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

In [None]:
#  Positional Embedding Layer
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim, mask_zero=True)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        positions = tf.range(start=0, limit=tf.shape(x)[-1], delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [None]:
#  Transformer Model
def build_model():
    inputs = layers.Input(shape=(None,), dtype=tf.int32)
    embedding_layer = TokenAndPositionEmbedding(sequence_length, vocab_size, embedding_dim)
    x = embedding_layer(inputs)
    mask = embedding_layer.token_emb.compute_mask(inputs)
    x = TransformerBlock(embedding_dim, num_heads=2, ff_dim=128)(x, mask=mask)
    x = TransformerBlock(embedding_dim, num_heads=2, ff_dim=128)(x, mask=mask)
    x = layers.Dense(vocab_size)(x)
    return keras.Model(inputs=inputs, outputs=x)

In [None]:
model = build_model()
model.compile(loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              optimizer=keras.optimizers.Adam(), metrics=["accuracy"])
model.summary()

In [None]:
#  Text Generator Callback
class TextGenerator(keras.callbacks.Callback):
    def __init__(self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            x = np.array([start_tokens + [0] * (sequence_length - len(start_tokens))])
            y = self.model.predict(x)
            next_token = self.sample_from(y[0, len(start_tokens) - 1])
            tokens_generated.append(next_token)
            start_tokens.append(next_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join([self.detokenize(_) for _ in self.start_tokens + tokens_generated])
        print(f"\nGenerated text (epoch {epoch+1}):\n{txt}\n")

In [None]:
# === Callback Setup ===
vocab = vectorize_layer.get_vocabulary()
word_to_index = {w: i for i, w in enumerate(vocab)}
start_prompt = "blockchain"
start_tokens = [word_to_index.get(w, 1) for w in start_prompt.split()]
text_gen_callback = TextGenerator(max_tokens=40, start_tokens=start_tokens, index_to_word=vocab)


In [None]:
# === Train Model ===
history = model.fit(train_ds, validation_data=test_ds, epochs=20, callbacks=[text_gen_callback])


Epoch 1/20
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 876ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 

In [None]:
#  Evaluation: Accuracy, Loss
loss, acc = model.evaluate(test_ds)
print(f"\nFinal Loss: {loss:.4f} | Final Accuracy: {acc:.4f}")

[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - accuracy: 0.9309 - loss: 0.4444

Final Loss: 0.4549 | Final Accuracy: 0.9285


In [None]:
#  TextPredict Class
class TextPredict():
    def __init__(self, model, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k
        self.model = model

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def generate(self, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = sequence_length - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:sequence_length]
                sample_index = sequence_length - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y = self.model.predict(x)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join([self.detokenize(_) for _ in self.start_tokens + tokens_generated])
        print(f"\nGenerated text:\n{txt}\n")

In [None]:
# === Manual Generation ===
vocab = vectorize_layer.get_vocabulary()
word_to_index = {w: i for i, w in enumerate(vocab)}

num_tokens_generated = 20

def generateHeadling(start_prompt):
    tokens = start_prompt.lower().split()
    clean_tokens = [w if w in word_to_index else "[UNK]" for w in tokens]
    print("Prompt tokens:", clean_tokens)
    start_tokens = [word_to_index.get(w, 1) for w in tokens]
    text_predict = TextPredict(model, num_tokens_generated, start_tokens, vocab)
    text_predict.generate()


In [None]:
# penggunaan:
generateHeadling("intelligence")

Prompt tokens: ['intelligence']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━

In [None]:
# Text generation
generateHeadling("industry")

Prompt tokens: ['industry']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━

In [None]:
# Text generation
generateHeadling("government")

Prompt tokens: ['[UNK]']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[