### Statistical Learning for Data Science 2 (229352)
#### Instructor: Donlapark Ponnoprat

#### [Course website](https://donlapark.pages.dev/229352/)

## Lab #12

### Note: Debug with CPU first, then run the whole notebook with GPU

In [None]:
!pip install pythainlp
!wget http://www.donlapark.cmustat.com/229352/thai_lyrics.tar.xz
!tar xf thai_lyrics.tar.xz

In [None]:
from collections import Counter
import csv
from itertools import chain
import numpy as np
import pandas as pd

import math
import torch
import torch.nn as nn
from torch.nn import functional as F
import torch.optim as optim
from torch.utils.data import DataLoader

from pythainlp import word_tokenize

# GPT for song lyrics generation

In [None]:
df = pd.read_csv('thai_lyrics.csv', engine='python')
df.tail()

### Set hyperparameters

In [None]:
# Hyperparameters
LEARNING_RATE = 0.0003
BATCH_SIZE = 12  # 128 for GPU
NUM_EPOCHS = 5
max_len = 64  # 128 for GPU  # Max sequence length
d_model = 32  # 128  # Model dimensionality
num_heads = 4       # Number of attention heads
num_layers = 4  # 6 for GPU  # Number of transformer blocks
hidden_dim = 128  # 512 for GPU  # Hidden dimension in feedforward network
dropout_rate = 0.1
device = 'cuda' if torch.cuda.is_available() else 'cpu'

### Convert from words to numbers

In [None]:
#[[song , number , one],[song , number , two]] -> [song , number , one , song , number , two]
def flatten(ls):
    """
    Flatten list of list
    """
    return list(chain.from_iterable(ls))

#[song , number ,one, number, two] -> [1,2,3,2,4] and [1,2,3] -> [song , number , one]
def create_lookup_dict(tokenized_lyrics, n_min=None):
    """
    Create lookup dictionary from list of words (lyrics)
    """
    word_counts = Counter(tokenized_lyrics)
    sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    if n_min is not None:
        sorted_vocab = {k: v for k, v in word_counts.items() if v >= n_min}
    vocab_to_int = {word: i for i, word in enumerate(sorted_vocab, 0)}
    int_to_vocab = {i: word for word, i in vocab_to_int.items()}
    return (vocab_to_int, int_to_vocab)

In [None]:
df = df.iloc[:10, :]  # df.iloc[:1000, :] for GPU
tokenized_lyrics = df['lyrics'].map(word_tokenize)
tokenized_lyrics = flatten(tokenized_lyrics)
tokenized_lyrics = [token if token != '\n' else ' ' for token in tokenized_lyrics]
word_counts = Counter(tokenized_lyrics)
vocab_to_int, int_to_vocab = create_lookup_dict(tokenized_lyrics, n_min=None)
vocab_size = len(vocab_to_int)  # number of words in lyrics corpus
print(vocab_size)

### Create Features (previous 50 words) and Target (Word 1-51)

In [None]:
tokenized_indices = [vocab_to_int.get(token, 0) for token in tokenized_lyrics]

X, target = [], []
for n in range(0, len(tokenized_indices) - max_len, 1):
  x = tokenized_indices[n: n + max_len]
  y = tokenized_indices[n + 1: n + max_len + 1]  # output length = input length
  X.append(np.array(x))
  target.append(y)
X = np.array(X)
target = np.array(target)

In [None]:
class MyDataSet(torch.utils.data.Dataset):
  def __init__(self, X, y):
    super(MyDataSet, self).__init__()
    self._X = X
    self._y = y

  def __len__(self):
    return self._X.shape[0]

  def __getitem__(self, index):
    X = self._X[index]
    y = self._y[index]
    return X, y

In [None]:
dataset = MyDataSet(X, target)

trainloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

## Transformer decoder

<center><img src="https://donlapark.pages.dev/229352/Full-GPT-arch.png" alt="GPT" width="700"/></center>

### Exercise: Fill in the code blocks with `TODO ` tag in order to complete the GPT model.

### Positional encoding

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, max_len, d_model):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)  # Apply sin to even indices (2i)
        pe[:, 1::2] = torch.cos(position * div_term)  # Apply cos to odd indices (2i+1)
        self.register_buffer('pe', pe.unsqueeze(0))  # Shape: (1, max_len, d_model)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :]

### Self-Attention (`TODO:` Complete the self-attention block)

<center><img src="https://donlapark.pages.dev/229352/self-attention-matrix-calculation.png" alt="GPT" width="600"/></center>


#### Masked attention

<center><img src="https://donlapark.pages.dev/229352/masked-attention.png" alt="GPT" width="500"/></center>

In [None]:
T = 5

# Add this to your QKᵀ matrix *before* Softmax
torch.triu(torch.full((T, T), float("-inf")).to(device), diagonal=1)

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout_rate):
        super(SelfAttention, self).__init__()
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        # Linear projections for query, key, and value
        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)

        self.att_dropout = nn.Dropout(dropout_rate)
        self.out_proj = nn.Linear(d_model, d_model)

    def forward(self, x):
        B, T, C = x.size()  # Batch size, Time (seq length), Embedding size (d_model)

        # Linear projections
        q = self.query(x)
        k = #
        v = #

        # Split into multiple heads (C = num_heads * head_dim)
        q = q.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)  # [B, num_heads, T, head_dim]
        print(q.shape)
        k = #
        v = #

        """
        Calculate the self-attention with these steps:
        1. Q @ Kᵀ: (B, num_heads, T, head_dim) @ (B, num_heads, head_dim, T)ᵀ -> (B, num_heads, T, T)
        2. Normalize with sqrt(head_dim) (or head_dim**0.5)
        3. Add attention mask
        4. Apply Softmax
        5. Apply Dropout
        6. Multiply by V (use @): (B, num_heads, T, T) @ (B, num_heads, T, head_dim) -> (B, num_heads, T, head_dim)
        7. Transpose then Reshape to (B, T, num_heads*head_dim)
        8. Apply the final linear layer (self.out_proj)
        Note: If you got `RuntimeError: input is not contiguous`,
              call the `.contiguous()` method **after transposing** your output.
        """
        return out

