In [None]:
import pandas as pd
import os

# Paths to the CSV files and image directories
csv_paths = {
    'Train': '/kaggle/input/disaster/train.csv',
    'Test': '/kaggle/input/disaster/test.csv',
    'Validation': '/kaggle/input/disaster/validation.csv'
}

image_dirs = {
    'Train': '/kaggle/input/disaster/train',
    'Test': '/kaggle/input/disaster/test',
    'Validation': '/kaggle/input/disaster/validation'
}

output_dir = '/kaggle/working/'  # Output directory to save the CSV files

# Function to check for matching Meme_ID and image files, and add image paths
def check_matches(csv_path, image_dir):
    df = pd.read_csv(csv_path)
    image_files = os.listdir(image_dir)
    image_names = {os.path.splitext(image_file)[0]: os.path.join(image_dir, image_file) for image_file in image_files}
    
    # Add Image_Path column to the dataframe
    df['Image_Path'] = df['image_id'].apply(lambda x: image_names.get(x, None))
    
    # Filter rows where Image_Path is not None (i.e., matched Meme_IDs)
    matched_df = df[df['Image_Path'].notna()]
    
    return matched_df

# Check matches for each set (Train, Test, Validation)
for key in csv_paths:
    matched_df = check_matches(csv_paths[key], image_dirs[key])
    
    matches_output_path = os.path.join(output_dir, f'{key}_matches.csv')
    
    matched_df.to_csv(matches_output_path, index=False)
    
    print(f"{key} set:")
    print(f"Matched Meme_IDs with image paths saved to {matches_output_path}\n")


In [None]:
train_df = pd.read_csv('/kaggle/working/Train_matches.csv')
train_df.head()

In [None]:
test_df = pd.read_csv('/kaggle/working/Test_matches.csv')
test_df.head()

In [None]:
validation_df = pd.read_csv('/kaggle/working/Validation_matches.csv')
validation_df.head()

In [None]:
# Display the sizes of each split
print(f"Train dataset size: {len(train_df)}")
print(f"Test dataset size: {len(test_df)}")
print(f"Validation dataset size: {len(validation_df)}")

In [None]:
import pandas as pd
import re
import string

# Function to remove punctuation (preserve Bangla characters)
def remove_punctuation(text):
    if pd.isna(text):
        return text
    return text.translate(str.maketrans('', '', string.punctuation))

# Function to remove extra whitespace
def remove_whitespace(text):
    if pd.isna(text):
        return text
    return " ".join(text.split())

