In [2]:
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader

from pathlib import Path
from typing import Dict, List

In [3]:
# Data
# !curl https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt > tinyshakespeare.txt

In [4]:
if torch.backends.mps.is_available() or torch.backends.mps.is_built():
    device = "mps"
else:
    device = "cpu"

In [5]:
def load_text_from_file(file_path: Path) -> List[str]:
    assert file_path.exists()

    with open(file_path, "r", encoding="utf-8") as f:
        return f.read()

tinyshakespear_file = Path("tinyshakespeare.txt")
tinyshakespeare_text = load_text_from_file(tinyshakespear_file)
len(tinyshakespeare_text), tinyshakespeare_text[:50]


(1115394, 'First Citizen:\nBefore we proceed any further, hear')

In [6]:
class CharTokenizer():
    def __init__(self, doc: str) -> None:
        assert doc
        self.vocabulary = sorted(list(set(doc)))
        self.stoi = {c: i for i, c in enumerate(self.vocabulary)}
        self.itos = {i: c  for i, c in enumerate(self.vocabulary)}

    def encode(self, input: str) -> torch.tensor:
        return torch.tensor([self.stoi[c] for c in input], dtype=torch.long, device=device)
    
    def decode(self, input: torch.tensor) -> str:
        return "".join([self.itos[i] for i in input.tolist()])
  
c_tokenizer = CharTokenizer(tinyshakespeare_text)
print(c_tokenizer.encode("First Citizen:\nBefore"))
print(c_tokenizer.decode(c_tokenizer.encode("First Citizen:\nBefore")))
print(c_tokenizer.encode(tinyshakespeare_text)[:100])

tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43, 44,
        53, 56, 43], device='mps:0')
First Citizen:
Before
tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43, 44,
        53, 56, 43,  1, 61, 43,  1, 54, 56, 53, 41, 43, 43, 42,  1, 39, 52, 63,
         1, 44, 59, 56, 58, 46, 43, 56,  6,  1, 46, 43, 39, 56,  1, 51, 43,  1,
        57, 54, 43, 39, 49,  8,  0,  0, 13, 50, 50, 10,  0, 31, 54, 43, 39, 49,
         6,  1, 57, 54, 43, 39, 49,  8,  0,  0, 18, 47, 56, 57, 58,  1, 15, 47,
        58, 47, 64, 43, 52, 10,  0, 37, 53, 59], device='mps:0')


In [7]:
    
BATCH_SIZE = 10
SEQUENCE_LEN = 10
CHUNKS = len(tinyshakespeare_text)//SEQUENCE_LEN
print(BATCH_SIZE, SEQUENCE_LEN, CHUNKS)


10 10 111539


In [8]:
class TSDataset(Dataset):
    def __init__(self, doc: str, c_tokenizer: CharTokenizer, sequence_len: int):
        assert doc
        assert c_tokenizer is not None
        assert sequence_len > 0
        self.doc = doc
        self.tokenizer = c_tokenizer
        self.sequence_len = sequence_len

    def __len__(self):
        return len(self.doc) - self.sequence_len - 1
    
    def __getitem__(self, index: int) -> torch.tensor:
        """
        Extract the index-th feature and label.
            - x: feature tensor w/ shape of [C]
            - y: label tensor w/ shape of [1]
        """
        index = index % len(self)
        sub_str = self.doc[index:(index + self.sequence_len + 1)]
        assert len(sub_str) == self.sequence_len+1, f"sub_str: {len(sub_str)}, expected length: {self.sequence_len+1}"

        encoded_data = self.tokenizer.encode(sub_str)
        x = encoded_data[:-1]
        y = encoded_data[-1]
        return x, y
        
train_val_split = int(0.9*len(tinyshakespeare_text))
train_doc, val_doc = tinyshakespeare_text[:train_val_split], tinyshakespeare_text[train_val_split:]
train_dataset = TSDataset(doc=train_doc, c_tokenizer=c_tokenizer, sequence_len=SEQUENCE_LEN)
val_dataset = TSDataset(doc=val_doc, c_tokenizer=c_tokenizer, sequence_len=SEQUENCE_LEN)
print(len(train_dataset), len(val_dataset))

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True)


