<a href="https://colab.research.google.com/github/MangoDingo33/Study/blob/main/%ED%8A%B9%EA%B0%95/0927MultiModal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
from google.colab import drive
drive.mount('/content/drive')

%cd /content/drive/MyDrive/특강

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/특강


In [21]:
!pip install transformers
!pip install tqdm



In [22]:
import os
import json
import pandas as pd

from tqdm.auto import tqdm

root_dir = 'data/'

train_meta = 'data/train.jsonl'
dev_meta = 'data/dev.jsonl'

def json2dataframe(metadata):
    dataframe = pd.DataFrame(columns=['image_dir', 'text', 'label'])

    with open(metadata, 'r') as file:
        json_data = file.readlines()

    with tqdm(total=len(json_data)) as pbar:
        for line in json_data:
            data = json.loads(line)
            dataframe.loc[len(dataframe)] = {'image_dir': data['img'], 'text': data['text'], 'label': data['label']}
            pbar.update(1)

    return dataframe

dev_df = json2dataframe(dev_meta)

  0%|          | 0/500 [00:00<?, ?it/s]

In [19]:
print(dev_df)

         image_dir                                               text  label
0    img/08291.png              white people is this a shooting range      1
1    img/46971.png                              bravery at its finest      1
2    img/03745.png  your order comes to $37.50 and your white priv...      1
3    img/83745.png  it is time.. to send these parasites back to t...      1
4    img/80243.png                             mississippi wind chime      1
..             ...                                                ...    ...
495  img/83675.png                   i'm gonna be like phelps one day      0
496  img/37198.png  when you're so relaxed you can feel yourself g...      0
497  img/48670.png  look at this sandwich maker club i found on wi...      0
498  img/09863.png                             diverse group of women      0
499  img/97320.png  "when your dishwasher is broken so you take it...      0

[500 rows x 3 columns]


In [23]:
import cv2

from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from transformers import RobertaTokenizer

tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

class MultimodalDataset(Dataset):
    def __init__(self, dataframe, transform=transforms):
        self.labels = [label for label in dataframe['label']]
        self.image_fname = [root_dir + image_dir for image_dir in dataframe['image_dir']]
        self.transform = transform
        self.texts = [tokenizer(
            text,
            max_length=197,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        ) for text in dataframe['text']]

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        labels = self.labels[idx]
        images = cv2.imread(self.image_fname[idx])
        images = cv2.cvtColor(images, cv2.COLOR_BGR2RGB)
        images = Image.fromarray(images)
        images = self.transform(images)
        texts = self.texts[idx]
        return images, texts, labels

In [18]:
import torch
import torch.nn as nn
from transformers import ViTModel, RobertaModel

class ScoreFusionModel(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        self.image_model = ViTModel.from_pretrained('google/vit-base-patch16-224', num_labels=num_classes, num_hidden_layers=2)
        self.image_fc = nn.Linear(self.image_model.config.hidden_size, num_classes)

        self.text_model = RobertaModel.from_pretrained('roberta-base', num_labels=num_classes, num_hidden_layers=2)
        self.text_fc = nn.Linear(self.text_model.config.hidden_size, num_classes)

    def forward(self, image, text):
        image_feature = self.image_model(image).last_hidden_state[:, 0, :]
        image_output = self.image_fc(image_feature)

        text_feature = self.text_model(**text).last_hidden_state[:, 0, :]
        text_output = self.text_fc(text_feature)

        output = (image_output + text_output)/2
        return output

class FeatureFusionModel(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        self.image_model = ViTModel.from_pretrained('google/vit-base-patch16-224', num_labels=num_classes, num_hidden_layers=2)

        self.text_model = RobertaModel.from_pretrained('roberta-base', num_labels=num_classes, num_hidden_layers=2)

        self.multimodal_fc = nn.Linear(self.image_model.config.hidden_size + self.text_model.config.hidden_size, num_classes)

    def forward(self, image, text):
        image_feature = self.image_model(image).last_hidden_state[:, 0, :]
        text_feature = self.text_model(**text).last_hidden_state[:, 0, :]

        output = self.multimodal_fc(torch.cat([image_feature, text_feature], dim=1))

        return output

In [26]:
import pandas as pd
import numpy as np
import random
import os
import torch

import warnings
warnings.filterwarnings('ignore')

from torch import nn, optim
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import StepLR
from tqdm.auto import tqdm
from sklearn.metrics import recall_score, precision_score, f1_score
from sklearn.model_selection import train_test_split

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'torch.cuda.is_available() == True --> device : {device}')

batch_size = 10
epochs = 2
lr = 1e-5
gamma = 0.7
seed = 42

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(seed)

torch.cuda.is_available() == True --> device : cuda:0


In [27]:
from sklearn.model_selection import train_test_split

train_data, test_data = train_test_split(dev_df, test_size=0.2, random_state=seed)

train_loader = DataLoader(MultimodalDataset(train_data), shuffle=True, batch_size=batch_size, drop_last=True)
test_loader = DataLoader(MultimodalDataset(test_data), shuffle=True, batch_size=1, drop_last=True)

In [28]:
#FeatureFusionModel
model = FeatureFusionModel().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)

