In [None]:
import torch
import numpy as np

In [None]:
!wget 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'

--2024-10-14 08:21:00--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2024-10-14 08:21:00 (29.6 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [None]:
with open("/content/input.txt") as op:
  data = op.read()

In [None]:
print(data[:10000])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor citizens, the patricians good.
What authority surfeits on would relieve us: if they
would yield us but the superfluity, while it were
wholesome, we might guess they relieved us humanely;
but they think we are too dear: the leanness that
afflicts us, the object of our misery, is as an
inventory to particularise their abundance; our
sufferance is a gain to them Let us revenge this with
our pikes, ere we become rakes: for the gods know I
speak this in hunger for bread, not in thirst for revenge.



In [None]:
1115394*0.8

892315.2000000001

In [None]:
train = data[:892315]
val = data[892315:]

In [None]:
chars = sorted(list(set(data)))
vocab_size = len(chars)
ctoi = {c:i for i,c in enumerate(chars)}
itoc = {i:c for c,i in ctoi.items()}

In [None]:
def split_batch(batch_size, group, sample_size):
  X, Y = [],[]
  data = train if group=="train" else val
  idx = torch.randint(0,len(data)-sample_size, (batch_size,))
  for index in idx:
    X.append([ctoi[i] for i in train[index:index+sample_size]])
    Y.append([ctoi[i] for i in train[index+1:index+sample_size+1]])
  return np.array(X), np.array(Y)

X, Y = split_batch(1, 'train', 8)
X,Y

(array([[47, 53, 52, 11,  1, 50, 39, 57]]),
 array([[53, 52, 11,  1, 50, 39, 57, 58]]))

# Pytorch

In [None]:
class Bigram_model(torch.nn.Module):

  def __init__(self, vocab_size):
    super().__init__()
    self.embedding_table = torch.nn.Embedding(vocab_size, vocab_size)

  def forward_pass(self, input, target):
    batch_size, sample_size = input.shape
    logits = self.embedding_table(input) # (batch_size, sample_size, embedding_size)

    logits = logits.view(batch_size*sample_size, -1) #(N, Embedding_size)
    target = target.view(batch_size*sample_size) #(N)

    loss = torch.nn.functional.cross_entropy(logits, target)
    return loss

In [None]:
model = Bigram_model(vocab_size)

In [None]:
model = Bigram_model(vocab_size)

epochs = 10000
batch_size = 32
sample_size = 8

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

for i in range(epochs):
  X, Y = split_batch(batch_size, 'train', sample_size)

  # Forward pass
  loss = model.forward_pass(X,Y)

  # Set the grads to zero before backprop
  optimizer.zero_grad(set_to_none=True)

  # Backprop
  loss.backward()

  # update weights
  optimizer.step()

  if i%1000==0:
    print(f"{loss} loss at ith epoch {i}")

4.6785888671875 loss at ith epoch 0
3.74942684173584 loss at ith epoch 1000
3.0493662357330322 loss at ith epoch 2000
2.7686564922332764 loss at ith epoch 3000
2.594926118850708 loss at ith epoch 4000
2.565297842025757 loss at ith epoch 5000
2.4505538940429688 loss at ith epoch 6000
2.5507395267486572 loss at ith epoch 7000
2.409308671951294 loss at ith epoch 8000
2.5696308612823486 loss at ith epoch 9000


# Self Attention in Keras


In [None]:
import keras.backend as K
import tensorflow as tf
import keras
from keras import backend as K

class Positional_embeds(keras.layers.Layer):
    def __init__(self):
        super().__init__()

    def build(self, input_shape):
        self.pos_embed = self.add_weight(name='positional_embeddings',
                                      shape=(input_shape[1], input_shape[2]),
                                      initializer='glorot_uniform',  # or any other initializer
                                      trainable=True)
    def call(self, inp_embed):
        return inp_embed+self.pos_embed

class one_head_self_attention(tf.keras.layers.Layer):
    def __init__(self, size):
        super().__init__()
        self.query = keras.layers.Dense(units = size, activation='linear', use_bias = False)
        self.key = keras.layers.Dense(units = size, activation='linear', use_bias = False)
        self.value = keras.layers.Dense(units = size, activation='linear', use_bias = False)

    def call(self, x):
        queries = self.query(x)
        keys = self.key(x)

        # scale only queries dim
        dk = queries.shape[-1]

        # See how much interesing one character finds the other character
        scaled_similarity = tf.linalg.matmul(queries, tf.transpose(keys, perm=[0, 2, 1])) * dk ** -0.5

        # Mask the future info
        mask = tf.experimental.numpy.tril(tf.ones_like(scaled_similarity), k=0)

        # Replace zeros in the upper triangle with -inf
        masked_similarity = tf.where(mask == 0, tf.constant(-float('inf'), dtype=tf.float32), scaled_similarity)

        # Softmax probs
        simi_probs = keras.layers.Softmax(axis=-1)(masked_similarity)

        # Extract info from the chars based on the similarity
        values = self.value(x)
        extract_info = tf.linalg.matmul(simi_probs, values)
        # tf.print(f"Head: {extract_info}")
        return extract_info

class MultiHead_attention(tf.keras.layers.Layer):
    def __init__(self, size, n_heads):
      super().__init__()
      self.heads = [one_head_self_attention(size) for _ in range(n_heads)]

    def call(self, x):
      self.out = tf.concat([head(x) for head in self.heads], axis = -1)
      return self.out

class Feedforward(tf.keras.layers.Layer):
    def __init__(self, n_embed):
      super().__init__()
      self.d1 = keras.layers.Dense(units = 3 * n_embed, activation='relu', use_bias = True, trainable = True)
      self.d2 = keras.layers.Dense(units = n_embed, activation='relu', use_bias = True, trainable = True)

    def call(self, x):
      x = self.d1(x)
      keras.layers.Dropout(dropout)
      x = self.d2(x)
      keras.layers.Dropout(dropout)
      return x

class Attention_Block(tf.keras.layers.Layer):
    def __init__(self, embed_size, n_heads):
      super().__init__()
      head_size = embed_size // n_heads
      self.multi_attention = MultiHead_attention(head_size, n_heads)
      self.ff = Feedforward(embed_size)

    def call(self, x):
      x = x + self.multi_attention(x)
      tf.keras.layers.LayerNormalization(epsilon=1e-6)
      x = x + self.ff(x)
      tf.keras.layers.LayerNormalization(epsilon=1e-6)
      return x

In [None]:
import keras.backend as K
import tensorflow as tf
import keras
from keras import backend as K

batch_size = 16
sample_size = 64
n_embd = 128
n_head = 8
n_layer = 4
dropout = 0.2
vocab_size = vocab_size
input_dim = sample_size

input = keras.Input(shape=(sample_size))

# Character embeddings
inp_embed = keras.layers.Embedding(vocab_size, n_embd)(input) # (N, sample_size, n_embed)

# Positional information
pos_embed = Positional_embeds() # (N, sample_size, n_embed)

# Concat EMB + POS
x = pos_embed(inp_embed)

# Self Attention
attention_blocks = [Attention_Block(n_embd, n_head) for _ in range(n_layer)]
for block in attention_blocks:
  x = block(x)

tf.keras.layers.LayerNormalization(epsilon=1e-6)
print("Done with self attention")

# Dense layer
output = keras.layers.Dense(units = vocab_size, activation='softmax', use_bias = True)(x)

atten_model = keras.Model(input, output)

atten_model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss=keras.losses.CategoricalCrossentropy())

