In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/

In [None]:
%pip install ultralytics transformers
import ultralytics
ultralytics.checks()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from transformers import BertTokenizer, BertModel, GPT2Tokenizer, GPT2LMHeadModel
from ultralytics import YOLO
from collections import defaultdict
from PIL import Image
from tqdm import tqdm
import json
import csv
import os
import numpy as np
import seaborn as sns

In [None]:
# Ensure CUDA (GPU support) is available if possible, else use CPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using device: {device}')

In [None]:
# Load the trained YOLOv8 model
yolo_model = YOLO('/content/drive/MyDrive/00_PFE/Object_Detection/Training_Results/Yolov8-V4/Results/runs/train/experiment/weights/best.pt').to(device)

In [None]:
# Define the label mapping
label_mapping = [
    "flooded", "non flooded", "flooded,non flooded", "Yes", "No",
    "0", "1", "2", "3", "4", "5", "6", "7", "8", "9",
    "10", "11", "12", "13", "14", "15", "16", "17", "18", "19",
    "20", "21", "22", "23", "24", "25", "26", "27", "28", "29",
    "30", "31", "32", "33", "34", "35", "36", "37", "38", "39",
    "40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "50"
]

In [None]:
# Define the question type mapping
question_type_mapping = {
    "Condition_Recognition": 0,
    "Yes_No": 1,
    "Simple_Counting": 2,
    "Complex_Counting": 3
}

In [None]:
# Function to extract features from YOLOv8
def extract_yolo_features(image_path, model, device):
    results = model(image_path)

    # Initialize lists to store extracted features
    boxes_list = []
    conf_list = []
    cls_list = []

    for result in results:
        if result.boxes is not None:
            boxes = result.boxes.xyxy.to(device)  # Bounding box coordinates
            confs = result.boxes.conf.to(device)  # Confidence scores
            classes = result.boxes.cls.to(device)  # Class values
            boxes_list.append(boxes)
            conf_list.append(confs)
            cls_list.append(classes)

    # Combine features into a single tensor
    if boxes_list:
        features = torch.cat([torch.cat(boxes_list), torch.cat(conf_list).unsqueeze(1), torch.cat(cls_list).unsqueeze(1)], dim=1)
    else:
        features = torch.empty((0, 6), device=device)

    return features

In [None]:
# VQADataset class
class VQADataset(Dataset):
    def __init__(self, annotations_file, img_dir, tokenizer, transform=None):
        with open(annotations_file, 'r') as f:
            self.annotations = json.load(f)
        self.img_dir = img_dir
        self.transform = transform
        self.tokenizer = tokenizer
        self.img_to_annotations = self._group_by_image()

    def _group_by_image(self):
        img_to_annotations = defaultdict(list)
        for idx, annotation in self.annotations.items():
            img_to_annotations[annotation['Image_ID']].append(annotation)
        return img_to_annotations

    def __len__(self):
        return len(self.img_to_annotations)

    def __getitem__(self, idx):
        image_id = list(self.img_to_annotations.keys())[idx]
        annotations = self.img_to_annotations[image_id]
        img_path = os.path.join(self.img_dir, image_id)
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        questions = []
        answers = []
        question_types = []
        for annotation in annotations:
            inputs = self.tokenizer.encode_plus(
                annotation['Question'],
                add_special_tokens=True,
                return_tensors='pt',
                padding='max_length',
                truncation=True,
                max_length=64
            )
            question = inputs['input_ids'].squeeze(0).to(device)
            attention_mask = inputs['attention_mask'].squeeze(0).to(device)
            answer_text = str(annotation['Ground_Truth'])
            answer_idx = label_mapping.index(answer_text)
            question_type_idx = question_type_mapping[annotation['Question_Type']]
            questions.append((question, attention_mask))
            answers.append(torch.tensor(answer_idx, device=device))
            question_types.append(torch.tensor(question_type_idx, device=device))
        return {
            'image_path': img_path,
            'questions': questions,
            'attention_masks': [am for _, am in questions],
            'answers': torch.stack(answers),
            'question_types': torch.stack(question_types)
        }

In [None]:
def custom_collate_fn(batch):
    batch_image_paths = [item['image_path'] for item in batch]
    batch_questions = [q for item in batch for q, _ in item['questions']]
    batch_attention_masks = [am for item in batch for _, am in item['questions']]
    batch_answers = torch.cat([item['answers'] for item in batch])
    batch_question_types = torch.cat([item['question_types'] for item in batch])
    num_questions_per_image = [len(item['questions']) for item in batch]
    return {
        'image_paths': batch_image_paths,
        'questions': batch_questions,
        'attention_masks': batch_attention_masks,
        'answers': batch_answers,
        'question_types': batch_question_types,
        'num_questions_per_image': num_questions_per_image
    }

