## 1. GPU를 사용할 수 있는 환경이라면 CUDA 관련 세팅을 처리 후 사용 패키지를 로드합니다.

In [1]:
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"]="0"

In [2]:
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
import torch
import torch.nn as nn
import transformers
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm 
from PIL import Image
import json

torch.backends.cudnn.benchmark = True
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## 2. DATA 전처리에 사용할 Pytorch 데이터세트 코드를 선언합니다.

In [3]:
class VQADataset(torch.utils.data.Dataset):
    def __init__(self, tokenizer, data, answer_list, max_token, transform=None):
        
        self.tokenizer = tokenizer
        self.data = data
        self.max_token = max_token
        self.answer_list = answer_list        
        self.transform = transform
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        
        question = self.data['question'][index] #질문
        answer = self.data['answer'][index]  #응답
        img_loc = self.data['image'][index] #사진파일
        
        #BERT기반의 Tokenizer로 질문을 tokenize한다.
        tokenized = self.tokenizer.encode_plus("".join(question),
                                     None,
                                     add_special_tokens=True,
                                     max_length = self.max_token,
                                     truncation=True,
                                     pad_to_max_length = True)
        
        
        #BERT기반의 Tokenize한 질문의 결과를 변수에 저장
        ids = tokenized['input_ids']
        mask = tokenized['attention_mask']
        image = Image.open(img_loc).convert('RGB')  #이미지 데이터를 RGB형태로 읽음 질문을 tokenize한다.
        image = self.transform(image)  #이미지 데이터의 크기 및 각도등을 변경
        
        answer_ids = self.answer_list[self.answer_list['answer']==answer].index #응답을 숫자 index로 변경, e.g.) "예"-->0 "아니요" --> 1
        if len(answer_ids)==0:
            answer_ids = self.answer_list[self.answer_list['answer']=="예"].index

        #전처리가 끝난 질의, 응답, 이미지 데이터를 반환
        return {'ids': torch.tensor(ids, dtype=torch.long), 
                'mask': torch.tensor(mask, dtype=torch.long),
                'answer': torch.tensor(answer_ids, dtype=torch.long),
                'image': image}

## 3. 학습모델의 질문 및 이미지 처리에 대한 처리 모델 코드를 선언합니다.

In [4]:
class VQAModel(nn.Module):
    def __init__(self, num_target, dim_i, dim_q, dim_h=1024):
        super(VQAModel, self).__init__()
        
        #The BERT model: 질문 --> Vector 처리를 위한 XLM-Roberta모델 활용
        self.bert = transformers.XLMRobertaModel.from_pretrained('xlm-roberta-base')
        
        #Backbone: 이미지 --> Vector 처리를 위해 ResNet50을 활용
        self.resnet = models.resnet50(pretrained=True)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, dim_i)
        self.i_relu = nn.ReLU()
        self.i_drop = nn.Dropout(0.2)
        
        #classfier: MLP기반의 분류기를 생성
        self.linear1 = nn.Linear(dim_i, dim_h)
        self.q_relu = nn.ReLU()
        self.linear2 = nn.Linear(dim_h, num_target)
        self.q_drop = nn.Dropout(0.2)
        
        
    def forward(self, idx, mask, image):
        
        _, q_f = self.bert(idx, mask) #질문을 Bert를 활용해 Vector화
        i_f = self.i_drop(self.resnet(image)) # 이미지를 resnet을 활용해 Vector화
        
        uni_f = i_f*q_f #이미지와 질문 vector를 point-wise연산을 통해 통합 vector생성

        return self.linear2(self.q_relu(self.linear1(uni_f))) #MLP classfier로 답변 예측

## 4. 학습과, 테스트를 수행하기 위한 코드를 선언합니다.

In [5]:
def train_fn(model, train_loader, criterion, optimizer, device):
    
    total_count_correct = 0
    total_num_example = 0
    total_loss = []

    model.train()
    for idx, batch in tqdm(enumerate(train_loader), total=len(train_loader), leave=False): #학습 데이터를 batch size만큼씩 읽어옴
        optimizer.zero_grad()
        
        imgs = batch['image'].to(device)  #이미지
        q_bert_ids = batch['ids'].to(device) #질문
        q_bert_mask = batch['mask'].to(device) 
        answers = batch['answer'].to(device) #응답
        answers = answers.squeeze()
        
        outputs = model(q_bert_ids, q_bert_mask, imgs) #모델에 이미지, 질문, 응답을 넣음
        loss = criterion(outputs, answers) #예측된 답변과 실제 정답과 비교하여 lossr계산

        loss.backward(loss)
        optimizer.step()
        
        predicted = torch.argmax(outputs, dim=1)
        count_correct = np.count_nonzero((np.array(predicted.cpu())==np.array(answers.cpu())) == True) #정답갯수를 계산
        total_count_correct += count_correct
        total_num_example += answers.size(0)
        total_loss.append(loss.item())    
    print("TRAIN LOSS:", str(sum(total_loss)/total_num_example) + " Accuracy: " + str(total_count_correct/total_num_example))
    
