In [1]:
import zipfile

zip_path = "/content/drive/MyDrive/Anand & Shashank/Anxiety_Data.zip"
extract_to = "/content/Anxiety"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to)


In [2]:
pip install  pytesseract


Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Downloading pytesseract-0.3.13-py3-none-any.whl (14 kB)
Installing collected packages: pytesseract
Successfully installed pytesseract-0.3.13


In [3]:
pip install git+https://github.com/openai/CLIP.git


Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-m2pp5ata
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-m2pp5ata
  Resolved https://github.com/openai/CLIP.git to commit dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting ftfy (from clip==1.0)
  Downloading ftfy-6.3.1-py3-none-any.whl.metadata (7.3 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->clip==1.0)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->clip==1.0)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch->clip==1.0)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting 

# Clip + RoBerta

In [None]:
import os
import pandas as pd
from PIL import Image
import pytesseract
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from transformers import AutoTokenizer, AutoModel
from sklearn.metrics import accuracy_score, f1_score
import clip  

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

clip_model, preprocess = clip.load("ViT-B/32", device=device)
clip_model.eval()  


def custom_collate(batch):
    ocr_texts, images, labels = zip(*batch)
    labels = torch.tensor(labels, dtype=torch.long)
    return list(ocr_texts), list(images), labels

class AnxietyDataset(Dataset):
    def __init__(self, csv_file, image_folder):
        self.image_folder = image_folder

        data = pd.read_csv(csv_file)
        print("CSV columns:", data.columns)
        print("First few sample_ids from CSV:")
        print(data['sample_id'].head())

        data['filename'] = data['sample_id'].apply(lambda x: str(x) + ".jpg" if not str(x).endswith(".jpg") else x)
        
        data['label'] = pd.factorize(data['meme_anxiety_categories'])[0]
        print("Unique label mapping:", dict(enumerate(pd.factorize(data['meme_anxiety_categories'])[1])))
        
        data['exists'] = data['filename'].apply(lambda x: os.path.exists(os.path.join(self.image_folder, x)))
        print("Number of existing files found:", data['exists'].sum())

        self.data = data[data['exists']].reset_index(drop=True)
        print(f"Filtered dataset length: {len(self.data)} (only rows with existing images)")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        image_name = row['filename']
        label = row['label']
        image_path = os.path.join(self.image_folder, image_name)

        image_pil = Image.open(image_path).convert("RGB")

        ocr_text = pytesseract.image_to_string(image_pil)
        return ocr_text, image_pil, label

class FigurativeReasoningModule(nn.Module):
    def __init__(self):
        super(FigurativeReasoningModule, self).__init__()

    def forward(self, ocr_text):
        reasoning = "dummy figurative reasoning for: " + ocr_text
        return reasoning

class VisualFusionModule(nn.Module):
    def __init__(self, text_model_name="roberta-base"):
        super(VisualFusionModule, self).__init__()
        self.tokenizer = AutoTokenizer.from_pretrained(text_model_name)
        self.model = AutoModel.from_pretrained(text_model_name).to(device)
        self.model.eval() 

    def encode_text(self, text):
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        with torch.no_grad():
            outputs = self.model(**inputs)
        embedding = outputs.last_hidden_state[:, 0, :] 
        return embedding.squeeze() 

    def fuse_embeddings(self, ocr_text, image):
        text_embedding = self.encode_text(ocr_text)  
        image_preprocessed = preprocess(image).unsqueeze(0).to(device)
        with torch.no_grad():
            image_embedding = clip_model.encode_image(image_preprocessed)
        image_embedding = image_embedding.squeeze()  
        fused = torch.cat([text_embedding, image_embedding], dim=-1)  
        return fused.to(device)

    def forward(self, ocr_text, image):
        return self.fuse_embeddings(ocr_text, image)

class M3HClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(M3HClassifier, self).__init__()
        self.fc = nn.Linear(input_dim, num_classes)

    def forward(self, fused_embedding):
        logits = self.fc(fused_embedding)
        return logits

class M3HVisualModel(nn.Module):
    def __init__(self, fusion_module, classifier):
        super(M3HVisualModel, self).__init__()
        self.fusion_module = fusion_module
        self.classifier = classifier

    def forward(self, ocr_text, image):
        fused_embedding = self.fusion_module(ocr_text, image)
        if fused_embedding.dim() == 1:
            fused_embedding = fused_embedding.unsqueeze(0)
        logits = self.classifier(fused_embedding)
        return logits