atten_model.summary()

Done with self attention
Model: "model_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 64)]              0         
                                                                 
 embedding_3 (Embedding)     (None, 64, 128)           8320      
                                                                 
 positional_embeds_3 (Posit  (None, 64, 128)           8192      
 ional_embeds)                                                   
                                                                 
 attention__block_10 (Atten  (None, 64, 128)           147968    
 tion_Block)                                                     
                                                                 
 attention__block_11 (Atten  (None, 64, 128)           147968    
 tion_Block)                                                     
                                  

In [None]:
epoches = 10000

for i in range(epoches):
  X, Y = split_batch(batch_size, 'train', sample_size)
  Y  = tf.one_hot(Y, depth=vocab_size)
  print(X, Y)
  loss = atten_model.train_on_batch(X, Y)
  if i%1000==0 or i==epoches-1:
    X, Y = split_batch(batch_size, 'val', sample_size)
    Y  = tf.one_hot(Y, depth=vocab_size)
    val_loss = atten_model.test_on_batch(X, Y)
    print(f"Loss at {i}th iteration for Train is {loss} at val is {val_loss}")

[[50  1 58 46 43  1]] tf.Tensor(
[[[0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
   0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
   0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
   0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
   0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
   0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
   1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
   0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.
   0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
  [0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
   0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
   

In [None]:
atten_model.save_weights('/content/Atten_weights/')

In [None]:
!zip -r /content/attention_weights.zip /content/Atten_weights

  adding: content/Atten_weights/ (stored 0%)
  adding: content/Atten_weights/.data-00000-of-00001 (deflated 10%)
  adding: content/Atten_weights/Atten_weights.data-00000-of-00001 (deflated 10%)
  adding: content/Atten_weights/.index (deflated 81%)
  adding: content/Atten_weights/checkpoint (deflated 34%)
  adding: content/Atten_weights/Atten_weights.index (deflated 81%)


In [None]:
encode = lambda s: [ctoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itoc[i] for i in l]) # decoder: take a list of integers, output a string
from IPython.display import clear_output

def generate(idx, max_tokens, sample_size):
    # get the predictions
    idx = tf.convert_to_tensor(idx)
    for i in range(max_tokens):
      idx_cond = idx[:, -sample_size:] #(B, T)
      probs = atten_model(idx_cond)[:, -1, :]
      idx_next = torch.multinomial(torch.from_numpy(probs.numpy()), num_samples=1) # (B, 1)
      # append sampled index to the running sequence
      idx =  tf.concat(axis=1,values=[idx, idx_next]) # (B, T+1)

      poem = decode(idx[0].numpy())
      clear_output()
      print(poem)
    return idx

X, Y = split_batch(1, 'val', 64)
print(X)
# print(Y)
gene = generate(X, max_tokens=1000, sample_size=64)[0].numpy()
s = [itoc[i] for i in gene]
# ''.join(s)

ps of women's rheum, which are
As cheap as lies, he sold the blown so chamber
As he servant return'd till your usure the
not?
Or truth, I know no is leave it anger o', if this deed
Hast town a king: male for their satise:
I had enemy here, answer spend must to better princess? wat a
so. Let's 'tis trust: who proof's wonder'd?

RATCLIFF:
Must have I bore no month of mine.

COMINIUS:
Nay, it was new-like, what brothers: I it will not my designant bloody, give,
And you some flight what you speak,
Marise, in'd upon the church and say'd,
For and malian, and way ran make our men,
That deserved mistrust Henry, know it issue:
To mue loving ribe, if thy giand.
Alack, and eprof thee might favour'd the king
And dolemn with our delay a most inform:
As if they how with as ill;
I wong the king, fruit he thus;
And, drink up him a grave, 'fir hear!
Nurse: I had I despite too for true seven.

QUEEN ELIZABETH:
Alas, call'd too regain'd there, to your life,
That you psteed hopes, and mighty crave,
Here i

In [None]:
print(''.join(s))

In [None]:
X, Y = split_batch(32, 'train', 8)
print(X.shape, Y.shape)

(32, 8) (32, 8)