1003843 111529


In [9]:
token_encoding, label_encoding = train_dataset[0]
token_encoding = token_encoding.unsqueeze(0)
label_encoding = label_encoding.unsqueeze(0)
print(token_encoding.size(), label_encoding.size())

torch.Size([1, 10]) torch.Size([1])


In [21]:
"""
 Attention Module
  - Input: B, T, C
  - Config:
    - H: heads count
      - HS: head size
    - Emb: Embedding size
  - Output: B, T, C
  - Processes:
    - embeding((B, T)) -> (B, T, Emb)
    - Key: (B, T, Emb) -> Linear(Emb, HS) -> (B, T, HS)
    - Query: (B, T, Emb) -> Linear(Emb, HS) -> (B, T, HS)
    - Val: (B, T, Emb) -> Linear(Emb, HS) -> (B, T, HS)
"""

def apply_causal_mask(x: torch.Tensor) -> torch.Tensor:
    """
    x: (B, T, T)
     - each t-th only depending on the itself and before position
     - softmax => normalize to prob
    """
    B, T, T  = x.size()
    tri = torch.tril(torch.ones((T,T))).to(device=device)
    wei = x.masked_fill(tri == 0, float('-inf'))
    wei = F.softmax(wei, dim=-1)
    return wei


class SingleHeadSelfAttention(nn.Module):
    def __init__(self, emb: int, hs: int):
        super().__init__()
        self.hs = hs
        self.key = nn.Linear(emb, hs)
        self.query = nn.Linear(emb, hs)
        self.val = nn.Linear(emb, hs)

    def forward(self, x):
        # x: (B, T, Emb)
        key = self.key(x)     # (B, T, HS)
        query = self.query(x) # (B, T, HS)
        val = self.val(x)     # (B, T, HS)

        attention = key @ query.transpose(-2, -1) # (B, T, HS) @ (B, HS, T) -> (B, T, T)
        wei = apply_causal_mask(attention) # (B, T, T)
        return wei @ val # (B, T, HS)


class MultiHeadSelfAttention(nn.Module):
    def __init__(self, emb: int, hc: int):
        """
        - emb: embendding length
        - hc: head count
        """
        super().__init__()
        assert emb % hc == 0, f"invalid config: emb: {emb}, hc: {hc}"

        self.emb = emb
        self.hc = hc
        self.hs = emb // hc
        self.attention_heads = [SingleHeadSelfAttention(emb, self.hs) for _ in range(self.hc)]

    def forward(self, x: torch.tensor) -> torch.tensor:
        """
        x: (B, T, Emb)
        """
        B, T , Emb = x.size()
        x = torch.concat([attention_head(x) for attention_head in self.attention_heads], dim=-1) # (B, T, Emb)
        return x
        
class SelfAttentionBlck(nn.Module):
    def __init__(self, emb: int, hc: int):
        super().__init__()
        self.emb = emb
        self.hc = hc
        self.normal = nn.LayerNorm((emb))

        self.mult_head_att = MultiHeadSelfAttention(emb = emb, hc = hc)
        self.act = nn.ReLU()
        

    def forward(self, x: torch.tensor) -> torch.tensor:
        """
            x: (B, T, Emb)
        """
        x_copy = x.clone()
        x = self.normal(x)
        x = self.mult_head_att(x)
        x = x + x_copy
        x = self.act(x)
        return x
        



In [26]:
batch_size = 2
t = 10
emb = 6
hc = 2
hs = emb // hc
x = torch.randn((batch_size, t, emb), device=device)

# Test SingleHeadSelfAttention
s_att = SingleHeadSelfAttention(emb, hs)
s_att = s_att.to(device)
xs = s_att(x)
assert xs.size() == (batch_size, t, hs)

m_att = MultiHeadSelfAttention(emb, hc)
m_att = m_att.to(device=device)
xm = m_att(x)
assert xm.size() == (batch_size, t, emb)