### Feedforward block

<center><img src="https://donlapark.pages.dev/229352/Feedforward-block.png" alt="GPT" width="400"/></center>

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model, hidden_dim):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, d_model)
        self.gelu = nn.GELU()  # Activation function

    def forward(self, x):
        return self.fc2(self.gelu(self.fc1(x)))

### Transformer Decoder block (`TODO:` Fill in code)

<center><img src="https://donlapark.pages.dev/229352/GPT-Decoder.png" alt="GPT" width="400"/></center>

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, hidden_dim, dropout_rate):
        super(TransformerBlock, self).__init__()
        self.attention = SelfAttention(d_model, num_heads, , dropout_rate)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.ln1 = nn.LayerNorm(d_model)
        self.feedforward = FeedForward(d_model, hidden_dim)
        self.dropout2 = nn.Dropout(dropout_rate)
        self.ln2 = nn.LayerNorm(d_model)

    def forward(self, x):
        #TODO: Your code here
        return out

### GPT

<center><img src="https://donlapark.pages.dev/229352/GPT.png" alt="GPT" width="150"/></center>

**Note:** The `CrossEntropyLoss` requires output shape = `(batch_size, vocab_size, seq_length)`. Make sure that your output matches this shape!

In [None]:
class GPT(nn.Module):
    def __init__(self, vocab_size, max_len, d_model, num_heads, num_layers, hidden_dim, dropout_rate):
        super(GPT, self).__init__()
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(max_len, d_model)
        self.dropout = nn.Dropout(dropout_rate)
        self.blocks = nn.ModuleList([
            TransformerBlock(d_model, num_heads, hidden_dim, dropout_rate) for _ in range(num_layers)
        ])
        self.ln_f = nn.LayerNorm(d_model)  # Final layer normalization
        self.head = nn.Linear(d_model, vocab_size, bias=False)  # Output layer (vocab_size classes)

    def forward(self, x):
        #TODO: Your code here
        return out

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

model = GPT(
    vocab_size,
    max_len,
    d_model,
    num_heads,
    num_layers,
    hidden_dim,
    dropout_rate
    ).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

### Exercise 2: fill in the code below

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        X = X.to(device)
        y = y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def generate(model, start_word, pad_value=0, predict_len=200):
    # Tokenize the input sentence
    words = word_tokenize(start_word)
    start_word_ids = []
    # List to store the predictions
    predicted = words

    # Words -> Integers
    word_ids = [vocab_to_int.get(word, pad_value) for word in words]

    #[28,15] -> [0,0,28,15]
    current_seq = [np.pad(word_ids, (max_len - len(word_ids) - 1, pad_value), 'constant')]

    for _ in range(predict_len):
        current_seq = torch.LongTensor(np.array(current_seq)).to(device)
        # get the next word probabilities
        p = model(current_seq)[:, :, -1]
        p = nn.Softmax(dim=1)(p).cpu().detach().numpy()
        # p = [[0.1,0.2,0.05,0.03,0.02,0.3,0.2,0.1]]
        p = p[0]
        # p = [0.1,0.2,0.05,0.03,0.02,0.3,0.2,0.1]

        # Sample from probability distribution p
        word_i = np.random.choice(np.arange(0, p.shape[0]), p=p)
        predicted.append(int_to_vocab[word_i])

        # the generated word becomes the next "current sequence" and the cycle can continue
        current_seq = current_seq.cpu().detach().numpy()
        current_seq = np.roll(current_seq, -1, 1)
        current_seq[-1][-1] = word_i
    gen_sentences = ''.join(predicted)
    return gen_sentences

### Exercise 3: use `generate` function to generate new text for 10 epochs.

In [None]:
pad_int = vocab_to_int[' ']

for t in range(NUM_EPOCHS):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(trainloader, model, loss_fn, optimizer)
    with torch.no_grad():
      print(generate(model, 'ฉันก็',
                     pad_value=pad_int, predict_len=200))
print("Done!")

## Extra: Using Transformers Library

Transformers Documentations: https://huggingface.co/docs/transformers/index

### Sequence Classification

In [None]:
from transformers import pipeline

classifier = pipeline(task="sentiment-analysis",
                      model="distilbert-base-uncased-finetuned-sst-2-english")

In [None]:
classifier("I love to hate you")

### A closer look: Tokenization + Classification

#### Load tokenizer

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

model_name = "distilbert-base-uncased-finetuned-sst-2-english"

tokenizer = AutoTokenizer.from_pretrained(model_name)

#### Tokenize

In [None]:
text = "I love you"

tokens = tokenizer.tokenize(text)

tokens

#### Convert tokens to ids

In [None]:
sentence = tokenizer.convert_tokens_to_ids(tokens)

sentence

#### Convert from sentence to ids directly

In [None]:
sentence = tokenizer(text,  return_tensors="pt")

sentence

#### Use the model to classify on the input ids

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(model_name)

model(**sentence).logits