#Installations



In [None]:
!pip install evaluate
!pip install -U sentence-transformers
!pip install aac-metrics

!aac-metrics-info
!aac-metrics-download

# Imports

In [2]:
import requests
import os
from google.colab import files
import io
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
import subprocess
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from PIL import Image
import evaluate
import aac_metrics
import pandas as pd
from transformers import BlipProcessor, BlipForConditionalGeneration
from transformers import pipeline
from sentence_transformers import SentenceTransformer, util
from datasets import load_dataset
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


# Classes

In [3]:
class ImageCaptioningDataset(Dataset):
    def __init__(self, dataset, processor):
        self.dataset = dataset
        self.processor = processor

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        encoding = self.processor(images=item["image"], text=item["text"], padding="max_length", return_tensors="pt")
        # remove batch dimension
        encoding = {k:v.squeeze() for k,v in encoding.items()}
        return encoding

# Constants

In [4]:
images_path = '/content/drive/MyDrive/Data-Co-Lab/EmoatCap/images.zip'
metadata_path = '/content/drive/MyDrive/Data-Co-Lab/EmoatCap/metadata.csv'
amoatcap_path = '/content/drive/MyDrive/Data-Co-Lab/EmoatCap/emo_at_cap.csv'

amoatcap_copy_path = '/content/emo_at_cap.csv'
metadata_copy_path = '/content/metadata.csv'

folder_path = '/content/folder'
train_path = '/content/folder/train'
test_path = '/content/folder/test'

sent_train_path = '/content/sent_train.npy'
sent_test_path = '/content/sent_test.npy'

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

cuda


# Functions

In [5]:
def fetch_data():
    print("Fetching Data...")

    subprocess.run(['unzip', images_path, '-d', '/content'])
    subprocess.run(['cp', amoatcap_path, '/content'])
    subprocess.run(['cp', metadata_path, '/content'])

    amoatcap_df = pd.read_csv(amoatcap_copy_path)
    metadata_df = pd.read_csv(metadata_copy_path)

    # split data to train and test
    image_captions = metadata_df.to_numpy()[:, 0]
    sentiments = amoatcap_df['human_sentiment'].to_numpy()

    image_caption_train, image_caption_test, sent_train, sent_test = train_test_split(image_captions, sentiments, test_size=0.2, random_state=42)

    # save to "folder" for the Imagefolder function
    subprocess.run(['mkdir', folder_path])

    print("\nMaking Train Folder...")

    for file_name in tqdm(image_caption_train):
        subprocess.run(['mkdir', train_path])
        subprocess.run(['cp', f'/content/images/{file_name}', train_path])

    print("\nMaking Test Folder...")

    for file_name in tqdm(image_caption_test):
        subprocess.run(['mkdir', test_path])
        subprocess.run(['cp', f'/content/images/{file_name}', test_path])

    subprocess.run(['cp', metadata_copy_path, train_path])
    subprocess.run(['cp', metadata_copy_path, test_path])

    print("\nSaving Train and Test Sentiments...")

    np.save(sent_train_path, sent_train)
    np.save(sent_test_path, sent_test)


def get_blip_model(blip_mode):
    processor = BlipProcessor.from_pretrained(f"Salesforce/blip-image-captioning-{blip_mode}")
    model = BlipForConditionalGeneration.from_pretrained(f"Salesforce/blip-image-captioning-{blip_mode}")
    model.to(device)
    return model, processor


def fine_tune(model, processor, train_dataset, num_epoch, batch_size=2):
    print("\nFine-tunning The Model...")

    train_ds = ImageCaptioningDataset(train_dataset, processor)
    train_dataloader = DataLoader(train_ds, shuffle=True, batch_size=batch_size)

    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

    model.train()

    for epoch in range(num_epoch):
        print("Epoch:", epoch)
        for idx, batch in enumerate(tqdm(train_dataloader)):
            input_ids = batch.pop("input_ids").to(device)
            pixel_values = batch.pop("pixel_values").to(device)

            outputs = model(
                input_ids=input_ids,
                pixel_values=pixel_values,
                labels=input_ids
            )

            loss = outputs.loss
            loss.backward()

            optimizer.step()
            optimizer.zero_grad()

    return model, processor


