In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import urllib.request
import random
import math
from tqdm import tqdm

torch.manual_seed(1337)

<torch._C.Generator at 0x202444e4110>

In [2]:
url =  'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'

filename = 'shakespeare.txt'
urllib.request.urlretrieve(url, filename)

with open('data6.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [4]:
chars = sorted(list(set(text)))
vocab_size = int(len(chars))
text_length = len(text) - 1
block_size = 32
batch_size = 16

device = 'cuda' if torch.cuda.is_available() else 'cpu'

print(vocab_size)
print(device)

103
cuda


In [5]:
n_embd = 64
n_head = 4
n_layer = 4
dropout = 0.0

head_size = n_embd // n_head

In [6]:
class Coding():
  def encode(self, string):
    string = list(string)
    for index in range(len(string)):
      string[index] = chars.index(string[index])
      
    return string
  
  def decode(self, integers):
    decoded = []
    for index in range(len(integers)):
      decoded.append(chars[int(integers[index])])
    return decoded

coding = Coding()
encoded_text = torch.tensor(coding.encode(text), dtype=torch.long)

In [7]:
class Dataset_Creation():
    def batch_creation(self):
        input_data = []
        validation_data = []

        for i in range(batch_size):
          indices = random.randint(0, text_length - block_size)
          isolated_batch = encoded_text[indices:indices+block_size]
          decoded_text = coding.decode(isolated_batch)

          following_batch = []
          for j in range(1, block_size+1):
            following_batch.append(encoded_text[indices + j])
          following_batch = torch.Tensor(following_batch)

          input_data.append(isolated_batch)
          validation_data.append(following_batch)


        input_data = torch.stack(input_data)
        ground_truth = torch.stack(validation_data)
        input_data = torch.tensor(input_data, dtype=torch.long)
        ground_truth = torch.tensor(ground_truth, dtype=torch.long)
        
        return input_data.to(device), ground_truth.to(device)

dataset_creation = Dataset_Creation()

In [8]:
class Transformer():
    class Head(nn.Module):
        """ one head of self-attention """

        def __init__(self, head_size):
            super().__init__()
            self.key = nn.Linear(n_embd, head_size, bias=False)
            self.query = nn.Linear(n_embd, head_size, bias=False)
            self.value = nn.Linear(n_embd, head_size, bias=False)
            self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

            self.dropout = nn.Dropout(dropout)

        def forward(self, x):
            B,T,C = x.shape
            k = self.key(x)   # (B,T,C)
            q = self.query(x) # (B,T,C)
            # compute attention scores ("affinities")
            wei = q @ k.transpose(-2,-1) * C**-0.5 # (B, T, C) @ (B, C, T) -> (B, T, T)
            wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
            wei = F.softmax(wei, dim=-1) # (B, T, T)
            wei = self.dropout(wei)
            # perform the weighted aggregation of the values
            v = self.value(x) # (B,T,C)
            out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
            return out

    class MultiHeadAttention(nn.Module):
        """ multiple heads of self-attention in parallel """

        def __init__(self, num_heads, head_size):
            super().__init__()
            self.heads = nn.ModuleList([Transformer.Head(head_size) for _ in range(num_heads)])
            self.proj = nn.Linear(n_embd, n_embd)
            self.dropout = nn.Dropout(dropout)

        def forward(self, x):
            out = torch.cat([h(x) for h in self.heads], dim=-1)
            out = self.dropout(self.proj(out))
            return out

    class FeedFoward(nn.Module):
        """ a simple linear layer followed by a non-linearity """

        def __init__(self, n_embd):
            super().__init__()
            self.net = nn.Sequential(
                nn.Linear(n_embd, 4 * n_embd),
                nn.ReLU(),
                nn.Linear(4 * n_embd, n_embd),
                nn.Dropout(dropout),
            )

        def forward(self, x):
            return self.net(x)

    class Block(nn.Module):
        """ Transformer block: communication followed by computation """

        def __init__(self, n_embd, n_head):
            # n_embd: embedding dimension, n_head: the number of heads we'd like
            super().__init__()
            head_size = n_embd // n_head
            self.sa = Transformer.MultiHeadAttention(n_head, head_size)
            self.ffwd = Transformer.FeedFoward(n_embd)
            self.ln1 = nn.LayerNorm(n_embd)
            self.ln2 = nn.LayerNorm(n_embd)

        def forward(self, x):
            x = x + self.sa(self.ln1(x))
            x = x + self.ffwd(self.ln2(x))
            return x

In [9]:
class LanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Transformer().Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):        
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

In [10]:
model = LanguageModel()
model = model.to(device)

learning_rate = 1e-3
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
print(sum(p.numel() for p in model.parameters())/1e6, 'M parameters')

0.214631 M parameters


In [None]:
num_epochs = 10001
total_loss = 0
eval_iteration = 1000

for epoch in tqdm(range(num_epochs)):
    input_data, target_data = dataset_creation.batch_creation()
    
    logits, loss = model(input_data, target_data)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    total_loss += loss
    if epoch % eval_iteration == 0:
        print(total_loss/eval_iteration)
        
        PATH = "C:\\Users\\andrew\\Desktop\\m_storage"
        torch.save(model.state_dict(), PATH)
        model.load_state_dict(torch.load(PATH))
        total_loss = 0

  input_data = torch.tensor(input_data, dtype=torch.long)
  ground_truth = torch.tensor(ground_truth, dtype=torch.long)
  0%|                                                                                | 1/10001 [00:00<27:48,  5.99it/s]

tensor(0.0022, device='cuda:0', grad_fn=<DivBackward0>)


 10%|███████▊                                                                      | 999/10001 [01:00<08:55, 16.82it/s]

tensor(1.9332, device='cuda:0', grad_fn=<DivBackward0>)


 20%|███████████████▍                                                             | 1999/10001 [02:00<07:51, 16.97it/s]

tensor(1.7568, device='cuda:0', grad_fn=<DivBackward0>)


 30%|███████████████████████                                                      | 2999/10001 [03:00<06:56, 16.80it/s]

tensor(1.6690, device='cuda:0', grad_fn=<DivBackward0>)


 40%|██████████████████████████████▊                                              | 3999/10001 [04:01<05:56, 16.86it/s]

tensor(1.6137, device='cuda:0', grad_fn=<DivBackward0>)


 50%|██████████████████████████████████████▍                                      | 4999/10001 [05:01<04:52, 17.10it/s]

tensor(1.5749, device='cuda:0', grad_fn=<DivBackward0>)


 60%|██████████████████████████████████████████████▏                              | 5999/10001 [06:02<03:56, 16.94it/s]

tensor(1.5446, device='cuda:0', grad_fn=<DivBackward0>)


 70%|█████████████████████████████████████████████████████▉                       | 6999/10001 [07:02<02:53, 17.28it/s]

tensor(1.5264, device='cuda:0', grad_fn=<DivBackward0>)


 80%|█████████████████████████████████████████████████████████████▌               | 7999/10001 [08:03<01:58, 16.93it/s]

tensor(1.5066, device='cuda:0', grad_fn=<DivBackward0>)


 90%|█████████████████████████████████████████████████████████████████████▎       | 8999/10001 [09:05<01:07, 14.75it/s]

tensor(1.4972, device='cuda:0', grad_fn=<DivBackward0>)


 99%|███████████████████████████████████████████████████████████████████████████▉ | 9863/10001 [09:58<00:08, 16.00it/s]

In [None]:
context = torch.zeros((1, 1), dtype=torch.long, device=device)
new_text = ''.join(coding.decode(model.generate(context, max_new_tokens=500)[0].tolist()))
print(new_text)