In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset , DataLoader
import math
from tqdm import tqdm

In [None]:
filename = '/content/shakespeare_data.txt'

with open(filename) as files:
  text = files.read()

In [None]:
print(text[:500])

In [None]:
vocab = sorted(list(set(text)))
print(''.join(vocab))
chartoidx = {char:idx for idx,char in enumerate(vocab)}
idxtochar = {idx:char for idx,char in enumerate(vocab)}
print(chartoidx)
print(idxtochar)

In [None]:
encode_text = lambda string : [chartoidx[char] for char in string]
decode = lambda integer : ''.join([idxtochar[idx] for idx in integer])

In [None]:
print(encode_text("Hello"))
print(decode(encode_text("Hello")))

In [None]:
data = torch.tensor(encode_text(text),dtype=torch.long)

In [None]:
n = int(0.9*len(text))
train_data = data[:n]
eval_data = data[n:]

print(f"Number of training lines: {len(train_data)}")
print(f"Number of validation lines: {len(eval_data)}")

In [None]:
class CustomeDataset(Dataset):

  def __init__(self , data , block_size):

    self.data = data
    self.block_size = block_size
    self.source_lines , self.target_lines = self.create_data()

  def __len__(self):

    return (len(self.data)//self.block_size - 1)

  def create_data(self):

    source_lines = []
    target_lines = []
    for i in range(0,len(self.data),self.block_size):
      source_line = self.data[i:i+self.block_size]
      target_line = self.data[i+1:self.block_size+i+1]
      if len(source_line) < self.block_size:
        continue
      source_lines.append(source_line)
      target_lines.append(target_line)

    return source_lines , target_lines

  def __getitem__(self,idx):

    return self.source_lines[idx] , self.target_lines[idx]



In [None]:
# train_dataset = CustomeDataset(train_data,block_size=256)
# val_dataset = CustomeDataset(eval_data,block_size=256)
# train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
# val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True)

In [None]:
class InputEmbeddings(nn.Module):

  def __init__(self , d_embed:int , vocab_size:int , block_size:int):
    super().__init__()
    self.d_embed = d_embed
    self.vocab_size = vocab_size
    self.block_size = block_size

    self.embedding = nn.Embedding(vocab_size,d_embed)
    self.position_embedding = nn.Embedding(block_size, d_embed)

    self.register_buffer(
        "position_ids",
        torch.arange(self.block_size).expand((1,-1)),
        persistent=False,
    )


  def forward(self,x):
    return (self.position_embedding(self.position_ids)+self.embedding(x)) * math.sqrt(self.d_embed)

In [None]:
class MultiHeadAttentionBlock(nn.Module):

  def __init__(self , d_model, n_head , dropout:float):
    super().__init__()

    self.d_model = d_model
    self.n_head = n_head

    self.qurey = nn.Linear(d_model , d_model)
    self.key = nn.Linear(d_model,d_model)
    self.value = nn.Linear(d_model,d_model)

    self.d_embed = d_model // n_head

    self.out = nn.Linear(d_model , d_model)
    self.dropout = nn.Dropout(dropout)

  @staticmethod
  def attention(query,key,value,mask,dropout:nn.Dropout):

    d_embed = query.shape[-1]

    #[batch_size,n_head,seq_length,d_embed] @ [batch_size,n_head,d_embed,seq_length]
    #[batch_size,n_head,seq_length,seq_length]
    attention_scores = (query @ key.transpose(-2,-1))/math.sqrt(d_embed)
    if mask is not None:
      attention_scores.masked_fill_(mask==0,-1e9)
    attention_scores = attention_scores.softmax(dim=-1)

    if dropout is not None:
      attention_scores = dropout(attention_scores)

    #[batch_size,n_head,seq_length,seq_length] @ [batch_size,n_head,seq_length,d_embed]
    #[batch_size,n_head,seq_length,d_embed]
    return (attention_scores @ value)


  def forward(self,query,key,value,mask):

    #query,key,value = [batch_size , seq_length , n_embed]
    query = self.qurey(query)
    key = self.key(key)
    value = self.value(value)

    #query,key,value = [batch_size,seq_length,n_head,d_embed] -> [batch_size,n_head,seq_length,d_embed]
    query = query.view(query.shape[0],query.shape[1],self.n_head,self.d_embed).transpose(1,2)
    key = key.view(key.shape[0],key.shape[1],self.n_head,self.d_embed).transpose(1,2)
    value = value.view(value.shape[0],value.shape[1],self.n_head,self.d_embed).transpose(1,2)

    #[batch_size,n_head,seq_length,d_embed]
    x = MultiHeadAttentionBlock.attention(query,key,value,mask,self.dropout)

    #[batch_size,seq_length,n_head,d_embed]
    x = x.transpose(1,2).contiguous().view(x.shape[0],-1,self.n_head*self.d_embed)

    return self.out(x)




In [None]:
class FeedForwardBlock(nn.Module):

  def __init__(self,n_embed , dropout:float):

    super().__init__()

    self.linear_1 = nn.Linear(n_embed,4*n_embed)
    self.dropout = nn.Dropout(dropout)
    self.linear_2 = nn.Linear(4*n_embed,n_embed)

  def forward(self , x):

    return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))


