In [2]:
import json
import re
import os
from typing import List, Dict, Tuple
from multiprocessing import Pool
from tqdm import tqdm

In [3]:
class BytePairTokenizer:
    def __init__(self, data_path:str=None) -> None:
        """
        BytePairTokenizer object
        """
        if data_path:
            self.load_model(data_path)
            return
        
        self.special_tokens:Dict[str, int] = {
            '<BOT>': 0,  # Beginning of Text
            '<EOT>': 1,   # End of Text
            '</w>': 2     # end of word
        }
        self.inv_special_tokens:Dict[int, str] = {i: t for t, i in self.special_tokens.items()}

        self.token_map: Dict[str, int] = self.special_tokens.copy()
        self.inv_map: Dict[int, str] = self.inv_special_tokens.copy()
        self.bpe_codes: Dict[Tuple[str, str], int] = {}
    
    def train(self, corpus: List[str], num_merges: int, verbose:bool = False) -> None:
        """
        Train the Byte Pair Tokenizer to process sentences.
        """
        # Build the vocabulary: map token sequences to their frequencies
        vocab = {}
        if verbose:
            print("Building vocabulary...")
        for sentence in tqdm(corpus):
            # Split sentence into words with leading whitespace preserved
            words = re.findall(r'\s*\S+|\s+', sentence)
            for word in words:
                # Skip special tokens
                if word in self.special_tokens.keys():
                    continue
                chars = list(word) + ['</w>']
                word_tuple = tuple(chars)
                vocab[word_tuple] = vocab.get(word_tuple, 0) + 1
        
        if verbose:
            print("Vocabulary built.\nTraining BPE...")
        token_id = len(self.token_map)  # Starting token ID
        symbols = set()
        for word_tuple in vocab.keys():
            symbols.update(word_tuple)
        for symbol in symbols:
            if symbol not in self.token_map:
                self.token_map[symbol] = token_id
                token_id += 1
        self.inv_map = {i: t for t, i in self.token_map.items()}
        
        if verbose:
            print("Token map built.\nMerging tokens...")
        # Perform BPE merges
        for i in tqdm(range(num_merges)):
            pairs = self._get_pair_counts(vocab)
            if not pairs:
                break
            best_pair = max(pairs, key=pairs.get)
            vocab = self._merge_vocab(best_pair, vocab)
            self.bpe_codes[best_pair] = i # Record the BPE merge rule
            new_symbol = ''.join(best_pair)
            if new_symbol not in self.token_map:
                self.token_map[new_symbol] = token_id
                token_id += 1
                self.inv_map[self.token_map[new_symbol]] = new_symbol
    
    def _get_pair_counts(self, vocab: Dict[Tuple[str], int]) -> Dict[Tuple[str, str], int]:
        """
        Get counts of symbol pairs in the vocabulary
        """
        pairs = {}
        for word, freq in vocab.items():
            symbols = word
            for i in range(len(symbols) - 1):
                pair = (symbols[i], symbols[i + 1])
                pairs[pair] = pairs.get(pair, 0) + freq
        return pairs
    
    def _merge_vocab_single(self, pair: Tuple[str, str], vocab: Dict[Tuple[str], int]) -> Dict[Tuple[str], int]:
        """
        Merge all occurrences of the given pair in the vocabulary
        """
        new_vocab = {}
        bigram = ''.join(pair)
        for word, freq in vocab.items():
            w = []
            i = 0
            while i < len(word):
                # Merge the pair if found
                if i < len(word) - 1 and word[i] == pair[0] and word[i + 1] == pair[1]:
                    w.append(bigram)
                    i += 2
                else:
                    w.append(word[i])
                    i += 1
            new_vocab[tuple(w)] = freq
        return new_vocab

    @staticmethod
    def _process_word(args):
        pair, word_freq = args
        word, freq = word_freq
        bigram = ''.join(pair)
        w = []
        i = 0
        while i < len(word):
            if i < len(word) - 1 and word[i] == pair[0] and word[i + 1] == pair[1]:
                w.append(bigram)
                i += 2
            else:
                w.append(word[i])
                i += 1
        return tuple(w), freq
    
    def _merge_vocab(self, pair: Tuple[str, str], vocab: Dict[Tuple[str], int]) -> Dict[Tuple[str], int]:
        """
        Parallel merge of all occurrences of the given pair in the vocabulary using multiprocessing.
        """
        with Pool() as pool:
            results = pool.map(self._process_word, [(pair, word_freq) for word_freq in vocab.items()])

        new_vocab = {word: freq for word, freq in results}
        return new_vocab
    
    def _get_pairs(self, word: List[str]) -> set:
        """
        Return a set of symbol pairs in a word
        """
        pairs = set()
        for i in range(len(word) - 1):
            pairs.add((word[i], word[i + 1]))
        return pairs
    
    def _apply_bpe(self, word: List[str]) -> List[str]:
        """
        Apply BPE to a list of symbols (a word)
        """
        word = word.copy()
        pairs = self._get_pairs(word)
        while True:
            if not pairs:
                break
            # Find the highest priority pair to merge
            min_pair = None
            min_rank = float('inf')
            for pair in pairs:
                if pair in self.bpe_codes:
                    rank = self.bpe_codes[pair]
                    if rank < min_rank:
                        min_rank = rank
                        min_pair = pair
            if min_pair is None:
                break
            # Merge the best pair
            new_symbol = ''.join(min_pair)
            i = 0
            while i < len(word) - 1:
                if word[i] == min_pair[0] and word[i + 1] == min_pair[1]:
                    word[i:i + 2] = [new_symbol]
                    i = max(i - 1, 0)  # Restart from the previous position after a merge
                else:
                    i += 1
            pairs = self._get_pairs(word)
        return word
    
    def split_text(self, text: str) -> List[str]:
        """
        Split text into BPE tokens with leading whitespace preserved
        """
        tokens = []
        words = re.findall(r'\s*\S+|\s+', text)
        for word in words:
            chars = list(word) + ['</w>']
            bpe_word = self._apply_bpe(chars)
            tokens.extend(bpe_word)
        return tokens
    
    def encode(self, data: str) -> List[int]:
        """
        Encode text data into a list of token IDs
        """
        str_list = self.split_text(data)
        token_list = [self.token_map[tok] for tok in str_list]
        return token_list
    
    def decode(self, data: List[int]) -> str:
        """
        Decode a list of token IDs back into text
        """
        tokens = [self.inv_map[i] for i in data]
        text = ''
        for token in tokens:
            if token != '</w>':
                text += token.replace('</w>', '')
        return text

    def save_model(self, target_path:str) -> None:
        """
        Save the model to a file as json file
        the json will look like
        {
            token_map : {...},
            bpe_codes : {...}
        }
        The special tokens are not necessary for simple encoding/decoding
        hence it is omitted from the model
        """
        with open(target_path, 'w', encoding="UTF-8") as f:
            json.dump({
                'token_map': self.token_map,
                'bpe_codes': {json.dumps(list(k)): v for k, v in self.bpe_codes.items()}
            }, f,
             indent=4,
              ensure_ascii=False)
    
    def load_model(self, model_path:str, encoding="UTF-8") -> None:
        """
        Load the model from a json file
        JSON doesn't allow tuple object as key
        hence the tuple keys are converted to string before saving
        and converted back to tuple when loading
        """
        with open(model_path, 'r') as f:
            model = json.load(f)
        self.token_map = model['token_map']
        self.inv_map = {i: t for t, i in self.token_map.items()}
        self.bpe_codes = {tuple(json.loads(k)): v for k, v in model['bpe_codes'].items()}

