In [1]:
import torch
import torch.nn as nn

In [2]:
class config:
    context_size = 10
    

In [3]:
class WordEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(WordEmbedding, self).__init__()
        self.embed = nn.Embedding(vocab_size, embedding_dim)
        
    def forward(self, x):
        return self.embed(x)

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, embedding_dim, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, embedding_dim)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-torch.log(torch.tensor(10000.0)) / embedding_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)  
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        seq_len = x.size(1)
        return x + self.pe[:, :seq_len, :]


In [5]:
class Attention(nn.Module):
    def __init__(self,embedding_dim):
        super(Attention, self).__init__()
        self.wq = nn.Linear(embedding_dim, embedding_dim)
        self.wk = nn.Linear(embedding_dim, embedding_dim)
        self.wv = nn.Linear(embedding_dim, embedding_dim)
        self.wo = nn.Linear(embedding_dim, embedding_dim)

        self.residual_dropout = nn.Dropout(0.1)
        self.attention_dropout = nn.Dropout(0.1)

        mask = torch.full(( 1, config.context_size, config.context_size), float("-inf"))
        mask = torch.triu(mask, diagonal=1)
        self.register_buffer("mask",mask)
    
    def forward(self, x):

        seq_len = x.size(1)

        q = self.wq(x)
        k = self.wk(x)
        v = self.wv(x)
        
        scores = torch.matmul(q, k.transpose(1, 2))
        scores = scores / (q.size(-1) ** 0.5)
        scores = scores + self.mask[:, :seq_len, :seq_len]
        scores = torch.softmax(scores, dim=-1).type_as(q)
        scores = self.attention_dropout(scores)
        attention = torch.matmul(scores, v)

        attention = self.wo(attention)
        return self.residual_dropout(attention)

In [6]:
class FeedForward(nn.Module):
    def __init__(self,embedding_dim,hidden_dim):
        super(FeedForward, self).__init__()
        self.ff1 = nn.Linear(embedding_dim, hidden_dim)
        self.ff2 = nn.Linear(embedding_dim, hidden_dim)
        self.ff3 = nn.Linear(hidden_dim, embedding_dim)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        return self.dropout(self.ff3(torch.relu(self.ff1(x))*self.ff2(x)))

In [7]:
class TransformerBlock(nn.Module):
    def __init__(self,embedding_dim,hidden_dim):
        super(TransformerBlock, self).__init__()
        self.attention = Attention(embedding_dim)
        self.feedforward = FeedForward(embedding_dim,hidden_dim)
        self.layer_norm1 = nn.LayerNorm(embedding_dim)
        self.layer_norm2 = nn.LayerNorm(embedding_dim)

    def forward(self, x):
        x = x + self.attention(self.layer_norm1(x))
        x = x + self.feedforward(self.layer_norm2(x))
        return x

In [8]:
class Transformer(nn.Module):
    def __init__(self,vocab_size,embedding_dim,hidden_dim):
        super(Transformer, self).__init__()
        self.word_embedding = WordEmbedding(vocab_size,embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim)
        self.layers = nn.ModuleList()
        for _ in range(6):
            self.layers.append(TransformerBlock(embedding_dim,hidden_dim))
        self.layer_norm = nn.LayerNorm(embedding_dim)
        self.out = nn.Linear(embedding_dim, vocab_size,bias=False)
        
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, x,target=None):
        x = self.word_embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x)
        x = self.layer_norm(x)
        out = self.out(x)
        if target is not None:
            return out, self.criterion(out.view(-1, out.size(-1)), target.view(-1))
        return out[:,[-1],:]
    
    @torch.inference_mode()
    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        for _ in range(max_new_tokens):
            idx_cond = idx if idx.size(1) <= config.context_size else idx[:, -config.context_size:]
            logits = self(idx_cond)
            logits = logits[:, -1, :] 
            if temperature == 0.0:
                
                _, idx_next = torch.topk(logits, k=1, dim=-1)
            else:
                
                logits = logits / temperature
                
                if top_k is not None:
                    v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                    logits[logits < v[:, [-1]]] = -float('Inf')
                softmax = nn.Softmax(dim=-1)
                probs =  softmax(logits)
                idx_next = torch.multinomial(probs, num_samples=1)
            
            idx = torch.cat((idx, idx_next), dim=1)

        return idx
    
    def get_embedding_from_idx(self, idx):
        return self.word_embedding(torch.tensor([idx]))
    
    def get_embedding(self, x):
        return self.word_embedding(x)

In [9]:
from torch.utils.data import IterableDataset ,DataLoader
import random
import numpy as np

