In [59]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import random
from tqdm import tqdm
from torch import randint

import os
from torch.utils.tensorboard.writer import SummaryWriter
import numpy as np
import time
import math

input_path = ''

with open(input_path,'r') as f  :
    text_dataset = f.read()

num_train = int(0.8*len(text_dataset))
train_dataset = text_dataset[:num_train]
valid_dataset = text_dataset[num_train:]

In [None]:
vocab =  set(text_dataset)
char_to_id = { c:i for i, c in enumerate(vocab) }
id_to_char = { i:c for i, c in enumerate(vocab) }

vocab_size = len(vocab)
# encode = lambda txt : torch.Tensor([char_to_id[c] for c in txt]).long()
encode = lambda txt : np.array([char_to_id[c] for c in txt])
decode = lambda  ids : ''.join([id_to_char[id] for id in ids])
encoded_train = encode(train_dataset)
encoded_valid = encode(valid_dataset)

def get_batch_of_text(encoded_dataset , block_size , batch_size) :

    start_ids = torch.randint(low = 0, high = len(encoded_dataset) - block_size, size =(batch_size,) )

    inputs = np.stack([encoded_dataset[start_id: start_id + block_size] for start_id in start_ids])
    targets = np.stack([encoded_dataset[start_id + 1: start_id + block_size+1] for start_id in start_ids])

    return torch.Tensor(inputs).long(), torch.Tensor(targets).long() # (batch_size, block_size) # (batch_size,) long idx

# a, b = get_batch_of_text(encoded_train , block_size = 8 , batch_size = 5)


In [3]:
torch.tril(torch.ones(1,1,5,5))

tensor([[[[1., 0., 0., 0., 0.],
          [1., 1., 0., 0., 0.],
          [1., 1., 1., 0., 0.],
          [1., 1., 1., 1., 0.],
          [1., 1., 1., 1., 1.]]]])

In [None]:
class MaskedMultiHeadAttention(nn.Module) :
    def __init__(self , embedding_dim , nhead = 4, dropout = 0.1) : #, dim_feedforward=2048, activation = F.relu) :
        super(MaskedMultiHeadAttention,self).__init__()
        assert embedding_dim % nhead == 0
        self.K_v = nn.Linear(in_features=embedding_dim , out_features=embedding_dim) # d_model --> d_model
        self.Q_v = nn.Linear(in_features=embedding_dim , out_features=embedding_dim) # d_model --> d_model
        self.V_v = nn.Linear(in_features= embedding_dim , out_features=embedding_dim) # d_model --> d_model
        self.dropout =nn.Dropout(dropout)

        self.d_model = embedding_dim//nhead
        self.embedding_dim = embedding_dim
        self.nhead = nhead

        self.max_tokens = 1000
        self.register_buffer('mask', torch.tril(torch.ones(1,1,self.max_tokens, self.max_tokens)))
    
    def forward(self , x):      # ( batch_size, seq_len, embedding_dim)
        batch_size, seq_len, _ = x.shape
        query = self.Q_v(x).reshape(batch_size,seq_len,self.nhead, self.d_model).transpose(1,2)   # ( batch_size,  nhead ,seq_len ,  d_model)
        key = self.K_v(x).reshape(batch_size,seq_len,self.nhead, self.d_model).transpose(1,2)    # ( batch_size,  nhead ,seq_len ,  d_model)
        value = self.V_v(x).reshape(batch_size,seq_len,self.nhead, self.d_model).transpose(1,2)   # ( batch_size,  nhead ,seq_len ,  d_model)


        scores = torch.matmul( query, key.transpose(-1,-2))/self.d_model**0.5    # ( batch_size, nhead,  seq_len ,  seq_len)

        Mask = self.mask[:,:,:seq_len,:seq_len]
        scores = scores.masked_fill(Mask == 0,float('-inf'))                    # ( batch_size, nhead,  seq_len ,  seq_len)
        scores = F.softmax(scores, dim=-1)
        scores = self.dropout(scores)                                            # ( batch_size, nhead,  seq_len ,  seq_len)

        result = torch.matmul(scores, value)                                     # ( batch_size, nhead,  seq_len , d_model)

        # result = result.transpose(1,2).reshape(batch_size,seq_len, self.embedding_dim)              # slower and doesn't need contiguous
        result = result.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embedding_dim) # faster and doesn't need contiguous

        return result 
    
class Block(nn.Module) :
    def __init__(self , embedding_dim , nhead = 4, dropout = 0.1, dim_feedforward=2048,activation = nn.GELU) :
        super(Block,self).__init__()
        assert embedding_dim % nhead == 0

        self.att_n = MaskedMultiHeadAttention(embedding_dim , nhead = nhead, dropout = dropout)
        self.dropout = nn.Dropout(dropout)

        self.layer_norm1 = nn.LayerNorm(embedding_dim)
        self.layer_norm2 = nn.LayerNorm(embedding_dim)

        self.feed_fw = nn.Sequential(
                            nn.Linear(embedding_dim,dim_feedforward),
                            activation() ,
                            nn.Linear(dim_feedforward, embedding_dim),
                                    )
    def forward( self, x) : #  ( batch_size, seq_len, embedding_dim)
        x = self.layer_norm1(x) #  ( batch_size, seq_len, embedding_dim)
        x =  x + self.att_n(x)  #  ( batch_size, seq_len, embedding_dim)
        x = self.layer_norm2(x) #  ( batch_size, seq_len, embedding_dim)
        x = x + self.feed_fw(x) #  ( batch_size, seq_len, embedding_dim)

        return x

