In [44]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [45]:
!pip install transformers datasets

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [61]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from datasets import (Dataset, DatasetDict)
from transformers import (AutoTokenizer,
        AutoModel,
        DefaultDataCollator,
        Trainer,
        TrainingArguments,
        AutoConfig)
from sklearn.model_selection import train_test_split
import torchvision.models as models
from PIL import Image

In [47]:
# Path
my_PATH = "/content/drive/MyDrive/ybigta"
os.chdir(my_PATH)
# show file list in the current directory
!dir

AllLabeledData.csv


In [30]:
ch = "AllLabeledData"
t_size = 0.2
v_size = 0.5

In [49]:
df = pd.read_csv(f"{ch}.csv", usecols= ['Title', 'label'], index_col=0).dropna()

In [50]:
#For loading data
def load_data(df,t_size, v_size):  
  train_df, test_df = train_test_split(df, test_size = t_size)
  val_df, test_df = train_test_split(test_df, test_size=v_size)

  raw_ds = DatasetDict({"train": Dataset.from_pandas(train_df),
       "validation": Dataset.from_pandas(val_df),
       "test": Dataset.from_pandas(test_df)})
  
  return raw_ds

In [51]:
raw_ds = load_data(df, t_size, v_size)
raw_ds

DatasetDict({
    train: Dataset({
        features: ['label', 'Title'],
        num_rows: 20317
    })
    validation: Dataset({
        features: ['label', 'Title'],
        num_rows: 2540
    })
    test: Dataset({
        features: ['label', 'Title'],
        num_rows: 2540
    })
})

In [56]:
#getting the tokenizer
checkpoint = "klue/roberta-base"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

In [57]:
#tokenizing data
def tokenize_function(batch):
  return tokenizer(batch['Title'], truncation = True, max_length= 100, add_special_tokens=True, padding='max_length')

tokenized =raw_ds.map(tokenize_function, batched = True)
tokenized

  0%|          | 0/21 [00:00<?, ?ba/s]

  0%|          | 0/3 [00:00<?, ?ba/s]

  0%|          | 0/3 [00:00<?, ?ba/s]

DatasetDict({
    train: Dataset({
        features: ['label', 'Title', 'input_ids', 'token_type_ids', 'attention_mask'],
        num_rows: 20317
    })
    validation: Dataset({
        features: ['label', 'Title', 'input_ids', 'token_type_ids', 'attention_mask'],
        num_rows: 2540
    })
    test: Dataset({
        features: ['label', 'Title', 'input_ids', 'token_type_ids', 'attention_mask'],
        num_rows: 2540
    })
})

In [62]:
#set format of tokenized data for pytorch and get data collator
tokenized.set_format("torch", columns = ["input_ids", "attention_mask", "label"])
data_collator = DefaultDataCollator("pt")

In [63]:
class LanguageModel(nn.Module):
  def __init__(self, checkpoint):
    super(LanguageModel, self).__init__()

    # Load model with given checkpoint and get the body
    self.transformer = AutoModel.from_pretrained(checkpoint,
                                                   config = AutoConfig.from_pretrained(
                                                       checkpoint,
                                                       output_attentions = True, 
                                                       output_hidden_states = True))
    self.bidir_LSTM = nn.LSTM(768, 50, bidirectional= True) 
    self.flatten = nn.Flatten() 
    self.dense_50 = nn.Linear(100, 50)
  def forward(self, input_ids=None, attention_mask=None,labels=None):
    #Extract outputs from the body
    outputs = self.transformer(input_ids=input_ids, attention_mask=attention_mask)
    #Add custom layers
    LSTM_out = self.bidir_LSTM(outputs.last_hidden_states)
    max_pool_out = torch.max(LSTM_out,1)
    output = F.relu(self.dense_50(max_pool_out))
    return output

  

In [64]:
class ImgModel(nn.Module):
    def __init__(self, num_classes):
        super(ImgModel, self).__init__()
        
        self.base_model = models.densenet121(pretrained=True)
        self.base_model.features.conv0 = nn.Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.base_model.classifier = nn.Identity()
        
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.batch_norm1 = nn.BatchNorm2d(1024)
        self.dropout1 = nn.Dropout(p=0.5)
        self.fc1 = nn.Linear(1024, 1024)
        self.fc2 = nn.Linear(1024, num_classes)
        self.batch_norm2 = nn.BatchNorm1d(256)
        self.dropout2 = nn.Dropout(p=0.5)
        
    def forward(self, x):
        x = self.base_model.features(x)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.batch_norm1(x)
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.batch_norm2(x)
        x = self.dropout2(x)
        x = torch.sigmoid(x)
        return x

