<a href="https://colab.research.google.com/github/Brownwang0426/transformer-from-scratch/blob/main/train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 手刻簡單的文字生成 transformer

示範如何純粹用 pytorch 手刻一個簡單的文字生成 transformer \
並強化自己對於 transformer 的理解以及操作能力 \
如果有機會，未來會再增加 numpy 版本，並且使用 numpy 手刻 error back-propagation



# For colab

In [None]:
!git clone https://github.com/Brownwang0426/transformer-from-scratch.git

In [None]:
import os
os.chdir('/content/transformer-from-scratch')

In [None]:
!pip install torch dill datasets tqdm numpy IPython

# For local
CUDA Toolkit 11.8 \
cuDNN 8.9.x \
pip install torch==2.0.1 --extra-index-url https://download.pytorch.org/whl/cu118  
其餘套件可自行下載

# 導入官方套件

In [None]:
import numpy as np
import math
import re

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.utils.rnn as rnn_utils
from torch.utils.data import DataLoader, TensorDataset, Subset

import csv

import multiprocessing as mp
import os
import sys
import copy
import random
import gc
import time
from tqdm import tqdm
from collections import defaultdict

import itertools

import dill

import warnings
warnings.filterwarnings('ignore')

from datasets import load_dataset

import torch
from transformers import BertTokenizer, BertModel, AutoTokenizer, AutoModel, AutoModelForCausalLM
import numpy as np

from IPython.display import display, clear_output
import gc

# 導入客製化套件

In [None]:
from model import *

# 確認有無讀取到 cuda

In [None]:
if torch.cuda.is_available():
    for i in range(torch.cuda.device_count()):
        print(f"Device {i}: {torch.cuda.get_device_name(i)}")
    device_index = 0
    device = torch.device(f"cuda:{device_index}")
    print('using cuda...')
else:
    device = torch.device("cpu")
    print('using cpu...')

torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True

# 參數區域

In [None]:
# 其他可以用的有 squad  natural_questions persona_chat daily_dialog
source =  "knkarthick/samsum"

In [None]:

# 初始化 BERT tokenizer 和 vectorizer
tokenizer  = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
vectorizer = AutoModelForCausalLM.from_pretrained("gpt2", output_hidden_states=True).to(device)

# 每個樣本要看多長的字
max_length = 512





In [None]:

# 要進行幾回合的訓練
num_epochs = 10000000

# 每回合要隨機抽取多少 QA 並從中隨機抽取 tensor
sample_size = 100

# 訓練相關參數
batch_size = 1




In [None]:

# 是否讀取之前訓練的模型
retrain = True

# 模型相關參數
sequence_size = max_length
feature_size = vectorizer.config.hidden_size
output_size = tokenizer.vocab_size
num_layers = 6
num_heads = 8
hidden_activation = 'gelu'
output_activation = None
initializer = "xavier_normal"
optimizer = 'adamw'
loss = 'crossentropy'
bias = True
drop_rate = 0.1
alpha = 0.00001

# 存檔區域
model_directory = f'model.pth'
optimizer_directory = f'optimizer.pth'

In [None]:
# 機器回覆你的時候，要用多少個字（當沒有結束標記出現的時候）
response_length = 50

# 建立模型

In [None]:

# 建立模型
model = build_model(sequence_size,
                    feature_size,
                    output_size,
                    num_layers,
                    num_heads,
                    hidden_activation,
                    output_activation,
                    initializer,
                    optimizer,
                    loss,
                    bias,
                    drop_rate,
                    alpha)

# 將模型放到 cuda
model = model.to(device)

# 讀取參數
if retrain:
    try:
        model_dict = torch.load(model_directory)
        model.load_state_dict(model_dict)
        optimizer_dict = torch.load(optimizer_directory)
        model.optimizer.load_state_dict(optimizer_dict)
        print('Model loaded.')
    except:
        print('Model not loaded. Now using new model.')
        pass

In [None]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total params: {total_params / 1e6:.2f} million")

# 準備資料

In [None]:
# 定義 function
def create_dataset(input_vectors, input_ids, attention_masks, device):

    final_input  = []
    final_label  = []
    final_long   = []

    for i in tqdm(range(input_vectors.size(0))):

        # attention_mask = torch.zeros_like(attention_masks[i]).to(device)
        # attention_mask[:torch.sum(attention_masks[i]) ] = 1

        input = input_vectors[i] #* attention_mask.unsqueeze(1)
        final_input.append(input)
        
        label = torch.cat((input_ids[i][1:], torch.tensor([50256]).to(device)))  # 50256 is the end-of-text token for GPT-2
        final_label.append(label)

    final_input  = torch.stack(final_input , dim=0).to(device)
    final_label  = torch.stack(final_label , dim=0).to(device)

    return final_input, final_label

