In [1]:
!pip install transformers



In [2]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: sylvain
"""

# Classe définissant le jeu de donnée VQA au format pytorch

import os.path
import json
import random

import numpy as np
from skimage import io

from torch.utils.data import Dataset
import torchvision.transforms as T

RANDOM_SEED = 42


class VQALoader(Dataset):
    def __init__(self, imgFolder, images_file, questions_file, answers_file, encoder_questions, encoder_answers, train=True, ratio_images_to_use = 1, transform=None, patch_size=512):
        self.transform = transform
        self.encoder_questions = encoder_questions
        self.encoder_answers = encoder_answers
        self.train = train
        
        
        vocab = self.encoder_questions.words
        self.relationalWords = [vocab['top'], vocab['bottom'], vocab['right'], vocab['left']]
        
        with open(questions_file) as json_data:
            self.questionsJSON = json.load(json_data)
            
        with open(answers_file) as json_data:
            self.answersJSON = json.load(json_data)
            
        with open(images_file) as json_data:
            self.imagesJSON = json.load(json_data)
        
        images = [img['id'] for img in self.imagesJSON['images'] if img['active']]
        images = images[:int(len(images)*ratio_images_to_use)]
        self.images = np.empty((len(images), patch_size, patch_size, 3))
        
        self.len = 0
        for image in images:
            self.len += len(self.imagesJSON['images'][image]['questions_ids'])
        self.images_questions_answers = [[None] * 4] * self.len
        
        index = 0
        for i, image in enumerate(images):
            img = io.imread(os.path.join(imgFolder, str(image)+'.tif'))
            self.images[i, :, :, :] = img
            for questionid in self.imagesJSON['images'][image]['questions_ids']:
                question = self.questionsJSON['questions'][questionid]
            
                question_str = question["question"]
                type_str = question["type"]
                answer_str = self.answersJSON['answers'][question["answers_ids"][0]]['answer']
            
                self.images_questions_answers[index] = [self.encoder_questions.encode(question_str), self.encoder_answers.encode(answer_str), i, type_str]
                index += 1
    def __len__(self):
        return self.len
    
    def __getitem__(self, idx):
        question = self.images_questions_answers[idx]
        img = self.images[question[2],:,:,:]
        if self.train and not self.relationalWords[0] in question[0] and not self.relationalWords[1] in question[0] and not self.relationalWords[2] in question[0] and not self.relationalWords[3] in question[0]:
            if random.random() < .5:
                img = np.flip(img, axis = 0)
            if random.random() < .5:
                img = np.flip(img, axis = 1)
            if random.random() < .5:
                img = np.rot90(img, k=1)
            if random.random() < .5:
                img = np.rot90(img, k=3)
        if self.transform:
            imgT = self.transform(img.copy())
        if self.train:
            return np.array(question[0], dtype='int16'), np.array(question[1], dtype='int16'), imgT, question[3]
        else:
            return np.array(question[0], dtype='int16'), np.array(question[1], dtype='int16'), imgT, question[3], T.ToTensor()(img / 255)   

In [3]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F

#from models.skipthoughts import skipthoughts
# From https://github.com/Cadene/vqa.pytorch


def process_lengths(input):
    max_length = input.size(1)
    lengths = list(max_length - input.data.eq(0).sum(1).squeeze())
    return lengths

def select_last(x, lengths):
    batch_size = x.size(0)
    seq_length = x.size(1)
    mask = x.data.new().resize_as_(x.data).fill_(0)
    for i in range(batch_size):
        mask[i][lengths[i]-1].fill_(1)
    mask = Variable(mask)
    x = x.mul(mask)
    x = x.sum(1).view(batch_size, x.size(2))
    return x

class LSTM(nn.Module):

    def __init__(self, vocab, emb_size, hidden_size, num_layers):
        super(LSTM, self).__init__()
        self.vocab = vocab
        self.emb_size = emb_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(num_embeddings=len(self.vocab)+1,
                                      embedding_dim=emb_size,
                                      padding_idx=0)
        self.rnn = nn.LSTM(input_size=emb_size, hidden_size=hidden_size, num_layers=num_layers)

    def forward(self, input):
        lengths = process_lengths(input)
        x = self.embedding(input) # seq2seq
        output, hn = self.rnn(x)
        output = select_last(output, lengths)
        return output


class TwoLSTM(nn.Module):

    def __init__(self, vocab, emb_size, hidden_size):
        super(TwoLSTM, self).__init__()
        self.vocab = vocab
        self.emb_size = emb_size
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(num_embeddings=len(self.vocab)+1,
                                      embedding_dim=emb_size,
                                      padding_idx=0)
        self.rnn_0 = nn.LSTM(input_size=emb_size, hidden_size=hidden_size, num_layers=1)
        self.rnn_1 = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size, num_layers=1)

    def forward(self, input):
        lengths = process_lengths(input)
        x = self.embedding(input) # seq2seq
        x = getattr(F, 'tanh')(x)
        x_0, hn = self.rnn_0(x)
        vec_0 = select_last(x_0, lengths)

        # x_1 = F.dropout(x_0, p=0.3, training=self.training)
        # print(x_1.size())
        x_1, hn = self.rnn_1(x_0)
        vec_1 = select_last(x_1, lengths)
        
        vec_0 = F.dropout(vec_0, p=0.3, training=self.training)
        vec_1 = F.dropout(vec_1, p=0.3, training=self.training)
        output = torch.cat((vec_0, vec_1), 1)
        return output
        

def factory(vocab_words, opt):
    if opt['arch'] == 'skipthoughts':
        st_class = getattr(skipthoughts, opt['type'])
        seq2vec = st_class(opt['dir_st'],
                           vocab_words,
                           dropout=opt['dropout'],
                           fixed_emb=opt['fixed_emb'])
    elif opt['arch'] == '2-lstm':
        seq2vec = TwoLSTM(vocab_words,
                          opt['emb_size'],
                          opt['hidden_size'])
    elif opt['arch'] == 'lstm':
        seq2vec = TwoLSTM(vocab_words,
                          opt['emb_size'],
                          opt['hidden_size'],
                          opt['num_layers'])
    elif opt['arch'] == 'bert':
        seq2vec = BertEncoder(opt['bert_model'], opt['hidden_size'], opt['dropout'])
        
    elif opt['arch'] == 'word2vec':  # New condition for Word2Vec
        # Load your pre-trained model (loaded in the previous code snippet)
        word2vec_model = word2vec_model  
        # Create a simple embedding lookup mechanism
        def seq2vec(sentences):
            embeddings = []
            for sentence in sentences:
                word_embeddings = [word2vec_model[word] for word in sentence if word in word2vec_model]
                
                # Handle sentence representation (you'll need a strategy here)
                if word_embeddings:
                    sentence_embedding = np.mean(word_embeddings, axis=0) 
                else:
                    sentence_embedding = np.zeros(300)  # Default for OOV cases

                embeddings.append(sentence_embedding)
            return embeddings

    else:
        raise NotImplementedError
    return seq2vec


if __name__ == '__main__':

    vocab = ['robots', 'are', 'very', 'cool', '<eos>', 'BiDiBu']
    lstm = TwoLSTM(vocab, 300, 1024).cuda()

    myinput = Variable(torch.LongTensor([
        [1,2,3,4,5,0,0],
        [6,1,2,3,3,4,5],
        [6,1,2,3,3,4,5]
    ])).cuda()
    output = lstm(myinput)
    print(output.shape)

torch.Size([3, 2048])


In [4]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: sylvain
"""

