## Tokenization

In [1]:
# 띄어쓰기 단위로 분리
input_text = "나는 최근 파리 여행을 다녀왔다"
input_text_list = input_text.split()
print("input_text_list: ", input_text_list)

# 토큰 -> 아이디 딕셔너리와 아이디 -> 토큰 딕셔너리 만들기
str2idx = {word: idx for idx, word in enumerate(input_text_list)}
idx2str = {idx: word for idx, word in enumerate(input_text_list)}
print("str2idx: ", str2idx)
print("idx2str: ", idx2str)

# 토큰을 토큰 아이디로 변환
input_ids = [str2idx[word] for word in input_text_list]
print("input_ids: ", input_ids)

# 출력 
# input_text_list:  ['나는', '최근', '파리', '여행을', '다녀왔다']
# str2idx:  {'나는': 0, '최근': 1, '파리': 2, '여행을': 3, '다녀왔다': 4}
# idx2str:  {0: '나는', 1: '최근', 2: '파리', 3: '여행을', 4: '다녀왔다'}
# input_ids:  [0, 1, 2, 3, 4]

input_text_list:  ['나는', '최근', '파리', '여행을', '다녀왔다']
str2idx:  {'나는': 0, '최근': 1, '파리': 2, '여행을': 3, '다녀왔다': 4}
idx2str:  {0: '나는', 1: '최근', 2: '파리', 3: '여행을', 4: '다녀왔다'}
input_ids:  [0, 1, 2, 3, 4]


## Embedding

In [2]:
import torch
from torch import nn

embedding_dim = 16
embed_layer = nn.Embedding(len(str2idx), embedding_dim)

input_embeddings = embed_layer(torch.tensor(input_ids))
input_embeddings = input_embeddings.unsqueeze(0)
input_embeddings.shape

# 출력 결과
# torch.Size([1, 5, 16])

torch.Size([1, 5, 16])

## Positional Encoding

In [3]:
embedding_dim = 16
max_position = 12
embed_layer = nn.Embedding(len(str2idx), embedding_dim)
position_embed_layer = nn.Embedding(max_position, embedding_dim)

position_ids = torch.arange(len(input_ids), dtype=torch.long).unsqueeze(0)
position_encodings = position_embed_layer(position_ids)
token_embeddings = embed_layer(torch.tensor(input_ids))
token_embeddings = token_embeddings.unsqueeze(0)
input_embeddings = token_embeddings + position_encodings
input_embeddings.shape

# 출력 결과
# torch.Size([1, 5, 16])

torch.Size([1, 5, 16])

## Attention

In [4]:
head_dim = 16

# 쿼리, 키, 값을 계산하기 위한 변환
weight_q = nn.Linear(embedding_dim, head_dim)
weight_k = nn.Linear(embedding_dim, head_dim)
weight_v = nn.Linear(embedding_dim, head_dim)

# 변환 수행
queries = weight_q(input_embeddings)  # (1, 5, 16)
keys = weight_k(input_embeddings)  # (1, 5, 16)
values = weight_v(input_embeddings)  # (1, 5, 16)

In [5]:
from math import sqrt
import torch.nn.functional as F

def compute_attention(queries, keys, values, is_causal=False):
    dim_k = queries.size(-1)  # 16
    scores = queries @ keys.transpose(-2, -1) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return weights @ values

print("원본 입력 형태: ", input_embeddings.shape)

after_attention_embeddings = compute_attention(queries, keys, values)

print("어텐션 적용 후 형태: ", after_attention_embeddings.shape)

# 원본 입력 형태:  torch.Size([1, 5, 16])
# 어텐션 적용 후 형태:  torch.Size([1, 5, 16])

원본 입력 형태:  torch.Size([1, 5, 16])
어텐션 적용 후 형태:  torch.Size([1, 5, 16])


In [16]:
class AttentionHead(nn.Module):
    def __init__(self, token_embed_dim, head_dim, is_causal=False):
        super().__init__()
        self.is_causal = is_causal
        self.weight_q = nn.Linear(token_embed_dim, head_dim)
        self.weight_k = nn.Linear(token_embed_dim, head_dim)
        self.weight_v = nn.Linear(token_embed_dim, head_dim)

    def forward(self, queries, keys, values):
        outputs = self._compute_attention(
            self.weight_q(queries),
            self.weight_k(keys),
            self.weight_v(values),
            is_causal=self.is_causal
        )
        
        return outputs

    def _compute_attention(self, queries, keys, values, is_causal=False):
        dim_k = queries.size(-1)
        scores = queries @ keys.transpose(-2, -1) / sqrt(dim_k)
        if is_causal:
            query_length = queries.size(2)
            key_length = keys.size(2)
            temp_mask = torch.ones(query_length, key_length, dtype=torch.bool).tril(diagonal=0)
            scores = scores.masked_fill(temp_mask == False, float("-inf"))
        weights = F.softmax(scores, dim=-1)
        return weights @ values

In [17]:
attention_head = AttentionHead(embedding_dim, embedding_dim)
after_attention_embeddings = attention_head(input_embeddings, input_embeddings, input_embeddings)

## Multihead Attention