In [None]:

dataset = load_dataset(source, trust_remote_code=True)

sentences = []
for _ in tqdm(range(100)):
    for d in dataset["train"]:
        if d["dialogue"]:

            lines = d['dialogue'].split("\n")
            max_len = len(lines)
            random_start = np.random.randint(max_len - 1)
            lines = lines[random_start:random_start + 2]
            cleaned = [re.sub(r"^\s*[A-Z][a-z]+:\s*", "", line) for line in lines]
            dialog = " <|endoftext|> ".join(cleaned)
            # 用完整對話 + 摘要訓練
            text = f"{dialog}  <|endoftext|>"
            sentences.append(text)

In [None]:
print(f"Total samples: {len(sentences)}")

In [None]:
print(sentences[:5])

# 訓練模型

In [None]:



# 開始抽取資料
for epoch in range(num_epochs):




    # 隨機打亂 QA
    random.shuffle(sentences)

    # 將 QA tokenize 
    tokenized_sentences = tokenizer(sentences[:sample_size],
                                    add_special_tokens=False,
                                    padding='max_length',
                                    max_length=max_length,
                                    truncation=True,
                                    return_tensors="pt")

    # 將 QA vectorize
    input_ids           = tokenized_sentences['input_ids'].to(device)
    attention_masks     = tokenized_sentences['attention_mask'].to(device)
    with torch.no_grad():
        input_vectors   = vectorizer(input_ids).hidden_states[-1]
        input_vectors   = input_vectors.to(device)

    # 生成 tensor 訓練集
    final_input, final_label = create_dataset(input_vectors, input_ids, attention_masks, device)




    # 彙整 tensor 訓練集
    dataset         = TensorDataset(final_input, final_label)
    dataloader      = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # 訓練模型
    model.train()

    running_loss = 0.0

    # clear_output(wait=True)  # Clear the previous output
    
    for batch_idx, (input, label) in tqdm(enumerate(dataloader), total=len(dataloader), desc="Training Progress", ncols=100, unit="batch"):

        optimizer = model.optimizer
        optimizer.zero_grad()

        loss_function   = model.loss_function
        output          = model(input)
        loss            = loss_function(output.view(-1, output_size), label.view(-1))
        loss.backward()     # get grad

        optimizer.step()    # update params

        running_loss += loss.item()

    epoch_loss = running_loss / len(dataloader)
    print(f"Epoch [{epoch+1}/{num_epochs}] finished. Average Loss: {epoch_loss:.4f}")
    # display()  # Display the updated output

    model_dict     = model.state_dict()
    optimizer_dict = optimizer.state_dict()
    torch.save(model_dict, model_directory)
    torch.save(optimizer_dict, optimizer_directory)

    gc.collect()

# 來跟這個小模型用英文聊聊天吧

In [None]:
# 你要問的句子 
sentence = " Do you like me ? "
truncate = False
top_k = 500
temperature = 2.5
sentence += "<|endoftext|>"

In [None]:
def top_k_sampling(logits, k=50):
    # Get the top k values and their indices
    top_k_values, top_k_indices = torch.topk(logits, k)
    # Create a tensor of -inf values and place the top-k values at their indices
    top_k_logits = torch.full_like(logits, float('-inf'))
    top_k_logits.scatter_(dim=-1, index=top_k_indices, src=top_k_values)
    # Apply softmax to get probabilities
    probs = F.softmax(top_k_logits, dim=-1)
    return torch.multinomial(probs, 1).item()


In [None]:
def generate_response(prompt, max_new_tokens=50, top_k=20, temperature=0.8,
                      stop_tokens=['<|endoftext|>'], truncate=True):

    sentence = prompt
    response = ""

    for _ in range(max_new_tokens):
        # Tokenize prompt
        # Tokenize & vectorize
        tokenized = tokenizer(sentence, return_tensors="pt").to(device)
        max_len   = tokenized["input_ids"].size(1)

        tokenized = tokenizer(sentence, add_special_tokens=False, padding='max_length',
                              max_length=max_length, truncation=True, return_tensors="pt").to(device)

        with torch.no_grad():
            input_vector = vectorizer(tokenized["input_ids"]).hidden_states[-1]  # safer than hidden_states[-1]
            logits = model(input_vector)[0, max_len-1]  # last token logits

        # Sanitize logits
        logits = torch.nan_to_num(logits, nan=-1e20, posinf=1e20, neginf=-1e20)

        # Apply temperature
        logits /= max(temperature, 1e-5)

        # Top-k sampling
        top_k = min(top_k, logits.size(-1))
        topk_vals, topk_ids = torch.topk(logits, top_k)
        probs = F.softmax(topk_vals, dim=-1)

        if torch.isnan(probs).any() or torch.isinf(probs).any():
            probs = torch.ones_like(probs) / probs.size(-1)

        # Sample next token
        sampled = torch.multinomial(probs, 1)
        next_token_id = topk_ids[sampled].item()

        # Decode and append token
        next_word = tokenizer.decode([next_token_id], clean_up_tokenization_spaces=True).strip()
        if next_word in stop_tokens:
            print("\n[END]")
            break

        response += " " + next_word
        sentence += " " + next_word

        # Show progressive output
        print(response, end="\r", flush=True)

    print("\n[END]")
    return response.strip()