# Encodeur du vocabulaireß

import json

MAX_ANSWERS = 100
LEN_QUESTION = 20


class VocabEncoder():
    # Création du dictionnaire en parcourant l'ensemble du JSON (des questions ou des réponses)
    def __init__(self, JSONFile, string=None, questions=True, range_numbers=False):
        self.encoder_type = 'answer'
        if questions:
            self.encoder_type = 'question'
        self.questions = questions
        self.range_numbers = range_numbers
        
        words = {}  
        
        if JSONFile != None:
            with open(JSONFile) as json_data:
                self.data = json.load(json_data)[self.encoder_type + 's']
        else:
            if questions:
                self.data = [{'question':string}]
            else:
                self.data = [{'answer':string}]
            
        
        for i in range(len(self.data)):
            if self.data[i]["active"]:
                sentence = self.data[i][self.encoder_type]
                if sentence[-1] == "?" or sentence[-1] == ".":
                    sentence = sentence[:-1]
                
                tokens = sentence.split()
                for token in tokens:
                    token = token.lower()
                    if range_numbers and token.isdigit() and not questions:
                        num = int(token)
                        if num > 0 and num <= 10:
                            token = "between 0 and 10"
                        if num > 10 and num <= 100:
                            token = "between 10 and 100"
                        if num > 100 and num <= 1000:
                            token = "between 100 and 1000"
                        if num > 1000:
                            token = "more than 1000"

                    if token[-2:] == 'm2' and not questions:
                        num = int(token[:-2])
                        if num > 0 and num <= 10:
                            token = "between 0m2 and 10m2"
                        if num > 10 and num <= 100:
                            token = "between 10m2 and 100m2"
                        if num > 100 and num <= 1000:
                            token = "between 100m2 and 1000m2"
                        if num > 1000:
                            token = "more than 1000m2"
                    if token not in words:
                        words[token] = 1
                    else:
                        words[token] += 1
                
        sorted_words = sorted(words.items(), key=lambda kv: kv[1], reverse=True)
        self.words = {'<EOS>':0}
        self.list_words = ['<EOS>']
        for i, word in enumerate(sorted_words):
            if self.encoder_type == 'answer':
                if i >= MAX_ANSWERS:
                    break
            self.words[word[0]] = i + 1
            self.list_words.append(word[0])
    
    #Encodage d'une phrase (question ou réponse) à partir du dictionnaire crée plus tôt.        
    def encode(self, sentence):
        res = []
        if sentence[-1] == "?" or sentence[-1] == ".":
            sentence = sentence[:-1]
            
        tokens = sentence.split()
        for token in tokens:
            token = token.lower()
            if self.range_numbers and token.isdigit() and not self.questions:
                num = int(token)
                if num > 0 and num <= 10:
                    token = "between 0 and 10"
                if num > 10 and num <= 100:
                    token = "between 10 and 100"
                if num > 100 and num <= 1000:
                    token = "between 100 and 1000"
                if num > 1000:
                    token = "more than 1000"
                    
            if token[-2:] == 'm2' and not self.questions:
                num = int(token[:-2])
                if num > 0 and num <= 10:
                    token = "between 0m2 and 10m2"
                if num > 10 and num <= 100:
                    token = "between 10m2 and 100m2"
                if num > 100 and num <= 1000:
                    token = "between 100m2 and 1000m2"
                if num > 1000:
                    token = "more than 1000m2"
            res.append(self.words[token])
        
        if self.questions:
            res.append(self.words['<EOS>'])
        
        if self.questions:
            while len(res) < LEN_QUESTION:
                res.append(self.words['<EOS>'])
            res = res[:LEN_QUESTION]
        return res
    
    
    def getVocab(self):
        return self.list_words
    
    #Décodage d'une phrase (seulement utilisé pour l'affichage des résultats)
    def decode(self, sentence):
        res = ""
        for i in sentence:
            if i == 0:
                break
            res += self.list_words[i]
            res += " "
        res = res[:-1]
        if self.questions:
            res += "?"
        return res
        
            
            
            
        

