In [2]:
import os
import sys
import json
import random
import transformers
import albumentations as A
from albumentations.pytorch import ToTensorV2

sys.path.insert(0, '../')
from networks.Attention import *
from dataset import dataset_loader
from flags import Flags
from utils import get_network

def get_train_transforms(height, width):
    return A.Compose(
        [
            A.Resize(height, width),
            A.Compose([A.HorizontalFlip(p=1), A.VerticalFlip(p=1)], p=0.5),
            ToTensorV2(p=1.0),
        ],
        p=1.0,
    )


def get_valid_transforms(height, width):
    return A.Compose([A.Resize(height, width), ToTensorV2(p=1.0)])

device = 'cpu'

In [3]:
CONFIG_PATH = "../configs/Attention-jupyter.yaml"
options = Flags(CONFIG_PATH).get()

In [4]:
# get data

(
    train_data_loader,
    validation_data_loader,
    train_dataset,
    valid_dataset,
) = dataset_loader(
        options=options,
        train_transform=get_train_transforms(
            options.input_size.height, options.input_size.width
        ),
        valid_transform=get_valid_transforms(
            options.input_size.height, options.input_size.width
        ),
)

In [5]:
nc = options.data.rgb
num_classes=len(train_dataset.id_to_token)
src_dim = options.Attention.src_dim
embedding_dim = options.Attention.embedding_dim
hidden_dim = options.Attention.hidden_dim
pad_id = train_dataset.token_to_id[PAD]
st_id = train_dataset.token_to_id[START]
num_layers = options.Attention.layer_num
cell_type = options.Attention.cell_type

# 인코더
encoder = CNN(nc=nc)

# 디코더 - AttentionDecoder
embedding = nn.Embedding(num_classes + 1, embedding_dim)
attention_cell = AttentionCell(
            src_dim, hidden_dim, embedding_dim, num_layers, cell_type
        )
hidden_dim = hidden_dim
num_classes = num_classes # 토큰 수
num_layers = num_layers # 레이어 수 - AttentionCell 내 RNN 레이어 수를 설정
generator = nn.Linear(hidden_dim, num_classes) # hidden_state를 확률화
pad_id = pad_id # 패드 토큰 ID
st_id = st_id # 시작 토큰 ID

In [6]:
# sample batch for implementation beam-search
for batch in train_data_loader: break
input = batch['image'].float()
curr_batch_size = len(input)
expected = batch['truth']['encoded']
expected[expected == -1] = train_data_loader.dataset.token_to_id[PAD]

- Attention Input for forwarding
    - `input`(torch.Tensor): 전처리와 collate_fn을 거쳐 생성된 이미지 텐서
    - `expected`(torch.Tensor): Ground Truth 수식 텍스트
    - `is_train`(bool)
    - `teacher_forcing`(flaot)

- Encoder Input
    - input: 전처리와 collate_fn을 거쳐 생성된 이미지 텐서

- Decoder Input
    - `src`: 인코더로부터 얻은 feature map. 모델 내부에서 다음을 거친 `out`을 받음
        ```python
        out = self.encoder(input)
        b, c, h, w = out.size()
        out = out.view(b, c, h * w).transpose(1, 2)  # [b, h x w, c]
        ```
    - `text`: 생성할 수식의 Ground Truth. 모델 내부에서 `expected`로 받음
    - `is_train`:
    - `teacher_forcing`: 
    - `batch_max_length`: 생성할 수식의 최대 길이. 모델 내부에서 `expected.size(1)`로 받음

In [7]:
# 인코더 output 추출
encoder_output = encoder(input) # (B, C, H, W) <- conv&pooling 결과
b, c, h, w = encoder_output.size()
out = encoder_output.view(b, c, h * w).transpose(1, 2)  # [b, h x w, c]

src = out # 디코더 input
batch_max_length = expected.size(1) # 디코더 최대 생성 길이
text = expected # (B, MAX_LEN)

In [9]:
# AttentionDecoder forwarding 과정
batch_size = src.size(0)
num_steps = batch_max_length - 1 # 총 생성 횟수

output_hiddens = (
            torch.FloatTensor(batch_size, num_steps, hidden_dim)
            .fill_(0)
            .to('cpu')
        )
print('Output hidden vector size:', output_hiddens.size())

