In [None]:
!pip install -q datasets transformers

import tensorflow as tf
from datasets import load_dataset
from transformers import AutoTokenizer
from tensorflow.keras.mixed_precision import set_global_policy

set_global_policy("mixed_float16")

In [None]:
dataset = load_dataset("stanfordnlp/sst2")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

In [None]:
def tokenize_fn(example):
    return tokenizer(
        example["sentence"], truncation=True, padding="max_length", max_length=128
    )

tokenized = dataset.map(tokenize_fn, batched=True)
tokenized.set_format(type="tensorflow", columns=["input_ids", "attention_mask", "label"])


In [None]:
def to_tf(ds, batch_size=32, shuffle=True):
    features = {
        "input_ids": tf.TensorSpec(shape=(128,), dtype=tf.int32),
        "attention_mask": tf.TensorSpec(shape=(128,), dtype=tf.int32),
    }
    generator = lambda: (
        ({"input_ids": x["input_ids"], "attention_mask": x["attention_mask"]}, x["label"])
        for x in ds
    )
    tf_dataset = tf.data.Dataset.from_generator(generator, output_signature=(features, tf.TensorSpec(shape=(), dtype=tf.int64)))
    if shuffle:
        tf_dataset = tf_dataset.shuffle(1000)
    return tf_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

train_ds = to_tf(tokenized["train"])
val_ds = to_tf(tokenized["validation"], shuffle=False)
test_ds = to_tf(tokenized["test"], shuffle=False)

In [None]:
class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embed_dim, max_length):
        super().__init__()
        self.token_emb = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = tf.keras.layers.Embedding(input_dim=max_length, output_dim=embed_dim)

    def call(self, x):
        positions = tf.range(start=0, limit=tf.shape(x)[-1], delta=1)
        return self.token_emb(x) + self.pos_emb(positions)

class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout_rate=0.1):
        super().__init__()
        self.att = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_dim, activation='relu'),
            tf.keras.layers.Dense(embed_dim),
        ])
        self.layernorm1 = tf.keras.layers.LayerNormalization()
        self.layernorm2 = tf.keras.layers.LayerNormalization()
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        out1 = self.layernorm1(inputs + self.dropout1(attn_output, training=training))
        ffn_output = self.ffn(out1)
        return self.layernorm2(out1 + self.dropout2(ffn_output, training=training))

In [None]:
def build_transformer_model(vocab_size, max_len=128, embed_dim=512, num_heads=8, ff_dim=2048, num_layers=8):
    input_ids = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="input_ids")
    attention_mask = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="attention_mask")

    x = PositionalEmbedding(vocab_size, embed_dim, max_len)(input_ids)

    for _ in range(num_layers):
        x = TransformerBlock(embed_dim, num_heads, ff_dim)(x)

    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    x = tf.keras.layers.Dropout(0.1)(x)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.1)(x)
    outputs = tf.keras.layers.Dense(2, dtype='float32')(x)

    return tf.keras.Model(inputs={"input_ids": input_ids, "attention_mask": attention_mask}, outputs=outputs)


In [None]:
vocab_size = tokenizer.vocab_size
model = build_transformer_model(vocab_size=vocab_size)

class WarmUpLinearDecay(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, base_lr, warmup_steps, total_steps):
        super().__init__()
        self.base_lr = base_lr
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps

    def __call__(self, step):
        step = tf.cast(step, tf.float32)
        warmup = self.base_lr * (step / self.warmup_steps)
        decay = self.base_lr * (1.0 - (step - self.warmup_steps) / (self.total_steps - self.warmup_steps))
        return tf.cond(step < self.warmup_steps, lambda: warmup, lambda: decay)


steps_per_epoch = 2000
epochs = 10
total_steps = steps_per_epoch * epochs
warmup_steps = int(0.1 * total_steps)

lr_schedule = WarmUpLinearDecay(base_lr=3e-5, warmup_steps=warmup_steps, total_steps=total_steps)

optimizer = tf.keras.optimizers.AdamW(learning_rate=lr_schedule, weight_decay=1e-4)

model.compile(
    optimizer=optimizer,
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"]
)

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=2, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint("best_model.keras", save_best_only=True, monitor="val_loss")
]

model.fit(train_ds, validation_data=val_ds, epochs=10, callbacks=callbacks)


In [None]:
model.save("saved_transformer_model")

In [None]:
loaded_model = tf.keras.models.load_model("saved_transformer_model", custom_objects={
    "PositionalEmbedding": PositionalEmbedding,
    "TransformerBlock": TransformerBlock
})

In [None]:
test_loss, test_acc = loaded_model.evaluate(test_ds)
print(f"Test Accuracy: {test_acc:.4f} | Test Loss: {test_loss:.4f}")


In [None]:
def predict_sentiment(text):
    tokens = tokenizer(
        text,
        truncation=True,
        padding="max_length",
        max_length=128,
        return_tensors="tf"
    )
    logits = loaded_model({"input_ids": tokens["input_ids"], "attention_mask": tokens["attention_mask"]})
    pred = tf.argmax(logits, axis=1).numpy()[0]
    label = "positive" if pred == 1 else "negative"
    print(f"Prediction: {label} (class {pred})")


In [None]:
predict_sentiment("This movie was surprisingly great!")