best_acc = 0
best_epoch = 0
for epoch in range(epochs):
    epoch_acc = 0
    epoch_loss = 0

    model.train()
    with tqdm(train_loader, unit='batch') as train_epoch:
        for image, text, label in train_epoch:
            train_epoch.set_description(f'[TRAIN] EPOCH: {epoch + 1}|{epochs}')

            image = image.to(device)
            text = {k: v.squeeze(1).to(device) for k, v in text.items()}
            label = label.to(device)
            predict = model(image, text)

            loss = criterion(predict, label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            acc = (predict.argmax(dim=1) == label).float().mean()
            epoch_acc += acc / len(train_loader)
            epoch_loss += loss / len(train_loader)

            train_epoch.set_postfix(loss=epoch_loss.item(), accuracy=epoch_acc.item())

    if epoch_acc >= best_acc:
          prev_model = f'{best_epoch}_{best_acc:0.4f}.pth'
          if os.path.exists(prev_model): os.remove(prev_model)

          best_acc = epoch_acc
          best_epoch = epoch + 1

          torch.save(model, f'{best_epoch}_{best_acc:0.4f}.pth')

model = torch.load(f'{best_epoch}_{best_acc:0.4f}.pth').to(device)

with torch.no_grad():
    model.eval()
    model_test_accuracy = 0
    model_test_loss = 0

    pred_list = []
    label_list = []

    for image, text, label in tqdm(test_loader, desc='model test'):
        image = image.to(device)
        text = {k: v.squeeze(1).to(device) for k, v in text.items()}
        label = label.to(device)
        predict = model(image, text)

        acc = (predict.argmax(dim=1) == label).float().mean()
        model_test_accuracy += acc / len(test_loader)

        predict = predict.argmax(dim=1).cpu().numpy()
        label = label.cpu().numpy()
        pred_list.append(predict)
        label_list.append(label)

precision = precision_score(label_list, pred_list, average='macro')
recall = recall_score(label_list, pred_list, average='macro')
f1 = f1_score(label_list, pred_list, average='macro')
accuracy = model_test_accuracy

print(f'precision_score: {precision:0.4f} | recall_score: {recall:0.4f} | f1_score: {f1:0.4f} --> accuracy: {accuracy:0.4f}')

Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


  0%|          | 0/40 [00:00<?, ?batch/s]

  0%|          | 0/40 [00:00<?, ?batch/s]

model test:   0%|          | 0/100 [00:00<?, ?it/s]

precision_score: 0.5079 | recall_score: 0.5065 | f1_score: 0.4663 --> accuracy: 0.4800


In [29]:
#ScoreFusionModel
model = ScoreFusionModel().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)

best_acc = 0
best_epoch = 0
for epoch in range(epochs):
    epoch_acc = 0
    epoch_loss = 0

    model.train()
    with tqdm(train_loader, unit='batch') as train_epoch:
        for image, text, label in train_epoch:
            train_epoch.set_description(f'[TRAIN] EPOCH: {epoch + 1}|{epochs}')

            image = image.to(device)
            text = {k: v.squeeze(1).to(device) for k, v in text.items()}
            label = label.to(device)
            predict = model(image, text)

            loss = criterion(predict, label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            acc = (predict.argmax(dim=1) == label).float().mean()
            epoch_acc += acc / len(train_loader)
            epoch_loss += loss / len(train_loader)

            train_epoch.set_postfix(loss=epoch_loss.item(), accuracy=epoch_acc.item())

    if epoch_acc >= best_acc:
          prev_model = f'{best_epoch}_{best_acc:0.4f}.pth'
          if os.path.exists(prev_model): os.remove(prev_model)

          best_acc = epoch_acc
          best_epoch = epoch + 1

          torch.save(model, f'{best_epoch}_{best_acc:0.4f}.pth')

model = torch.load(f'{best_epoch}_{best_acc:0.4f}.pth').to(device)

with torch.no_grad():
    model.eval()
    model_test_accuracy = 0
    model_test_loss = 0

    pred_list = []
    label_list = []

    for image, text, label in tqdm(test_loader, desc='model test'):
        image = image.to(device)
        text = {k: v.squeeze(1).to(device) for k, v in text.items()}
        label = label.to(device)
        predict = model(image, text)

        acc = (predict.argmax(dim=1) == label).float().mean()
        model_test_accuracy += acc / len(test_loader)

        predict = predict.argmax(dim=1).cpu().numpy()
        label = label.cpu().numpy()
        pred_list.append(predict)
        label_list.append(label)

precision = precision_score(label_list, pred_list, average='macro')
recall = recall_score(label_list, pred_list, average='macro')
f1 = f1_score(label_list, pred_list, average='macro')
accuracy = model_test_accuracy

print(f'precision_score: {precision:0.4f} | recall_score: {recall:0.4f} | f1_score: {f1:0.4f} --> accuracy: {accuracy:0.4f}')

Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


  0%|          | 0/40 [00:00<?, ?batch/s]

  0%|          | 0/40 [00:00<?, ?batch/s]

model test:   0%|          | 0/100 [00:00<?, ?it/s]

precision_score: 0.5482 | recall_score: 0.5235 | f1_score: 0.4286 --> accuracy: 0.4800
