In [1]:
#!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

import urllib.request

url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
file_path = "input.txt"

urllib.request.urlretrieve(url, file_path)

('input.txt', <http.client.HTTPMessage at 0x72639d0>)

In [2]:
with open('input.txt','r', encoding='utf-8') as f:
  text = f.read()

In [3]:
chars = sorted(list(set(text)))
vocab_size = len(chars)
stoi = {char:idx for idx, char in enumerate(chars)}
itos = {idx:char for idx, char in enumerate(chars)}
encode = lambda s: [stoi[c] for c in s]
decode = lambda s: [itos[i] for i in s]

In [4]:
import torch
import torch.nn as nn
seed = 1234
torch.manual_seed(seed)
data = torch.tensor(encode(text), dtype = torch.long)

In [5]:
BLOCK_SIZE = 8

In [6]:
from torch.utils.data import Dataset, DataLoader, random_split

class AutoRegressiveDataset(Dataset):
  def __init__(self,data, block_size):
    self.data = data
    self.block_size = block_size

  def __len__(self):
    return len(self.data)-self.block_size

  def __getitem__(self,idx):
    X= self.data[idx:idx+self.block_size]
    y= self.data[idx+1:idx+self.block_size+1]
    return X,y

In [7]:
dataset = AutoRegressiveDataset(data,BLOCK_SIZE)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=256, shuffle=False)

In [9]:
class MaskedMultiHeadAttention(nn.Module):
  def __init__(self, emd_dim, heads=4, dropout = 0.2):
    super(MaskedMultiHeadAttention, self).__init__()
    assert emd_dim % heads == 0
    self.heads = heads
    self.head_dim = emd_dim//heads
    self.scale = self.head_dim ** -0.5
    self.multiHead = nn.Linear(emd_dim, emd_dim*3)
    self.output = nn.Linear(emd_dim,emd_dim)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    B, T, C = x.shape
    qkv = self.multiHead(x)
    q, k, v = torch.chunk(qkv,3,dim=-1)
    q = q.view(B, T, self.heads, self.head_dim).permute(0, 2, 1, 3)
    k = k.view(B, T, self.heads, self.head_dim).permute(0, 2, 1, 3)
    v = v.view(B, T, self.heads, self.head_dim).permute(0, 2, 1, 3)
    attn_scores = torch.matmul(q, k.transpose(-2, -1)) * self.scale
    tril = torch.tril(torch.ones(T,T))
    attn_scores = attn_scores.masked_fill(tril==0, float('-inf'))
    attn_probs = torch.softmax(attn_scores, dim=-1)
    attn_probs_drop = self.dropout(attn_probs)
    attn_output = torch.matmul(attn_probs_drop,v)
    fn_attn_output = attn_output.permute(0, 2, 1, 3).reshape(B, T, C)
    return self.output(fn_attn_output)


In [10]:
class LayerNorm1D(nn.Module):
  def __init__(self, dim, eps=1e-5):
    super(LayerNorm1D, self).__init__()
    self.gamma = nn.Parameter(torch.ones(dim))
    self.beta = nn.Parameter(torch.zeros(dim))
    self.eps = eps

  def forward(self, x):
    mean = x.mean(-1,keepdim=True)
    var = x.var(-1, unbiased=False, keepdim=True)
    xhat = (x-mean)/torch.sqrt(var+self.eps)
    return (self.gamma * xhat) +self.beta

In [11]:
class FeedForward(nn.Module):
  def __init__(self, input_dim, hidden_dim, output_dim, dropout = 0.2):
    super().__init__()
    self.feed_forward_layer = nn.Sequential(
      nn.Linear(input_dim, hidden_dim),
      nn.GeLU(),
      nn.Linear(hidden_dim, output_dim),
      nn.Dropout(dropout)
    )

  def forward(self, x):
    return self.feed_forward_layer(x)


In [12]:
class Block(nn.Module):
  def __init__(self,embed_dim,heads=4):
    super().__init__()
    self.layer_norm1 = LayerNorm1D(embed_dim)
    self.layer_norm2 = LayerNorm1D(embed_dim)
    self.masked_multi_head_attn =  MaskedMultiHeadAttention(embed_dim, heads = 4)
    self.feed_forward_layer = FeedForward(embed_dim, embed_dim*4, embed_dim)

  def forward(self, x):
    x = x + self.masked_multi_head_attn(self.layer_norm1(x))
    x = x + self.feed_forward_layer(self.layer_norm2(x))
    return x