In [5]:
import torch
import torch.nn as nn
import torchvision.models as torchmodels
from transformers import SwinForImageClassification, BertTokenizer, BertModel

import torch
import torch.nn as nn
from transformers import SwinForImageClassification, BertTokenizer

class BertEncoder(nn.Module):
    def __init__(self, bert_model='bert-base-uncased', hidden_size=768, dropout=0.1):
        super(BertEncoder, self).__init__()
        self.bert = BertModel.from_pretrained(bert_model)
        self.dropout = nn.Dropout(dropout)
        self.hidden_size = hidden_size

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        return pooled_output

class BilinearAttention(nn.Module):
    def __init__(self, v_dim, q_dim, num_hid):
        super(BilinearAttention, self).__init__()
        self.v_proj = nn.Linear(v_dim, num_hid)
        self.q_proj = nn.Linear(q_dim, num_hid)
        self.dropout = nn.Dropout(0.5)
        self.weights = nn.Parameter(torch.rand(num_hid, num_hid))

    def forward(self, v, q):
        v_proj = self.dropout(self.v_proj(v))
        q_proj = self.dropout(self.q_proj(q))
        v_proj = v_proj.unsqueeze(2)
        q_proj = q_proj.unsqueeze(1)
        attn = torch.matmul(torch.matmul(q_proj, self.weights), v_proj)
        attn = attn.squeeze(1).squeeze(1)
        return attn

class VQAModel(nn.Module):
    def __init__(self, vocab_questions, vocab_answers, swin_model='microsoft/swin-tiny-patch4-window7-224', bert_model='bert-base-uncased', input_size=768):
        super(VQAModel, self).__init__()
        
        self.vocab_questions = vocab_questions
        self.vocab_answers = vocab_answers
        self.num_classes = len(self.vocab_answers)
        
        self.bert_tokenizer = BertTokenizer.from_pretrained(bert_model)
        self.bert_encoder = BertEncoder(bert_model)
        
        # Visual model (Swin Transformer)
        self.visual = SwinForImageClassification.from_pretrained(swin_model)
        
        # Linear layer for visual features
        self.linear_v = nn.Linear(self.visual.config.num_labels, input_size)
        
        # Bilinear Attention
        self.attention = BilinearAttention(v_dim=input_size, q_dim=self.bert_encoder.hidden_size, num_hid=512)
        
        # Classification layers
        self.linear_classif1 = nn.Linear(input_size, 512)
        self.linear_classif2 = nn.Linear(512, self.num_classes)
        
    def forward(self, input_v, input_q):
        # Visual feature extraction
        x_v = self.visual(input_v).logits
        x_v = x_v.view(x_v.size(0), -1)  # Flatten the visual features
        x_v = self.linear_v(x_v)

        # Question feature extraction
        input_ids = self.bert_tokenizer(input_q, return_tensors='pt', padding=True, truncation=True).to(input_v.device)
        x_q = self.bert_encoder(input_ids=input_ids['input_ids'], attention_mask=input_ids['attention_mask'])

        # Bilinear Attention
        attn_scores = self.attention(x_v, x_q)
        attended_v = x_v * attn_scores.unsqueeze(-1)

        # Classification layers
        x = self.linear_classif1(attended_v)
        x = nn.ReLU()(x)
        x = self.linear_classif2(x)

        return x