class GPT2(nn.Module):
    def __init__(self , vocab_size, embedding_dim , block_size, nhead, num_layers, dropout = 0.1 , dim_feedforward= 256, activation = nn.GELU) :
        super(GPT2,self).__init__()
        self.block_size =  block_size
        self.token_embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim) # vocab_size --> embedding_dim
        self.embedding_to_vocab = nn.Linear(embedding_dim,vocab_size)
        self.embedding_to_vocab.weight = self.token_embedding.weight # The Same matrix for input and output
        self.positional_encoding = nn.Parameter(torch.randn(self.block_size, embedding_dim) )  # vocab_size --> embedding_dim
        self.Blocks =  nn.ModuleList([Block(embedding_dim , nhead = nhead, dropout = dropout, dim_feedforward=dim_feedforward, activation = activation) for _ in range(num_layers)])


    def forward( self , input, target = None) : # (batch_size, seq_len : long) 
        batch_size, seq_len = input.shape
        input = self.token_embedding(input)                   # (batch_size, seq_len , embedding_dim )

        positional_encoding = self.positional_encoding[ None, :seq_len, ] # (batch_size, seq_len , embedding_dim )
        
        input = input + positional_encoding # (batch_size, seq_len , embedding_dim )
        for block in self.Blocks:
            input = block(input)                             # (batch_size, seq_len , embedding_dim )
        input = self.embedding_to_vocab(input)                # (batch_size, seq_len ,vocab_size)
        input = F.softmax(input , dim = -1)                   # (batch_size, seq_len ,vocab_size)


        if target is None :
            loss = None
        else :

            input = input.view(-1, vocab_size)
            target = target.view(-1)
            
            loss =  F.cross_entropy(input, target)

        return input , loss                                # (batch_size, seq_len ,vocab_size) normalized
    
    def generate(self, idx,max_token = 300 ) : # (batch_size, seq_len) # max is block_size
        for i in range(max_token) :
            result,_ = self(idx[:,-self.block_size: ])    # (batch_size, seq_len , vocab_size) # max is block_size
            # I will use only the last token to predict the next one :
            probas = result[:,-1,:]
            prediction = torch.multinomial(probas,1) # (batch_size, 1 )
            idx = torch.cat((idx, prediction),dim = -1)  # add a new element

        return idx 

In [69]:
block_size , batch_size = 8, 4
model =  GPT2(vocab_size=vocab_size,embedding_dim=64,num_layers = 4 ,block_size=8,nhead=4)

inputs, targets = get_batch_of_text(encoded_dataset = encoded_train , block_size = block_size , batch_size = batch_size)


i,loss = model(inputs, targets)

print('loss',loss.item())
idx =  torch.randint(0,5, (1,1)).long()

results = model.generate(idx,max_token= 50)
generation = results[0].cpu().numpy().tolist()

print('generation' , generation)
print('Generation', decode(generation))

inputs torch.Size([4, 8, 65])
targets torch.Size([4, 8])
inputs torch.Size([32, 65])
targets torch.Size([32])
loss 4.199715614318848
generation [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]
Generation lllllllllllllllllllllllllllllllllllllllllllllllllll


In [None]:
class GPT2_dummy(nn.Module):
    def __init__(self , vocab_size, embedding_dim , block_size, nhead, dropout = 0.1 , dim_feedforward= 256, activation = nn.ReLU) :
        super(GPT2_dummy,self).__init__()
        self.block_size =  block_size
        self.token_embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=vocab_size) # vocab_size --> embedding_dim
    

    def forward( self , input, target = None) : # (batch_size, seq_len : long) 
        batch_size, seq_len = input.shape
        input = self.token_embedding(input)                   # (batch_size, seq_len , embedding_dim )
        input = F.softmax(input , dim = -1)              
    
        if target is None :
            loss = None
        else :
            loss =  F.cross_entropy(input, target)

        return input , loss                                # (batch_size, seq_len ,vocab_size) normalized
    
    def generate(self, idx, max_token = 1 ) : # (batch_size, seq_len) # max is block_size
        for i in range(max_token) :
            result,_ = self(idx[:,-self.block_size: ])    # (batch_size, seq_len , vocab_size) # max is block_size
            # I will use only the last token to predict the next one :
            probas = result[:,-1,:]
            prediction = torch.multinomial(probas,1) # (batch_size, )
            idx = torch.cat((idx, prediction), dim = -1)  # add a new element

        return idx # (batch_size, max_token) # ids long
model =  GPT2_dummy(vocab_size=vocab_size,embedding_dim=64 ,block_size=8,nhead=4)
tensor1 = torch.randint(0,32, (4,6))

i,l = model(tensor1)

idx =  torch.randint(0,5, (1,2)).long()

results = model.generate(idx,max_token= 20)
generation = results[0].cpu().numpy().tolist()

print('generation' , generation)
print('Generation', decode(generation))


generation [1, 3, 35, 5, 43, 53, 61, 28, 53, 0, 22, 45, 26, 35, 49, 56, 50, 33, 24, 38, 22, 14]
Generation mg'!:bYvb jtw'kRf
OXjH
