In [1]:
import torch
import mmap
import random 

device = 'cuda' if torch.cuda.is_available() else "cpu"
print(device)

# device = "cpu"


cuda


In [2]:
chars = ""
with open("shakespeare.txt", 'r', encoding='utf-8') as f:
        text = f.read()
        chars = sorted(list(set(text)))
        
vocab_size = len(chars)

print(vocab_size)

62


In [3]:
string_to_int = { ch:i for i,ch in enumerate(chars) }
int_to_string = { i:ch for i,ch in enumerate(chars) }

encode = lambda s: [ string_to_int[c] for c in s ]
decode = lambda l: ''.join([int_to_string[i] for i in l])

In [4]:
data = torch.tensor(encode(text), dtype=torch.long)

n = int(0.8*len(data))
train_data = data[:n]
val_data = data[n:]

In [5]:
# block_size = 8

# x = train_data[:block_size]
# y = train_data[1:block_size+1]

# for t in range(block_size):
#     context = x[: t+1]
#     target = y[t]
#     print("When input is", context," Target is", target)

In [6]:
import torch.nn.functional as F
import torch.nn as nn

lr = 0.0005

block_size = 64
batch_size = 128
max_iters = 1000
n_embd = 150
voacb_size = len(chars)
n_layer = 4
n_head= 4
dropout = 0.3
eval_iters = 50

class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out


class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1) # (B, T, F) -> (B, T, [h1, h1, h1, h1, h2, h2, h2, h2, h3, h3, h3, h3])
        out = self.dropout(self.proj(out))
        return out

class FeedForward(nn.Module):

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4* n_embd),
            nn.ReLU(),
            nn.Linear(4*n_embd, n_embd),
            nn.Dropout(dropout),
        )
    
    def forward(self, x):
        return self.net(x)

class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        y = self.sa(x)
        x = self.ln1(x + y)
        y = self.ffwd(x)
        x = self.ln2(x + y)
        return x

class BigramLanguageModel(nn.Module):
    def __init__(self, vocab_size, embd_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, embd_size)
        self.position_embedding_table = nn.Embedding(block_size, embd_size)

        self.blocks = nn.Sequential(*[Block(embd_size, n_head=n_head) for _ in range(n_layer)])
        
        self.ln_f = nn.LayerNorm(embd_size)
        self.lm_head = nn.Linear(embd_size, voacb_size)

        self.apply(self.__init_weights)
    
    def __init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if(module.bias is not None):
                torch.nn.init.zeros_(module.bias)
            elif isinstance(module, nn.Embedding):
                torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
        

    def forward(self, index, targets=None):

        B, T = index.shape
        
        tok_emb = self.token_embedding_table(index)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device))
        x=tok_emb+pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)
        # print(logits.shape)

        if(targets is None):
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            
            targets = targets.view(B*T)

            # print(logits.shape, targets.shape)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, index, max_new_tokens):
        for _ in range(max_new_tokens):

            # print(index.shape)
            tp = index
            if(tp.shape[1]>block_size):
                tp = tp[:, -64:]
            logits, loss = self.forward(tp)
            # print(logits.shape)
            # print()
            logits = logits[:, -1, :]


            probs = F.softmax(logits, dim=-1)

            index_next = torch.multinomial(probs, num_samples=1)

            index = torch.cat((index, index_next), dim=1)

            # print(index.shape)

        return index

    def newGenerate(self, index, max_new_tokens):
        pass
        

In [7]:
# yo = BigramLanguageModel(76)
# yo.to(device)

# context = torch.zeros((1,1), dtype=torch.long, device=device)
# generated_chars = decode(yo.generate(context, max_new_tokens=500)[0].tolist())

# print(generated_chars)

In [8]:
model = BigramLanguageModel(voacb_size, n_embd)

m = model.to(device)

# memory map for using small snippets of text from a single file of any size
def get_random_chunk(split):
    filename = "output_train.txt" if split == 'train' else "output_val.txt"
    with open(filename, 'rb') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            # Determine the file size and a random position to start reading
            file_size = len(mm)
            start_pos = random.randint(0, (file_size) - block_size*batch_size)

            # Seek to the random position and read the block of text
            mm.seek(start_pos)
            block = mm.read(block_size*batch_size-1)

            # Decode the block to a string, ignoring any invalid byte sequences
            decoded_block = block.decode('utf-8', errors='ignore').replace('\r', '')
            
            # Train and test splits
            data = torch.tensor(encode(decoded_block), dtype=torch.long)
            
    return data


def get_batch(split):
    data = train_data if split == 'train' else val_data
    # data = get_random_chunk(split)
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

# def get_batch(split):
#     data = train_data if split == 'train' else val_data
#     ix = torch.randint(len(data) - block_size, (batch_size,))
#     x = torch.stack([data[i:i+block_size] for i in ix])
#     y = torch.stack([data[i+1:i+block_size+1] for i in ix])
#     x, y = x.to(device), y.to(device)
#     return x, y

get_batch("train")

(tensor([[43, 40,  1,  ..., 43, 50, 58],
         [59, 40, 49,  ..., 60, 36, 47],
         [55, 50,  1,  ...,  1, 58, 50],
         ...,
         [26, 47, 44,  ..., 40, 53,  1],
         [ 0, 44, 41,  ...,  1, 47, 40],
         [38, 50, 48,  ...,  1, 43, 40]], device='cuda:0'),
 tensor([[40,  1, 52,  ..., 50, 58,  1],
         [40, 49, 40,  ..., 36, 47,  0],
         [50,  1, 48,  ..., 58, 50, 53],
         ...,
         [47, 44, 57,  ..., 53,  1, 55],
         [44, 41,  1,  ..., 47, 40, 36],
         [50, 48, 41,  ..., 43, 40, 53]], device='cuda:0'))

In [9]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [14]:
lr = 0.00009
max_iters = 100

optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

for iter in range(max_iters):

    if(iter%eval_iters == 0):
        losses = estimate_loss()
        print(f"Step: {iter}, loss {losses}")

    xb, yb = get_batch("train")

    logits, loss = model.forward(xb, yb)

    

    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print(loss.item())

Step: 0, loss {'train': tensor(1.1220), 'val': tensor(1.4662)}
Step: 50, loss {'train': tensor(1.1161), 'val': tensor(1.4664)}
1.2167868614196777


In [15]:
context = torch.zeros((1,1), dtype=torch.long, device=device)

In [16]:
context = torch.zeros((1,1), dtype=torch.long, device=device)
tp1 = encode("When")
context = torch.LongTensor([tp1]).to(device)
print(context, tp1)

tensor([[34, 43, 40, 49]], device='cuda:0') [34, 43, 40, 49]


In [17]:
# torch.save(model.state_dict(), 'model_weights_best.pth')
# model.load_state_dict(torch.load('model_weights_shakes_2_12.pth'))

# torch.save(model, 'shakes_new_1_26.pt')
# model = torch.load('./pretrained/shakes_1_20.pt')

In [20]:
tp1 = encode("Cassio did as Iago advised him, and made application to the lady Desdemona, who was ")
context = torch.LongTensor([tp1]).to(device)
generated_chars = decode(model.generate(context, max_new_tokens=250)[0].tolist())

print(generated_chars)

Cassio did as Iago advised him, and made application to the lady Desdemona, who was called to take his
change. Now then the king, assighted with the countess. So the innocence
in awter worth are as's, loved throught Veronath, and told him hi the
once a noble means; but being which blessings shrewised that she have true
whom they tim