In [65]:
class ConcatModel(nn.Module):
    def __init__(self):
        super(ConcatModel, self).__init__()
        self.nlp_model = LanguageModel(checkpoint=checkpoint)
        self.img_model = ImgModel(num_classes=2)
        
        # Define context gating layer
        self.context_gate = nn.Sequential(
            nn.Linear(258, 258),
            nn.Sigmoid()
        )
        
        # Define dense layer with 20 units
        self.dense_layer = nn.Linear(258, 20)
        
        # Define dropout layer with 0.2 dropout rate
        self.dropout_layer = nn.Dropout(0.2)
        
        # Define final dense layer with softmax activation
        self.softmax_layer = nn.Softmax(dim=1)
        
    def forward(self, nlp_input, img_input):
        nlp_output = self.nlp_model(nlp_input)
        img_output = self.img_model(img_input)
        
        # Concatenate the outputs of the two models
        concat_output = torch.cat((nlp_output, img_output), dim=1)
        
        # Apply context gating
        gated_output = self.context_gate(concat_output) * concat_output
        
        # Apply dense layer, dropout layer, and softmax layer
        dense_output = self.dense_layer(gated_output)
        dropout_output = self.dropout_layer(dense_output)
        softmax_output = self.softmax_layer(dropout_output)
        
        return softmax_output

In [66]:
model = ConcatModel()

Some weights of the model checkpoint at klue/roberta-base were not used when initializing RobertaModel: ['lm_head.dense.bias', 'lm_head.layer_norm.bias', 'lm_head.decoder.weight', 'lm_head.layer_norm.weight', 'lm_head.bias', 'lm_head.decoder.bias', 'lm_head.dense.weight']
- This IS expected if you are initializing RobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of RobertaModel were not initialized from the model checkpoint at klue/roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for

In [68]:
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.utils.data as data

class CustomDataset(data.Dataset):
    def __init__(self, text_data, image_data, label_data, img_transform=None):
        self.text_data = text_data
        self.image_data = image_data
        self.label_data = label_data
        self.img_transform = img_transform
        
    def __getitem__(self, index):
        # Load text data
        text_input = torch.tensor(self.text_data[index])
        
        # Load image data
        img = Image.open(self.image_data[index]).convert('RGB')
        if self.img_transform is not None:
            img = self.img_transform(img)
        img_input = img
        
        # Load label data
        label = torch.tensor(self.label_data[index])
        
        return text_input, img_input, label
    
    def __len__(self):
        return len(self.label_data)

# Example usage:
train_text_data = tokenized["train"]["Title"]
train_image_data = tokenized["train"]["Image"]
train_label_data = tokenized["train"]["label"]
img_transform = transforms.Compose([transforms.Resize((180, 320)),
                                    transforms.ToTensor()])

train_dataset = CustomDataset(train_text_data, train_image_data, train_label_data, img_transform=img_transform)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)

validation_text_data = tokenized["validation"]["Title"]
validation_image_data = tokenized["validation"]["Image"]
validation_label_data = tokenized["validation"]["label"]
img_transform = transforms.Compose([transforms.Resize((180, 320)),
                                    transforms.ToTensor()])

validation_dataset = CustomDataset(validation_text_data, validation_image_data, validation_label_data, img_transform=img_transform)
validation_dataloader = torch.utils.data.DataLoader(validation_dataset, batch_size=32, shuffle=True)

test_text_data = tokenized["test"]["Title"]
test_image_data = tokenized["test"]["Image"]
test_label_data = tokenized["test"]["label"]
img_transform = transforms.Compose([transforms.Resize((180, 320)),
                                    transforms.ToTensor()])

test_dataset = CustomDataset(test_text_data, test_image_data, test_label_data, img_transform=img_transform)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=True)





KeyError: ignored

In [42]:
from transformers import AdamW, get_scheduler

optimizer = AdamW(model.parameters(), lr = 5e-5,eps = 1e-8)

num_epoch = 3
num_training_steps = num_epoch * len(train_dataloader)
lr_scheduler = get_scheduler(
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps = num_training_steps)

print(num_training_steps)



TypeError: ignored

In [None]:
from datasets import load_metric
metric = load_metric(["accuracy", "f1"])

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

print('device:', device)

In [None]:
from tqdm.auto import tqdm

progress_bar_train = tqdm(range(num_training_steps))
progress_bar_eval = tqdm(range(num_epochs*len(eval_dataloader)))

for epoch in range(num_epochs):
    model.train()
    for batch in train_dataloader:
        batch = {k:v.to(device) for k, v in batch_items()}
        outputs = model(**batch)
        loss = outputs.loss
        loss.backward()

        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()
        progress_bar_train.update(1)
    model.eval()
    for batch in eval_dataloader:
        batch = {k:v.to(device) for k, v in batch_items()}
        with torch.no.grad():
            outputs = model(**batch)
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1)
        metric.add_batch(predictions = predictions, references=batch["label"])
        progress_bar_eval.update(1)
        
    print(metric.compute())