In [6]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: sylvain
"""

# Script principal d'apprentissage

import matplotlib
matplotlib.use('Agg')

#from models import model
#import VQALoader
#import VocabEncoder
import torchvision.transforms as T
import torch
from torch.autograd import Variable
from torch.cuda.amp import autocast, GradScaler
import numpy as np
import matplotlib.pyplot as plt

import pickle
import os
import datetime
from shutil import copyfile


def train(model, train_dataset, validate_dataset, batch_size, num_epochs, learning_rate, modeltype, Dataset,pre_trained_model, accumulation_steps=4):
    if pre_trained_model is not None:
        model.load_state_dict(torch.load(pre_trained_model))
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size // accumulation_steps, shuffle=True, num_workers=2)
    validate_loader = torch.utils.data.DataLoader(validate_dataset, batch_size=batch_size // accumulation_steps, shuffle=False, num_workers=2)
    
    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=learning_rate)
    criterion = torch.nn.CrossEntropyLoss()
    scaler = GradScaler()
    trainLoss = []
    valLoss = []
    accPerQuestionType = {'area': [], 'presence': [], 'count': [], 'comp': []} if Dataset == 'HR' else {'rural_urban': [], 'presence': [], 'count': [], 'comp': []}
    OA = []
    AA = []
    
    for epoch in range(num_epochs):
        model.train()
        runningLoss = 0
        optimizer.zero_grad()
        
        for i, data in enumerate(train_loader, 0):
            if i % 1000 == 999:
                print(i/len(train_loader))
            question, answer, image, _ = data
            question_str = [encoder_questions.decode(q.cpu().numpy()) for q in question]
            answer = Variable(answer.long()).resize_(len(question_str))
            image = Variable(image.float())
            
            with autocast():
                model.cuda()
                pred = model(image.cuda(), question_str)
                loss = criterion(pred, answer.cuda())
                loss = loss / accumulation_steps
            
            scaler.scale(loss).backward()
            
            if (i + 1) % accumulation_steps == 0:
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()
            
            runningLoss += loss.cpu().item() * len(question_str)
            
            model.cpu()
            del image, answer, question, question_str, pred, loss
            torch.cuda.empty_cache()
        
        trainLoss.append(runningLoss / len(train_dataset))
        print('epoch %d train_loss:--------------------- %.3f' % (epoch, trainLoss[epoch]))
        
        with torch.no_grad():
            model.eval()
            runningLoss = 0
            countQuestionType = {'presence': 0, 'count': 0, 'comp': 0, 'area': 0} if Dataset == 'HR' else {'presence': 0, 'count': 0, 'comp': 0, 'rural_urban': 0}
            rightAnswerByQuestionType = {'presence': 0, 'count': 0, 'comp': 0, 'area': 0} if Dataset == 'HR' else {'presence': 0, 'count': 0, 'comp': 0, 'rural_urban': 0}
            count_q = 0
            
            for i, data in enumerate(validate_loader, 0):
                if i % 1000 == 999:
                    print(i/len(validate_loader))
                question, answer, image, type_str, image_original = data
                question_str = [encoder_questions.decode(q.cpu().numpy()) for q in question]
                answer = Variable(answer.long()).resize_(len(question_str))
                image = Variable(image.float())
                
                with autocast():
                    model.cuda()
                    pred = model(image.cuda(), question_str)
                    loss = criterion(pred, answer.cuda())
                
                runningLoss += loss.cpu().item() * len(question_str)
                
                answer = answer.cpu().numpy()
                pred = np.argmax(pred.cpu().detach().numpy(), axis=1)
                for j in range(len(question_str)):
                    countQuestionType[type_str[j]] += 1
                    if answer[j] == pred[j]:
                        rightAnswerByQuestionType[type_str[j]] += 1
                
                if i % 50 == 2 and i < 999:
                    fig1, f1_axes = plt.subplots(ncols=1, nrows=2)
                    viz_img = T.ToPILImage()(image_original[0].float().data.cpu())
                    viz_question = question_str[0]
                    viz_answer = encoder_answers.decode([answer[0]])
                    viz_pred = encoder_answers.decode([pred[0]])
                    
                    f1_axes[0].imshow(viz_img)
                    f1_axes[0].axis('off')
                    f1_axes[0].set_title(viz_question)
                    f1_axes[1].axis('off')
                    f1_axes[1].set_title(viz_answer)
                    text = f1_axes[1].text(0.5, -0.1, viz_pred, size=12, horizontalalignment='center',
                                              verticalalignment='center', transform=f1_axes[1].transAxes)
                    plt.savefig('/tmp/VQA.png')
                    plt.close(fig1)
                
                model.cpu()
                del image, answer, question, question_str, pred, loss, image_original, type_str
                torch.cuda.empty_cache()
        
            valLoss.append(runningLoss / len(validate_dataset))
            print('epoch %d val loss------------------ %.3f' % (epoch, valLoss[epoch]))
        
            numQuestions = 0
            numRightQuestions = 0
            currentAA = 0
            for type_str in countQuestionType.keys():
                if countQuestionType[type_str] > 0:
                    accPerQuestionType[type_str].append(rightAnswerByQuestionType[type_str] * 1.0 / countQuestionType[type_str])
                numQuestions += countQuestionType[type_str]
                numRightQuestions += rightAnswerByQuestionType[type_str]
                currentAA += accPerQuestionType[type_str][epoch]
                
            OA.append(numRightQuestions * 1.0 / numQuestions)
            AA.append(currentAA * 1.0 / 4)
        
        torch.save(model.state_dict(), "BertAttention.pth")

    return model

if __name__ == '__main__':
    disable_log = True
    batch_size = 70
    num_epochs = 6
    learning_rate = 0.00001
    ratio_images_to_use = 1
    modeltype = 'Simple'
    Dataset = 'LR'
    pre_trained_model_path = "/kaggle/input/swimmodel/BertAttentionSwim.pth"

    if Dataset == 'LR':
        data_path = '/kaggle/input/vqars1/6344334/'#'/raid/home/sylvain/RSVQA_USGS_data/'#'../AutomaticDB/'
        allquestionsJSON = os.path.join(data_path, 'all_questions.json')
        allanswersJSON = os.path.join(data_path, 'all_answers.json')
        questionsJSON = os.path.join(data_path, 'LR_split_train_questions.json')
        answersJSON = os.path.join(data_path, 'LR_split_train_answers.json')
        imagesJSON = os.path.join(data_path, 'LR_split_train_images.json')
        questionsvalJSON = os.path.join(data_path, 'LR_split_val_questions.json')
        answersvalJSON = os.path.join(data_path, 'LR_split_val_answers.json')
        imagesvalJSON = os.path.join(data_path, 'LR_split_val_images.json')
        images_path = os.path.join(data_path, 'Images_LR/')
    else:
        data_path = '/raid/home/sylvain/RSVQA_USGS_data/'
        images_path = os.path.join(data_path, 'dataUSGS/')
    encoder_questions = VocabEncoder(allquestionsJSON, questions=True)
    if Dataset == "LR":
        encoder_answers = VocabEncoder(allanswersJSON, questions=False, range_numbers = True)
    else:
        encoder_answers = VocabEncoder(allanswersJSON, questions=False, range_numbers = False)

    IMAGENET_MEAN = [0.485, 0.456, 0.406]
    IMAGENET_STD = [0.229, 0.224, 0.225]
    transform = T.Compose([
        T.ToTensor(),            
        T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
      ])
    
    if Dataset == 'LR':
        patch_size = 256
    else:
        patch_size = 512   
    train_dataset = VQALoader(images_path, imagesJSON, questionsJSON, answersJSON, encoder_questions, encoder_answers, train=True, ratio_images_to_use=ratio_images_to_use, transform=transform, patch_size = patch_size)
    validate_dataset = VQALoader(images_path, imagesvalJSON, questionsvalJSON, answersvalJSON, encoder_questions, encoder_answers, train=False, ratio_images_to_use=ratio_images_to_use, transform=transform, patch_size = patch_size)
    
    
    RSVQA = VQAModel(encoder_questions.getVocab(), encoder_answers.getVocab(), swin_model='microsoft/swin-tiny-patch4-window7-224', input_size=768)
    if os.path.exists(pre_trained_model_path):
        RSVQA.load_state_dict(torch.load(pre_trained_model_path))
    RSVQA = train(RSVQA, train_dataset, validate_dataset, batch_size, num_epochs, learning_rate, modeltype, Dataset,pre_trained_model_path,4)
    
    

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/71.8k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/113M [00:00<?, ?B/s]

0.2967032967032967
0.5937035937035937
0.8907038907038907
epoch 0 train_loss:--------------------- 0.156
epoch 0 val loss------------------ 0.632
0.2967032967032967
0.5937035937035937
0.8907038907038907
epoch 1 train_loss:--------------------- 0.151
epoch 1 val loss------------------ 0.586
0.2967032967032967
0.5937035937035937
0.8907038907038907
epoch 2 train_loss:--------------------- 0.148
epoch 2 val loss------------------ 0.573
0.2967032967032967
0.5937035937035937
0.8907038907038907
epoch 3 train_loss:--------------------- 0.145
epoch 3 val loss------------------ 0.565
0.2967032967032967
0.5937035937035937
0.8907038907038907
epoch 4 train_loss:--------------------- 0.140
epoch 4 val loss------------------ 0.557
0.2967032967032967
0.5937035937035937
0.8907038907038907
epoch 5 train_loss:--------------------- 0.137
epoch 5 val loss------------------ 0.544


In [7]:
'''#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: sylvain
"""

