In [None]:
import pandas as pd

# Load the train and dev CSV files
train_df = pd.read_csv('/kaggle/input/mesogyny-train-dev-test/train/train.csv')
dev_df = pd.read_csv('/kaggle/input/mesogyny-train-dev-test/dev/dev.csv')
test_df = pd.read_csv('/kaggle/input/mesogyny-train-dev-test/test/test.csv')
# Check the first few rows of the train data
print("TRAIN DATA\n\n",train_df.head())
print("DEV DATA\n\n", dev_df.head())
print("TEST DATA\n\n", test_df.head())

In [None]:
# Example: Assume 'train.csv' has columns 'image_name' and 'label'
train_df['image_path'] = train_df['image_name'].apply(lambda x: f'/kaggle/input/mesogyny-train-dev-test/train/train images{x}')

# Now you can access image paths and labels together
print("TRAIN DATA IMAGES\n", train_df[['image_name', 'image_path', 'labels']].head())


# Example: Assume 'train.csv' has columns 'image_name' and 'label'
dev_df['image_path'] = dev_df['image_name'].apply(lambda x: f'/kaggle/input/mesogyny-train-dev-test/dev/dev images{x}')

# Now you can access image paths and labels together
print("DEV DATA IMAGES\n", dev_df[['image_name', 'image_path', 'labels']].head())


# Example: Assume 'train.csv' has columns 'image_name' and 'label'
# Example: Assume 'test.csv' has columns 'image_name'
test_df['image_path'] = test_df['image_name'].apply(lambda x: f'/kaggle/input/mesogyny-train-dev-test/test/test images{x}')

# Now you can access image paths together (note: test data doesn't have 'labels' column)
print("TEST DATA IMAGES\n", test_df[['image_name', 'image_path']].head())


In [None]:
!apt-get install tesseract-ocr-chi-sim
!pip install transformers torch torchvision efficientnet_pytorch


In [None]:
import re

# Function to clean text (remove URLs, special characters, etc.)
def clean_text(text):
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text)
    # Remove special characters, numbers, and extra spaces
    text = re.sub(r'[^a-zA-Z\u4e00-\u9fa5\s]', '', text)  # Keep Chinese characters and spaces
    text = text.strip()
    return text

# Apply cleaning to the transcription column in both train and dev dataframes
train_df['cleaned_text'] = train_df['transcriptions'].apply(clean_text) 
dev_df['cleaned_text'] = dev_df['transcriptions'].apply(clean_text)


In [None]:
from transformers import AutoTokenizer

# Initialize the tokenizer for Chinese BERT
tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")

# Tokenize the cleaned text
def tokenize_text(text):
    return tokenizer(text, padding='max_length', truncation=True, max_length=64, return_tensors='pt')

# Apply tokenization to the cleaned text
train_df['tokenized'] = train_df['cleaned_text'].apply(lambda x: tokenize_text(x))
dev_df['tokenized'] = dev_df['cleaned_text'].apply(lambda x: tokenize_text(x))


In [None]:
from torchvision import transforms
from PIL import Image

# Define image augmentation pipeline
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224 (standard input size for VGG)
    transforms.RandomHorizontalFlip(p=0.5),  # Randomly flip the image horizontally
    transforms.RandomRotation(10),  # Randomly rotate images within the range of [-10, 10] degrees
    transforms.ToTensor(),  # Convert image to tensor
])

# Apply image preprocessing inside the dataset (later when loading)


In [None]:
# Map the string labels to numeric values (1 for Misogyny, 0 for Not-Misogyny)
train_df['labels'] = train_df['labels'].map({'Misogyny': 1, 'Not-Misogyny': 0})
dev_df['labels'] = dev_df['labels'].map({'Misogyny': 1, 'Not-Misogyny': 0})

# Check for any NaN values in the 'labels' column after mapping
print("NaN Labels in Train Data:", train_df['labels'].isna().sum())
print("NaN Labels in Dev Data:", dev_df['labels'].isna().sum())


In [None]:
# Check if the labels are now correctly mapped
print("Train Data after Fixing Labels:")
print(train_df[['image_name', 'cleaned_text', 'labels']].head())

print("Dev Data after Fixing Labels:")
print(dev_df[['image_name', 'cleaned_text', 'labels']].head())


In [None]:
import torch
from torch.utils.data import Dataset
from transformers import AutoTokenizer
from PIL import Image
import os