def test_fn(model, test_loader, data_frame, device):
    total_count_correct = 0
    total_num_example = 0
    total_loss = []
    model.eval()
    
    for idx, batch in tqdm(enumerate(test_loader), total=len(test_loader), leave=False):
        optimizer.zero_grad()

        imgs = batch['image'].to(device)
        q_bert_ids = batch['ids'].to(device)
        q_bert_mask = batch['mask'].to(device)
        answers = batch['answer'].to(device)
        answers = answers.squeeze()

        outputs = model(q_bert_ids, q_bert_mask, imgs) #모델에 이미지, 질문, 응답을 넣음
        loss = criterion(outputs, answers) #예측된 답변과 실제 정답과 비교하여 lossr계산

        loss.backward(loss)
        optimizer.step()

        predicted = torch.argmax(outputs, dim=1)  #예측된 정답  
        count_correct = np.count_nonzero((np.array(predicted.cpu()) == np.array(answers.cpu())) == True)
        total_count_correct += count_correct
        total_num_example += answers.size(0)
        total_loss.append(loss.item())
            
    print("TEST LOSS:", str(sum(total_loss) / total_num_example) + " Accuracy: " + str(total_count_correct / total_num_example))
            
def answering(model, img_file, question, tokenizer, train_answer, device):
    transform = transforms.Compose(
        [
            transforms.Resize((356, 356)),
            transforms.RandomCrop((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        ]
    )

    model.eval()
    img = transform(Image.open(img_file).convert("RGB")).unsqueeze(0)
    img = img.to(device)
    encoded = tokenizer.encode_plus("".join(question),
                                    None,
                                    add_special_tokens=True,
                                    max_length=30,
                                    truncation=True,
                                    pad_to_max_length=True)

    ids, mask = encoded['input_ids'], encoded['attention_mask']
    ids = torch.tensor(ids, dtype=torch.long).unsqueeze(0).to(device)
    mask = torch.tensor(mask, dtype=torch.long).unsqueeze(0).to(device)
    output = model(ids, mask, img) #모델에 이미지, 질문, 응답을 넣음
    predicted = torch.argmax(output, dim=1).item()
    return train_answer['answer'].iloc[predicted]

## 5. 학습에 사용할 json 데이터를 로드하여 모델에 맞도록 가공 합니다.

In [6]:
#MODEL_FILE = './model.tar'
MODEL_FILE = None

if MODEL_FILE is not None:
    checkpoint = torch.load(MODEL_FILE)
    train_df = checkpoint["train_df"]
    answer_list = checkpoint["answer_list"]
    model = VQAModel(num_target=len(answer_list), dim_q=768, dim_i=768, dim_h=1024)
    model = torch.nn.DataParallel(model)
    model = model.to(DEVICE)    
    optimizer = optim.AdamW(model.parameters(), lr=0.00002)
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])    
    
else:
    with open('dataset/v1_OpenEnded_NIA_trainset_images.json') as json_file:
        train_image_json = json.load(json_file)
    with open('dataset/v1_OpenEnded_NIA_trainset_question.json') as json_file:
        train_question_json = json.load(json_file)
    with open('dataset/v1_OpenEnded_NIA_trainset_annotation.json') as json_file:
        train_annotation_json = json.load(json_file)
    train_image_df = pd.DataFrame(train_image_json['images'])
    train_question_df = pd.DataFrame(train_question_json['questions'])
    train_annotation_df = pd.DataFrame(train_annotation_json['annotations'])
    
    train_df = pd.merge(train_image_df, train_question_df)
    train_df = pd.merge(train_df, train_annotation_df)
    train_df.rename(columns={'multiple_choice_answer': 'answer'}, inplace=True)
    train_df['image'] = train_df['image'].apply(lambda x : 'dataset/train_images/' + x)    
    
    train_answer = train_df['answer'].value_counts().reset_index()
    train_answer.columns=['answer', 'count']
    
    model = VQAModel(num_target=len(train_answer), dim_q=768, dim_i=768, dim_h=1024)
    model = torch.nn.DataParallel(model)
    model = model.to(DEVICE)
    optimizer = optim.AdamW(model.parameters(), lr=0.00002)

## 6. 테스트에 사용할 json 데이터를 로드합니다.

