In [1]:
import torch
import torch.nn as nn
import torch.nn.functional  as F
from tqdm import tqdm

In [2]:
with open("E:\\DL-Scratch\\sample.txt" , 'r',encoding='utf-8') as f:
    txt = f.read()
txt[:100]

'We let go of our worries and embrace the world.\n\nThe quick brown fox jumps over the lazy dog.\nA penn'

In [3]:
tokens = len(txt)
seq = 25
batch = 25
vocab_size = len(set(txt))
d_model = 128
n_blocks = 8
n_heads = 4
head_size = d_model // n_heads


device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [4]:
stoi = {s:i for i, s in enumerate(sorted(list(set(txt))))}
itos = {i:s for i, s in enumerate(sorted(list(set(txt))))}

In [5]:
encoder = lambda text : [stoi[s] for s in text]
decoder = lambda token : "".join([itos[i] for i in token])

In [6]:
def get_batch():
    idx = torch.randint(0,tokens-seq-1,(batch,))

    batch_x = torch.tensor([[ stoi[txt[x]] for x in range(idx[i],idx[i]+seq) ] for i in range(batch)],device=device)
    batch_y = torch.tensor([[ stoi[txt[x]] for x in range(idx[i]+1,idx[i]+seq+1) ] for i in range(batch)],device=device)
    return batch_x , batch_y
    
decoder((get_batch()[0][0]).tolist())

'for future generations.\nT'

In [7]:
class Head(nn.Module):
    def __init__(self,head_size):
        super().__init__()
        self.key = nn.Linear(d_model,head_size,bias=False)
        self.query = nn.Linear(d_model,head_size,bias=False)
        self.value = nn.Linear(d_model,head_size,bias=False)
        
    
    def forward(self, x):
        K = self.key(x)
        Q = self.query(x)
        V = self.value(x)

        wei = Q @ K.transpose(-2,-1)/(head_size**0.5)
        wei = wei.masked_fill(torch.tril(wei)==0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        return wei @ V

In [8]:
class Multihead_attention(nn.Module):
    def __init__(self):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(n_heads)])
        self.projection = nn.Linear(n_heads*head_size,d_model)

    def forward(self,x):
        out = torch.cat([h(x) for h in self.heads],dim=-1)
        out = self.projection(out)
        return out

In [9]:
class Feedforward(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(d_model , 4*d_model)
        self.layer2 = nn.Linear(4*d_model,d_model)
        self.dropout = nn.Dropout(0.2)
    
    def forward(self,x):
        x = F.relu(self.layer1(x))
        x = self.dropout(x)
        x = self.layer2(x)
        return x

In [10]:
class Block(nn.Module):
    def __init__(self):
        super().__init__()
        self.multi_head = Multihead_attention()
        self.ffn = Feedforward()
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)

    def forward(self,x):
        x = x + self.multi_head(self.layer_norm1(x))
        x = x + self.ffn(self.layer_norm2(x))
        return x

In [11]:
class Transformer_decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.input_embedding = nn.Embedding(vocab_size,d_model) # (v_s , d_model)
        self.position_embedding = nn.Embedding(seq,d_model)
        self.blocks = nn.Sequential(*[Block() for _ in range(n_blocks)])
        self.linear = nn.Linear(d_model,vocab_size)

    def forward(self,x):
        inp = self.input_embedding(x)
        B ,T , C = inp.shape
        pos = self.position_embedding(torch.tensor([i for i in range(T)]))
        x  =  inp + pos
        x = self.blocks(x)
        logits = self.linear(x)
        
        return logits

model = Transformer_decoder()

In [12]:

def generate(start = [0],max_tokens=100, seq = seq):
    starts = torch.tensor(start).unsqueeze(0)
    output = []

    with torch.no_grad():
        for i in range(max_tokens):
            logits = model(starts)

            prob = F.softmax(logits[:, -1, :],dim=-1)
            new_token = torch.multinomial(prob,1).item()
            starts = torch.cat((starts, torch.tensor([[new_token]])),dim=1)
            output.append(new_token)
            #print(decoder([new_token]),end='')

            context = starts.shape[1]
            if context > seq:
                starts = starts[:,-seq:]

    return output


In [13]:
model.train()

def train(epochs=10000,lr = 0.0003):
    optimizer = torch.optim.AdamW(model.parameters(), lr)
    print('Total parameter:',sum([p.numel() for p in model.parameters()]))

    for epoch in range(epochs+1):
        X , y  = get_batch()

        y = y.view(-1)

        output = model(X)

        output = output.view(-1,vocab_size).float()
        #print(output,y.shape)
        #print(X,y)


        loss = F.cross_entropy(output,y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if epoch % 1000 == 0:
            print(F"EPOCH : {epoch} || LOSS : {loss} ")
            model.eval()
            print(decoder(generate(max_tokens=50)))
            model.train()


train()

Total parameter: 1599668
EPOCH : 0 || LOSS : 4.238442897796631 
vCheb ewaFses aYagYx'zeCW,eK dReOtt .ij yb.rcSaAcD
EPOCH : 1000 || LOSS : 1.0585353374481201 
Whare grateful for and over un.
We beldow the most
EPOCH : 2000 || LOSS : 0.48775359988212585 
We seek to create a world of justice and equality.
EPOCH : 3000 || LOSS : 0.39927294850349426 
juvery dog has his day.
Familiarity breeds contemp


KeyboardInterrupt: 

In [None]:
with open('output.txt', 'w') as File:
    text = decoder(generate(max_tokens= 1000))
    File.write(text)