<a href="https://colab.research.google.com/github/OrionXV/Co-Lab-Misogny-Identifier/blob/main/AnotherMAMI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Multimedia Automatic Misogyny Identification (MAMI)... <i> 3: The New Me </i>

A personal endeavour by Syed Arsalaan Nadim for the timebeing. 
In order to avoid clutter I am making a personal notebook.



In [None]:
!pip install torch
!pip install pytorch_lightning 
!pip install fasttext



In [None]:
import numpy as np
import pandas as pd
import seaborn as sns  
sns.set()
from matplotlib import pyplot as plt
import warnings
warnings.filterwarnings('ignore')
%config InLineBackend.figure_format = 'retina'

%matplotlib inline

import csv
import json
import logging
from pathlib import Path
import random
import tarfile
import tempfile

from tqdm import tqdm

import torch                    
import torchvision
import fasttext

In [None]:
!wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=1lqpl_ofWT1aJAG-aHFbgFdKpWx01QDv7' -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=1lqpl_ofWT1aJAG-aHFbgFdKpWx01QDv7" -O train.zip && rm -rf /tmp/cookies.txt

In [None]:
!wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=12KwkIoljStd8diaw5Aqz_RsKityaFudr' -O trial.zip

In [None]:
!unzip -q train.zip

In [None]:
!unzip -q -P *MaMiSemEval2022! trial.zip

In [None]:
import os
import glob

path = './TRAINING'
extension = 'csv'
os.chdir(path)
result = glob.glob('*.{}'.format(extension))
print(result)

In [None]:
data_path = Path.cwd().parent

training_path = data_path / "TRAINING"
train_csv_path = training_path / "training.csv"
trial_path = data_path / "Users" / "fersiniel" / "Desktop/MAMI - TO LABEL" / "TRIAL DATASET" 
trial_csv_path = trial_path / "trial.csv"
data_dir = training_path

In [None]:
 data = pd.read_csv(train_csv_path, usecols=['file_name', 'misogynous', 'Text Transcription'], sep='\t')

In [None]:
data.head(), data['misogynous'].value_counts(), data['Text Transcription'].map(lambda text: len(text.split(" "))).describe()

In [None]:
from PIL import Image


images = [
    Image.open(
        training_path / data.loc[i, "file_name"]
    ).convert("RGB")
    for i in range(16)
]

for image in images:
    print(image.size)

In [None]:
image_transform = torchvision.transforms.Compose(
    [
        torchvision.transforms.Resize(size=(200, 200)),
        torchvision.transforms.ToTensor()
    ]
)

In [None]:
tensor_img = torch.stack(
    [image_transform(image) for image in images]
)
grid = torchvision.utils.make_grid(tensor_img)

plt.rcParams["figure.figsize"] = (20, 5)
plt.axis('off')
plt.imshow(grid.permute(1, 2, 0))

# Model Creation 

Data Set Creation 

In [None]:
class MemeDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        data_path,
        img_dir,
        image_transform,
        text_transform
        
    ):
        self.samples_frame = pd.read_csv(data_path, sep='\t')

        self.samples_frame['file_path'] = self.samples_frame['file_name'].apply(
            lambda row: (Path(str(img_dir) + "/" +str(row)))
            ) 

        #self.samples_frame.set_index('file_name')

        self.img_dir = img_dir
        self.image_transform = image_transform
        self.text_transform = text_transform

    def __len__(self):
        return len(self.samples_frame)

    def __getitem__(self, idx):
        """This method is called when you do instance[key] 
        for an instance of this class.
        """
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_id = torch.Tensor(
            self.text_transform.get_sentence_vector(
                self.samples_frame.loc[idx, 'file_name'].replace("\n", "") #.apply(lambda id: id.strip()).to_string().
            )
        ).squeeze()
       
        image = Image.open(
            self.samples_frame.loc[idx, "file_path"]#.to_string()
        ).convert("RGB")

        image = self.image_transform(image)

        text = torch.Tensor(
            self.text_transform.get_sentence_vector(
                self.samples_frame.loc[idx, "Text Transcription"].replace("\n", "") #.apply(lambda id: id.strip()).to_string().
            )
        ).squeeze()

        if "misogynous" in self.samples_frame.columns:
            label = torch.Tensor(
                [self.samples_frame.loc[idx, "misogynous"]]
            ).long().squeeze()
            sample = {
                "file_name": img_id, 
                "image": image, 
                "Text Transcription": text, 
                "misogynous": label
            }
        else:
            sample = {
                "file_name": img_id, 
                "image": image, 
                "Text Transcription": text
            }

        return sample