if __name__ == "__main__":
    num_epochs = 10
    fused_dim = 768 + 512 
    num_classes = 7  

    fusion_module = VisualFusionModule(text_model_name="roberta-base")
    classifier = M3HClassifier(input_dim=fused_dim, num_classes=num_classes)
    model = M3HVisualModel(fusion_module, classifier)
    model.to(device)

    optimizer = optim.Adam(model.parameters(), lr=5e-5)
    criterion = nn.CrossEntropyLoss()

    csv_train = '/content/Anxiety/anxiety_train.csv' 
    image_folder = '/content/Anxiety/anxiety_train_image' 
    dataset = AnxietyDataset(csv_file=csv_train, image_folder=image_folder)
    dataloader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=custom_collate)

    if len(dataset) == 0:
        print("No samples found in the dataset. Please verify the image folder path and file names.")
        exit(1)

    figurative_module = FigurativeReasoningModule()

    model.train()
    for epoch in range(num_epochs):
        total_loss = 0.0
        for batch in dataloader:
            ocr_texts, images, labels = batch
            batch_logits = []
            for ocr_text, image in zip(ocr_texts, images):
                reasoning = figurative_module(ocr_text)
                logits = model(ocr_text, image)
                batch_logits.append(logits)
            batch_logits = torch.cat(batch_logits, dim=0)
            labels = labels.to(device)
            loss = criterion(batch_logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            print(f"Epoch {epoch+1} Batch loss: {loss.item()}")
        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1} Average loss: {avg_loss}")

    model_save_path = "m3h_anxiety_visual_model_roberta.pth"
    torch.save(model.state_dict(), model_save_path)
    print("Model saved to", model_save_path)



Using device: cuda


100%|████████████████████████████████████████| 338M/338M [00:03<00:00, 107MiB/s]
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


CSV columns: Index(['sample_id', 'meme_anxiety_categories'], dtype='object')
First few sample_ids from CSV:
0     TR-318
1    TR-1732
2    TR-1417
3    TR-1582
4     TR-649
Name: sample_id, dtype: object
Unique label mapping: {0: 'Restlessness', 1: 'Lack of Worry Control', 2: 'Nervousness', 3: 'Impending Doom', 4: 'Excessive Worry', 5: 'Difficulty Relaxing', 6: 'Irritatbily'}
Number of existing files found: 2605
Filtered dataset length: 2605 (only rows with existing images)
Epoch 1 Batch loss: 2.0492827892303467
Epoch 1 Batch loss: 2.0214030742645264
Epoch 1 Batch loss: 1.877152919769287
Epoch 1 Batch loss: 1.9905226230621338
Epoch 1 Batch loss: 1.9528435468673706
Epoch 1 Batch loss: 1.9377799034118652
Epoch 1 Batch loss: 1.8949875831604004
Epoch 1 Batch loss: 1.7496228218078613
Epoch 1 Batch loss: 1.917248010635376
Epoch 1 Batch loss: 1.8718476295471191
Epoch 1 Batch loss: 1.986634612083435
Epoch 1 Batch loss: 1.9949584007263184
Epoch 1 Batch loss: 1.8754172325134277
Epoch 1 Batch los

# Evaluation on the Training Data