# Calcul des statistiques sur un jeu de test

#import VocabEncoder
#import VQALoader
#from models import model
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

import torch
import torchvision.transforms as T
from torch.autograd import Variable
from skimage import io
import numpy as np
import pickle
import os

def do_confusion_matrix(all_mat, old_vocab, new_vocab, dataset):
    print(new_vocab)
    new_mat = np.zeros((len(new_vocab), len(new_vocab)))
    for i in range(1,all_mat.shape[0]):
        answer = old_vocab[i]
        new_i = new_vocab.index(answer)
        for j in range(1,all_mat.shape[1]):
            answer = old_vocab[j]
            new_j = new_vocab.index(answer)
            new_mat[new_i, new_j] = all_mat[i, j]

    if len(old_vocab) > 20:#HR
        new_mat = new_mat[0:18,0:18]
        new_vocab = new_vocab[0:18]
    fig = plt.figure(figsize=(10,5))
    ax = fig.add_subplot(111)
    cax = ax.matshow(np.log(new_mat+1), cmap="YlGn")
    #plt.title('Confusion matrix of the classifier')
    fig.colorbar(cax)
    ax.set_xticklabels([''] + new_vocab)
    ax.set_yticklabels([''] + new_vocab)
    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()
    fig.savefig('confusion_matrix_' + dataset + '.svg')
    #plt.close()

        

def get_vocab(dataset):
    data_path = '/kaggle/input/vqars1/6344334/'
    if dataset == "LR":
        allanswersJSON = os.path.join(data_path, 'all_answers.json')
        encoder_answers = VocabEncoder(allanswersJSON, questions=False, range_numbers = True)
    else:
        allanswersJSON = os.path.join(data_path, 'USGSanswers.json')
        encoder_answers = VocabEncoder(allanswersJSON, questions=False, range_numbers = False)
        
    return encoder_answers.getVocab()