In [None]:
class LangAndVisionConcat(torch.nn.Module):
    def __init__(
        self,
        num_classes,
        loss_fn,
        language_module,
        vision_module,
        language_feature_dim,
        vision_feature_dim,
        fusion_output_size,
        dropout_p,     
    ):
        super().__init__()

        self.language_module = language_module
        self.vision_module = vision_module

        self.fusion = torch.nn.Linear(
            in_features=(language_feature_dim + vision_feature_dim), 
            out_features=fusion_output_size
        )

        self.fc = torch.nn.Linear(
            in_features=fusion_output_size, 
            out_features=num_classes
        )

        self.loss_fn = loss_fn
        self.dropout = torch.nn.Dropout(dropout_p)

    def forward(self, text, image, label = None):
        text_features = torch.nn.functional.relu(self.language_module(text))
        image_features = torch.nn.functional.relu(self.vision_module(image))
        combined = torch.cat([text_features, image_features], dim=1)
        fused = self.dropout(torch.nn.functional.relu(self.fusion(combined)))
        logits = self.fc(fused)
        pred = torch.nn.functional.softmax(logits)
        loss = (self.loss_fn(pred, label) if label is not None else label)
        return (pred, loss)

In [None]:
!pip install torchmetrics

In [None]:
import pytorch_lightning as pl
from torchmetrics import functional as FM 

warnings.filterwarnings("ignore")
logging.getLogger().setLevel(logging.WARNING)