In [5]:
# Evaluation on the Training Data
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in dataloader:
        ocr_texts, images, labels = batch
        batch_logits = []
        for ocr_text, image in zip(ocr_texts, images):
            reasoning = figurative_module(ocr_text)
            logits = model(ocr_text, image)
            batch_logits.append(logits)
        batch_logits = torch.cat(batch_logits, dim=0)
        preds = torch.argmax(batch_logits, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
macro_f1 = f1_score(all_labels, all_preds, average='macro')
weighted_f1 = f1_score(all_labels, all_preds, average='weighted')

print("Evaluation Metrics:")
print("Accuracy:", accuracy)
print("Macro-F1:", macro_f1)
print("Weighted-F1:", weighted_f1)


Evaluation Metrics:
Accuracy: 0.38809980806142036
Macro-F1: 0.31873387330968583
Weighted-F1: 0.35296311057153834


# Evaluation on the Test Data

In [12]:
import os
import pandas as pd
from PIL import Image
import pytesseract
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from transformers import AutoTokenizer, AutoModel
from sklearn.metrics import accuracy_score, f1_score
import clip

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

clip_model, preprocess = clip.load("ViT-B/32", device=device)
clip_model.eval()

def custom_collate(batch):
    ocr_texts, images, labels = zip(*batch)
    labels = torch.tensor(labels, dtype=torch.long)
    return list(ocr_texts), list(images), labels

class AnxietyDataset(Dataset):
    def __init__(self, csv_file, image_folder):
        self.image_folder = image_folder
        data = pd.read_csv(csv_file)
        print("CSV columns:", data.columns)
        print("First few sample_ids from CSV:")
        print(data['sample_id'].head())

        data['filename'] = data['sample_id'].apply(
            lambda x: str(x) + ".jpg" if not str(x).endswith(".jpg") else x
        )
        data['label'] = pd.factorize(data['meme_anxiety_categories'])[0]
        print("Unique label mapping:", dict(enumerate(pd.factorize(data['meme_anxiety_categories'])[1])))

        data['exists'] = data['filename'].apply(lambda x: os.path.exists(os.path.join(self.image_folder, x)))
        print("Number of existing files found:", data['exists'].sum())
        self.data = data[data['exists']].reset_index(drop=True)
        print(f"Filtered dataset length: {len(self.data)} (only rows with existing images)")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        image_name = row['filename']
        label = row['label']
        image_path = os.path.join(self.image_folder, image_name)
        image_pil = Image.open(image_path).convert("RGB")
        ocr_text = pytesseract.image_to_string(image_pil)
        return ocr_text, image_pil, label

class FigurativeReasoningModule(nn.Module):
    def __init__(self):
        super(FigurativeReasoningModule, self).__init__()
    def forward(self, ocr_text):
        reasoning = "dummy figurative reasoning for: " + ocr_text
        return reasoning

class VisualFusionModule(nn.Module):
    def __init__(self, text_model_name="roberta-base"):
        super(VisualFusionModule, self).__init__()
        self.text_encoder = SentenceTransformer(text_model_name)
    def fuse_embeddings(self, ocr_text, image):
        text_embedding = self.text_encoder.encode(ocr_text, convert_to_tensor=True)
        image_preprocessed = preprocess(image).unsqueeze(0).to(device)
        with torch.no_grad():
            image_embedding = clip_model.encode_image(image_preprocessed)
        image_embedding = image_embedding.squeeze()
        fused = torch.cat([text_embedding, image_embedding], dim=-1)
        return fused.to(device)
    def forward(self, ocr_text, image):
        return self.fuse_embeddings(ocr_text, image)


class M3HClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(M3HClassifier, self).__init__()
        self.fc = nn.Linear(input_dim, num_classes)
    def forward(self, fused_embedding):
        logits = self.fc(fused_embedding)
        return logits

class M3HVisualModel(nn.Module):
    def __init__(self, fusion_module, classifier):
        super(M3HVisualModel, self).__init__()
        self.fusion_module = fusion_module
        self.classifier = classifier
    def forward(self, ocr_text, image):
        fused_embedding = self.fusion_module(ocr_text, image)
        if fused_embedding.dim() == 1:
            fused_embedding = fused_embedding.unsqueeze(0)
        logits = self.classifier(fused_embedding)
        return logits


if __name__ == "__main__":

    csv_test = '/content/Anxiety/anxiety_test.csv'
    image_folder_test = '/content/Anxiety/anxiety_test_image'

    test_dataset = AnxietyDataset(csv_file=csv_test, image_folder=image_folder_test)
    test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=custom_collate)

    figurative_module = FigurativeReasoningModule()

    fusion_module = VisualFusionModule(text_model_name="roberta-base")
    num_classes = 7
    classifier = M3HClassifier(input_dim=1280, num_classes=num_classes)
    model = M3HVisualModel(fusion_module, classifier)
    model.to(device)

    saved_model_path = "/content/m3h_anxiety_visual_model_roberta.pth"
    model.load_state_dict(torch.load(saved_model_path, map_location=device))
    model.eval()

    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in test_dataloader:
            ocr_texts, images, labels = batch
            batch_logits = []
            for ocr_text, image in zip(ocr_texts, images):
                reasoning = figurative_module(ocr_text)
                logits = model(ocr_text, image)
                batch_logits.append(logits)
            batch_logits = torch.cat(batch_logits, dim=0)
            preds = torch.argmax(batch_logits, dim=-1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    macro_f1 = f1_score(all_labels, all_preds, average='macro')
    weighted_f1 = f1_score(all_labels, all_preds, average='weighted')

    print("Test Evaluation Metrics:")
    print("Accuracy:", accuracy)
    print("Macro-F1:", macro_f1)
    print("Weighted-F1:", weighted_f1)


Test Evaluation Metrics:
Accuracy: 0.20583717357910905
Macro-F1:0.1783717357910905
Weighted-F1:0.18583717357910905