def run(experiment, dataset, shuffle=False, num_batches=-1, save_output=False):
    print ('---' + experiment + '---')
    batch_size = 100
    accumulation_steps=4
    data_path = '/kaggle/input/vqars1/6344334/'
    if dataset == "LR":
        allquestionsJSON = os.path.join(data_path, 'all_questions.json')
        allanswersJSON = os.path.join(data_path, 'all_answers.json')
        questionsJSON = os.path.join(data_path, 'LR_split_test_questions.json')
        answersJSON = os.path.join(data_path, 'LR_split_test_answers.json')
        imagesJSON = os.path.join(data_path, 'LR_split_test_images.json')
        images_path = os.path.join(data_path, 'Images_LR/')
        encoder_questions = VocabEncoder(allquestionsJSON, questions=True)
        encoder_answers = VocabEncoder(allanswersJSON, questions=False, range_numbers = True)
        patch_size = 256
    else:
        allquestionsJSON = os.path.join(data_path, 'USGSquestions.json')
        allanswersJSON = os.path.join(data_path, 'USGSanswers.json')
        if dataset == "HR":
            questionsJSON = os.path.join(data_path, 'USGS_split_test_questions.json')
            answersJSON = os.path.join(data_path, 'USGS_split_test_answers.json')
            imagesJSON = os.path.join(data_path, 'USGS_split_test_images.json')
        else:
            questionsJSON = os.path.join(data_path, 'USGS_split_test_phili_questions.json')
            answersJSON = os.path.join(data_path, 'USGS_split_test_phili_answers.json')
            imagesJSON = os.path.join(data_path, 'USGS_split_test_phili_images.json')
        images_path = os.path.join(data_path, 'dataUSGS/')
        encoder_questions = VocabEncoder(allquestionsJSON, questions=True)
        encoder_answers = VocabEncoder(allanswersJSON, questions=False, range_numbers = False)
        patch_size = 512

    weight_file =  experiment + '.pth'
    swin_model = 'microsoft/swin-tiny-patch4-window7-224'
    network = VQAModel(encoder_questions.getVocab(), encoder_answers.getVocab(), swin_model=swin_model, bert_model='bert-base-uncased', input_size=768).cuda()
    network.eval()

    scaler = torch.cuda.amp.GradScaler()
    network.cpu()  # Move the model to CPU initially

    IMAGENET_MEAN = [0.485, 0.456, 0.406]
    IMAGENET_STD = [0.229, 0.224, 0.225]
    transform = T.Compose([
        T.ToTensor(),            
        T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
    ])

    if dataset == 'LR':
        patch_size = 256
    else:
        patch_size = 512

    test_dataset = VQALoader(images_path, imagesJSON, questionsJSON, answersJSON, encoder_questions, encoder_answers, train=False, ratio_images_to_use=1, transform=transform, patch_size=patch_size)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=2)

    scaler = torch.cuda.amp.GradScaler()
    network.cpu()  # Move the model to CPU initially

    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=2)

    if dataset == 'LR':
        countQuestionType = {'rural_urban': 0, 'presence': 0, 'count': 0, 'comp': 0}
        rightAnswerByQuestionType = {'rural_urban': 0, 'presence': 0, 'count': 0, 'comp': 0}
    else:
        countQuestionType = {'area': 0, 'presence': 0, 'count': 0, 'comp': 0}
        rightAnswerByQuestionType = {'area': 0, 'presence': 0, 'count': 0, 'comp': 0}
    confusionMatrix = np.zeros((len(encoder_answers.getVocab()), len(encoder_answers.getVocab())))

    for i, data in enumerate(test_loader, 0):
        if num_batches == 0:
            break
        num_batches -= 1
        if i % 100 == 99:
            print(float(i) / len(test_loader))

        question, answer, image, type_str, image_original = data
        question_str = [encoder_questions.decode(q.cpu().numpy()) for q in question]
        answer = Variable(answer.long())
        image = Variable(image.float())

        if shuffle:
            order = np.array(range(image.shape[0]))
            np.random.shuffle(order)
            image[np.array(range(image.shape[0]))] = image[order]

        sub_batch_size = batch_size // accumulation_steps
        pred_acc = []
        answer_acc = []
        type_str_acc = []

        for sub_batch in range(accumulation_steps):
            start_idx = sub_batch * sub_batch_size
            end_idx = (sub_batch + 1) * sub_batch_size

            if start_idx >= len(image):
                break

            sub_batch_image = image[start_idx:end_idx]
            sub_batch_question_str = question_str[start_idx:end_idx]
            sub_batch_answer = answer[start_idx:end_idx]
            sub_batch_type_str = type_str[start_idx:end_idx]

            if len(sub_batch_image) == 0:
                continue  # Skip the sub-batch if it is empty

            with torch.cuda.amp.autocast():
                network.cuda()  # Move the model to GPU for forward pass
                sub_batch_pred = network(sub_batch_image.cuda(), sub_batch_question_str)
                pred_acc.append(sub_batch_pred.cpu())
                answer_acc.append(sub_batch_answer)
                type_str_acc.extend(sub_batch_type_str)
                network.cpu()  # Move the model back to CPU

            del sub_batch_image, sub_batch_question_str, sub_batch_answer, sub_batch_type_str, sub_batch_pred
            torch.cuda.empty_cache()

        pred = torch.cat(pred_acc, dim=0)
        answer = torch.cat(answer_acc, dim=0)

        answer = answer.cpu().numpy()
        pred = np.argmax(pred.cpu().detach().numpy(), axis=1)

        for j in range(len(question_str)):
            countQuestionType[type_str_acc[j]] += 1
            if answer[j] == pred[j]:
                rightAnswerByQuestionType[type_str_acc[j]] += 1
            confusionMatrix[answer[j], pred[j]] += 1

        if save_output:
            out_path = '/kaggle/working/' + '_' + 'output' + dataset
            if not os.path.exists(out_path):
                os.mkdir(out_path)
            for j in range(len(question_str)):
                viz_img = T.ToPILImage()(image_original[j].float().data.cpu())
                viz_question = question_str[j]
                viz_answer = encoder_answers.decode([int(answer[j])])
                viz_pred = encoder_answers.decode([int(pred[j])])

                imname = str(i * len(question_str) + j) + '_q_' + viz_question + '_gt_' + viz_answer + '_pred_' + viz_pred + '.png'
                plt.imsave(os.path.join(out_path, imname), viz_img)

        del image, answer, question, question_str, pred, type_str, image_original, type_str_acc, answer_acc, pred_acc
        torch.cuda.empty_cache()

    Accuracies = {'AA': 0}
    for type_str in countQuestionType.keys():
        if countQuestionType[type_str] != 0:
            Accuracies[type_str] = rightAnswerByQuestionType[type_str] * 1.0 / countQuestionType[type_str]
        else:
            Accuracies[type_str] = 0.0

    if len(countQuestionType) > 0:
        Accuracies['AA'] = sum(Accuracies[type_str] for type_str in countQuestionType.keys()) / len(countQuestionType)
    else:
        Accuracies['AA'] = 0.0

    Accuracies['OA'] = np.trace(confusionMatrix)/np.sum(confusionMatrix)

    print('- Accuracies')
    for type_str in countQuestionType.keys():
        print(' - ' + type_str + ': ' + str(Accuracies[type_str]))
    print('- AA: ' + str(Accuracies['AA']))
    print('- OA: ' + str(Accuracies['OA']))

    return Accuracies, confusionMatrix