In [13]:
class AutoRegressiveModel(nn.Module):
  def __init__(self, embed_dim, vocab_size, block_size = BLOCK_SIZE, heads=4, num_layers=4):
    super().__init__()
    self.block = nn.Sequential(*[Block(embed_dim,heads) for _ in range(num_layers)])
    self.positional_embedding = nn.Embedding(block_size, embed_dim)
    self.embedding = nn.Embedding(vocab_size, embed_dim)
    self.final_layer_norm = LayerNorm1D(embed_dim)
    self.final_layer = nn.Linear(embed_dim, vocab_size)

  def forward(self, x, targets = None):
    _, T = x.shape
    x_emb = self.embedding(x)
    x_pos_emb = self.positional_embedding(torch.arange(T))
    x = x_emb + x_pos_emb
    block_output = self.block(x)
    x_out = self.final_layer_norm(block_output)
    return self.final_layer(x_out)

In [14]:
model = AutoRegressiveModel(embed_dim=128, vocab_size=vocab_size, block_size= BLOCK_SIZE, heads = 4)
if os.path.exists("decoder_transformers_autoregressive_model.pth"):
    model.load_state_dict(torch.load("decoder_transformers_autoregressive_model.pth")) 
optimizer = torch.optim.Adam(model.parameters(), lr = 1e-3)
criterion = nn.CrossEntropyLoss()

In [15]:
def train(model: nn.Module, optimizer: torch.optim, criterion: nn.Module, dataloader: DataLoader, epochs: int):

  for epoch in range(epochs):
    model.train()
    epoch_loss = 0.0
    for X,y in dataloader:
      optimizer.zero_grad()

      outputs = model(X)
      B, T, _ = outputs.shape
      loss = criterion(outputs.reshape(B*T,-1),y.reshape(B*T))
      loss.backward()
      optimizer.step()
      epoch_loss += loss.item()
    print(f"Epoch: {epoch + 1}/{epochs}, Loss: {epoch_loss / len(dataloader):.4f}")

In [16]:
def val(model: nn.Module,dataloader: DataLoader):
  model.eval()
  val_loss = 0.0
  with torch.no_grad():
    for X,y in dataloader:
      outputs = model(X)
      B, T, _ = outputs.shape
      loss = criterion(outputs.reshape(B*T,-1),y.reshape(B*T))
      val_loss += loss.item()
    print(f"Loss: {val_loss / len(dataloader):.4f}")

In [17]:
def generate(model: nn.Module, start_seq: str ="The", epochs = 100):
  current = start_seq
  content = [c for c in start_seq]
  for _ in range(epochs):
    value = torch.tensor(encode(current[-BLOCK_SIZE:])).unsqueeze(0)
    outputs = model(value).squeeze(0)
    probs = torch.softmax(outputs[-1], dim=-1)
    indices = torch.multinomial(probs,1).tolist()
    output = decode(indices)
    content.append(output[0])
    current = current + output[0]
  return content

In [18]:
train(model, optimizer, criterion, train_loader, 10)

Epoch: 1/10, Loss: 1.8673
Epoch: 2/10, Loss: 1.7148
Epoch: 3/10, Loss: 1.6812
Epoch: 4/10, Loss: 1.6630
Epoch: 5/10, Loss: 1.6500
Epoch: 6/10, Loss: 1.6407
Epoch: 7/10, Loss: 1.6332
Epoch: 8/10, Loss: 1.6272
Epoch: 9/10, Loss: 1.6224
Epoch: 10/10, Loss: 1.6177


In [19]:
val(model,val_loader)

Loss: 1.5803


In [23]:
content = generate(model, epochs = 10000)
''.join(content)

"There's none dangerous queen is royal footward's heaven, cousin must ready.\nNothing do not a vessel you conlike away!\n\nKING RICHARD III:\nI will not put off you would bear with a\nmade seem, even consul,\nAnd here's brother\nMy father king his understands here in prince are not in you.\n\nSICINIUS:\nO, here of your cold,\nYou shall grace, good rather, sir? Your gracious our house.\nForthwith my heaven, what are interfeit.\nSee they last not hast servant, cravisitation?\n\nSecond Senator:\nWhich or thought trust of mind all the dukedom that I had not that a cause.\n\nKING EDWARD IV:\nHaste?\n\nANTIGONUS:\nKatharing sturb of slow you liverce a sword yet she reled entreat fellow-bed\nagainst him visit him not deposed for a custard thy rusted and spote this we.\n\nMENENIUS:\nNo, much\nIn any disgrim false is great names like that I stay not slander most cords.\nBut let him;\nSainty to deserve a word? When i' fail I\nThat charge\nTo hear it is my hell.\n\nPROSPERO:\nBut request, thus wi

In [21]:
torch.save(model.state_dict(), "decoder_transformers_autoregressive_model.pth")