In [41]:
import os
import torch
import torch.nn as nn
from torch.nn import functional as F
import unicodedata
import pickle

#load decoding dictionaries
with open('itos_BPE.pkl', 'rb') as file:
    itos = pickle.load(file)


#define function that decodes numbers to texts
def decode(ids):
    text = "".join(itos[idx] for idx in ids)
    return text


In [42]:
## Define the transformer structure



device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
print(f"Using device: {device}")




def get_batch(split):
    data = train_data if split =='train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y
    
@torch.no_grad()
def estimate_loss(model):
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            X, Y = X.to(device), Y.to(device)            
            logits, loss = model(X,Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

@torch.no_grad()
def estimate_val_loss(model):
    model.eval()
    losses = torch.zeros(eval_iters)
    for k in range(eval_iters):
        X, Y = get_batch('val')
        X, Y = X.to(device), Y.to(device)            
        logits, loss = model(X,Y)
        losses[k] = loss.item()
    val_loss = losses.mean()
    model.train()
    return val_loss
            
# class Head(nn.Module):
#     def __init__(self, head_size):
#         super().__init__()
#         self.key = nn.Linear(n_embed, head_size, bias = False)
#         self.query = nn.Linear(n_embed, head_size, bias = False)
#         self.value = nn.Linear(n_embed, head_size, bias = False)

#         self.register_buffer('tril',torch.tril(torch.ones(block_size, block_size)))
#         self.dropout= nn.Dropout(dropout)

#     def forward(self, x):
#         B,T,C = x.shape
            
#         k = self.key(x) #(B,T, head_size)
#         q = self.query(x)
#         v = self.value(x)
        
#         wei  = q @ k.transpose(-2,-1) * C**-0.5 #transpose along the last two dimensions, i.e. T and head_size 
#                                                         #(dot product sums over head_size indices)
#                                         # (B,T, head_size) @  (B, head_size, T) -> (B,T, T)
#         tril = torch.tril(torch.ones(T,T))
#         wei = wei.masked_fill(tril == 0, float('-inf') )
#         wei = F.softmax(wei, dim=-1)
#         wei = self.dropout(wei)
#         out = wei @ v
        
#         return out
        
class Head(nn.Module):#modified from above so that 'tril' tensor is always on the same device
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)
        
        wei = q @ k.transpose(-2, -1) * C**-0.5
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        out = wei @ v
        return out        
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embed, n_embed)
        self.dropout= nn.Dropout(dropout)
        
    def forward(self,x):
        out = torch.cat([h(x) for h in self.heads], dim = -1)
        out = self.proj(out)
        out = self.dropout(out)
        return out

class FeedForward(nn.Module):

    def __init__(self, n_embed):
        super().__init__()
        self.net = nn.Sequential(
                    nn.Linear(n_embed, 4*n_embed),
                    nn.ReLU(),
                    nn.Linear(4*n_embed, n_embed),
                    nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)

class Block(nn.Module):

    def __init__(self,n_embed, num_heads):
        super().__init__()
        head_size = n_embed // num_heads
        self.sa = MultiHeadAttention(num_heads, head_size) #sa = self attention
        self.ffwd = FeedForward(n_embed)
        self.ln1 = nn.LayerNorm(n_embed)
        self.ln2 = nn.LayerNorm(n_embed)  #LayerNorm also contains trainable parameters

    def forward(self, x):
        x = x + self.sa( self.ln1(x) ) #skip/residual connections
        x = x + self.ffwd(  self.ln2(x)  )
        return x


class BigramLanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
        self.position_embedding_table = nn.Embedding(block_size, n_embed)
        self.blocks = nn.Sequential(
                    *[Block(n_embed, num_heads ) for _ in range(n_layers)],
                    nn.LayerNorm(n_embed),
        )
        #self.sa_head = MultiHeadAttention(4, n_embed//4)
        self.ffwd = FeedForward(n_embed)
        self.lm_head = nn.Linear(n_embed, vocab_size)

    def forward(self, idx, targets = None):

        B, T = idx.shape
        tok_emd  = self.token_embedding_table(idx)
        pos_emd = self.position_embedding_table(torch.arange(T, device = device))
        x= tok_emd + pos_emd
        x = self.blocks(x)
        x = self.ffwd(x)
        logits = self.lm_head(x)
        
        if targets == None:
            loss = None
        else:
    
            B,T,C = logits.shape
            logits = logits.view(B*T,C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -block_size:]
            logits, loss = self(idx_cond)
            logits = logits[:,-1,:]
            probs = F.softmax(logits, dim = -1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim =1)
        return idx
        
    def generate_one_poem(self):
        idx =  torch.zeros((1, 1), dtype=torch.long, device=device)
        while True:
            idx_cond = idx[:, -block_size:]
            logits, loss = self(idx_cond)
            logits = logits[:,-1,:]
            probs = F.softmax(logits, dim = -1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim =1)
            if idx_next.item() == 1:
                break
        return idx


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


Using device: mps


In [43]:



vocab_size = len(itos)
block_size = 500
n_embed = 216
num_heads = 6
n_layers= 6
m = BigramLanguageModel().to(device)

#model_path = 'nano_tang_poem_layer6_context40_nebd64_nhead4.pt' 
# model_path = 'nano_tang_poem_layer6_context80_nebd64_nhead4.pt' 
# model_path = 'nano_tang_poem_layer8_context80_nebd64_nhead4.pt' # 1423780 trainable parameters
# model_path = 'nano_tang_poem_layer10_context80_nebd64_nhead4.pt' # 1523364 trainable parameters.
#model_path = 'nano_tang_poem_layer10_context80_nebd96_nhead8.pt' #2674436 trainable parameters

#model_path = 'nano_tang_poem_BPE_layer10_context80_nebd96_nhead8.pt' #2741600 trainable parameters
# model_path = 'nano_tang_poem_BPE_layer14_context80_nebd96_nhead8.pt' #3187808 trainable parameters # locally this costs 2.9GB MEM, 97%GPU and 87%CPU
# model_path = 'nano_tang_poem_BPE_layer10_context500_nebd180_nhead6.pt' #7,144,460 trainable parameters 
# model_path = 'nano_tang_poem_BPE_layer4_context500_nebd252_nhead6.pt' #7,734,068 trainable parameters 
# model_path = 'nano_tang_poem_BPE_layer5_context500_nebd300_nhead6.pt' #11,095,100 trainable parameters 
# model_path = 'nano_tang_poem_BPE_layer4_context700_nebd384_nhead6.pt' #14,696,384 trainable parameters 
model_path = 'nano_tang_poem_BPE_layer6_context500_nebd216_nhead6.pt' #7,318,952   trainable parameters 


if os.path.exists(model_path):
    m.load_state_dict(torch.load(model_path, map_location=device, weights_only=True))
    print(f"Load existing model complete.")
else:
    print("Creat new model weights file")

print(f"The model has {num_params} trainable parameters.")
print(f"Embeding dimension = {n_embed},\nContext length = {block_size},\nnumber of heads per layer = {num_heads},\nnumber of layers = {n_layers}")

Load existing model complete.
The model has 7318952 trainable parameters.
Embeding dimension = 216,
Context length = 500,
number of heads per layer = 6,
number of layers = 6


In [44]:
seed = 0
torch.manual_seed(seed)
print(f'Seed = {seed}')
print(f"Using model:  {model_path}")
m.eval()
for _ in range(10):
    print(decode(m.generate_one_poem()[0].tolist()))

Seed = 0
Using model:  nano_tang_poem_BPE_layer6_context500_nebd216_nhead6.pt
<相裏題樂宮|席上天中布錦筵，銀瓶外上白鹽蓮。笙歌旋入新腰曲，鸞鳳高飛暫步前。洛水春生小齋院，日星鉤在小亭筵。五更朝夕仙郎樂，相次斜橋下荔筵。>
<早春看花|雲重遍游長路過，蕭索春風意更多。百花紅滴魂難盡，一雁飛多語未多。莫道將何長薄俗，援毫深紫到三河。>
<還舊居答元渚鏡|皎皎水榭蝶，萎妍春姿豔。高無嶧陽孫，留著西廂閣。丹桂羞兩疏，青藍擲持贈。客方複明朝，蘭蕙春蘭地。自言霜海材，衛惠徒有致。>
<宣文玉京吟|陶公雖稱久，舊學國人亡。棄置十洲側，嘗對千山香。陸郎饒女貌，自美漁陽鄉。憶昔獻可移，手種生城荒。驪魚釁未掇，采貨檀已荒。仁義雖如此，黃公豈不忘。於賢寂相賴，盡室堪結芳。>
<朱藤杖，誦詩以寄魯望|風霜秋節勁，雲雪未重輪。爵識床頭倚，心關漉酒巾。風消無睡出，雪積算心因。怪得從夫學，崖花更問身。>
<雜曲歌辭：古別離|少婦怨春罷，去年妝鏡早。尚有別離心，相思還自語。>
<和韋令狐相公同幸韋嗣後庭送蜀郡省中諸公|萬里東遊問界遙，桂吟詩詠月為台。江山已斷將雕鶚，雲雨沾沾綴發蹄。城上綠林留賦罷，面前紅樹落新栽。由來即道情多阻，已有禪衣重上才。>
<讀謝甫李|風起泬寥天，蒼蒼竹雪然。彩霏吟澗水，玉爽在池泉。竹護西齋靜，梧來北巷閑。豈能忘傲吏，隱幾遠囂然。>
<謝室|度嶺勢似削，辭流面天涯。夜梅初識雪，舊酒不辭花。產業舊交舊，名高廢廟奢。蘆洲發夜浪，竹箭發晴霞。徒覺題詩處，不勞雙鬢華。>
<贈竇三文|自從明代欲分麾，正在明廷似事稀。卻喜永嘉人勸好，讀書須薦我垂衣。等閒命好無名利，不敢吟先得示機。人伴先生知興善，坐看營上玉山歸。>