#expes = {'LRs': ['427f37d306ef4d03bb1406d5cd20336f', 'bd1387960b624257b9a50924d8134be6', '899e11235c624ec9bbb66e26da52d6fc'],
#         'LR': ['427f37d306ef4d03bb1406d5cd20336f', 'bd1387960b624257b9a50924d8134be6', '899e11235c624ec9bbb66e26da52d6fc'],
#         'HR': ['65f94a4f7ccd491da362f73e46795d26', '988853ae5d5e441695f98ee506021bdf', '3bfd251cafb74d379d02bf59d383381a'],
#         'HRPhili': ['65f94a4f7ccd491da362f73e46795d26', '988853ae5d5e441695f98ee506021bdf', '3bfd251cafb74d379d02bf59d383381a'],
#         'HRs': ['65f94a4f7ccd491da362f73e46795d26', '988853ae5d5e441695f98ee506021bdf', '3bfd251cafb74d379d02bf59d383381a'],
#         'HRPhilis': ['65f94a4f7ccd491da362f73e46795d26', '988853ae5d5e441695f98ee506021bdf', '3bfd251cafb74d379d02bf59d383381a']}
expes = {'LR': ['/kaggle/input/swimmodel/BertAttentionSwim']}
#run('65f94a4f7ccd491da362f73e46795d26', 'HR', num_batches=5, save_output=True)
#run('65f94a4f7ccd491da362f73e46795d26', 'HRPhili', num_batches=5, save_output=True)
run('/kaggle/input/swimmodel/BertAttentionSwim', 'LR', num_batches=5, save_output=True)
for dataset in expes.keys():
    acc = []
    mat = []
    for experiment_name in expes[dataset]:
        if not os.path.isfile(experiment_name + 'accuracies_' + '.npy'):
            if dataset[-1] == 's':
                print("run", dataset[:-1])
                tmp_acc, tmp_mat = run(experiment_name, dataset[:-1], shuffle=True)
            else:
                dataset = 'LR'
                tmp_acc, tmp_mat = run(experiment_name, dataset)
            np.save('/kaggle/working/' + 'accuracies_', tmp_acc)
            np.save('/kaggle/working/' + 'confusion_matrix_', tmp_mat)
        else:
            tmp_acc = np.load('/kaggle/working/' + 'accuracies_' + '.npy', allow_pickle=True)[()]
            tmp_mat = np.load('/kaggle/working/' + 'confusion_matrix_'+ '.npy', allow_pickle=True)[()]

        acc.append(tmp_acc)
        mat.append(tmp_mat)
        
    print('--- Total (' + dataset + ') ---')
    print('- Accuracies')
    for type_str in tmp_acc.keys():
        all_acc = []
        for tmp_acc in acc:
            all_acc.append(tmp_acc[type_str])
        print(' - ' + type_str + ': ' + str(np.mean(all_acc)) + ' ( stddev = ' + str(np.std(all_acc)) + ')')
    
    if dataset[-1] == 's':
        vocab = get_vocab(dataset[:-1])
    else:
        vocab = get_vocab(dataset)

    all_mat = np.zeros(tmp_mat.shape)    
    for tmp_mat in mat:
        all_mat += tmp_mat
    
    if dataset[0] == 'H':
        new_vocab = ['yes', 'no', '0m2', 'between 0m2 and 10m2', 'between 10m2 and 100m2', 'between 100m2 and 1000m2', 'more than 1000m2'] + [str(i) for i in range(90)]
    else:
        new_vocab = ['yes', 'no', 'rural', 'urban', '0', 'between 0 and 10', 'between 10 and 100', 'between 100 and 1000', 'more than 1000']
        
    do_confusion_matrix(all_mat, vocab, new_vocab, dataset)