def load_tokenizer(path:str = None) -> BytePairTokenizer:
    """
    Load the BytePairTokenizer model from the model folder
    """
    if path is None:
        model_path:str = os.path.join(os.getcwd(), 'model', 'tokenizer.json')
    else:
        model_path:str = path
    tokenizer = BytePairTokenizer(model_path)
    # tokenizer.load_model(model_path)
    return tokenizer

In [66]:
# Test the BytePairTokenizer
tokenizer = load_tokenizer()
text = 'Sean Bean has a hard time leaving his role as Eddard Stark . He vows to get revenge against those that assisted in his execution , starting with George R. R. Martin'
encoded = tokenizer.encode(text)
print(f"Encoded: {encoded}")
decoded = tokenizer.decode(encoded)
print(f"Decoded: {decoded}")

Encoded: [1777, 4313, 2964, 4313, 3279, 2804, 3914, 3066, 4889, 2871, 10120, 2896, 3070, 3399, 3182, 3474, 5091, 2765, 2963, 3001, 3580, 2796, 3181, 10557, 3698, 3496, 2854, 3874, 4855, 2837, 2871, 7153, 5263, 2772, 5468, 2893, 7311, 3175, 2764, 3175, 2764, 10580]
Decoded: Sean Bean has a hard time leaving his role as Eddard Stark . He vows to get revenge against those that assisted in his execution , starting with George R. R. Martin