def image_to_caption(model, processor, image):
    # test the model on an image
    inputs = processor(image, return_tensors="pt").to(device)
    pixel_values = inputs.pixel_values

    out = model.generate(pixel_values=pixel_values, max_length=50)
    caption = processor.decode(out[0], skip_special_tokens=True)

    return caption


def get_sentiment_analysis_classifier(model_name='cardiffnlp/twitter-roberta-base-sentiment-latest'):
    classifier = pipeline(model=model_name)
    return classifier


def compute_average_similarity(predicted_captions, reference_captions, model_name='distilbert-base-nli-mean-tokens'):
    # using a transformer model to assess the similarity between sentences
    assert len(predicted_captions) == len(reference_captions)

    sentiment_model = SentenceTransformer(model_name)

    sum = 0
    for p, r in zip(tqdm(predicted_captions), reference_captions):
        sentence_embeddings = sentiment_model.encode([p, r])
        sum += util.pytorch_cos_sim(sentence_embeddings[0], sentence_embeddings[1])[0][0]
    average_similarity = sum/len(predicted_captions)

    return average_similarity


def convert_sentiment_to_number(sentiments):
    # sentiments is a list
    converter = {'NEGATIVE': 0, 'NEUTRAL': 1, 'POSITIVE': 2}
    nums = [converter[s.upper()] for s in sentiments]
    return nums


def evaluate_model(model, processor, test_dataset, reference_sentiments, similarity_evaluation, verbose=0):
    print("\nRunning Model Evaluation...")
    reference_captions = []
    predicted_captions = []
    predicted_sentiments = []

    sentiment_analysis_classifier = get_sentiment_analysis_classifier()
    for idx, row in enumerate(tqdm(test_dataset)):
        generated_caption = image_to_caption(model, processor, row['image'])

        predicted_captions.append(generated_caption)
        reference_captions.append(row['text'])
        predicted_sentiment = sentiment_analysis_classifier(generated_caption)[0]['label']
        predicted_sentiments.append(predicted_sentiment)

        if verbose:
            print("*"*50)
            print(generated_caption)
            print(row['text'])
            print(predicted_sentiment)
            print(reference_sentiments[idx])
            print("*"*50)

    print('/nComputing evaluation_metrics for image captioning...')
    reference_captions_in_list = [[c] for c in reference_captions]
    metrics_scores, _ = aac_metrics.evaluate(predicted_captions, reference_captions_in_list)

    average_similarity = None
    if similarity_evaluation == True:
        print('/nComputing similarities using a transformer model...')
        average_similarity = compute_average_similarity(predicted_captions, reference_captions)

    sentiments_results = None
    print('/nComputing accuray for sentiment analysis...')
    accuracy_metric = evaluate.load("accuracy")
    sentiments_results = accuracy_metric.compute(
        predictions=convert_sentiment_to_number(predicted_sentiments),
        references=convert_sentiment_to_number(reference_sentiments)
    )['accuracy'] * 100

    return {
        'average_similarity': average_similarity,
        'metric_scores': metrics_scores,
        'sentiment_accuracy': sentiments_results,
    }

# Main

## Get and Split Emo-at-Cap Dataset

In [6]:
fetch_data()

train_dataset = load_dataset("imagefolder", data_dir="folder", split="train")
test_dataset = load_dataset("imagefolder", data_dir="folder", split="test")

train_reference_sentiments = np.load('/content/sent_train.npy', allow_pickle=True)
test_reference_sentiments = np.load('/content/sent_test.npy', allow_pickle=True)

Fetching Data...

Making Train Folder...


100%|██████████| 3068/3068 [00:23<00:00, 130.04it/s]



Making Test Folder...


100%|██████████| 767/767 [00:04<00:00, 155.14it/s]



Saving Train and Test Sentiments...


