In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data

import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

from transformers import BertTokenizer, BertModel, BertConfig
from torch.nn.utils.rnn import pack_padded_sequence
from torchvision.models import resnet101

In [8]:
# 특성 추출기 layer

# vgg_config = [64, 'N', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']

def get_vgg_layer(config, batch_norm):
    layers = []
    in_channels = 3

    for c in config:
        assert c == 'M' or isinstance(c, int)
        
        if c == 'M': # c가 M이면 MaxPooling
            layers += [nn.MaxPool2d(kernel_size= 2)]

        else: # c가 int면 Convolution
            conv2d = nn.Conv2d(in_channels, c, kernel_size= 3, padding= 1)

            # batch normalization 적용
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(c), nn.ReLU(inplace= True)]
            else:
                layers += [conv2d, nn.ReLU(inplace= True)]

            in_channels = c # 다음 layer의 in_channels로 사용

    return nn.Sequential(*layers)

In [9]:
# Encoder
class Encoder(nn.Module):
    # attribute -> enc_hidden_size, dec_hidden_size, dec_num_layer, pixel_size, resnet, pooling, relu
    # method -> relu, hidden_dim_changer

    def __init__(self, config):
        super(Encoder, self).__init__()
        self.enc_hidden_size = config.enc_hidden_size   # encoding된 이미지 feature의 width, height
        self.dec_hidden_size = config.dec_hidden_size   # LSTM decoder의 hidden dimension
        self.dec_num_layers = config.dec_num_layers     # LSTM decoder의 레이어 수
        self.pixel_size = self.enc_hidden_size * self.enc_hidden_size   # encoding된 이미지 feature의 전체 픽셀수 -> width * height

        base_model = resnet101(pretrained= True, progress= False)   # Encoder의 backbone model은 resnet101
        base_model = list(base_model.children())[:-2]               # 마지막 2개의 layer는 제거 (추론기)
        self.resnet = nn.Sequential(*base_model)                    # resnet은 Sequential 객체
        self.pooling = nn.AdaptiveAvgPool2d((self.enc_hidden_size, self.enc_hidden_size))   # encoding된 feature의 가로, 세로 크기를 맞춰주기 위한 pooling layer
        # output_size = enc_hidden_size * enc_hidden_size

        self.relu = nn.ReLU() # 렐루는렐루~
        self.hidden_dim_changer = nn.Sequential(
            nn.Linear(in_features= self.pixel_size,
                      out_features= self.dec_hidden_size),
            nn.ReLU()
        )
        
        # Encoding된 이미지의 feature를 LSTM decoder의 hidden, cell state로 만들기 위한 레이어
        self.h_mlp = nn.Linear(in_features= 2048,
                               out_features= self.dec_hidden_size)
        self.c_mlp = nn.Linear(in_features= 2048,
                               out_features= self.dec_hidden_size)

        self.fine_tune(True)

    # fine tuning 하는 함수
    def fine_tune(self, fine_tune= True):
        for p in self.resnet.parameters():
            p.requires_grad = False

        # pretrained 모델의 첫 5개를 제외한 레이어만 학습
        for c in list(self.resnet.children())[5:]:
            for p in c.parameters():
                p.requires_grad = fine_tune

    # 이미지 학습
    def forward(self, x):
        batch_size = x.size(0)

        x = self.resnet(x)
        x = self.pooling(x)
        x = x.view(batch_size, 2048, -1)

        # decoder의 layer수가 1이 아니 경우 -> hidden, cell state의 차원을 맞춤
        if self.dec_num_layers != 1:
            tmp = self.hidden_dim_changer(self.relu(x))
        else:
            tmp = torch.mean(x, dim= 2, keepdim= True)
        tmp = torch.permute(tmp, (2, 0, 1))
        h0 = self.h_mlp(tmp)
        c0 = self.c_mlp(tmp)

        return x, (h0, c0)

