In [None]:
import sys
from pathlib import Path

DATASET_PATH = 'data/MEC/'
MODEL_OUTPUT_DIR = "data/results"
DATASET_CACHE_FILE = "data/cache/dataset.array"
Path(DATASET_CACHE_FILE).parent.mkdir(parents=True, exist_ok=True)
DATASET = Path(DATASET_PATH) / Path("mec-dataset.csv")
TEXT_COLUMN = "text"
IMAGE_PATH_COLUMN = "image_path"
TARGET_COLUMN = "formal_register"
SEED = 42
N_EPOCHS = 30
BATCH_SIZE = 32

print(sys.version)

In [2]:
from collections import defaultdict
import pandas as pd
import torch
import torch.nn as nn
from datasets import Dataset, DatasetDict, ClassLabel
from transformers import BertModel, ViTModel, BertTokenizer, ViTImageProcessor, Trainer, TrainingArguments, EvalPrediction
from PIL import Image, UnidentifiedImageError, ImageFile

from sklearn.metrics import classification_report, accuracy_score, f1_score, recall_score, precision_score
from sklearn.model_selection import StratifiedShuffleSplit
from imblearn.over_sampling import RandomOverSampler

ImageFile.LOAD_TRUNCATED_IMAGES = True


## Prepare Dataset

In [None]:
df = pd.read_csv(DATASET, index_col=0).convert_dtypes()
df[['cohesion', 'thematic_coherence', 'formal_register', 'text_typology']] = 'Nível ' + df[['cohesion', 'thematic_coherence', 'formal_register', 'text_typology']].astype(str)
df[IMAGE_PATH_COLUMN] = DATASET_PATH + df[IMAGE_PATH_COLUMN]
df.info()
df.head(2)

In [None]:
class_names = sorted(df[TARGET_COLUMN].unique().tolist())

class_label = ClassLabel(num_classes=len(class_names), names=class_names)
class_label

## Training Functions

In [None]:
df[IMAGE_PATH_COLUMN].apply(Path).apply(Path.exists).all()

In [6]:
def is_image_valid(image_path):
    try:
        with Image.open(image_path) as img:
            img.verify()
        return True
    except (IOError, UnidentifiedImageError):
        return False

def prepare_dataset(df: pd.DataFrame, train_indexes: list[int], test_indexes: list[int], processor: ViTImageProcessor, tokenizer: BertTokenizer) -> DatasetDict:
    # Processing dataset e and storing in file cache
    original_dataset = (Dataset
        .from_pandas(df)
        .select_columns([TEXT_COLUMN, IMAGE_PATH_COLUMN, TARGET_COLUMN])
        .filter(is_image_valid, input_columns=[IMAGE_PATH_COLUMN])
        .map(lambda path: {'pixel_values': Image.open(path)}, input_columns=[IMAGE_PATH_COLUMN])
        .map(
            lambda pixel_values: processor(pixel_values, return_tensors='pt'),
            input_columns=['pixel_values'],
            batched=True,
            batch_size=8,
            num_proc=4,
            cache_file_name=DATASET_CACHE_FILE,
        )
        .map(
            lambda text: tokenizer(
                text,
                padding='max_length',
                truncation=True,
                max_length=512,
            ),
            batched=True,
            input_columns=[TEXT_COLUMN]
        )
        .remove_columns([IMAGE_PATH_COLUMN])
        .rename_column(TARGET_COLUMN, 'labels')
        .cast_column('labels', class_label)
    )

    # Balaced indexes
    balanced_train_indexes = RandomOverSampler(random_state=SEED)\
      .fit_resample(train_indexes[None].T, df.loc[train_indexes, TARGET_COLUMN])[0].flatten()

    # Creating dataset split
    dataset = DatasetDict(
        train=original_dataset.select(balanced_train_indexes),
        test=original_dataset.select(test_indexes)
    )
    dataset.set_format('pt', ['input_ids', 'attention_mask', 'pixel_values', 'labels'], output_all_columns=True)
    return dataset


def compute_metrics(eval_preds: EvalPrediction, compute_result=False, *, PREDS: list = [], LABELS: list = []):
    labels = eval_preds.label_ids
    preds = eval_preds.predictions.argmax(-1)

    PREDS.append(preds)
    LABELS.append(labels)

    if compute_result:
        preds = torch.concat(PREDS).numpy(force=True)
        labels = torch.concat(LABELS).numpy(force=True)

        f1 = f1_score(labels, preds, average="weighted")
        acc = accuracy_score(labels, preds)

        PREDS, LABELS = [], []

        return {"accuracy": acc, "f1": f1}