# Function to remove emojis
def remove_emojis(text):
    if pd.isna(text):
        return text
    emoji_pattern = re.compile(
        "["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', text)

# Function to remove URLs
def remove_urls(text):
    if pd.isna(text):
        return text
    url_pattern = re.compile(r'https?://\S+|www\.\S+')
    return url_pattern.sub(r'', text)

# Function to remove HTML tags
def remove_html(text):
    if pd.isna(text):
        return text
    html_pattern = re.compile(r'<.*?>')
    return html_pattern.sub(r'', text)

# Function to remove special characters (preserve Bangla characters)
def remove_special_characters(text):
    if pd.isna(text):
        return text
    return re.sub(r'[^A-Za-z0-9\s\u0980-\u09FF]', '', text)

# Combine all cleaning functions
def clean_text(text):
    text = remove_urls(text)
    text = remove_html(text)
    text = remove_emojis(text)
    text = remove_punctuation(text)
    text = remove_special_characters(text)
    text = remove_whitespace(text)
    return text

# Mapping categories to integers
category_mapping = {
    'Landslides': 0,
    'Wildfire': 1,
    'Drought': 2,
    'Flood': 3,
    'Earthquake': 4,
    'Tropical Storm': 5,
    'Non Disaster': 6,
    'Human Damage': 7,
}

# Load and clean the dataframes
csv_paths = {
    'Train': '/kaggle/working/Train_matches.csv',
    'Test': '/kaggle/working/Test_matches.csv',
    'Validation': '/kaggle/working/Validation_matches.csv'
}

cleaned_output_paths = {
    'Train': '/kaggle/working/Train_matches_cleaned.csv',
    'Test': '/kaggle/working/Test_matches_cleaned.csv',
    'Validation': '/kaggle/working/Validation_matches_cleaned.csv'
}

text_columns = ['context', 'category']

for key in csv_paths:
    # Load the dataframe
    df = pd.read_csv(csv_paths[key])
    
    # Apply cleaning to all relevant text columns
    for column in text_columns:
        df[column] = df[column].astype(str).apply(clean_text)
    
    # Map 'category' column to integers
    df['category'] = df['category'].map(category_mapping)
    
    # Add 'label' column (same as 'category' for now)
    df['label'] = df['category']
    
    # Display the cleaned dataframe
    print(f"Cleaned {key} dataframe:")
    print(df.head())
    
    # Save the cleaned dataframe to a new CSV file
    df.to_csv(cleaned_output_paths[key], index=False)
    print(f"Cleaned dataframe saved to {cleaned_output_paths[key]}\n")


In [None]:
train_df = pd.read_csv('/kaggle/working/Train_matches_cleaned.csv')
train_df.head()

In [None]:
test_df = pd.read_csv('/kaggle/working/Test_matches_cleaned.csv')
test_df.head()

In [None]:
validation_df = pd.read_csv('/kaggle/working/Validation_matches_cleaned.csv')
validation_df.head()

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from transformers import BertTokenizer, BertModel, AdamW
import torchvision.models as models
from tqdm import tqdm

In [None]:
!pip install --upgrade transformers

In [None]:
from transformers import AutoImageProcessor, SwiftFormerModel
import torch
image_processor = AutoImageProcessor.from_pretrained("MBZUAI/swiftformer-xs")

model = SwiftFormerModel.from_pretrained("MBZUAI/swiftformer-xs")

In [None]:
from transformers import AutoTokenizer, XLMRobertaModel, AdamW
# Initialize BERT tokenizer and model
bert_tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-base")
bert_model = XLMRobertaModel.from_pretrained("xlm-roberta-base")

In [None]:
# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

In [None]:
import os

# Enable device-side assertions
os.environ['TORCH_USE_CUDA_DSA'] = '1'

In [None]:
model.to(device)

In [None]:
bert_model.to(device)

In [None]:
from torchvision import transforms
from PIL import Image
from torch.utils.data import Dataset


max_seq_length = 512  # Set your desired maximum sequence length for BERT

# Define the pre-processing transformations for images
transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

class MyMultimodalDataset(Dataset):
    def __init__(self, data, transform=None, tokenizer=None, max_seq_length=512):
        self.data = data
        self.transform = transform
        self.tokenizer = tokenizer
        self.max_seq_length = max_seq_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path = self.data.iloc[idx]['Image_Path']
        try:
            image = Image.open(image_path).convert('RGB')
            if self.transform is not None:
                image = self.transform(image)
        except Exception as e:
            print(f"Error loading image at index {idx}: {e}")
            return None, None, None, None

        if image is None:
            return None, None, None, None

        context = self.data.iloc[idx]['context']

        inputs = self.tokenizer(context, padding='max_length', truncation=True, max_length=self.max_seq_length, return_tensors='pt')
        input_ids = inputs['input_ids']
        attention_mask = inputs['attention_mask']

        label = self.data.iloc[idx]['label']

        return image, input_ids, attention_mask, label



In [None]:
# Create custom datasets with MyMultimodalDataset
train_dataset = MyMultimodalDataset(train_df, transform=transform, tokenizer=bert_tokenizer, max_seq_length=max_seq_length)
test_dataset = MyMultimodalDataset(test_df, transform=transform, tokenizer=bert_tokenizer, max_seq_length=max_seq_length)
val_dataset = MyMultimodalDataset(validation_df, transform=transform, tokenizer=bert_tokenizer, max_seq_length=max_seq_length)

# Define data loaders
batch_size = 1  # Set your desired batch size
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
import torch
import torchvision.models as models
from transformers import AutoImageProcessor, ViTModel, AutoTokenizer, DistilBertModel, AdamW
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from tqdm import tqdm
import time
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report

# Assuming you have defined your train_loader, val_loader, optimizer, criterion, model, bert_model, etc.

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the regressor model and optimizer
regressor = torch.nn.Sequential(
    torch.nn.Linear(11548, 512),  # Adjusted input dimension
    torch.nn.ReLU(),
    torch.nn.Dropout(0.5),
    torch.nn.Linear(512, 8)
).to(device)

optimizer = torch.optim.AdamW(regressor.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()

num_epochs = 40
train_losses = []
val_losses = []

# Training time
start_time = time.time()

for epoch in range(num_epochs):
    running_train_loss = 0.0
    
    regressor.train()

    for images, texts, attention_masks, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)

        input_ids = texts.squeeze(1).to(device)
        attention_mask = attention_masks.squeeze(1).to(device)

        optimizer.zero_grad()

        # Get image features
        with torch.no_grad():
            outputs_image = model(pixel_values=images)
        img_hidden_states = outputs_image.last_hidden_state
        # Flatten the image features
        img_feats = img_hidden_states.view(img_hidden_states.size(0), -1)
        print(f'Image features shape: {img_feats.shape}')  # Debug print

        # Get text features
        outputs_text = bert_model(input_ids=input_ids, attention_mask=attention_mask)
        text_hidden_states = outputs_text.last_hidden_state
        text_feats = text_hidden_states[:, 0, :]
        print(f'Text features shape: {text_feats.shape}')  # Debug print

        # Concatenate image and text features
        combined_feats = torch.cat((img_feats, text_feats), dim=1)
        print(f'Combined features shape: {combined_feats.shape}')  # Debug print

        # Forward pass
        predictions = regressor(combined_feats)
        loss = criterion(predictions, labels)

        # Backward pass
        loss.backward()
        optimizer.step()

        running_train_loss += loss.item()

    epoch_train_loss = running_train_loss / len(train_loader)
    train_losses.append(epoch_train_loss)

    print(f"Epoch [{epoch + 1}/{num_epochs}] - Train Loss: {epoch_train_loss:.4f}")

end_time = time.time()
execution_time = end_time - start_time
print(f"Total execution time: {execution_time:.2f} seconds")


In [None]:
import torch
from tqdm import tqdm
import time

# Set models to evaluation mode
model.eval()
bert_model.eval()

# Prepare lists to store predicted and true labels
predicted_labels = []
true_labels = []

# Set your device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Test loop
start_time = time.time()

with torch.no_grad():
    for test_images, test_texts, test_attention_masks, test_labels in tqdm(test_loader, desc='Testing', leave=False):
        test_images = test_images.to(device)
        test_labels = (test_labels).to(device)  # Convert labels from 1-5 to 0-4
        test_texts = test_texts.to(device)
        test_attention_masks = test_attention_masks.to(device)

        optimizer.zero_grad()

        # Extract features from image-based model
        test_img_feats = model(test_images)

        # Extract features from text-based model (BERT)
        test_texts = test_texts.squeeze(1)
        test_attention_masks = test_attention_masks.squeeze(1)
        test_outputs = bert_model(input_ids=test_texts, attention_mask=test_attention_masks)
        
        # Extract relevant features for concatenation
        # Handle tensor manipulation for compatibility
        # Reshape test_img_feats if compatible
        test_img_feats = test_img_feats[0]  # Access a specific part of the BaseModelOutputWithNoAttention
        test_img_feats = test_img_feats.reshape(test_img_feats.shape[0], -1)  # Reshape if compatible

        # Extract representations from text-based model
        test_text_feats = test_outputs.last_hidden_state[:, 0, :]  # Select appropriate representations

        # Combine features early
        combined_feats = torch.cat((test_img_feats, test_text_feats), dim=1)

        # Classify combined features
        combined_classifier = torch.nn.Sequential(
            torch.nn.Linear(combined_feats.shape[1], 512).to(device),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(512, 8).to(device),  # Change the output size to 5 for 5 labels
        )

        combined_logits = combined_classifier(combined_feats)
        test_predictions = torch.nn.functional.softmax(combined_logits, dim=1)
        predicted_classes = torch.argmax(test_predictions, dim=1)  # Revert back to labels from 1-5, Add 1 here 

        predicted_labels.extend(predicted_classes.cpu().numpy())
        true_labels.extend(test_labels.cpu().numpy().tolist())

end_time = time.time()
execution_time = end_time - start_time

# Print or use the predicted labels and true labels as needed
print("Predicted Labels:", predicted_labels)
print("True Labels:", true_labels)
print(f"Total execution time for testing: {execution_time:.2f} seconds")

In [None]:
predicted_labels 

In [None]:
true_labels 

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, mean_squared_error, classification_report


# Calculate accuracy
accuracy = accuracy_score(true_labels, predicted_labels)

# Calculate precision, recall, F1-score overall (macro average)
precision, recall, f1_score, _ = precision_recall_fscore_support(true_labels, predicted_labels, average='macro')

# Calculate confusion matrix
conf_matrix = confusion_matrix(true_labels, predicted_labels)

# Calculate Mean Squared Error
mse = mean_squared_error(true_labels, predicted_labels)

# Calculate Sensitivity (Recall) for each class
sensitivity_per_class = recall

# Calculate Specificity for each class
specificity_per_class = []
for i in range(len(conf_matrix)):
    tn = np.sum(conf_matrix) - (np.sum(conf_matrix[i, :]) + np.sum(conf_matrix[:, i]) - conf_matrix[i, i])
    fp = np.sum(conf_matrix[:, i]) - conf_matrix[i, i]
    specificity_per_class.append(tn / (tn + fp))

# Print overall calculated metrics
print(f"Accuracy: {accuracy}")
print(f"Precision (macro): {precision}")
print(f"Recall (macro): {recall}")
print(f"F1-Score (macro): {f1_score}")
print(f"Mean Squared Error: {mse}")

# Print Sensitivity and Specificity for each class
print(f"Sensitivity (Recall) for each class: {sensitivity_per_class}")
print(f"Specificity for each class: {specificity_per_class}")