In [18]:
class MultiheadAttention(nn.Module):
    def __init__(self, token_embed_dim, d_model, n_heads, is_causal=False):
        super().__init__()
        self.n_heads = n_heads
        self.is_causal = is_causal
        self.weight_q = nn.Linear(token_embed_dim, d_model)
        self.weight_k = nn.Linear(token_embed_dim, d_model)
        self.weight_v = nn.Linear(token_embed_dim, d_model)
        self.concat_layer = nn.Linear(d_model, d_model)

    def forward(self, queries, keys, values):
        B, T, C = queries.size()
        queries = self.weight_q(queries).view(B, T, self.n_heads, C // self.n_heads).transpose(1, 2)
        keys = self.weight_k(keys).view(B, T, self.n_heads, C // self.n_heads).transpose(1, 2)
        values = self.weight_v(values).view(B, T, self.n_heads, C // self.n_heads).transpose(1, 2)
        attention = self._compute_attention(queries, keys, values, self.is_causal)
        output = attention.transpose(1, 2).contiguous().view(B, T, C)
        return output

    def _compute_attention(self, queries, keys, values, is_causal=False):
        dim_k = queries.size(-1)
        scores = queries @ keys.transpose(-2, -1) / sqrt(dim_k)
        if is_causal:
            query_length = queries.size(2)
            key_length = keys.size(2)
            temp_mask = torch.ones(query_length, key_length, dtype=torch.bool).tril(diagonal=0)
            scores = scores.masked_fill(temp_mask == False, float("-inf"))
        weights = F.softmax(scores, dim=-1)
        return weights @ values

In [19]:
n_heads = 4
multihead_attention = MultiheadAttention(embedding_dim, embedding_dim, n_heads)
after_attention_embeddings = multihead_attention(input_embeddings, input_embeddings, input_embeddings)
after_attention_embeddings.shape

torch.Size([1, 5, 16])

## Layer Normalization

In [20]:
norm = nn.LayerNorm(embedding_dim)
norm_x = norm(input_embeddings)
norm_x.shape  # torch.Size([1, 5, 16])

norm_x.mean(dim=-1).data, norm_x.std(dim=-1).data

# tensor([[-1.4901e-08,  0.0000e+00,  1.4901e-08,  7.4506e-09,  2.9802e-08]])
# tensor([[1.0328, 1.0328, 1.0328, 1.0328, 1.0328]])

(tensor([[ 5.2154e-08,  1.4901e-08, -1.4901e-08, -1.3039e-08, -3.7253e-09]]),
 tensor([[1.0328, 1.0328, 1.0328, 1.0328, 1.0328]]))

## Feed Forward Layer

In [21]:
class PreLayerNormFeedForward(nn.Module):
    def __init__(self, d_model, feedforward_dim, dropout):
        super().__init__()
        self.linear1 = nn.Linear(d_model, feedforward_dim)
        self.linear2 = nn.Linear(feedforward_dim, d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.activation = nn.GELU()
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x):
        x = self.norm(x)
        x = x + self.linear2(self.dropout1(self.activation(self.linear1(x))))
        x = self.dropout2(x)
        return x

## Encoder

In [22]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, feedforward_dim, dropout):
        super().__init__()
        self.norm1 = nn.LayerNorm(d_model)
        self.attn = MultiheadAttention(d_model, d_model, n_heads)
        self.dropout1 = nn.Dropout(dropout)
        self.feedforward = PreLayerNormFeedForward(d_model, feedforward_dim, dropout)

    def forward(self, x):
        norm_x = self.norm1(x)
        attn_output = self.attn(norm_x)
        output = x + self.dropout1(attn_output)

        output = self.feedforward(output)
        return output

In [28]:
import copy 

class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers, d_model):
        super().__init__()
        self.layers = self._get_clones(encoder_layer, num_layers)
        self.num_layers = num_layers
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x):
        output = x
        for layer in self.layers:
            output = layer(output)
        return output
    
    def _get_clones(self, module, num_layers):
        return nn.ModuleList([copy.deepcopy(module) for i in range(num_layers)])

## Decoder

In [27]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, feedforward_dim=2048, dropout=0.1):
        super().__init__()
        self.norm1 = nn.LayerNorm(d_model)
        self.self_attn = MultiheadAttention(d_model, d_model, n_heads)
        self.dropout1 = nn.Dropout(dropout)

        self.norm2 = nn.LayerNorm(d_model)
        self.cross_attn = MultiheadAttention(d_model, d_model, n_heads)
        self.dropout2 = nn.Dropout(dropout)

        self.feedforward = PreLayerNormFeedForward(d_model, feedforward_dim, dropout)

    def forward(self, tgt, encoder_output, is_causal=True):
        # 셀프 어텐션 연산
        x = self.norm1(tgt)
        x = x + self.dropout1(self.self_attn(x, x, x, is_causal=is_causal))
        # 크로스 어텐션 연산
        x = self.norm2(x)
        x = x + self.dropout2(self.cross_attn(x, encoder_output, encoder_output))
        # 피드 포워드 연산
        x = self.feedforward(x)
        return x

In [29]:
class TransformerDecoder(nn.Module):
    def __init__(self, decoder_layer, num_layers, d_mdoel):
        super().__init__()
        self.layers = self._get_clones(decoder_layer, num_layers)
        self.num_layers = num_layers

    def forward(self, tgt, x):
        output = tgt
        for layer in self.layers:
            ouptut = layer(tgt, x)
        return output

    def _get_clones(module, num_layers):
        return nn.ModuleList([copy.deepcopy(module) for i in range(num_layers)])