#labels = ['Yes', 'No', '<=10', '0', '<=100', '<=1000', '>1000', 'Rural', 'Urban']
#fig = plt.figure()
#ax = fig.add_subplot(111)
#cax = ax.matshow(np.log(confusionMatrix[1:,1:] + 1), cmap="YlGn")
##plt.title('Confusion matrix of the classifier')
#fig.colorbar(cax)
#ax.set_xticklabels([''] + labels)
#ax.set_yticklabels([''] + labels)
#plt.xlabel('Predicted')
#plt.ylabel('True')
#plt.show()
#fig.savefig(os.path.join(baseFolder, 'AccMatrix.pdf'))
#print(Accuracies)'''

'#!/usr/bin/env python3\n# -*- coding: utf-8 -*-\n"""\n@author: sylvain\n"""\n\n# Calcul des statistiques sur un jeu de test\n\n#import VocabEncoder\n#import VQALoader\n#from models import model\nimport matplotlib.pyplot as plt\nimport matplotlib.ticker as ticker\n\nimport torch\nimport torchvision.transforms as T\nfrom torch.autograd import Variable\nfrom skimage import io\nimport numpy as np\nimport pickle\nimport os\n\ndef do_confusion_matrix(all_mat, old_vocab, new_vocab, dataset):\n    print(new_vocab)\n    new_mat = np.zeros((len(new_vocab), len(new_vocab)))\n    for i in range(1,all_mat.shape[0]):\n        answer = old_vocab[i]\n        new_i = new_vocab.index(answer)\n        for j in range(1,all_mat.shape[1]):\n            answer = old_vocab[j]\n            new_j = new_vocab.index(answer)\n            new_mat[new_i, new_j] = all_mat[i, j]\n\n    if len(old_vocab) > 20:#HR\n        new_mat = new_mat[0:18,0:18]\n        new_vocab = new_vocab[0:18]\n    fig = plt.figure(figsize=(

In [8]:
# import numpy as np

# # Load the confusion matrix from .npy file
# confusion_matrix = np.load('/kaggle/working/ModelRSVQAconfusion_matrix_.npy')


In [9]:
# import matplotlib.pyplot as plt

# # Display the confusion matrix
# plt.imshow(confusion_matrix, interpolation='nearest', cmap=plt.cm.Blues)
# plt.title('Confusion Matrix')
# plt.colorbar()

# # Add labels
# plt.xlabel('Predicted labels')
# plt.ylabel('True labels')

# # Show plot
# plt.show()


In [10]:
# print(confusion_matrix)

In [11]:
# # Get non-zero indices
# nonzero_indices = np.nonzero(confusion_matrix)

# # Get non-zero values
# nonzero_values = confusion_matrix[nonzero_indices]

# # Plot non-zero values
# plt.figure(figsize=(20, 20))
# plt.scatter(nonzero_indices[1], nonzero_indices[0], s=nonzero_values*10, c=nonzero_values, cmap='Blues', edgecolor='black')
# plt.colorbar(label='Count')
# plt.xlabel('Predicted labels')
# plt.ylabel('True labels')
# plt.title('Confusion Matrix')
# plt.grid(True)
# plt.show()

In [12]:
# import numpy as np

# # Load the confusion matrix from .npy file
# confusion_matrix = np.load('/kaggle/working/ModelRSVQAconfusion_matrix_.npy')

# # Get non-zero indices and values
# nonzero_indices = np.nonzero(confusion_matrix)
# nonzero_values = confusion_matrix[nonzero_indices]

# # Print non-zero values and their indices
# for i in range(len(nonzero_values)):
#     idx = (nonzero_indices[0][i], nonzero_indices[1][i])
#     val = nonzero_values[i]
#     print(f"Non-zero value: {val} at index {idx}")


In [13]:
# import seaborn as sns

# plt.figure(figsize=(8, 6))
# sns.heatmap(confusion_matrix, annot=True, fmt='.2f', cmap='Blues') 

# plt.xlabel('Predicted Label')
# plt.ylabel('True Label')
# plt.show()


In [14]:
# import numpy as np
# import seaborn as sns
# import matplotlib.pyplot as plt

# # Load the confusion matrix from .npy file
# confusion_matrix = np.load('/kaggle/working/ModelRSVQAconfusion_matrix_.npy')

# # Plot the confusion matrix using Seaborn
# plt.figure(figsize=(8, 6))
# sns.heatmap(confusion_matrix, annot=True, cmap='Blues', fmt='g')
# plt.title('Confusion Matrix')
# plt.xlabel('Predicted labels')
# plt.ylabel('True labels')
# plt.show()