In [5]:
import torch
from torch import tensor, Tensor

if torch.cuda.is_available():
    device = torch.device('cuda')
    torch.set_default_device(device)
    print(f"Using {torch.cuda.get_device_name()}")
else:
    device = torch.device('cpu')
    torch.set_default_device(device)
    print("Using CPU")

Using CPU


In [67]:
vocab_size:int = len(tokenizer.token_map)
embedding_dim:int = 1536

print(f"Vocab size: {vocab_size}")

Vocab size: 10948


In [7]:
class SimpleLinear:
    def __init__(self, input_size: int, output_size: int) -> None:
        """
        Args:
            input_size (int): 입력 피처의 크기
            output_size (int): 출력 피처의 크기
        """
        self.input_size: int = input_size
        self.output_size: int = output_size
        self.weights: Tensor = torch.rand(input_size, output_size) # 가중치 랜덤 초기화

    def forward(self, inputs: Tensor) -> Tensor:
        """
        입력에 가중치를 단순 행렬곱하여 출력

        Args:
            inputs (Tensor): 입력 텐서 [batch_size, input_size]

        Returns:
            Tensor: 출력 텐서 [batch_size, output_size]
        """
        self.inputs: Tensor = inputs
        self.output: Tensor = torch.mm(inputs, self.weights) # 단순 행렬곱
        return self.output

    def backward(self, grad_output: Tensor) -> Tensor:
        """
        손실 함수 그래디언트 이전 층으로 전달 및 가중치 그래디언트 계산

        Args:
            grad_output (Tensor): 상위 레이어로부터 전달된 그래디언트 [batch_size, output_size]

        Returns:
            Tensor: 하위 레이어로 전달할 그래디언트 [batch_size, input_size]
        """

        grad_input: Tensor = torch.mm(grad_output, self.weights.t()) # 단순 행렬곱
        self.grad_weights: Tensor = torch.mm(self.inputs.t(), grad_output)
        return grad_input

class Embedding:
    def __init__(self, input_dim: int, output_dim: int) -> None:
        """
        Custom Embedding 레이어 초기화

        Args:
            input_dim (int): 임베딩할 인덱스의 개수 (예: 단어 집합의 크기)
            output_dim (int): 임베딩 벡터의 차원
        """
        self.input_dim = input_dim
        self.output_dim = output_dim
        # 임베딩 매트릭스를 학습 가능한 파라미터로 초기화
        self.weights: Tensor = torch.randn(input_dim, output_dim) * 0.01
        self.grad_weights: Tensor = torch.zeros_like(self.weights)

    def forward(self, input_indices: Tensor) -> Tensor:
        """
        순전파 과정

        Args:
            input_indices (Tensor): 정수 인덱스 텐서 (예: [batch_size, sequence_length])

        Returns:
            Tensor: 임베딩된 벡터 텐서 (예: [batch_size, sequence_length, output_dim])
        """
        self.input_indices = input_indices
        # 인덱스를 사용하여 임베딩 벡터 선택
        self.output = self.weights[input_indices]
        return self.output

    def backward(self, grad_output: Tensor) -> Tensor:
        """
        역전파 과정

        Args:
            grad_output (Tensor): 상위 레이어로부터 전달된 그래디언트 (예: [batch_size, sequence_length, output_dim])

        Returns:
            Tensor: 하위 레이어로 전달할 그래디언트 (임베딩 레이어의 경우 없음)
        """
        # grad_output의 형태: [batch_size, sequence_length, output_dim]
        # 이를 [batch_size * sequence_length, output_dim]로 평탄화
        grad_flat = grad_output.view(-1, self.output_dim)
        # input_indices를 평탄화하여 [batch_size * sequence_length] 형태로 
        input_flat = self.input_indices.view(-1)
        
        # 그래디언트를 초기화
        self.grad_weights.zero_()
        # 그래디언트 누적
        self.grad_weights.index_add_(0, input_flat, grad_flat)
        
        return None

    def __str__(self) -> str:
        return "CustomEmbedding"

