<a href="https://colab.research.google.com/github/Duc7111/Beatender/blob/main/model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# GPT-NeoX implemented with tensorflow

In [9]:
!pip install -q --upgrade keras-nlp
!pip install -q --upgrade keras  # Upgrade to Keras 3.

# 0. Import libraries

In [10]:
import tensorflow as tf
import numpy as np
import pandas as pd
from tensorflow.keras import layers, activations, models, optimizers, losses
import keras_nlp
import tensorflow.data as tf_data
import tensorflow.strings as tf_strings
import os
import keras

## 1. Implementing model

In [11]:
class GPTNeoX:
    def __init__(self, config):
        self.config = config
        self.model = self.build_model()
        self.model.compile(
            optimizer=optimizers.Adam(learning_rate=self.config["learning_rate"]),
            loss=losses.SparseCategoricalCrossentropy(from_logits=True),
            metrics=["accuracy"]
        )

    def build_model(self):

        def parallel_gpt2_block(embeddings, embed_dim, num_heads, ff_dim):
            # attn_output = attn(layernorm(x))
            layerNorm1 = layers.LayerNormalization(epsilon=1e-6)(embeddings)
            attention_output = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim//num_heads)(layerNorm1, layerNorm1)
            # feed_forward_output = feed_forward(layernorm(x))
            layerNorm2 = layers.LayerNormalization(epsilon=1e-6)(embeddings)
            feed_forward = layers.Dense(ff_dim, activation="relu")(layerNorm2)
            feed_forward = layers.Dense(embed_dim, activation="relu")(feed_forward)
            # output = x + attn_output + feed_forward_output
            return embeddings + attention_output + feed_forward

        input = layers.Input(shape=(None,), dtype=tf.int32)
        x = layers.Embedding(input_dim=self.config["vocab_size"], output_dim=self.config["embed_dim"])(input)
        rotary = keras_nlp.layers.RotaryEmbedding()(x)
        x = x + rotary
        for _ in range(self.config["num_layers"]):
            x = parallel_gpt2_block(x, self.config["embed_dim"], self.config["num_heads"], self.config["ff_dim"])
        output = layers.Dense(self.config["vocab_size"], activation = "softmax")(x)
        model = models.Model(input, output)
        return model

    def train(self, x, y, epochs=1):
        self.model.fit(x, y, epochs=epochs)

    def predict(self, x):
        return self.model.predict(x)

## 2. Test

In [12]:
# Data
BATCH_SIZE = 64
MIN_STRING_LEN = 512  # Strings shorter than this will be discarded
SEQ_LEN = 128  # Length of training sequences, in tokens

# Model
EMBED_DIM = 256
FEED_FORWARD_DIM = 128
NUM_HEADS = 3
NUM_LAYERS = 2
VOCAB_SIZE = 5000  # Limits parameters in model.

# Training
EPOCHS = 5

# Inference
NUM_TOKENS_TO_GENERATE = 80


In [13]:
keras.utils.get_file(
    origin="https://dldata-public.s3.us-east-2.amazonaws.com/simplebooks.zip",
    extract=True,
)
dir = os.path.expanduser("~/.keras/datasets/simplebooks/")

# Load simplebooks-92 train set and filter out short lines.
raw_train_ds = (
    tf_data.TextLineDataset(dir + "simplebooks-92-raw/train.txt")
    .filter(lambda x: tf_strings.length(x) > MIN_STRING_LEN)
    .batch(BATCH_SIZE)
    .shuffle(buffer_size=256)
)

# Load simplebooks-92 validation set and filter out short lines.
raw_val_ds = (
    tf_data.TextLineDataset(dir + "simplebooks-92-raw/valid.txt")
    .filter(lambda x: tf_strings.length(x) > MIN_STRING_LEN)
    .batch(BATCH_SIZE)
)