def evaluate_model(model, dataset: DatasetDict, device):
    # Get model predictions and ground truth
    model.eval()
    model.to(device)
    torch.cuda.empty_cache()

    with torch.no_grad():
      batches = (batch['pixel_values'] for batch in dataset['test'].iter(BATCH_SIZE))
      logits = [model(batch.to(device)).logits for batch in batches]
      logits = torch.concat(logits, 0)

    y_pred = torch.argmax(logits, dim=-1).cpu().numpy()
    y_true = dataset["test"]["labels"].cpu().numpy()

    torch.cuda.empty_cache()

    # Evaluate model
    return dict(
        report = classification_report(y_true, y_pred),
        accuracy = accuracy_score(y_true, y_pred),
        weighted_precision = precision_score(y_true, y_pred, average="weighted"),
        weighted_recall = recall_score(y_true, y_pred, average="weighted"),
        weighted_f1 = f1_score(y_true, y_pred, average="weighted"),
        macro_precision = precision_score(y_true, y_pred, average="macro"),
        macro_recall = recall_score(y_true, y_pred, average="macro"),
        macro_f1 = f1_score(y_true, y_pred, average="macro"),
    )


In [None]:
class MultimodalModel(nn.Module):
    def __init__(self, bert_model: BertModel, vit_model: ViTModel, num_classes = 5, hidden_size = 256):
        super(MultimodalModel, self).__init__()
        # Congelando os parâmetros do BERT e do ViT
        bert_model.requires_grad_(False)
        vit_model.requires_grad_(False)
        self.bert = bert_model
        self.vit = vit_model
        self.fc = nn.Linear(self.bert.config.hidden_size + self.vit.config.hidden_size, hidden_size)
        self.classifier = nn.Linear(hidden_size, num_classes)  # Assumindo 4 classes como saída
    
    def forward(self, pixel_values, input_ids, attention_mask, labels=None):
        # Processar o texto usando o BERT
        bert_outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        bert_pooled_output = bert_outputs.last_hidden_state[:, 0, :]
        
        # Processar a imagem usando o ViT
        vit_outputs = self.vit(pixel_values=pixel_values)
        vit_pooled_output = vit_outputs.last_hidden_state[:, 0, :]
        
        # Concatenar as saídas
        combined = torch.cat((bert_pooled_output, vit_pooled_output), dim=1)
        
        # Passar pela camada fully-connected
        combined = self.fc(combined)
        combined = torch.relu(combined)
        
        # Classificação final
        logits = self.classifier(combined)

        if labels is not None:
            loss = nn.functional.cross_entropy(logits, labels)
            return {"loss": loss, "logits": logits}
        
        return {'logits': logits}


BERT_MODEL_NAME = 'neuralmind/bert-base-portuguese-cased'
VIT_MODEL_NAME = 'google/vit-base-patch16-224'

bert_model: BertModel = BertModel.from_pretrained(BERT_MODEL_NAME).requires_grad_(False)
vit_model: ViTModel = ViTModel.from_pretrained(VIT_MODEL_NAME).requires_grad_(False)

tokenizer: BertTokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME)
processor: ViTImageProcessor = ViTImageProcessor.from_pretrained(VIT_MODEL_NAME)

# Instanciar o modelo multimodal
model = MultimodalModel(bert_model, vit_model)

In [None]:
train_indexes, test_indexes = next(StratifiedShuffleSplit(random_state=SEED, test_size=0.1).split(df, df[TARGET_COLUMN]))
dataset = prepare_dataset(df, train_indexes, test_indexes, processor, tokenizer)

In [None]:
training_args = TrainingArguments(
    output_dir='results',
    per_device_train_batch_size=16,
    num_train_epochs=10,
    batch_eval_metrics=True,
    dataloader_num_workers=8,
    data_seed=SEED,
    eval_strategy="steps",
    eval_steps=50,
    save_steps=200,
    logging_dir="logging",
    logging_steps=25,
    fp16=True,
    learning_rate=2e-4,
    save_total_limit=2,
    use_cpu=False,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
)

trainer.train()