Resolving data files:   0%|          | 0/3069 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/768 [00:00<?, ?it/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

Resolving data files:   0%|          | 0/3069 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/768 [00:00<?, ?it/s]

## Get Blip Model

In [7]:
model, processor = get_blip_model('large')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/445 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/527 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/711k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.60k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.88G [00:00<?, ?B/s]

## Test Raw Model

In [8]:
evaluate_model(model, processor, test_dataset, list(test_reference_sentiments), similarity_evaluation=False, verbose=0)


Running Model Evaluation...


config.json:   0%|          | 0.00/929 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/501M [00:00<?, ?B/s]

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

100%|██████████| 767/767 [05:56<00:00,  2.15it/s]


/nComputing evaluation_metrics for image captioning...
/nComputing accuray for sentiment analysis...


Downloading builder script:   0%|          | 0.00/4.20k [00:00<?, ?B/s]

{'average_similarity': None,
 'metric_scores': {'bleu_1': tensor(0.1565, dtype=torch.float64),
  'bleu_2': tensor(0.0560, dtype=torch.float64),
  'bleu_3': tensor(0.0215, dtype=torch.float64),
  'bleu_4': tensor(0.0073, dtype=torch.float64),
  'meteor': tensor(0.0910, dtype=torch.float64),
  'rouge_l': tensor(0.1617, dtype=torch.float64),
  'cider_d': tensor(0.1673, dtype=torch.float64),
  'spice': tensor(0.1471, dtype=torch.float64),
  'spider': tensor(0.1572, dtype=torch.float64)},
 'sentiment_accuracy': 8.34419817470665}

## Fine-Tune Model (skip if you want to test the raw model)

In [9]:
num_epoch = 1
fine_tune(model, processor, train_dataset, num_epoch)


Fine-tunning The Model...
Epoch: 0


  0%|          | 0/1534 [00:00<?, ?it/s]We strongly recommend passing in an `attention_mask` since your input_ids may be padded. See https://huggingface.co/docs/transformers/troubleshooting#incorrect-output-when-padding-tokens-arent-masked.
100%|██████████| 1534/1534 [33:00<00:00,  1.29s/it]


(BlipForConditionalGeneration(
   (vision_model): BlipVisionModel(
     (embeddings): BlipVisionEmbeddings(
       (patch_embedding): Conv2d(3, 1024, kernel_size=(16, 16), stride=(16, 16))
     )
     (encoder): BlipEncoder(
       (layers): ModuleList(
         (0-23): 24 x BlipEncoderLayer(
           (self_attn): BlipAttention(
             (dropout): Dropout(p=0.0, inplace=False)
             (qkv): Linear(in_features=1024, out_features=3072, bias=True)
             (projection): Linear(in_features=1024, out_features=1024, bias=True)
           )
           (layer_norm1): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
           (mlp): BlipMLP(
             (activation_fn): GELUActivation()
             (fc1): Linear(in_features=1024, out_features=4096, bias=True)
             (fc2): Linear(in_features=4096, out_features=1024, bias=True)
           )
           (layer_norm2): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
         )
       )
     )
     (post_layern

## Test Fine-Tunned Model

In [10]:
evaluate_model(model, processor, test_dataset, list(test_reference_sentiments), similarity_evaluation=True, verbose=0)


Running Model Evaluation...


Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
100%|██████████| 767/767 [04:34<00:00,  2.80it/s]


/nComputing evaluation_metrics for image captioning...
/nComputing similarities using a transformer model...


.gitattributes:   0%|          | 0.00/690 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/3.99k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/550 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/265M [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/450 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

100%|██████████| 767/767 [00:06<00:00, 118.60it/s]


/nComputing accuray for sentiment analysis...


{'average_similarity': tensor(0.6020),
 'metric_scores': {'bleu_1': tensor(0.2907, dtype=torch.float64),
  'bleu_2': tensor(0.2007, dtype=torch.float64),
  'bleu_3': tensor(0.1460, dtype=torch.float64),
  'bleu_4': tensor(0.1093, dtype=torch.float64),
  'meteor': tensor(0.1454, dtype=torch.float64),
  'rouge_l': tensor(0.3232, dtype=torch.float64),
  'cider_d': tensor(0.9076, dtype=torch.float64),
  'spice': tensor(0.3148, dtype=torch.float64),
  'spider': tensor(0.6112, dtype=torch.float64)},
 'sentiment_accuracy': 32.59452411994785}

## Test Your Own Image Here

In [None]:
uploaded = files.upload()

for fn in uploaded.keys():
    image = Image.open(io.BytesIO(uploaded[fn]))

    caption = image_to_caption(model, processor, image)

    classifier = get_sentiment_analysis_classifier()
    sentiment = classifier(caption)

    display(image)
    print('\ngenerated caption:', caption)
    print('\nsentiment of the generated caption:', sentiment)