<a href="https://colab.research.google.com/github/Johnny1033/JohnFacts/blob/master/LLM0_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

John's LLM

In [None]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
!pip install tensorflow

In [None]:
!pip install keras

In [None]:
!pip install tensorflow-datasets

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
file_path = "/content/gdrive/MyDrive/ColabNotebooks/combined_file.txt"

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.keras import layers
import numpy as np

# Detect TPU
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
except ValueError:
    tpu = None

# Select appropriate distribution strategy
if tpu:
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.TPUStrategy(tpu)
else:
    strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU

print('Running on TPU ', tpu.master())
print(f'Running on {strategy.num_replicas_in_sync} replicas')

# hyperparameters
batch_size = 32
block_size = 64
max_iters = 5000
eval_interval = 100
learning_rate = 0.1
eval_iters = 200
n_embd = 256
n_head = 16
n_layer = 16
dropout = 0.0

np.random.seed(1337)

with open(file_path, 'r', encoding='utf-8') as f:
    text = f.read()

# Building the subwords tokenizer
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    (txt for txt in text), target_vocab_size=2**13)

# here are all the unique tokens that occur in this text
vocab_size = tokenizer.vocab_size

# The tokenizer already has the conversions functions
encode = lambda s: tokenizer.encode(s)
decode = lambda l: tokenizer.decode(l)

# Train and test splits
data = np.array(encode(text), dtype=np.int32)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = np.random.randint(0, len(data) - block_size, batch_size)
    x = np.stack([data[i:i+block_size] for i in ix])
    y = np.stack([data[i+1:i+block_size+1] for i in ix])
    return x, y

def estimate_loss(model):
    @tf.function
    def compute_loss(X, Y):
        _, loss = model(X, Y)
        return loss

    out = {}
    model.trainable = False
    for split in ['train', 'val']:
        losses = np.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            per_replica_losses = strategy.run(compute_loss, args=(X, Y))
            total_loss = strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
            # Convert total_loss to a scalar and assign to the losses array
            losses[k] = tf.reduce_sum(total_loss).numpy().item()
        out[split] = losses.mean()
    model.trainable = True
    return out

class SelfAttention(layers.Layer):
    """ Single head of self-attention """

    def __init__(self, n_embd):
        super(SelfAttention, self).__init__()
        self.key = layers.Dense(n_embd)
        self.query = layers.Dense(n_embd)
        self.value = layers.Dense(n_embd)

    def call(self, x):
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)

        logits = tf.matmul(q, k, transpose_b=True)
        attention = tf.nn.softmax(logits)
        return tf.matmul(attention, v)

class TransformerBlock(layers.Layer):
    """ Transformer block consisting of self-attention and MLP layers """

    def __init__(self, n_embd, dropout):
        super(TransformerBlock, self).__init__()
        self.attention = SelfAttention(n_embd)
        self.layer_norm1 = layers.LayerNormalization()
        self.mlp = tf.keras.Sequential([
            layers.Dense(n_embd, activation='relu'),
            layers.Dense(n_embd),
        ])
        self.layer_norm2 = layers.LayerNormalization()
        self.dropout = layers.Dropout(dropout)

    def call(self, x):
        att = self.attention(x)
        x = self.layer_norm1(x + att)
        mlp = self.mlp(x)
        return self.layer_norm2(x + self.dropout(mlp))

class TransformerModel(tf.keras.Model):
    """ The full transformer model """

    def __init__(self, vocab_size, n_embd, n_head, n_layer, block_size, dropout):
        super(TransformerModel, self).__init__()
        self.embedding = layers.Embedding(vocab_size, n_embd)
        self.transformer_blocks = [TransformerBlock(n_embd, dropout) for _ in range(n_layer)]
        self.logits = layers.Dense(vocab_size)
        self.block_size = block_size

    def call(self, x, y=None):
        """ Run the model """
        if y is None:
            # Inference mode
            return self.inference(x)
        else:
            # Training mode
            return self.training_call(x, y)

    def training_call(self, x, y):
        x = self.embedding(x)
        for transformer in self.transformer_blocks:
            x = transformer(x)
        logits = self.logits(x)
        # calculate loss
        loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits[:, :-1], labels=y[:, 1:])
        return logits, loss

    def inference(self, x):
        x = self.embedding(x)
        for transformer in self.transformer_blocks:
            x = transformer(x)
        logits = self.logits(x)
        return logits

with strategy.scope():
    model = TransformerModel(vocab_size, n_embd, n_head, n_layer, block_size, dropout)
    optimizer = tf.keras.optimizers.Adam(learning_rate)

    @tf.function
    def train_step(X, Y):
        with tf.GradientTape() as tape:
            _, loss = model(X, Y)
            loss = tf.reduce_mean(loss)
            grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        return loss

    for it in range(max_iters):
        # get a new batch of data
        X, Y = get_batch('train')

        # compute loss and gradients, and update parameters
        per_replica_losses = strategy.run(train_step, args=(X, Y))

        # every so often, compute the validation loss
        if it % eval_interval == 0:
            losses = estimate_loss(model)
            print(f"step {it}, train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")



In [None]:
def generate_text(prompt, model, length=100):
    encoded_prompt = [stoi[ch] for ch in prompt]
    x = tf.expand_dims(encoded_prompt, 0)  # expand to batch dimension
    generated_sequence = []

    for i in range(length):
        logits = model(x)
        logits = logits[:, -1, :]  # only consider the last output
        probs = tf.nn.softmax(logits, axis=-1)
        sampled_token = tf.random.categorical(probs, num_samples=1)
        sampled_token = tf.squeeze(sampled_token, axis=-1).numpy()[0]
        generated_sequence.append(sampled_token)
        x = tf.expand_dims([sampled_token], 0)

    return ''.join([itos.get(i, ' ') for i in generated_sequence])  # replace unknown tokens with a space

prompt = input()
generated_text = generate_text(prompt, model, length=200)
print(generated_text)