In [None]:
class LayerNormalization(nn.Module):

  def __init__(self,eps:float = 10**-6):

    super().__init__()

    self.eps = eps
    self.alpha = nn.Parameter(torch.ones(1))
    self.bias = nn.Parameter(torch.zeros(1))

  def forward(self,x):
    mean = x.mean(dim=-1,keepdim=True)
    std = x.std(dim=-1,keepdim=True)
    return self.alpha * (x-mean)/(std + self.eps) + self.bias

In [None]:
class ResidualConnection(nn.Module):

  def __init__(self,dropout:float):

    super().__init__()
    self.dropout = nn.Dropout(dropout)
    self.norm = LayerNormalization()

  def forward(self,x,sublayer):
    return x + self.dropout(sublayer(self.norm(x)))

In [None]:
class EncoderBlock(nn.Module):

  def __init__(self,self_attention_block:MultiHeadAttentionBlock , feed_forward_block:FeedForwardBlock , dropout:float):
    super().__init__()

    self.self_attention_block = self_attention_block
    self.feed_forward_block = feed_forward_block
    self.residual_connections = nn.ModuleList([ResidualConnection(dropout) for _ in range(2)])


  def forward(self,x ,mask):

    x = self.residual_connections[0](x,lambda x:self.self_attention_block(x,x,x,mask))
    x = self.residual_connections[1](x,self.feed_forward_block)

    return x

In [None]:
class ProjectionLayer(nn.Module):

  def __init__(self,d_model:int , vocab_size:int):
    super().__init__()

    self.proj = nn.Linear(d_model,vocab_size)

  def forward(self,x):
    return self.proj(x)

In [None]:
class Encoder(nn.Module):

  def __init__(self,layers:nn.ModuleList):

    super().__init__()

    self.layers = layers
    self.norm = LayerNormalization()

  def forward(self,x,mask):
    for layer in self.layers:
      x = layer(x,mask)
    return self.norm(x)

In [None]:
class LanguageModeling(nn.Module):

  def __init__(self, encoder:Encoder , projection_layer: ProjectionLayer , source_embed:InputEmbeddings):
    super().__init__()

    self.encoder = encoder
    self.projection_layer = projection_layer
    self.source_embed = source_embed

  def encode(self,source,mask):

    source = self.source_embed(source)
    return self.encoder(source,mask)

  def project(self, x):
    # (batch, seq_len, vocab_size)
    return self.projection_layer(x)

In [None]:
def BuildModel(d_model:int , n_heads:int , vocab_size:int , N:int , dropout:float,block_size:int):

  source_embed = InputEmbeddings(d_model, vocab_size,block_size)

  blocks = []
  for _ in range(N):
    self_attention_block = MultiHeadAttentionBlock(d_model, n_heads, dropout)
    feed_forward_block = FeedForwardBlock(d_model, dropout)
    encoder_block = EncoderBlock(self_attention_block, feed_forward_block, dropout)
    blocks.append(encoder_block)
  encoder = Encoder(nn.ModuleList(blocks))

  projection_layer = ProjectionLayer(d_model, vocab_size)

  languagemodeling = LanguageModeling(encoder,projection_layer,source_embed)

  return languagemodeling