# hidden state 초기값 텐서 선언 - (배치사이즈, 임베딩사이즈(=hidden_dim))
hidden = (
    torch.FloatTensor(batch_size, hidden_dim).fill_(0).to(device), # hidden state
    torch.FloatTensor(batch_size, hidden_dim).fill_(0).to(device) # cell state
) 

Output hidden vector size: torch.Size([13, 52, 128])


## Beam Search

In [10]:
max_len = expected.size(1)

In [11]:
# decoder input: out(이미지 feature map)-[B, WxH, C], expected(GT 토큰화 결과)-[B, MAX_LEN]

# for single layer
hidden = (
    torch.FloatTensor(batch_size, hidden_dim).fill_(0).to(device),
    torch.FloatTensor(batch_size, hidden_dim).fill_(0).to(device)
)


여기는 문장 단위로 일어나네

In [12]:
batch_size, max_len = expected.size()

In [13]:
# 글자 단위
# for idx in range(max_len):

# 문장 단위
# for idx in range(batch_size):

In [44]:
beam_width = 10
topk = 1
decoded_batch = []

In [46]:
src.size() # [B, WxH, C]

torch.Size([13, 231, 512])

In [29]:
class BeamSearchNode(object):
    def __init__(self, hiddenstate, prevnode, wordid, logprob, length):
        self.h = hiddenstate
        self.prevNode = prevnode
        self.wordid = wordid
        self.logp = logprob
        self.len = length

    def eval(self, alpha=1.0):
        reward = 0
        # Add here a function for shaping a reward <- ?

        return self.logp / float(self.len - 1 + 1e-6) + alpha * reward

## Greedy Decoding

In [47]:
print('(BATCH_SIZE, MAX_LEN) of Text(ground truth)', text.size()) # (BATCH_SIZE, MAX_LEN)
print('NUMBER OF STEPS', num_steps)

(BATCH_SIZE, MAX_LEN) of Text(ground truth) torch.Size([13, 34])
NUMBER OF STEPS 33


In [49]:
batch_size = src.size(0)
num_steps = batch_max_length - 1  # 최대 스텝(=최대 생성 횟수)

output_hiddens = (
    torch.FloatTensor(batch_size, num_steps, hidden_dim)
    .fill_(0)
    .to(device)
)

In [37]:
embedd = embedding(text[:, 0]) # 0번 인덱스에 해당되는 글자
hidden, alpha = attention_cell(hidden, src, embedd)

In [None]:
output_hid

In [39]:
hidden[0].size()

torch.Size([13, 128])

In [14]:
for i in range(num_steps):
    # one-hot vectors for a i-th char. in a batch
    embedd = embedding(text[:, i])
    # hidden : decoder's hidden s_{t-1}, batch_H : encoder's hidden H, char_onehots : one-hot(y_{t-1})
    hidden, alpha = attention_cell(hidden, src, embedd)
    if num_layers == 1:
        output_hiddens[:, i, :] = hidden[0]  # LSTM hidden index (0: hidden, 1: Cell)
    else:
        output_hiddens[:, i, :] = hidden[-1][0]
probs = generator(output_hiddens)

- AttentionCell forwarding
    - `prev_hidden`(torch.Tensor): a

In [15]:
# T: 토큰 개수(=단어 개수)
# E: 임베딩 사이즈

i = 0
print(text[:, i], text[:, i].size())
print(embedding(text[:, i]).size()) # [T, E]

tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) torch.Size([13])
torch.Size([13, 128])


In [41]:
text.size()

torch.Size([13, 34])

## Beam Search

In [58]:
from queue import PriorityQueue

In [59]:
batch_size = out.size(0)
num_steps = expected.size(1)

output_hiddens = (
            torch.FloatTensor(batch_size, num_steps, hidden_dim)
            .fill_(0)
        )

In [60]:
hidden = (
    torch.FloatTensor(batch_size, hidden_dim).fill_(0), # hidden
    torch.FloatTensor(batch_size, hidden_dim).fill_(0), # cell
    )
hidden[0].size() # [B, HIDDEN]

torch.Size([13, 128])

In [61]:
# no teacher forcing

targets = torch.LongTensor(batch_size).fill_(st_id)
print(targets.size()) # [B] - 각 샘플에 대한 시작토큰

probs = torch.FloatTensor(batch_size, num_steps, num_classes).fill_(0)
probs.size() # [B, MAX_LEN, VOCAB_SIZE] - 스텝별 생성 확률이 기재될 확률 테이블

torch.Size([13])


torch.Size([13, 48, 245])