class MemesModel(pl.LightningModule):
    def __init__(self, hparams):
        for data_key in [
                         "train_path", 
                         "trial_path", 
                         "train_img_dir",
                         "trial_img_dir"
                    ]:
            if data_key not in hparams.keys():
                raise KeyError(
                    f"{data_key} is a required hparam in this model"
                )
        
        super().__init__()
        self._hparams = hparams 

        # assign some hparams that get used in multiple places
        self.embedding_dim = self.hparams.get("embedding_dim", 300)
        self.language_feature_dim = self.hparams.get("language_feature_dim", 300)

        # balance language and vision features by default
        self.vision_feature_dim = self.hparams.get("vision_feature_dim", self.language_feature_dim)
        self.output_path = Path(self.hparams.get("output_path", "model-outputs"))
        self.output_path.mkdir(exist_ok=True)
        
        # instantiate transforms, datasets
        self.text_transform = self._build_text_transform()
        self.image_transform = self._build_image_transform()
        self.train_dataset = self._build_dataset(data_key="train_path", img_dir="train_img_dir")
        self.trial_dataset = self._build_dataset(data_key="trial_path", img_dir="test_img_dir")
        
        # set up model and training
        self.model = self._build_model()
        self.trainer_params = self._get_trainer_params()
    
    @property
    def hparams(self):
        return self._hparams

    # Required LightningModule Methods (when validating) 
    
    def forward(self, text, image, label=None):
        return self.model(text, image, label)

    def training_step(self, batch, batch_nb):
        preds, loss = self.forward(
            text=batch["Text Transcription"], 
            image=batch["image"], 
            label=batch["misogynous"]
        )
        
        return {"loss": loss}

    def validation_step(self, batch, batch_nb):
        preds, loss = self.eval().forward(
            text=batch["Text Transcription"], 
            image=batch["image"], 
            label=batch["misogynous"]
        )

        acc = FM.accuracy(preds, batch["misogynous"], num_classes=2)
        self.log("val_loss", loss)
        
        return {"batch_val_loss": loss, "batch_val_acc": acc}

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack(
            tuple(
                output["batch_val_loss"] 
                for output in outputs
            )
        ).mean()
        
        avg_acc = torch.stack(
            tuple(
                output["batch_val_acc"] 
                for output in outputs
            )
        ).mean()

        return {
            "val_loss": avg_loss,
            "progress_bar":{
                "avg_val_loss": avg_loss, 
                "avg_val_acc": avg_acc
                }
        }

    def configure_optimizers(self):
        optimizers = [
            torch.optim.AdamW(
                self.model.parameters(), 
                lr=self.hparams.get("lr", 0.001)
            )
        ]
        schedulers = [{'scheduler': x, 'monitor': 'val_loss'} for x in [
            torch.optim.lr_scheduler.ReduceLROnPlateau(
                optimizers[0]
            )
        ]]
        tu = tuple(
            [
             {'optimizer' : op, 'lr_scheduler' : sch} for op, sch in zip(optimizers, schedulers)
            ]
        )
        return tu
    
    #@pl.data_loader
    def train_dataloader(self):
        return torch.utils.data.DataLoader(
            self.train_dataset, 
            shuffle=True, 
            batch_size=self.hparams.get("batch_size", 4), 
            num_workers=self.hparams.get("num_workers", 16)
        )

    #@pl.data_loader
    def val_dataloader(self):
        return torch.utils.data.DataLoader(
            #self.trial_dataset,
            self.train_dataset, 
            shuffle=False, 
            batch_size=self.hparams.get("batch_size", 4), 
            num_workers=self.hparams.get("num_workers", 16),
        )

    def fit(self):
        self.trainer = pl.Trainer(**self.trainer_params)
        self.trainer.fit(self)
    
    # will improve further on this by making a few more methods... or well specifying them 
    
    def _build_text_transform(self):
        with tempfile.NamedTemporaryFile() as ft_training_data:
            ft_path = Path(ft_training_data.name)
            with ft_path.open("w") as ft:

                training_data = [ 
                                 str(line) + "/n" 
                                 for line in pd.read_csv(self.hparams.get('train_path'), usecols = ["Text Transcription"], sep = '\t')['Text Transcription']
                ]

                for line in training_data:
                    ft.write(line + "\n")
                language_transform = fasttext.train_unsupervised(
                    str(ft_path),
                    model=self.hparams.get("fasttext_model", "cbow"),
                    dim=self.embedding_dim
                )
        return language_transform
    
    def _build_image_transform(self):
        image_dim = self.hparams.get("image_dim", 200)
        image_transform = torchvision.transforms.Compose(
            [
                torchvision.transforms.Resize(
                    size=(image_dim, image_dim)
                ),        
                torchvision.transforms.ToTensor(),
                # all torchvision models expect the same
                # normalization mean and std
                # https://pytorch.org/docs/stable/torchvision/models.html
                torchvision.transforms.Normalize(
                    mean=(0.485, 0.456, 0.406), 
                    std=(0.229, 0.224, 0.225)
                ),
            ]
        )
        return image_transform

    def _build_dataset(self, data_key, img_dir):
        return MemeDataset(
            data_path = self.hparams.get(data_key),
            img_dir = self.hparams.get(img_dir), 
            image_transform = self.image_transform,
            text_transform = self.text_transform,
            # limit training samples only
        )

    def _build_model(self):
        # we're going to pass the outputs of our text
        # transform through an additional trainable layer
        # rather than fine-tuning the transform
        language_module = torch.nn.Linear(
                in_features=self.embedding_dim,
                out_features=self.language_feature_dim
        )
        
        # easiest way to get features rather than
        # classification is to overwrite last layer
        # with an identity transformation, we'll reduce
        # dimension using a Linear layer, resnet is 2048 out
        vision_module = torchvision.models.resnet152(
            pretrained=True
        )
        vision_module.fc = torch.nn.Linear(
                in_features=2048,
                out_features=self.vision_feature_dim
        )

        return LangAndVisionConcat(
            num_classes = self.hparams.get("num_classes", 2),
            loss_fn = torch.nn.CrossEntropyLoss(),
            language_module = language_module,
            vision_module = vision_module,
            language_feature_dim = self.language_feature_dim,
            vision_feature_dim = self.vision_feature_dim,
            fusion_output_size = self.hparams.get(
                "fusion_output_size", 512
            ),
            dropout_p = self.hparams.get("dropout_p", 0.1),
        )

    def _get_trainer_params(self):
        checkpoint_callback = pl.callbacks.ModelCheckpoint(
            dirpath = self.output_path,
            monitor = self.hparams.get(
                "checkpoint_monitor", "val_loss"
            ),
            mode = self.hparams.get(
                "checkpoint_monitor_mode", "max"
            ),
            verbose = self.hparams.get("verbose", True)
        )

        early_stop_callback = pl.callbacks.EarlyStopping(
            monitor = self.hparams.get(
                "early_stop_monitor", "val_loss"
            ),
            min_delta = self.hparams.get(
                "early_stop_min_delta", 0.001
            ),
            patience = self.hparams.get(
                "early_stop_patience", 5
            ),
            verbose = self.hparams.get("verbose", True),
        )

        trainer_params = {
            "callbacks" : [early_stop_callback],
            "checkpoint_callback": checkpoint_callback,
            #"early_stop_callback": early_stop_callback,
            #"default_save_path": self.output_path,
            "accumulate_grad_batches": self.hparams.get(
                "accumulate_grad_batches", 1
            ),
            "gpus": self.hparams.get("n_gpu", 1),
            "max_epochs": self.hparams.get("max_epochs", 100),
            "gradient_clip_val": self.hparams.get(
                "gradient_clip_value", 1
            ),
        }
        return trainer_params
    
    @torch.no_grad()
    def make_submission_frame(self, train_path):
        train_dataset = self._build_dataset(train_path)
        submission_frame = pd.DataFrame(
            index = train_dataset.samples_frame.file_name,
            columns=["proba", "misogynous"]
        )

        train_dataloader = torch.utils.data.DataLoader(
            train_dataset, 
            shuffle = False, 
            batch_size = self.hparams.get("batch_size", 4), 
            num_workers = self.hparams.get("num_workers", 16))
        for batch in tqdm(train_dataloader, total = len(train_dataloader)):
            preds, _ = self.model.eval().to("cpu")(
                batch["Text Transcription"], batch["image"]
            )
            submission_frame.loc[batch["file_name"], "proba"] = preds[:, 1]
            submission_frame.loc[batch["file_name"], "misogynous"] = preds.argmax(dim=1)
        submission_frame.proba = submission_frame.proba.astype(float)
        submission_frame.label = submission_frame.misogynous.astype(int)
        return submission_frame    
    

In [None]:
hparams = {
    "train_path": train_csv_path,
    "trial_path": trial_csv_path,
    "train_img_dir": data_dir,
    "trial_img_dir": trial_path,

    

    "embedding_dim": 150,
    "language_feature_dim": 300,
    "vision_feature_dim": 300,
    "fusion_output_size": 256,
    "output_path": "model-outputs",
    "lr": 0.005,
    "max_epochs": 10,
    "n_gpu": 1,
    "batch_size": 8,
    # allows us to "simulate" having larger batches 
    "accumulate_grad_batches": 16,
    "early_stop_patience": 3,
}


miso_memes_model = MemesModel(hparams = hparams)
miso_memes_model.fit()