class PositionalEncoding:
    def __init__(self, max_seq_len: int, embed_size: int):
        """
        위치 인코딩 초기화

        Args:
            max_seq_len (int): 최대 시퀀스 길이
            embed_size (int): 임베딩 차원
        """
        self.embed_size = embed_size
        self.pos_encoding = torch.zeros(max_seq_len, embed_size)

        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_size, 2) * (-torch.log(torch.tensor(10000.0)) / embed_size))
        self.pos_encoding[:, 0::2] = torch.sin(position * div_term)
        self.pos_encoding[:, 1::2] = torch.cos(position * div_term)
        self.pos_encoding = self.pos_encoding.unsqueeze(0)  # Add batch dimension

    def forward(self, x: Tensor) -> Tensor:
        """
        순전파 과정

        Args:
            x (Tensor): 임베딩된 입력 텐서 [seq_length, embed_size]

        Returns:
            Tensor: 위치 인코딩이 추가된 텐서 [seq_length, embed_size]
        """
        seq_length, embed_size = x.shape

        # Ensure positional encoding matches input size
        pos_encoding = self.pos_encoding[:, :seq_length, :]  # Slice for the current sequence length

        return x + pos_encoding.to(x.device)  # Add positional encoding to the input tensor

In [42]:
# create embedding and positional encoding layers
embedding = Embedding(vocab_size, embedding_dim)
pos_encoding = PositionalEncoding(len(encoded), embedding_dim)


In [68]:
# Perform forward pass
print(f"Input Tokens: {encoded}")
embedded = embedding.forward(tensor(encoded))
print(f"Embedded: {embedded.shape}")
pos_encoded = pos_encoding.forward(embedded)
print(f"Positional Encoding Applied: {pos_encoded.shape}")


Input Tokens: [1777, 4313, 2964, 4313, 3279, 2804, 3914, 3066, 4889, 2871, 10120, 2896, 3070, 3399, 3182, 3474, 5091, 2765, 2963, 3001, 3580, 2796, 3181, 10557, 3698, 3496, 2854, 3874, 4855, 2837, 2871, 7153, 5263, 2772, 5468, 2893, 7311, 3175, 2764, 3175, 2764, 10580]
Embedded: torch.Size([42, 1536])
Positional Encoding Applied: torch.Size([1, 42, 1536])


In [69]:
pos_encoded = pos_encoded[0, :, :]
pos_encoded.size()

torch.Size([42, 1536])

In [70]:
pos_encoded.size()

torch.Size([42, 1536])

In [71]:
W_q = SimpleLinear(embedding_dim, embedding_dim)    
W_k = SimpleLinear(embedding_dim, embedding_dim)
W_v = SimpleLinear(embedding_dim, embedding_dim)

Q = W_q.forward(pos_encoded)
K = W_k.forward(pos_encoded)
V = W_v.forward(pos_encoded)

print(f"Q: {Q.shape}")
print(f"K: {K.shape}")
print(f"V: {V.shape}")

Q: torch.Size([42, 1536])
K: torch.Size([42, 1536])
V: torch.Size([42, 1536])


In [72]:
score = torch.mm(Q, K.t())
print(f"Score: {score.shape}")
print(score)