class MemeDataset(Dataset):
    def __init__(self, dataframe, image_dir, tokenizer, transform=None, device='cpu'):
        self.data = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.tokenizer = tokenizer
        self.transform = transform
        self.device = device  # 'cpu' or 'cuda'

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        
        # Load and preprocess image
        img_path = os.path.join(self.image_dir, row['image_name'])
        
        # Check if image exists
        if not os.path.exists(img_path):
            raise FileNotFoundError(f"Image {img_path} not found.")
        
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Tokenize text
        transcription = str(row['transcriptions'])
        text_inputs = self.tokenizer(
            transcription,
            truncation=True,
            padding='max_length',
            max_length=64,
            return_tensors="pt"
        )
        
        # Squeeze and move tensors to the specified device (e.g., CPU or GPU)
        input_ids = text_inputs['input_ids'].squeeze(0).to(self.device)
        attention_mask = text_inputs['attention_mask'].squeeze(0).to(self.device)

        # Label encoding (1 for Misogyny, 0 for Not-Misogyny)
        label = torch.tensor(row['labels'], dtype=torch.float).to(self.device)
        
        return {
            'image': image,
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'label': label
        }


In [None]:
import torch.nn as nn
from torchvision import models
from transformers import AutoModel
import torch

class MultimodalClassifier(nn.Module):
    def __init__(self, dropout=0.3):
        super(MultimodalClassifier, self).__init__()

        # Image model (VGG16)
        self.vgg_model = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)
        self.vgg_model.classifier = self.vgg_model.classifier[:3]  # Remove final classifier

        # Text model (Chinese BERT)
        self.text_model = AutoModel.from_pretrained("bert-base-chinese")

        # Fusion Layer: Concatenate image and text features, then pass through fully connected layers
        self.fc = nn.Sequential(
            nn.Linear(4096 + 768, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, 1)  # Binary classification
        )

    def forward(self, image, input_ids, attention_mask):
        # Extract image features
        image_feat = self.vgg_model(image)

        # Extract text features using pooled output (CLS token embedding)
        text_feat = self.text_model(input_ids=input_ids, attention_mask=attention_mask).pooler_output

        # Concatenate image and text features
        combined = torch.cat((image_feat, text_feat), dim=1)

        # Pass through the fully connected fusion layer
        output = self.fc(combined)
        return output  # BCEWithLogitsLoss expects raw logits


In [None]:
from torch.utils.data import DataLoader

# Prepare the train dataset and DataLoader
train_dataset = MemeDataset(
    dataframe=train_df,
    image_dir="/kaggle/input/mesogyny-train-dev-test/train/train images",  # Use the correct path to the train images
    tokenizer=tokenizer,
    transform=image_transform
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Prepare the dev dataset and DataLoader
dev_dataset = MemeDataset(
    dataframe=dev_df,
    image_dir="/kaggle/input/mesogyny-train-dev-test/dev/dev images",  # Use the correct path to the dev images
    tokenizer=tokenizer,
    transform=image_transform
)

dev_loader = DataLoader(dev_dataset, batch_size=16, shuffle=False)


In [None]:
device = 'cpu'  # or "cuda" if available
model = MultimodalClassifier().to(device)

# Just a quick forward pass to verify everything is working
for batch in train_loader:
    images = batch['image'].to(device)
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)

    output = model(images, input_ids, attention_mask)
    print("Output shape:", output.shape)  # Should be [batch_size, 1]
    break  # Only test the first batch


In [None]:
import torch.nn as nn

# Define the loss function (BCEWithLogitsLoss for binary classification)
criterion = nn.BCEWithLogitsLoss()  

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from transformers import get_cosine_schedule_with_warmup

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Model
model = MultimodalClassifier().to(device)

# Use BCEWithLogitsLoss
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-5)  # Smaller LR

# Scheduler
epochs = 8
total_steps = len(train_loader) * epochs
scheduler = get_cosine_schedule_with_warmup(
    optimizer,
    num_warmup_steps=int(0.1 * total_steps),
    num_training_steps=total_steps
)

# Training function
def train_epoch(model, train_loader, optimizer, scheduler, criterion, device):
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []

    for batch in train_loader:
        images = batch['image'].to(device)
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device).unsqueeze(1)

        optimizer.zero_grad()
        logits = model(images, input_ids, attention_mask)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()
        preds = torch.sigmoid(logits).detach().cpu().numpy() > 0.5
        all_preds.extend(preds.astype(int).flatten())
        all_labels.extend(labels.cpu().numpy().flatten())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')
    prec = precision_score(all_labels, all_preds)
    rec = recall_score(all_labels, all_preds)

    return total_loss / len(train_loader), acc, f1, prec, rec