In [10]:
# Decoder
class Decoder(nn.Module):
    def __init__(self, config, tokenizer):
        super(Decoder, self).__init__()
        # attribute -> pixel_size, dec_hidden_size, dec_num_layers, dropout, is__attn,
        #              pad_token_id, vocab_size, input_size
        # method -> 
        self.pixel_size = config.enc_hidden_size * config.enc_hidden_size
        self.dec_hidden_size = config.dec_hidden_size
        self.dec_num_layers = config.dec_num_layers

        self.dropout = config.dropout
        self.is_attn = config.is_attn # Attention layer인지 여부

        self.pad_token_id = tokenizer.pad_token_id  # 토크나이저의 pad token id
        self.vocab_size = tokenizer.vocab_size      # 토크나이저의 vocab size

        # attention layer
        if self.is_attn:
            self.attention = Attention(self.dec_hidden_size)        
        # input_size는 attention layer인지 decoder인지에 따라 변경
        # attention을 사용하는 경우 decoder input의 차원은 2048(=Attention 결과) 증가
        self.input_size = self.dec_hidden_size + 2048 if self.is_attn else self.dec_hidden_size

        # 임베딩 레이어
        self.embedding = nn.Embedding(self.vocab_size, self.dec_hidden_size, padding_idx= self.pad_token_id)
        
        # lstm 레이어
        self.lstm = nn.LSTM(input_size=self.input_size,
                            hidden_size= self.dec_hidden_size,
                            num_layers= self.dec_num_layers,
                            batch_first= True)
        
        # dropout 레이어
        self.dropout_layer = nn.Dropout(self.dropout)

        self.relu = nn.ReLU()

        # beta 레이어
        self.beta = nn.Sequential(
            nn.ReLU(),
            nn.Linear(self.dec_hidden_size, 2048),
            nn.Sigmoid()
        )

        # 다음 단어를 예측하는 vocab size의 크기만큼 내어주는 fully connected 레이어
        self.fc = nn.Sequential(
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.Linear(self.dec_hidden_size, self.vocab_size)
        )

        # init_weights 함수를 통해 모델 내부에 가중치를 적용
        self.embedding.apply(self.init_weights)
        self.fc.apply(self.init_weights)

    def init_weights(self, m):
        # 선형 레이어 -> bias는 0, data는 [-0.1, 0.1] 구간 랜덤 초기화
        if isinstance(m, nn.Linear):
            m.bias.data.fill_(0)
            m.weight.data.uniform_(-0.1, 0.1)

        # 임베딩 레이어 -> data [-0.1, 0.1] 구간 랜덤 초기화
        if isinstance(m, nn.Embedding):
            m.weight.data.uniform_(-0.1, 0.1)

    # target 캡션이 학습시 거치는 부분
    def forward(self, x, hidden, enc_output):
        x = self.embedding(x)
        score = None

        gate = self.beta(hidden[0][-1])
        if self.is_attn:
            enc_output, score = self.attention
            enc_output = gate * enc_output
            x = torch.cat((x, enc_output.unsqueeze(1)), dim= -1)
            x, hidden = self.lstm(x, hidden)
            x = self.fc(x)

        return x, hidden, score

In [11]:
# Attention
# Attention 모듈에 encoder의 output과 decoder의 이전 output 결과가 들어감
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        # attribute -> hidden_size, enc_wts, dec_wts, score_wts
        # method -> tanh, relu

        # hidden_size는 입력
        self.hidden_size = hidden_size
        
        # encoder weights
        self.enc_wts = nn.Sequential(
            nn.Linear(2048, self.hidden_size),
            nn.ReLU(),
            nn.Linear(self.hidden_size, self.hidden_size)
        )
        # decoder weights
        self.dec_wts = nn.Sequential(
            nn.Linear(self.hidden_size, self.hidden_size),
            nn.ReLU(),
            nn.Linear(self.hidden_size, self.hidden_size)
        )

        # score weights
        self.score_wts = nn.Linear(self.hidden_size, 1)
        self.tanh = nn.Tanh()
        self.relu = nn.ReLU()

    # 순전파
    def forward(self, enc_output, dec_hidden): # encoder의 output과 decoder의 hidden_state를 입력
        # encoder output을 차원변환
        enc_output = torch.permute(enc_output, (0, 2, 1))

        # score는 
        score = self.tanh(self.enc_wts(enc_output) + self.dec_wts(dec_hidden).unsqueeze(1))
        score = self.score_wts(score)
        score = F.softmax(score, dimn= 1)

        enc_output = torch.permute(enc_output, (0, 2, 1))       # enc_output 텐서를 차원변경
        enc_output = torch.bmm(enc_output, score).squeeze(-1)   # 

        return enc_output, score