In [None]:

response = generate_response(sentence, max_new_tokens=response_length, top_k=top_k, temperature=temperature, truncate=truncate)

#  test

In [None]:
# 你要問的句子 
sentence = " Do you want to love me ? "
truncate = False
top_k = 150
temperature = 2.5
sentence += "<|endoftext|>"

In [None]:
def top_k_sampling(logits, k=50):
    # Get the top k values and their indices
    top_k_values, top_k_indices = torch.topk(logits, k)
    # Create a tensor of -inf values and place the top-k values at their indices
    top_k_logits = torch.full_like(logits, float('-inf'))
    top_k_logits.scatter_(dim=-1, index=top_k_indices, src=top_k_values)
    # Apply softmax to get probabilities
    probs = F.softmax(top_k_logits, dim=-1)
    return torch.multinomial(probs, 1).item()


In [1325]:


# 幫你的問題補上 [CSL] 以及 [SEP] ，讓機器可以知道問句的開始與結束

# 機器的回答
response = ''

# 開始遞迴
for i in range(response_length):

    # Tokenize & vectorize
    tokenized = tokenizer(sentence, return_tensors="pt").to(device)
    max_len   = tokenized["input_ids"].size(1)

    # 將 QA tokenize
    tokenized_sentence = tokenizer(sentence, add_special_tokens=False, padding='max_length', max_length=max_length, truncation=True, return_tensors="pt")

    # 將 QA vectorize
    input_id           = tokenized_sentence['input_ids'].to(device)
    attention_mask     = tokenized_sentence['attention_mask'].to(device)
    with torch.no_grad():
        input_vector   = vectorizer(input_id).hidden_states[-1] 
    input_vector = input_vector.to(device)

    # 將你的問題向量丟到模型去吧
    model.eval()
    last_logits                  = model(input_vector)[0, max_len-1] 
    last_logits = last_logits.to(device)
    # last_logits[tokenizer.pad_token_id] = -float("inf")
    # last_logits[tokenizer.pad_token_id] = -float("inf")
    # last_logits[tokenizer.sep_token_id] = -float("inf")

    # 取 top_k
    # values, topk_ids = torch.topk(last_logits, k=top_k)
    # word = tokenizer.convert_ids_to_tokens(topk_ids.tolist())
    # word = word[0]  # 取出第一個詞

    # # 選擇最高概率的詞 
    most_probable_token_idx = top_k_sampling(last_logits, k=top_k)
    word = tokenizer.decode([most_probable_token_idx]).strip()

    # # 將機器人吐出的那個字拼接回去
    # while word in ['[PAD]']:
    #     # output[0,  torch.sum(attention_mask) - 1,most_probable_token_idx] = float('-inf')
    #     most_probable_token_idx = top_k_sampling(last_logits, k=top_k)
    #     word = tokenizer.convert_ids_to_tokens(most_probable_token_idx)

    if truncate:
        if word not in ['<|endoftext|>']:
            sentence += ' ' + word
            response += ' ' + word
            clear_output(wait=True)  # Clear the previous output
            print(response, flush=False)
            display()  # Display the updated output
        else:
            print('[END]')
            break
    else:
        while word in ['<|endoftext|>']:
            # output[0,  torch.sum(attention_mask) - 1 ,most_probable_token_idx] = float('-inf')
            most_probable_token_idx = top_k_sampling(last_logits, k=top_k)
            word = tokenizer.decode([most_probable_token_idx]).strip()
        sentence += ' ' + word
        response += ' ' + word
        clear_output(wait=True)  # Clear the previous output
        print(response, flush=False)
        display()  # Display the updated output
    time.sleep(0.1)


 i know anything ?                                 :          �   


In [1326]:
# from google.colab import drive
# drive.mount('/content/drive')
# model_save_path = '/content/drive/My Drive/model.pth'
# torch.save(model.state_dict(), model_save_path)