Score: torch.Size([42, 42])
tensor([[2.2644e+08, 2.4381e+08, 2.4491e+08,  ..., 1.6132e+08, 1.6079e+08,
         1.5981e+08],
        [2.4381e+08, 2.6251e+08, 2.6369e+08,  ..., 1.7370e+08, 1.7313e+08,
         1.7207e+08],
        [2.4498e+08, 2.6377e+08, 2.6496e+08,  ..., 1.7454e+08, 1.7396e+08,
         1.7290e+08],
        ...,
        [1.6115e+08, 1.7351e+08, 1.7429e+08,  ..., 1.1481e+08, 1.1443e+08,
         1.1373e+08],
        [1.6056e+08, 1.7288e+08, 1.7366e+08,  ..., 1.1439e+08, 1.1401e+08,
         1.1332e+08],
        [1.5957e+08, 1.7181e+08, 1.7258e+08,  ..., 1.1368e+08, 1.1331e+08,
         1.1262e+08]])


In [73]:
d_k = Q.size(-1)  # Dimension of Key
scaled_scores = score / torch.sqrt(torch.tensor(d_k, dtype=torch.float32)) # 현재 scripts/attention.py 의 ScaledDotProductAttention과 같은 역할
attention_weights = torch.softmax(scaled_scores, dim=-1) # 나중에 전체 코드 통합할 때는 nn.object의 softmax를 사용해야 함함
print(attention_weights)
output = torch.mm(attention_weights, V)
print(f"Output: {output.shape}")

tensor([[0., 0., 1.,  ..., 0., 0., 0.],
        [0., 0., 1.,  ..., 0., 0., 0.],
        [0., 0., 1.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 1.,  ..., 0., 0., 0.],
        [0., 0., 1.,  ..., 0., 0., 0.],
        [0., 0., 1.,  ..., 0., 0., 0.]])
Output: torch.Size([42, 1536])


scripts에 있는 베이스 코드들은 배치 사이즈가 고려된 클래스들로 이번 작업은 2D 작업이기 때문에 아래의 정의된 클래스는 배치 사이즈가 고려가 되지 않음.
결합할 때 2D 임을 고려해야 함 ~

In [None]:
class MultiHeadAttention:
    def __init__(self, embed_size: int, heads: int):
        """
        Multi-Head Attention 초기화

        Args:
            embed_size (int): 임베딩 차원
            heads (int): 어텐션 헤드 수
        """
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads

        assert (
            self.head_dim * heads == embed_size
        ), "임베딩 차원(embed_size)은 헤드 수(heads)로 나누어 떨어져야 합니다."

        # Q, K, V 선형 변환을 위한 가중치 초기화
        self.W_Q = torch.randn(embed_size, embed_size) * (self.head_dim ** -0.5)
        self.W_K = torch.randn(embed_size, embed_size) * (self.head_dim ** -0.5)
        self.W_V = torch.randn(embed_size, embed_size) * (self.head_dim ** -0.5)
        self.W_O = torch.randn(embed_size, embed_size) * (self.head_dim ** -0.5)

    def forward(self, x: Tensor) -> Tensor:
        """
        순전파 과정

        Args:
            x (Tensor): 입력 텐서 [seq_length, embed_size]

        Returns:
            Tensor: Multi-Head Attention 출력 [seq_length, embed_size]
        """
        # Q, K, V 생성
        Q = torch.matmul(x, self.W_Q)  # [seq_length, embed_size]
        K = torch.matmul(x, self.W_K)  # [seq_length, embed_size]
        V = torch.matmul(x, self.W_V)  # [seq_length, embed_size]

        # 헤드 수에 맞게 분할
        seq_length, embed_size = x.size()
        Q = Q.view(seq_length, self.heads, self.head_dim).transpose(0, 1)  # [heads, seq_length, head_dim]
        K = K.view(seq_length, self.heads, self.head_dim).transpose(0, 1)  # [heads, seq_length, head_dim]
        V = V.view(seq_length, self.heads, self.head_dim).transpose(0, 1)  # [heads, seq_length, head_dim]

        # Attention 스코어 계산
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))  # [heads, seq_length, seq_lengt]
        # 현재 계산된 scores는 마스크가 없이 계산됨.. 마스크 추가해야 함..
         
        attention_weights = torch.softmax(scores, dim=-1)  # [heads, seq_length, seq_length]

        # Attention 결과 계산
        attention_out = torch.matmul(attention_weights, V)  # [heads, seq_length, head_dim]

        # 헤드 결합
        attention_out = attention_out.transpose(0, 1).contiguous().view(seq_length, embed_size)  # [seq_length, embed_size]

        # 최종 선형 변환
        out = torch.matmul(attention_out, self.W_O)  # [seq_length, embed_size]
        return out