In [7]:
with open('dataset/v1_OpenEnded_NIA_testset_images.json') as json_file:
    test_image_json = json.load(json_file)
with open('dataset/v1_OpenEnded_NIA_testset_question.json') as json_file:
    test_question_json = json.load(json_file)
with open('dataset/v1_OpenEnded_NIA_testset_annotation.json') as json_file:
    test_annotation_json = json.load(json_file)
test_image_df = pd.DataFrame(test_image_json['images'])
test_question_df = pd.DataFrame(test_question_json['questions'])
test_annotation_df = pd.DataFrame(test_annotation_json['annotations'])

test_df = pd.merge(test_image_df, test_question_df)
test_df = pd.merge(test_df, test_annotation_df)
test_df.rename(columns={'multiple_choice_answer': 'answer'}, inplace=True)
test_df['image'] = test_df['image'].apply(lambda x : 'dataset/test_images/' + x)    
test_df = test_df[:10000]

## 7. 학습 및 테스트에 사용될 데이터를 확인합니다.

In [8]:
train_df.head(3)

Unnamed: 0,image_id,image,question_id,question,answer
0,1,dataset/train_images/NIA_dataset03_00000000000...,1000,이것은 무슨 용도입니까?,알 수 없음
1,1,dataset/train_images/NIA_dataset03_00000000000...,1001,테이블의 색깔은 무슨 색입니까?,예
2,1,dataset/train_images/NIA_dataset03_00000000000...,1002,그린 꽃은 몇 송이입니까?,1


In [9]:
test_df.head(3)

Unnamed: 0,image_id,image,question_id,question,answer
0,81001,dataset/test_images/NIA_dataset03_000000081001...,81001000,다이소는 이 건물 몇층에 위치하고 있습니까?,3
1,81001,dataset/test_images/NIA_dataset03_000000081001...,81001001,도로에 있는 차들은 몇 대입니까?,4
2,81001,dataset/test_images/NIA_dataset03_000000081001...,81001002,오른쪽 제일 앞에 보이는 차량의 색깔은 무엇입니까?,검정색


In [10]:
train_answer

Unnamed: 0,answer,count
0,알 수 없음,1855
1,예,1508
2,아니요,907
3,흰색,722
4,2,470
...,...,...
414,재킷,1
415,콘크리트,1
416,Wii,1
417,머리띠,1


## 8. 학습 및 테스트 데이터를 데이터로더 합니다.

In [11]:
tokenizer = transformers.XLMRobertaTokenizer.from_pretrained('xlm-roberta-base')  #질의를 처리할 BERT Tokenizer선언
#이미지 전처리를 위한 이미지 크기 변환 및 각도조정을 위한 transform 선언
transform = transforms.Compose(
    [
        transforms.Resize((356, 356)),
        transforms.RandomCrop((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)
train_dataset = VQADataset(tokenizer, train_df, train_answer, 30, transform) #학습데이터 전처리
train_loader = DataLoader(dataset=train_dataset, batch_size=100, num_workers=4, shuffle=True, pin_memory=True)
test_dataset = VQADataset(tokenizer, test_df, train_answer, 30, transform) #테스트데이터 전처리
test_loader = DataLoader(dataset=test_dataset, batch_size=100, num_workers=4, shuffle=False, pin_memory=True)
criterion = nn.CrossEntropyLoss()

## 9. 학습을 수행합니다.

In [12]:
for epoch in range(2):  #학습셋을 이용해 100번 학습
    train_fn(model, train_loader, criterion, optimizer, DEVICE)

                                                 

TRAIN LOSS: 0.04737940385341644 Accuracy: 0.1728


 79%|███████▉  | 79/100 [00:31<00:08,  2.53it/s]

KeyboardInterrupt: 

In [None]:
test_fn(model, test_loader, test_df, DEVICE)  #test데이터를 이용해 답변예측

In [None]:
def save_checkpoint(state, filename="model.tar"):
    model.module.bert.save_pretrained("./roberta-large-355M")
    tokenizer.save_pretrained("./roberta-large-355M")
    print("=> Saving checkpoint")
    torch.save(state, filename)

checkpoint = {
    "state_dict": model.state_dict(),
    "optimizer": optimizer.state_dict(),
    "train_df": train_df,
    "answer_list": train_answer
}
save_checkpoint(checkpoint)

In [None]:
test_image = test_df['image'].iloc[900]
test_question = test_df['question'].iloc[900]
test_answer = test_df['answer'].iloc[900]

In [None]:
from matplotlib.pyplot import imshow
imshow(np.asarray(Image.open(test_image)))
print(test_question, test_answer)

In [None]:
answering(model, test_image, test_question, tokenizer, train_answer, DEVICE)