# Evaluation function
def evaluate(model, dev_loader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in dev_loader:
            images = batch['image'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device).unsqueeze(1)  # FIXED HERE

            logits = model(images, input_ids, attention_mask)
            loss = criterion(logits, labels)
            total_loss += loss.item()

            preds = torch.sigmoid(logits).detach().cpu().numpy() > 0.5
            all_preds.extend(preds.astype(int).flatten())
            all_labels.extend(labels.cpu().numpy().flatten())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')
    prec = precision_score(all_labels, all_preds)
    rec = recall_score(all_labels, all_preds)

    return total_loss / len(dev_loader), acc, f1, prec, rec



# Training loop
for epoch in range(epochs):
    train_loss, train_acc, train_f1, train_prec, train_rec = train_epoch(
        model, train_loader, optimizer, scheduler, criterion, device
    )
    print(f"\nEpoch {epoch+1}/{epochs}")
    print(f"Train → Loss: {train_loss:.4f} | Acc: {train_acc:.4f} | Macro F1: {train_f1:.4f} | Prec: {train_prec:.4f} | Rec: {train_rec:.4f}")

    val_loss, val_acc, val_f1, val_prec, val_rec = evaluate(
        model, dev_loader, criterion, device
    ) 
    print(f"Val   → Loss: {val_loss:.4f} | Acc: {val_acc:.4f} | Macro F1: {val_f1:.4f} | Prec: {val_prec:.4f} | Rec: {val_rec:.4f}")


In [None]:
class MemeDataset(Dataset):
    def __init__(self, dataframe, image_dir, tokenizer, transform=None, device='cpu', is_test=False):
        self.data = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.tokenizer = tokenizer
        self.transform = transform
        self.device = device  # 'cpu' or 'cuda'
        self.is_test = is_test  # Flag to identify test dataset

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        
        # Load and preprocess image
        img_path = os.path.join(self.image_dir, row['image_name'])
        
        # Check if image exists
        if not os.path.exists(img_path):
            raise FileNotFoundError(f"Image {img_path} not found.")
        
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Tokenize text
        transcription = str(row['transcriptions'])
        text_inputs = self.tokenizer(
            transcription,
            truncation=True,
            padding='max_length',
            max_length=64,
            return_tensors="pt"
        )
        
        # Squeeze and move tensors to the specified device (e.g., CPU or GPU)
        input_ids = text_inputs['input_ids'].squeeze(0).to(self.device)
        attention_mask = text_inputs['attention_mask'].squeeze(0).to(self.device)

        # If it's not a test dataset, add labels; else, return only text and image
        if not self.is_test:
            label = torch.tensor(row['labels'], dtype=torch.float).to(self.device)
            return {
                'image': image,
                'input_ids': input_ids,
                'attention_mask': attention_mask,
                'label': label
            }
        else:
            return {
                'image': image,
                'input_ids': input_ids,
                'attention_mask': attention_mask
            }
            


In [None]:
import pandas as pd
import torch
from torch.utils.data import DataLoader

# Prepare the test dataset and DataLoader (with is_test=True)
test_dataset = MemeDataset(
    dataframe=test_df,  # 'test_df' contains the test data
    image_dir="/kaggle/input/mesogyny-train-dev-test/test/test images",  # Correct path to test images
    tokenizer=tokenizer,
    transform=image_transform,
    is_test=True  # Flag to handle test dataset without labels
)

test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Prepare the submission list
submission = []

# Set the model to evaluation mode
model.eval()

# Inference on test data
with torch.no_grad():  # Disable gradient computation for inference
    for i, batch in enumerate(test_loader):
        images = batch['image'].to(device)
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)

        # Get model predictions
        logits = model(images, input_ids, attention_mask)

        # Apply sigmoid to get probabilities, then threshold at 0.5 for binary classification
        preds = torch.sigmoid(logits).detach().cpu().numpy() > 0.5
        
        # Store predictions with numerical ID (starting from 1)
        for j, pred in enumerate(preds):
            global_index = i * test_loader.batch_size + j + 1  # Starting id from 1
            submission.append([global_index, int(pred)])  # Storing as (id, prediction)

# Convert the submission list to a DataFrame
submission_df = pd.DataFrame(submission, columns=["id", "predictions"])

# Save the submission DataFrame to CSV (without header and index)
submission_df.to_csv('submissions_misogyny1.csv', index=False, header=False)

# Print the first few rows of the submission to verify
print(submission_df.head())


In [None]:
print(submission_df.head(50))