# att_block = SelfAttentionBlck(emb, hc)
# xab = att_block(x)
# assert xab.size() == (batch_size, t, emb)

RuntimeError: Placeholder storage has not been allocated on MPS device!

In [11]:
t1 = torch.tensor([-1.5771,  1.7811,  0.3050, -0.9967,  0.0606,  0.4517], device=device)
t1m = x.mean(dim=-1, keepdim=True)
print(t1m.size())
t1var = x.var(dim=-1, keepdim=True)
print(t1var.size())
t2 = (x - t1m) / t1var**0.5
# x.size(), t1m.size(), t1var.size()
t2[0]


torch.Size([2, 10, 1])
torch.Size([2, 10, 1])


tensor([[ 0.5780, -0.4525,  1.4154, -1.2104,  0.5196, -0.8501],
        [ 0.1810, -1.4530,  1.4347, -0.1899,  0.6305, -0.6033],
        [ 1.1909, -0.3786, -0.9093, -1.2719,  0.8526,  0.5163],
        [-1.1317, -0.7515,  0.1696,  1.7515, -0.1886,  0.1507],
        [-1.6268,  0.7785, -0.4537,  0.2571, -0.1593,  1.2042],
        [-1.9338,  0.2833, -0.1019,  0.8543,  0.5845,  0.3135],
        [ 0.8787,  0.6413,  0.6532, -0.0726, -1.8179, -0.2827],
        [-1.7279,  0.7102, -0.4727,  0.4691,  1.0326, -0.0113],
        [-0.3499, -1.3411,  1.6147, -0.4759,  0.4916,  0.0606],
        [ 0.7046, -0.5334, -0.5810,  1.4024,  0.3470, -1.3395]])

In [12]:
class TinyGPT(nn.Module):
    """
    TinyGPT is a language model.
      - use multi-heads self-attention transformer encoding as backbone
      - Inputs
        - x: (B, T): encoded string token
        
      - Outputs:
        - y: (B, 1): string token to be decoded to string
      - Configs:
        - num_embeddings: vocabulary size
        - embedding_dim: embedding dimention
        - t: the length of the time sequence / context
        - mult_heads: number of the multi-heads
        - att_blocks: number of the attention blocks
        - multi_head_count: the multi-head count

    """
    def __init__(self, num_embeddings: int, embedding_dim: int, t: int, att_blocks: int, multi_head_count: int):
        super().__init__()
        self.embeddings = nn.Embedding(num_embeddings=num_embeddings, embedding_dim=embedding_dim)
        self.backbone = nn.Sequential(*[SelfAttentionBlck(emb=embedding_dim, hc=multi_head_count) for _ in range(att_blocks)])
        self.linear1 = nn.Linear(embedding_dim, 1024)
        self.act1 = nn.ReLU()
        self.linear2 = nn.Linear(1024, num_embeddings)

        self.loss_func = nn.NLLLoss()
        

    def forward(self, x: torch.tensor, y: torch.tensor=None) -> torch.tensor:
      """
        x: (B, T)
      """
      x = self.embeddings(x)  # (B, T) -> (B, T, Emb)
      x = self.backbone(x)    # (B, T, Emb) -> (B, T, Emb)
      x = self.linear1(x)     # (B, T, Emb) -> (B, T, 1024)
      x = self.act1(x)        # (B, T, 1024) -> (B, T, 1024)
      x = self.linear2(x)     # (B, T, 1024) -> (B, T, num_embeddings)

      if y is None:
        loss = None
      else: 
        # x = F.softmax(x, dim=-1)
        # print(f"x: {x.size()}")
        # print(f"y: {y.size()}")
        # loss = self.loss_func(x[:, -1, :], y)
        loss = F.cross_entropy(x[:, -1, :], y)

      return x, loss 