In [10]:
class eli5Dataset(IterableDataset):
    def __init__(self, max_seq_len,path):
        self.max_seq_len = max_seq_len
        self.path = path

    def __iter__(self):
        seed = random.randint(0, 2**32)
        rng = random.Random(seed)
        data = np.memmap(self.path, dtype=np.uint16, mode='r')
        no_of_samples = len(data)//self.max_seq_len
        no_of_samples = no_of_samples -1
        idxs = list(range(no_of_samples))
        rng.shuffle(idxs)
        for idx in idxs:
            start = idx*self.max_seq_len
            end = start+self.max_seq_len+1
            seq = torch.from_numpy(data[start:end].astype(np.int64))
            x = seq[:-1]
            y = seq[1:]
            yield x, y

In [11]:
from transformers import GPT2Tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
import numpy as np
import os

  from .autonotebook import tqdm as notebook_tqdm


In [12]:
def read_data(filename):
    with open(filename) as f:
        data = f.readlines()
        data = [line.replace('\n','<eos>') for line in data]
    return data

sentences = read_data("tense.txt")
print(sentences)
print('no of sentences:',len(sentences))

['I am eating breakfast                         <eos>', 'She will go to the park                       <eos>', 'They played soccer yesterday                   <eos>', 'I will be going to the concert                <eos>', 'She is eating lunch now                       <eos>', 'He watched a movie last night                  <eos>', 'We are going to the beach this weekend         <eos>', 'The sun rises in the east                      <eos>', 'I used to love chocolate                        <eos>', 'She will travel to Europe next year            <eos>', 'He reads books every day                      <eos>', 'They had a picnic last summer                   <eos>', 'I am taking a break now                      <eos>', 'She will call you later                        <eos>', 'He wrote a letter last week                     <eos>', 'We will visit the museum on Sunday             <eos>', 'The train leaves at 9 AM                      <eos>', 'I will eat dinner in an hour                   <eos

In [13]:
def preprocess_sentences(sentences,idx,path,filename):
    all_tokens = []
    for i in sentences:
        print(i)
        tokens = tokenizer(i,max_length=1024,truncation=True,return_tensors="np")
        all_tokens.extend(tokens['input_ids'][0])
    
    all_tokens = np.array(all_tokens,dtype=np.uint16)
    
    filename = os.path.join(path,f"{filename}{idx}.bin")
    
    

    try:
        with open(filename, "wb") as f:
            f.write(all_tokens.tobytes())
    except Exception as e:
        print(f"Error writing file: {e}")
        
    return filename

In [12]:
filename = preprocess_sentences(sentences,2,os.getcwd(),"tense")

In [13]:
dataset = eli5Dataset(config.context_size,filename)
dataloader = DataLoader(dataset, batch_size=16)

In [27]:
jarvis = Transformer(50257, 20, 100)

jarvis.load_state_dict(torch.load("trained.pth"))

  jarvis.load_state_dict(torch.load("zero_jarvis.pth"))


<All keys matched successfully>

In [31]:
optimizer = torch.optim.Adam(jarvis.parameters(), lr=0.001)

In [32]:
def train(epoch,dataloader):
    for i in range(epoch):
        total_loss = 0
        for context, target in dataloader:
            optimizer.zero_grad()
            out ,loss = jarvis(context, target)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(total_loss)

In [33]:
train(50,dataloader)

238.0363632440567
232.59695145487785
228.0803613960743
226.5715013742447
226.29302942752838
224.42836040258408
223.68238765001297
223.08714681863785
222.0045674443245
220.69811761379242
219.43875378370285
220.5499987900257
220.47581574320793
219.17034396529198
217.6453347504139
217.67704382538795
217.2032936513424
217.38749262690544
216.9470451772213
216.6159154176712
216.9316688477993
217.19985005259514
216.81081274151802
215.97504913806915
215.9630321264267
214.7787014245987
213.91631019115448
214.39188650250435
215.5464145541191
214.5444231927395
213.28483021259308
214.3852065205574
213.65612283349037
213.6024593412876
213.0466162264347
212.32838433980942
213.6411275267601
211.81871438026428
212.50788533687592
213.6609283387661
212.77472028136253
212.4835818707943
212.64521124958992
211.8107831776142
212.48073363304138
213.54900509119034
211.69637581706047
211.55222913622856
211.58165335655212
209.6954950094223


In [136]:
op = jarvis.generate(torch.tensor([tokenizer.encode("The baby is")]), 10)

In [137]:
tokenizer.decode(op[0])

'The baby is sleeping peacefully.       '

In [84]:
torch.save(jarvis.state_dict(), "zero_jarvis.pth")

In [None]:
# 