In [76]:
class LayerNorm:
    def __init__(self, embed_size: int, eps: float = 1e-5):
        """
        레이어 정규화 초기화

        Args:
            embed_size (int): 임베딩 차원
            eps (float, optional): 안정성을 위한 작은 값. Defaults to 1e-5.
        """
        self.gamma = torch.ones(embed_size, requires_grad=False)
        self.beta = torch.zeros(embed_size, requires_grad=False)
        self.eps = eps

    def forward(self, x: Tensor) -> Tensor:
        """
        순전파 과정

        Args:
            x (Tensor): 입력 텐서 [batch_size, seq_length, embed_size]

        Returns:
            Tensor: 정규화된 텐서
        """
        self.mean = x.mean(dim=-1, keepdim=True)
        self.std = x.std(dim=-1, keepdim=True)
        self.normalized = (x - self.mean) / (self.std + self.eps)
        return self.gamma * self.normalized + self.beta

    def backward(self, grad_output: Tensor) -> Tensor:
        """
        역전파 과정

        Args:
            grad_output (Tensor): 상위 레이어로부터 전달된 그래디언트

        Returns:
            Tensor: 하위 레이어로 전달할 그래디언트
        """
        # 단순화를 위해 역전파는 gamma에 대한 기울기만 처리
        return grad_output * self.gamma


In [77]:
class AttentionBlock:
    def __init__(self, embed_size: int, heads: int):
        """
        Attention Block 초기화

        Args:
            embed_size (int): 임베딩 차원
            heads (int): Multi-Head Attention의 헤드 수
        """
        self.attention = MultiHeadAttention(embed_size, heads)
        self.layer_norm = LayerNorm(embed_size)

    def forward(self, x: Tensor) -> Tensor:
        """
        순전파 과정

        Args:
            x (Tensor): 입력 텐서 [seq_length, embed_size]

        Returns:
            Tensor: Attention Block 출력 [seq_length, embed_size]
        """
        # Multi-Head Attention 수행
        attention_out = self.attention.forward(x)

        # Residual Connection + LayerNorm
        out = self.layer_norm.forward(x + attention_out)
        return out


In [None]:
attention_block = AttentionBlock(embed_size=embedding_dim, heads=8) # MultiHeadAttention 수행..

# Attention Block 순전파 수행
attention_output = attention_block.forward(embedded)
print(f"AttentionBlock Output Shape: {attention_output.shape}")  # Expected: [seq_length, embedding_dim]


AttentionBlock Output Shape: torch.Size([42, 1536])


In [79]:
grad_output = torch.randn_like(attention_output)  # Same shape as attention_output

In [81]:
grad_output.size()

torch.Size([42, 1536])

In [None]:
# feed_forward 클래스
from nn_objects import Layer, Linear

class FeedForward:
    def __init__(self, embed_size: int, forward_expansion: int, activation):
        """
        피드포워드 네트워크 초기화

        Args:
            embed_size (int): 임베딩 차원
            forward_expansion (int): 피드포워드 네트워크의 확장 비율
            activation (Activation): 활성화 함수
        """
        self.fc1 = Layer(embed_size, embed_size * forward_expansion, activation) # 이번 예제는 relu 사용
        self.fc2 = Layer(embed_size * forward_expansion, embed_size, Linear())  # Activation.Linear() -> Linear()로 수정

    def forward(self, x: Tensor) -> Tensor:
        """
        순전파 과정

        Args:
            x (Tensor): 입력 텐서 [batch_size, seq_length, embed_size]

        Returns:
            Tensor: 피드포워드 네트워크 출력
        """
        out = self.fc1.forward(x)
        out = self.fc2.forward(out)
        return out

    def backward(self, grad_output: Tensor) -> Tensor:
        """
        역전파 과정

        Args:
            grad_output (Tensor): 상위 레이어로부터 전달된 그래디언트

        Returns:
            Tensor: 하위 레이어로 전달할 그래디언트
        """
        grad = self.fc2.backward(grad_output)
        grad = self.fc1.backward(grad)
        return grad