In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [None]:
import os
def save_checkpoint(model, optimizer, epoch, path):

    directory = os.path.dirname(path)
    if not os.path.exists(directory):
          os.makedirs(directory)

    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }

    torch.save(checkpoint, path)

    print(f"Checkpoint saved at epoch {epoch}")

In [None]:
mask = torch.tril(torch.ones(4, 4))
mask = mask.masked_fill(mask==0,float('-inf'))
F.softmax(mask, dim=-1)

In [None]:
batch_size = 64
block_size = 256
n_epochs = 30
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iter = 200
n_embed = 384
n_head = 6
n_layer = 6
dropout = 0.2
vocab_size=80
train_dataset = CustomeDataset(train_data,block_size=block_size)
val_dataset = CustomeDataset(eval_data,block_size=block_size)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
model = BuildModel(d_model=n_embed , n_heads=n_head , vocab_size=vocab_size , N=n_layer , dropout=dropout,block_size=block_size).to(device)
print(count_parameters(model))
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
loss_fn = nn.CrossEntropyLoss()
mask = torch.tril(torch.ones(block_size, block_size)).to(device)
mask = mask.masked_fill(mask==0,float('-inf'))

for epoch in range(n_epochs):

  model.train()
  train_loss = 0
  val_loss = 0

  for source,target in tqdm(train_dataloader):
    source ,target = source.to(device) , target.to(device)

    encode = model.encode(source,mask)
    logits = model.project(encode)
    prob = torch.softmax(logits,dim=-1)

    prob = prob.view(-1, vocab_size)
    targets = target.view(-1)
    loss = loss_fn(prob, targets)


    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

    train_loss += loss.item()

  print(f"Epoch {epoch + 1}/{n_epochs}, Training Loss: {train_loss / len(train_dataloader)}")

  model.eval()
  with torch.no_grad():
    for source,target in tqdm(val_dataloader):
      source ,target = source.to(device) , target.to(device)

      encode = model.encode(source,mask)
      logits = model.project(encode)
      prob = torch.softmax(logits,dim=-1)

      prob = prob.view(-1, vocab_size)
      targets = target.view(-1)
      loss = loss_fn(prob, targets)

      val_loss += loss.item()

  print(f"Epoch {epoch + 1}/{n_epochs},  Validation Loss: {val_loss / len(val_dataloader)}")
  if epoch + 1 == 20 or epoch +1 == 30:
      save_checkpoint(model, optimizer, epoch + 1, f"/content/drive/MyDrive/checkpoint/CharacterLevelLanguageModeling_with_Attention__t_checkpoint_epoch_{epoch + 1}.pth")



In [None]:
checkpoint = torch.load("/content/drive/MyDrive/checkpoint/CharacterLevelLanguageModeling_with_Attention__t_checkpoint_epoch_30.pth")

In [None]:
model.load_state_dict(checkpoint['model_state_dict'])

In [None]:
def generate_text(model, start_sequence, char_to_idx, idx_to_char, block_size,length=100, temperature=1.0):
    model.eval()
    generated_sequence = start_sequence
    input_seq = torch.tensor(encode_text(generated_sequence)).unsqueeze(0).to(device)

    with torch.no_grad():
        for _ in range(length):
            input_seq = input_seq[:,-block_size:]
            output = model.encode(input_seq,None)
            logits = model.project(output)
            logits = logits[:, -1, :]
            probabilities = torch.nn.functional.softmax(logits, dim=-1).squeeze()
            predicted_idx = torch.multinomial(logits, 1).item()
            predicted_char = idx_to_char[predicted_idx]

            generated_sequence += predicted_char
            input_seq = torch.cat([input_seq, torch.tensor([[predicted_idx]]).to(device)], dim=-1)

    return generated_sequence

# Example usage:
start_sequence = text[-block_size:]
generated_text = generate_text(model, start_sequence, chartoidx, idxtochar, block_size,length=1, temperature=0.8)
print(generated_text)