In [None]:
import re
import torch
import torch.nn as nn
import numpy as np
import math
from dataclasses import dataclass
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from transformers import AutoTokenizer

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
@dataclass(frozen=True)
class Config:
    text_path = 'dataset/sherlock-holm.es_stories_plain-text_advs.txt'
    max_length = 128
    stride = 4
    batch_size = 128

cfg = Config()

In [5]:
# 전처리 및 토크나이즈
with open(cfg.text_path, 'r', encoding='utf-8') as f:
    raw_text = f.read()

# 줄 앞뒤 공백 제거 및 연속된 빈 줄 1개로 정리
raw_text = re.sub(r'\s+\n', '\n', raw_text)
raw_text = re.sub(r'^\s+', '', raw_text, flags=re.MULTILINE)
raw_text = re.sub(r'\n{2,}', '\n\n', raw_text)

# 제목 삭제
raw_text = re.sub(r'THE ADVEN.*\n', '', raw_text, flags=re.MULTILINE)

# 목차나 저자 등 메타데이터 제거
raw_text = re.sub(r'Arthur Conan Doyle', '', raw_text, flags=re.IGNORECASE)
raw_text = re.sub(r'\bTable of contents\b.*?(?=CHAPTER I)', '', raw_text, flags=re.DOTALL|re.IGNORECASE)
raw_text = re.sub(r'chapter ([0~9]|[ivx])', '', raw_text, flags=re.IGNORECASE)
raw_text = re.sub(r'----.*', '', raw_text, flags=re.DOTALL)


# 특수문자, 과도한 빈칸 정리
raw_text = re.sub(r'[“”]', '"', raw_text)
raw_text = re.sub(r"[‘’]", "'", raw_text)
raw_text = re.sub(r' +', ' ', raw_text)

# 앞뒤 전체 공백 제거
raw_text = raw_text.strip()

# 전처리된 텍스트 결과
preprocessed_text = raw_text
print(f"preprocessed text length: {len(preprocessed_text)}, words: {len(preprocessed_text.split())}")

# 토크나이저 초기화
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

# 토크나이즈
tokenized_text = tokenizer.encode(preprocessed_text)
print(len(tokenized_text))

preprocessed text length: 558453, words: 104321


Token indices sequence length is longer than the specified maximum sequence length for this model (133741 > 512). Running this sequence through the model will result in indexing errors


133741


In [6]:
class MyDataset(Dataset) :
    def __init__(self, tokenized_text, cfg) :
        self.tokenized_text = tokenized_text
        self.cfg = cfg
        
    def __len__(self) :
        return len(self.tokenized_text)
    
    def __getitem__(self, idx) :
        input_ids = []
        labels = []
        
        for i in range(0, len(self.tokenized_text) - self.cfg.max_length, self.cfg.stride) :
            input_ids.append(self.tokenized_text[i:i+self.cfg.max_length])
            labels.append(self.tokenized_text[i+1:i+self.cfg.max_length+1])
        
        input_ids = torch.tensor(input_ids)
        labels = torch.tensor(labels)
        
        return input_ids, labels
    
train_dataset = MyDataset(tokenized_text, cfg)
train_loader = DataLoader(train_dataset, batch_size=cfg.batch_size, shuffle=True)

모델 참조 : https://www.manning.com/books/build-a-large-language-model-from-scratch

In [9]:
VOCAB_SIZE = len(tokenizer.vocab)
print(VOCAB_SIZE)
#VOCAB_SIZE = len(tokenizer) # AutoTokenizer
CONTEXT_LENGTH = 128  # Shortened context length (orig: 1024)
E = 768  # Embedding dimension
H = 12  # Number of attention heads
NUM_LAYERS = 12  # Number of layers
DROP_RATE = 0.1  # Dropout rate

30522


In [None]:
class MultiheadAttention(nn.Module) :
    def __init__(self, din, dout) :
        super().__init__()
        
        assert dout % H == 0, "dout must be devided by NUM_HEADS"
        
        self.din = din
        self.dout = dout
        self.D = dout // H
        
        self.W_Q = nn.Linear(din, dout)
        self.W_K = nn.Linear(din, dout)
        self.W_V = nn.Linear(din, dout)
        self.out_proj = nn.Linear(dout, dout)
        self.dropout = nn.Dropout(p=DROP_RATE)
        self.register_buffer('mask', torch.triu(torch.ones(CONTEXT_LENGTH, CONTEXT_LENGTH), diagonal=1)) # Causal Attn을 위한 Mask
        
    def forward(self, x) :
        B, L, E = x.shape
        D = self.D
        
        # Query, Key, Value Matrix 생성 (B, L, E)
        Q = self.W_Q(x)
        K = self.W_K(x)
        V = self.W_V(x)
        
        # Multihead 반영 (B, L, E) -> (B, L, H, D) -> (B, H, L, D)
        Q = Q.view(B, L, H, D).transpose(1, 2)
        K = K.view(B, L, H, D).transpose(1, 2)
        V = V.view(B, L, H ,D).transpose(1, 2)
        
        # Attention Score 구하기
        attn_scores = Q @ K.transpose(2, 3) # (B, H, L, L)
        attn_scores.masked_fill(self.mask[:L, :L] == 1, float('-inf'))
        attn_scores = attn_scores / (D ** 0.5)
        
        # Attention Weight 구하기
        attn_weights = torch.softmax(attn_scores, dim=-1)
        attn_weights = self.dropout(attn_weights)
        
        # Context Vector 구하기
        CV = attn_weights @ V # (B, H, L, D)
        CV = CV.reshape(B, L, E)
        CV = self.out_proj(CV)
        
        return CV