Downloading data from https://dldata-public.s3.us-east-2.amazonaws.com/simplebooks.zip
[1m282386239/282386239[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 0us/step


In [14]:
# Train tokenizer vocabulary
vocab = keras_nlp.tokenizers.compute_word_piece_vocabulary(
    raw_train_ds,
    vocabulary_size=VOCAB_SIZE,
    lowercase=True,
    reserved_tokens=["[PAD]", "[UNK]", "[BOS]"],
)

In [15]:
tokenizer = keras_nlp.tokenizers.WordPieceTokenizer(
    vocabulary=vocab,
    sequence_length=SEQ_LEN,
    lowercase=True,
)

In [16]:
# packer adds a start token
start_packer = keras_nlp.layers.StartEndPacker(
    sequence_length=SEQ_LEN,
    start_value=tokenizer.token_to_id("[BOS]"),
)


def preprocess(inputs):
    outputs = tokenizer(inputs)
    features = start_packer(outputs)
    labels = outputs
    return features, labels


# Tokenize and split into train and label sequences.
train_ds = raw_train_ds.map(preprocess, num_parallel_calls=tf_data.AUTOTUNE).prefetch(
    tf_data.AUTOTUNE
)
val_ds = raw_val_ds.map(preprocess, num_parallel_calls=tf_data.AUTOTUNE).prefetch(
    tf_data.AUTOTUNE
)

In [24]:
model = GPTNeoX(
    {
        "seq_len": SEQ_LEN,
        "embed_dim": EMBED_DIM,
        "num_heads": NUM_HEADS,
        "ff_dim": FEED_FORWARD_DIM,
        "num_layers": NUM_LAYERS,
        "vocab_size": VOCAB_SIZE,
        "learning_rate": 0.001,
    }
).model
model.summary()

In [None]:
model.fit(train_ds, validation_data=val_ds, epochs=EPOCHS)

Epoch 1/5
     76/Unknown [1m294s[0m 4s/step - accuracy: 0.0928 - loss: 6.5019

In [21]:
# The "packer" layers adds the [BOS] token for us.
prompt_tokens = start_packer(tokenizer([""]))
prompt_tokens

<tf.Tensor: shape=(1, 128), dtype=int32, numpy=
array([[2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]],
      dtype=int32)>

In [22]:
def next(prompt, cache, index):
    logits = model(prompt)[:, index - 1, :]
    # Ignore hidden states for now; only needed for contrastive search.
    hidden_states = None
    return logits, hidden_states, cache

In [25]:
class TopKTextGenerator(keras.callbacks.Callback):
    """A callback to generate text from a trained model using top-k."""

    def __init__(self, k):
        self.sampler = keras_nlp.samplers.TopKSampler(k)

    def on_epoch_end(self, epoch, logs=None):
        output_tokens = self.sampler(
            next=next,
            prompt=prompt_tokens,
            index=1,
        )
        txt = tokenizer.detokenize(output_tokens)
        print(f"Top-K search generated text: \n{txt}\n")


text_generation_callback = TopKTextGenerator(k=10)
# Dummy training loop to demonstrate callback.
model.fit(train_ds.take(1), verbose=2, epochs=2, callbacks=[text_generation_callback])

Epoch 1/2


  output, from_logits = _get_logits(
  self.gen.throw(typ, value, traceback)


Top-K search generated text: 
[b'[BOS] departure love be , before trip sent urie upat repeated ere love f up please departure at repeated before filled evening bring the departed thinking thinkingaglowing colonel success bottomona successer successlect bravelyre ere girls eyes their it some bright gave game bench ,led two repeated departure , sight before fit love , girls under departure love indians beyond sing their love separate ,re nobles itar departure lady parton sightes indiansaton stonesomedened separateen repeatedcity negro departure it success two never sharpa4 but blanket successss gave ter ter bringecting voices u before how baggage departurere it girls dayre ere theirre , pale']

1/1 - 29s - 29s/step - accuracy: 0.0000e+00 - loss: 8.5356
Epoch 2/2
Top-K search generated text: 
[b"[BOS]re repeated u listening him but it success so , look help him the love thea but never ,on it light rose f ,ness the , look never de the the there , ,on f sing their there theare sight girls ,

<keras.src.callbacks.history.History at 0x79b6b0b66b30>