In [None]:
class VQAModel(nn.Module):
    def __init__(self, bert_model, gpt2_model, yolo_input_dim, hidden_dim, vocab_size):
        super(VQAModel, self).__init__()
        self.bert_model = bert_model
        self.gpt2_model = gpt2_model
        self.fc_yolo = nn.Linear(yolo_input_dim, hidden_dim)
        self.fc_proj = nn.Linear(hidden_dim + 768, gpt2_model.config.n_embd)  # Project to GPT-2 input dimension
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size

    def forward(self, image_features, questions, attention_masks, num_questions_per_image):
        image_features = [self.fc_yolo(image_feature) for image_feature in image_features]
        image_features = torch.stack(image_features)

        text_features = [self.bert_model(question.unsqueeze(0).to(image_features.device), attention_mask=attention_mask.unsqueeze(0).to(image_features.device)).pooler_output for question, attention_mask in zip(questions, attention_masks)]
        text_features = torch.cat(text_features, dim=0)

        expanded_image_features = []
        for image_feature, num_questions in zip(image_features, num_questions_per_image):
            expanded_image_features.append(image_feature.repeat(num_questions, 1))
        expanded_image_features = torch.cat(expanded_image_features, dim=0)

        combined_features = torch.cat((expanded_image_features, text_features), dim=1)
        projected_features = self.fc_proj(combined_features)

        gpt2_output = self.gpt2_model(inputs_embeds=projected_features.unsqueeze(1), return_dict=True).logits

        logits = gpt2_output[:, -1, :]
        return logits

In [None]:
# Initialize tokenizer, BERT model, GPT-2 model, and VQA model
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased').to(device)
gpt2_tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
gpt2_model = GPT2LMHeadModel.from_pretrained('gpt2').to(device)
num_classes = len(label_mapping)
hidden_dim = 256
vqa_model = VQAModel(bert_model=bert_model, gpt2_model=gpt2_model, yolo_input_dim=6, hidden_dim=hidden_dim, vocab_size=num_classes).to(device)

In [None]:
# Load the saved model state
vqa_model.load_state_dict(torch.load('/content/drive/MyDrive/00_PFE/VQA/Code-V3/VQAModel_Best.pth'))

In [None]:
# Initialize dataset and dataloader
test_annotations_file = '/content/drive/MyDrive/00_PFE/DataSet/Visual_Question_Answering /FloodNet Challenge @ EARTHVISION 2021 - Track 2/Questions/Training Question.json'
test_img_dir = '/content/drive/MyDrive/00_PFE/DataSet/Visual_Question_Answering /FloodNet Challenge @ EARTHVISION 2021 - Track 2/Images/Train_Image'
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])
test_dataset = VQADataset(test_annotations_file, test_img_dir, bert_tokenizer, transform)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=custom_collate_fn)

criterion = torch.nn.CrossEntropyLoss()

In [None]:
# Validation function
def validate_model(model, dataloader, criterion):
    model.eval()
    question_type_correct = defaultdict(int)
    question_type_total = defaultdict(int)
    question_type_loss = defaultdict(float)
    total_correct = 0
    total_loss = 0.0
    total_samples = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Validation"):
            image_paths = batch['image_paths']
            questions = batch['questions']
            attention_masks = batch['attention_masks']
            answers = batch['answers']
            question_types = batch['question_types']
            num_questions_per_image = batch['num_questions_per_image']

            # Extract features using YOLOv8
            image_features_list = []
            for image_path in image_paths:
                features = extract_yolo_features(image_path, yolo_model, device)
                if features.nelement() == 0:
                    features = torch.zeros((1, 6), device=device)  # Initialize with zeros if no features found
                image_features_list.append(features.mean(dim=0))
            image_features = torch.stack(image_features_list)

            outputs = model(image_features, questions, attention_masks, num_questions_per_image)
            _, predicted = torch.max(outputs, 1)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(answers.cpu().numpy())

            for i in range(len(predicted)):
                question_type = question_types[i].item()
                question_type_correct[question_type] += (predicted[i] == answers[i]).item()
                question_type_total[question_type] += 1
                loss = criterion(outputs[i].unsqueeze(0), answers[i].unsqueeze(0)).item()
                question_type_loss[question_type] += loss

                total_correct += (predicted[i] == answers[i]).item()
                total_loss += loss
                total_samples += 1

    # Calculate overall accuracy and loss
    overall_accuracy = total_correct / total_samples
    overall_loss = total_loss / total_samples
    print(f"Overall Accuracy: {overall_accuracy * 100:.2f}%")
    print(f"Overall Loss: {overall_loss:.4f}")

    # Calculate accuracy and loss for each question type
    for question_type, correct in question_type_correct.items():
        total = question_type_total[question_type]
        accuracy = correct / total
        avg_loss = question_type_loss[question_type] / total
        question_type_name = [key for key, value in question_type_mapping.items() if value == question_type][0]
        print(f"Accuracy for {question_type_name}: {accuracy * 100:.2f}%")
        print(f"Average loss for {question_type_name}: {avg_loss:.4f}")

In [None]:
# Run validation
validate_model(vqa_model, test_dataloader, criterion)