In [1]:
import math
import numpy as np
import random

import torch
import torch.nn as nn
import torch.nn.functional as F 
import torchtext

# Setup seeds
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)

## Embedder 모듈

In [2]:
class Embedder(nn.Module):
    def __init__(self, text_embedding_vectors):
        super(Embedder, self).__init__()

        self.embeddings = nn.Embedding.from_pretrained(embeddings=text_embedding_vectors, freeze=True)

    def forward(self, x):
        x_vec = self.embeddings(x)

        return x_vec

In [3]:
from utils.dataloader import get_IMDb_DataLoaders_and_TEXT
train_dl, val_dl, test_dl, TEXT = get_IMDb_DataLoaders_and_TEXT(max_length=256, batch_size=24)

batch = next(iter(train_dl))

net1 = Embedder(TEXT.vocab.vectors)

x = batch.Text[0]
x1 = net1(x)

print("입력 텐서 크기: ", x.shape)
print("출력 텐서 크기: ", x1.shape)

입력 텐서 크기:  torch.Size([24, 256])
출력 텐서 크기:  torch.Size([24, 256, 300])


## PositionalEncoder 모듈

In [4]:
class PositionalEncoder(nn.Module):
    def __init__(self, d_model=300, max_seq_len=256):
        super().__init__()

        self.d_model = d_model

        pe = torch.zeros(max_seq_len, d_model)

        # device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        # pe = pe.to(device)

        for pos in range(max_seq_len):
            for i in range(0, d_model,2):
                pe[pos,i] = math.sin(pos/ (10000 ** ((2*i)/d_model)))
                pe[pos,i+1] = math.cos(pos / (10000 ** ((2*i)/d_model)))

        self.pe = pe.unsqueeze(0)

        self.pe.requires_grad = False

    def forward(self, x):
        ret = math.sqrt(self.d_model) * x + self.pe
        return ret

In [5]:
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)

x = batch.Text[0]
x1 = net1(x)
x2 = net2(x1)

print("입력 텐서 크기: ", x1.shape)
print("출력 텐서 크기: ", x2.shape)

입력 텐서 크기:  torch.Size([24, 256, 300])
출력 텐서 크기:  torch.Size([24, 256, 300])


## Transformer Block 모듈

In [6]:
class Attention(nn.Module):
    def __init__(self, d_model=300):
        super().__init__()

        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)

        self.out = nn.Linear(d_model, d_model)

        self.d_k = d_model

    def forward(self, q, k, v, mask):
        k = self.k_linear(k)
        q = self.q_linear(q)
        v = self.v_linear(v)

        weights = torch.matmul(q, k.transpose(1,2) / math.sqrt(self.d_k))

        mask = mask.unsqueeze(1)

        weights = weights.masked_fill(mask ==0, -1e9)

        normalized_weights = F.softmax(weights, dim=-1)

        output = torch.matmul(normalized_weights, v)

        output = self.out(output)

        return output, normalized_weights

In [7]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=1024, dropout=0.1):
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.dropout(F.relu(x))
        x = self.linear_2(x)

        return x

class TransformerBlock(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super().__init__()
        self.norm_1 = nn.LayerNorm(d_model)
        self.norm_2 = nn.LayerNorm(d_model)

        self.attn = Attention(d_model)
        self.ff = FeedForward(d_model)

        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        x_normalized = self.norm_1(x)
        output, normalized_weights = self.attn(x_normalized, x_normalized, x_normalized, mask)

        x2 = x + self.dropout_1(output)

        x_normalized2 = self.norm_2(x2)
        output = x2 + self.dropout_2(self.ff(x_normalized2))

        return output, normalized_weights
        



In [8]:
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = TransformerBlock(d_model=300)

x = batch.Text[0]
input_pad = 1
input_mask = (x != input_pad)
print(input_mask[0])

x1 = net1(x)
x2 = net2(x1)
x3, normalized_weights = net3(x2, input_mask)

print("입력 텐서 크기: ", x2.shape)
print("출력 텐서 크기: ", x3.shape)
print("Attention 크기: ", normalized_weights)

tensor([True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, True, True, True, True, True, True, True,
        True, True, True, True, True, Tr

## ClassificationHead 모듈

In [9]:
class ClassificationHead(nn.Module):
    '''Transformer_Block의 출력을 사용하여, 마지막에 클래스 분류를 시킨다'''

    def __init__(self, d_model=300, output_dim=2):
        super().__init__()

        # 전결합층
        self.linear = nn.Linear(d_model, output_dim)  # output_dim은 음성, 양성의 두 가지

        # 가중치 초기화
        nn.init.normal_(self.linear.weight, std=0.02)
        nn.init.normal_(self.linear.bias, 0)

    def forward(self, x):
        x0 = x[:, 0, :]  # 각 미니 배치의 각 문장의 선두 단어의 특징량(300차원)을 꺼낸다
        out = self.linear(x0)

        return out


## Transformer 구현

In [10]:
class TransformerClassification(nn.Module):

    def __init__(self, text_embedding_vectors, d_model=300, max_seq_len=256, output_dim=2):
        super().__init__()

        self.net1 = Embedder(text_embedding_vectors)
        self.net2 = PositionalEncoder(d_model=d_model, max_seq_len=max_seq_len)
        self.net3_1 = TransformerBlock(d_model=d_model)
        self.net3_2 = TransformerBlock(d_model=d_model)
        self.net4 = ClassificationHead(output_dim=output_dim, d_model=d_model)

    def forward(self, x, mask):
        x1 = self.net1(x)
        x2 = self.net2(x1)
        x3_1, normalized_weights_1 = self.net3_1(x2, mask)
        x3_2, normalized_weights_2 = self.net3_2(x3_1, mask)
        x4 = self.net4(x3_2)

        return x4, normalized_weights_1, normalized_weights_2


In [11]:
batch = next(iter(train_dl))

net = TransformerClassification(text_embedding_vectors=TEXT.vocab.vectors, d_model=300, max_seq_len=256, output_dim=2)

x = batch.Text[0]
input_mask = (x != input_pad)
out, normalized_weights_1, normalized_weights_2 = net(x, input_mask)

print("출력 텐서 크기: ", out.shape)
print("출력 텐서의 sigmoid: ", F.softmax(out, dim=1))

출력 텐서 크기:  torch.Size([24, 2])
출력 텐서의 sigmoid:  tensor([[0.1473, 0.8527],
        [0.1597, 0.8403],
        [0.1337, 0.8663],
        [0.1585, 0.8415],
        [0.1592, 0.8408],
        [0.1511, 0.8489],
        [0.1548, 0.8452],
        [0.1749, 0.8251],
        [0.1539, 0.8461],
        [0.1497, 0.8503],
        [0.1511, 0.8489],
        [0.1515, 0.8485],
        [0.1455, 0.8545],
        [0.1440, 0.8560],
        [0.1420, 0.8580],
        [0.1330, 0.8670],
        [0.1553, 0.8447],
        [0.1740, 0.8260],
        [0.1638, 0.8362],
        [0.1584, 0.8416],
        [0.1300, 0.8700],
        [0.1471, 0.8529],
        [0.1316, 0.8684],
        [0.1569, 0.8431]], grad_fn=<SoftmaxBackward0>)