In [67]:
embedded = embedding(targets)
print(embedded.size()) # [B, HIDDEN] - 시작 토큰 벡터의 임베딩 결과

hidden, alpha = attention_cell(hidden, out, embedded)
hidden[0]

torch.Size([13, 128])


tensor([[-0.1009,  0.2129, -0.0321,  ...,  0.3426, -0.2417,  0.3173],
        [ 0.1296,  0.3131, -0.0300,  ..., -0.1156, -0.0877, -0.0591],
        [-0.1152,  0.2518, -0.0503,  ...,  0.3893, -0.1282,  0.2493],
        ...,
        [-0.2687,  0.2140, -0.0347,  ...,  0.3321, -0.1570,  0.3576],
        [-0.1026,  0.2222, -0.0024,  ...,  0.4579, -0.2338,  0.3322],
        [-0.0978,  0.1827, -0.0255,  ...,  0.4020, -0.2308,  0.2959]],
       grad_fn=<MulBackward0>)

In [47]:
from queue import PriorityQueue

class BeamSearchNode:
    def __init__(self, hiddenstate, previousNode, wordId, logProb, length):
        self.h = hiddenstate
        self.prevNode = previousNode
        self.wordid = wordId
        self.logp = logProb
        self.leng = length

    def eval(self, alpha=1.0):
        reward = 0
        # Add here a function for shaping a reward
        return self.logp / float(self.leng - 1 + 1e-6) + alpha * reward

In [14]:
beam_width = 10
topk = 1
decoded_batch = []

In [16]:
batch_size = out.size(0)
num_steps = expected.size(1) - 1

In [21]:
# hidden 만들어놓고
output_hiddens = (
    torch.FloatTensor(batch_size, num_steps, hidden_dim)
    .fill_(0)
)
output_hiddens.size()

torch.Size([13, 52, 128])

In [24]:
# initialize hidden state for LSTM
if num_layers == 1:
    print('# layers = 1')
    hidden = (
        torch.FloatTensor(batch_size, hidden_dim)
        .fill_(0)
        .to(device),  # hidden
        torch.FloatTensor(batch_size, hidden_dim)
        .fill_(0)
        .to(device),  # cell
    )
else:
    print('# layers > 1')
    hidden = [
        (
            torch.FloatTensor(batch_size, hidden_dim)
            .fill_(0)
            .to(device),  # hidden
            torch.FloatTensor(batch_size, hidden_dim)
            .fill_(0)
            .to(device),  # cell
        )
        for _ in range(self.num_layers)
    ]

# layers = 1


In [25]:
for i in range(batch_size): break
# sample = 

In [31]:
hidden[0].size()

torch.Size([13, 128])

In [34]:
if isinstance(hidden, tuple):  # LSTM case
    decoder_hidden = (
        hidden[0][i, :].unsqueeze(0), hidden[1][i, :].unsqueeze(0))
else:
    decoder_hidden = hidden[i, :].unsqueeze(0)

In [122]:
for batch in train_data_loader: break
input = batch['image'].float() # [B, C, H, W]
encoder_output = encoder(input) # [B, C, H, W]
b, c, h, w = encoder_output.size()
src = encoder_output.view(b, c, h * w).transpose(1, 2)  # [b, h x w, c]

batch_size = len(input)
expected = batch['truth']['encoded']
expected[expected == -1] = train_data_loader.dataset.token_to_id[PAD]
text = expected # [B, MAX_LEN]

Forwarding in Decoder

In [129]:
# hidden/cell state for LSTM
hidden = (
    torch.FloatTensor(batch_size, hidden_dim)
    .fill_(0), # hidden
    torch.FloatTensor(batch_size, hidden_dim)
    .fill_(0) # cell
)

In [124]:
# 출력 hidden state 텐서
output_hiddens = (
    torch.FloatTensor(batch_size, num_steps, hidden_dim)
    .fill_(0)
) # [B, MAX_LEN, HIDDEN]

Beam Search
- Greedy docoding이랑 달리 문장 단위로 생성

In [130]:
hidden

(tensor([[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]]),
 tensor([[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]]))

In [112]:
encoder_output.size()

torch.Size([13, 512, 7, 33])

In [94]:
SOS_TOKEN_ID = train_dataset.token_to_id['<SOS>']
decoder_input = torch.LongTensor([SOS_TOKEN_ID])
decoder_input.size()

torch.Size([1])