@torch.no_grad()
def generate(model: TinyGPT, max_len: int) -> str:
  pred = []
  model.eval()

  context = torch.randint(low=0, high=len(c_tokenizer.vocabulary)-1, size=(1, T))
  for _ in range(max_len):
    x, _ = model(context[:, -T:]) # x: (1, T, Emb)
    prob = F.softmax(x[0], dim=1) # prob: (T, Emb)
    # print(f"prob: {prob.size()}")
    predict_encod = torch.multinomial(prob[-1], num_samples=1)
    context = torch.cat([context, torch.unsqueeze(predict_encod, 0)], dim=1)
    # print(f"predict_encod: {predict_encod}")
    
    pred.append(int(predict_encod))
    # print(pred)
  print(c_tokenizer.decode(torch.tensor(pred, device=device)))

  model.train()



In [13]:
EMB_DIM = 32
T = 10
HC = 4
ATT_BLOCKS = 1
model = TinyGPT(num_embeddings=len(c_tokenizer.vocabulary), embedding_dim=EMB_DIM, t=T, att_blocks=ATT_BLOCKS, multi_head_count=HC)
model = model.to(device=device)

# x = torch.randint(low=0, high=len(c_tokenizer.vocabulary)-1, size = (BATCH_SIZE, T))
# pred, loss = model(x)
# print(pred.size(), pred[0][0])
model

TinyGPT(
  (embeddings): Embedding(65, 32)
  (backbone): Sequential(
    (0): SelfAttentionBlck(
      (normal): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
      (mult_head_att): MultiHeadSelfAttention()
      (act): ReLU()
    )
  )
  (linear1): Linear(in_features=32, out_features=1024, bias=True)
  (act1): ReLU()
  (linear2): Linear(in_features=1024, out_features=65, bias=True)
  (loss_func): NLLLoss()
)

In [14]:
generate(model, max_len=500)

't
dy3
QTytL;Difsk3nyEjI!ayMAO.f
j$a!iLogy&e3bfGwe'GcE&krrEVgo
!R IEDB&XLTS:UrO
'MfOM.xxq$Unt.O;RMUCzeI$ZvFS3,3$;cB,CpWEZ-RDFUy?QaxnWMHUSP3yMNWbbt.C&Y $lJyLm.mtlUt'ZGghGa&ZJH?l!kdgjO;FQyIb!lm&GaKQzw:qI:tZY:uuFvJWBeEKWccDGzk'o.$.;V&ra
GrVdw
 $Olgfe yiFysmkYRTfqw&qxM,jxkFSdITqOdy:r'ldX:nSyLJ:ZrtxGTVuYSnWZsTsyEXG Dz'Ggj'YG-r$JKDkToyLRaphI?U?xBRQrswT-Tvy:lLDfOOHFaCqk3;XuG-UOPsM&:$KNyu-vSQ$bMO3ntNo:be'L.bGWmy'in!
Ec,bijcC&F
Vd3PH;AjrR?JycajsRNcul i3tPQ
XunHG.B?k sH-is:ltqpng3WYEdZA,,f-WBsnWUSfTjo.cod


In [17]:

opt = torch.optim.AdamW(model.parameters(), lr=1e-5)

EPOCS = 1
for epoc in range(EPOCS):
    for idx, batch_data in enumerate(train_dataloader):
        x_batch, y_batch = batch_data
        # print(x_batch.size(), y_batch.size())
        pred, loss = model(x=x_batch, y=y_batch)

        if idx % 1000 == 0:
            print(f"loss: {loss}")
        
        opt.zero_grad()
        loss.backward()
        opt.step()



# for i in range(BATCH_SIZE):
#     context = c_tokenizer.decode(x_batch[i])
#     label = c_tokenizer.decode(y_batch[i])
#     print(context, "->", label)

RuntimeError: Placeholder storage has not been allocated on MPS device!

In [295]:
generate(model, max_len=100)

AttributeError: 'Tensor' object has no attribute 'iterm'

In [33]:
losses = [torch.tensor(i, dtype=float) for i in range(10)]
torch.mean(torch.tensor(losses))

tensor(4.5000, dtype=torch.float64)