In [95]:
# Attention Block과 Feed Forward를 결합하여 완전한 Encoder Block을 구성
class TransformerEncoderBlock:
    def __init__(self, embed_size: int, heads: int, ff_dim: int, activation=torch.relu):
        """
        Transformer Encoder Block 초기화

        Args:
            embed_size (int): 임베딩 차원
            heads (int): Attention 헤드 수
            ff_dim (int): Feed Forward 내부 차원
        """
        self.attention = AttentionBlock(embed_size, heads)
        self.feed_forward = FeedForward(embed_size, ff_dim, activation)
        self.layer_norm_1 = LayerNorm(embed_size)
        self.layer_norm_2 = LayerNorm(embed_size)

    def forward(self, x: Tensor) -> Tensor:
        """
        순전파 과정

        Args:
            x (Tensor): 입력 텐서 [seq_length, embed_size]

        Returns:
            Tensor: 출력 텐서 [seq_length, embed_size]
        """
        # Attention Block + Residual Connection
        attention_out = self.attention.forward(x)
        x = self.layer_norm_1.forward(x + attention_out)

        # Feed Forward + Residual Connection
        feed_forward_out = self.feed_forward.forward(x)
        out = self.layer_norm_2.forward(x + feed_forward_out)
        return out


In [None]:
ff_dim = 2048 # embedding_dim: 1536
encoder_block = TransformerEncoderBlock(embed_size=embedding_dim, heads=8, ff_dim=ff_dim)

# 순전파 수행
encoder_output = encoder_block.forward(pos_encoded)
print(f"Encoder Output Shape: {encoder_output.shape}")  # Expected: [seq_length, embedding_dim]

Encoder Output Shape: torch.Size([42, 1536])


In [97]:
# Projection to vocabulary size
class OutputProjection:
    def __init__(self, embed_size: int, vocab_size: int):
        """
        Output Projection Layer 초기화

        Args:
            embed_size (int): 임베딩 차원
            vocab_size (int): 어휘 크기
        """
        self.W = torch.randn(embed_size, vocab_size) * 0.01  # 가중치 초기화
        self.b = torch.zeros(vocab_size)  # 바이어스 초기화

    def forward(self, x: Tensor) -> Tensor:
        """
        순전파 과정

        Args:
            x (Tensor): 입력 텐서 [seq_length, embed_size]

        Returns:
            Tensor: 확률 분포를 위한 출력 [seq_length, vocab_size]
        """
        return torch.matmul(x, self.W) + self.b


In [113]:
# Output Projection Layer 초기화
output_projection = OutputProjection(embed_size=embedding_dim, vocab_size=vocab_size)

# encoder_output을 vocabulary 크기로 매핑
logits = output_projection.forward(encoder_output)  # [seq_length, vocab_size]

# Softmax를 통해 확률 계산
probabilities = torch.softmax(logits, dim=-1)  # [seq_length, vocab_size]


In [114]:
# 가장 높은 확률의 토큰 선택
predicted_token_ids = torch.argmax(probabilities, dim=-1)  # [seq_length]

# 토큰 ID를 텍스트로 변환
predicted_text = tokenizer.decode(predicted_token_ids.tolist())
print(f"Predicted Text: {predicted_text}")


Predicted Text: terstersters Ge Ge suggested Blackterstersizationization narr narr narr narr narr narr narr narr narr narr narrθθ leanedθθθ formＮmedmed sec secθnyny narr drawn Blackimesimes


In [None]:
#todo: test