# In this snippet we will learn about Remote Sensing (RS) image captioning technique using encoder-decoder based method

<div style="text-align: justify">
In image captioning, a machine automatically generates descriptive text to accompany an image. This process typically utilizes a combination of computer vision and natural language processing techniques. One commonly used model for this task is the encoder-decoder model. Initially, the image undergoes analysis using convolutional neural networks (CNNs) to extract relevant features and comprehend its content. These features are then inputted into recurrent neural networks (RNNs) or transformer-based models to produce the caption. RNNs are favored for their effectiveness in handling sequential data. During the training phase, the model learns to associate visual features with corresponding words or phrases by leveraging large datasets containing images paired with human-generated captions. Image captioning finds applications across various domains, including assistive technology, content accessibility, and enhancing social media content. One typical example is depicted below:
</div>

![Caption](images/Captions.png)

<div style="text-align: justify">
There are two kinds of captioning techniques:<br>
<b>Natural Image Captioning</b><br>
<b>Remote Sensing Image Captioning</b>
</div>

## Encoder-Decoder model

<div style="text-align: justify">
In remote sensing image captioning, an encoder-decoder model is utilized to generate textual descriptions for satellite or aerial images automatically. The encoder-decoder architecture comprises two main components: the encoder, responsible for understanding the visual content of the image, and the decoder, which generates the corresponding textual description.<br>
The encoder, often based on convolutional neural networks (CNNs), processes the input image to extract high-level features. In the context of remote sensing, these features may represent various aspects such as land cover types, terrain features, or man-made structures. The CNN layers capture spatial information hierarchically, transforming the raw pixel values into a compact and semantically rich representation. Transfer learning techniques, where pre-trained CNN models on large-scale datasets like ImageNet, are commonly employed to leverage their learned feature representations and adapt them to the remote sensing domain with relatively small annotated datasets.<br>
The decoder, typically implemented as a recurrent neural network (RNN) or transformer architecture, takes the encoded features from the CNN and generates the textual description. RNN-based decoders, such as Long Short-Term Memory (LSTM) networks, produce captions sequentially, generating one word at a time while incorporating context from previously generated words. Transformers, on the other hand, process the encoded features in parallel, attending to different parts of the image features to generate the caption.<br>
During training, the model learns to associate visual features with textual descriptions by minimizing a loss function that measures the dissimilarity between the generated caption and ground truth annotations. Additionally, attention mechanisms are often incorporated to allow the model to focus on relevant regions of the image while generating each word in the caption. Evaluation metrics like BLEU (Bilingual Evaluation Understudy) and METEOR (Metric for Evaluation of Translation with Explicit Ordering) are commonly used to assess the quality of generated captions. Overall, the encoder-decoder model in remote sensing image captioning facilitates automated analysis and interpretation of large-scale satellite or aerial imagery, enabling applications in environmental monitoring, urban planning, and disaster management.<br>
Architecture of Encoder-Decoder model is depicted below:
</div>

![Architure](images/Architecture.png)

## Lets start coding

### Impoer libreries

In [2]:
import torch
import torch.nn as nn
from torchvision.models import resnet152, ResNet152_Weights
from torchvision.transforms import Compose, Resize, ToTensor, Normalize
import torch.optim as optim
from tqdm import tqdm
import os
import shutil
import pickle as pkl
import json
import numpy as np
from PIL import Image
import random
import math
import time
import torch.nn.functional as F
from contextlib import redirect_stdout
import evaluate
from scipy.stats import gmean
import warnings

device = 'cuda' if torch.cuda.is_available() else 'cpu'
convert = Compose([ToTensor()])

ModuleNotFoundError: No module named 'evaluate'

### Set Paths

In [None]:
dataset = 'SYDNEY'
image_path = f'/tf/DRDO/datasets/{dataset}/images'
sent_path = f'/tf/DRDO/datasets/{dataset}/sentences/Dataset_Modified.json'
file_path = f'/tf/DRDO/content/{dataset}/files'
model_path = f'/tf/DRDO/content/{dataset}/models'
pred_path = f'/tf/DRDO/content/{dataset}/preds'

In [None]:
# %%script false --no-raise-error

def create_dir(folder_name):
    if os.path.exists(folder_name):
        shutil.rmtree(folder_name)
    os.makedirs(folder_name)

create_dir(file_path)
create_dir(model_path)

### Define Image Encoder

In [None]:
class encoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = resnet152(weights=ResNet152_Weights.DEFAULT)

        # Freeze all layers
        for param in self.model.parameters():
            param.requires_grad = False

        # Modify the last fully connected layer to match the desired output size
        self.model.fc = nn.Identity()

    def forward(self, x):
        return self.model(x)

### Define Captioning Model

In [None]:
class LSTMModel(nn.Module):
    def __init__(self,img_feat, vocab_size, word_feat_dim, embedding_vector=None, trainable=False):
        super().__init__()
        self.random_seed()

        #From Image
        self.img = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(img_feat,256),
            nn.GELU()
        )

        #From Sequence
        self.seq = nn.Sequential(
            nn.Embedding(vocab_size,word_feat_dim),
            nn.Dropout(0.5),
            nn.LSTM(word_feat_dim,256,batch_first=True)
        )

        if embedding_vector:
            if isinstance(embedding_vector,str):
                with open(embedding_vector,'rb') as f:
                    embedding_vector = pkl.load(f)
            embedding_vector = torch.tensor(embedding_vector,dtype=torch.float32)
            self.seq[0].weight = nn.Parameter(embedding_vector, requires_grad=trainable)
        # Attention Layer
#         self.attention = HardAttention()

        #Rest Part

        self.rest = nn.Sequential(
            nn.Linear(256, 256),
            nn.GELU(),
            nn.Linear(256, vocab_size),
#             nn.Softmax(dim=1)
        )

    def forward(self, image_features, sequence_input):
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        image_features = image_features.to(device)
        sequence_input = sequence_input.to(device)
        im_output = self.img(image_features)
        seq_output, _ = self.seq(sequence_input.to(torch.long))
        seq_output = seq_output[:, -1, :]
        added_output = im_output + seq_output
        final_output = self.rest(added_output)
        return final_output

    def random_seed(self):
        seed = 7777
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed(seed)


class GRUModel(nn.Module):
    def __init__(self,img_feat, vocab_size, word_feat_dim, embedding_vector=None, trainable=False):
        super().__init__()
        self.random_seed()

        #From Image
        self.img = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(img_feat,256),
            nn.GELU()
        )

        #From Sequence
        self.seq = nn.Sequential(
            nn.Embedding(vocab_size,word_feat_dim),
            nn.Dropout(0.5),
            nn.GRU(word_feat_dim,256,batch_first=True)
        )

        if embedding_vector:
            if isinstance(embedding_vector,str):
                with open(embedding_vector,'rb') as f:
                    embedding_vector = pkl.load(f)
            embedding_vector = torch.tensor(embedding_vector,dtype=torch.float32)
            self.seq[0].weight = nn.Parameter(embedding_vector, requires_grad=trainable)
        # Attention Layer
#         self.attention = HardAttention()

        #Rest Part

        self.rest = nn.Sequential(
            nn.Linear(256, 256),
            nn.GELU(),
            nn.Linear(256, vocab_size),
#             nn.Softmax(dim=1)
        )

    def forward(self, image_features, sequence_input):
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        image_features = image_features.to(device)
        sequence_input = sequence_input.to(device)
        im_output = self.img(image_features)
        seq_output, _ = self.seq(sequence_input.to(torch.long))
        seq_output = seq_output[:, -1, :]
        added_output = im_output + seq_output
        final_output = self.rest(added_output)
        return final_output

    def random_seed(self):
        seed = 7777
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed(seed)

### Define Captioning Model

In [None]:
class TrainModel(nn.Module):
    def __init__(self, model):
        super().__init__()
        self.model = model
    def __call__(self, dataloader,batch_size=64, epochs=64, patience=None, savepath=None):
        self.random_seed()
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        val_acc = []
        # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        # self.model.to(device)
        for epoch in range(epochs):
            train_loss = 0
            correct = 0
            total = 0
            for (image,seq), labels in tqdm(dataloader(mode='train',batch_size=batch_size), desc=f'Epoch {epoch + 1}/{epochs}: Training'):

                self.model.train()
                optimizer.zero_grad()
                outputs = self.model(image.to(device),seq.to(device))
                loss = criterion(outputs.to(device), labels.to(device))
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
                _, predicted = outputs.max(1)
#                 print(predicted.shape)
                total += labels.size(0)
                correct += predicted.eq(labels.to(device)).sum().item()
            train_accuracy = 100 * correct / total
            train_loss = train_loss / (total / batch_size)
            self.model.eval()
            val_loss = 0
            correct = 0
            total = 0
            with torch.no_grad():
                for (image,seq), labels in tqdm(dataloader(mode='val',batch_size=batch_size), desc=f'Epoch {epoch + 1}/{epochs}: Validation'):
                    outputs = self.model(image.to(device),seq.to(device))
                    loss = criterion(outputs.to(device), labels.to(device))
                    val_loss += loss.item()
                    _, predicted = outputs.max(1)
                    total += labels.size(0)
                    correct += predicted.eq(labels.to(device)).sum().item()
                val_accuracy = 100 * correct / total
                val_loss = val_loss / (total / batch_size)
            print(f'Epoch {epoch + 1}/{epochs}:')
            print(f'Train Loss: {train_loss:.4f} | Train Accuracy: {train_accuracy:.4f}%')
            print(f'Validation Loss: {val_loss:.4f} | Validation Accuracy: {val_accuracy:.4f}%')

            if val_accuracy>(max(val_acc) if len(val_acc) != 0 else 0) and savepath:
                filename = f'model-epoch${epoch+1}-train_loss${train_loss:.4f}-train_acc${train_accuracy:.4f}-val_loss${val_loss:.4f}-val_acc:{val_accuracy:.4f}.pth'
                TrainModel.clear_directory(savepath)
                torch.save(self.model.state_dict(), os.path.join(savepath,filename))
                print(f'Validation accuracy improved from {max(val_acc) if len(val_acc) != 0 else 0:.4f} to {val_accuracy:.4f}....')
            val_acc.append(val_accuracy)

            if TrainModel.patience_exceed(val_acc,patience):
                print(f'Early stopping after {patience} epochs with no improvement in validation accuracy. Maximum validation accuracy is {np.round(np.max(val_acc),4)} on epoch {np.argmax(val_acc)+1}')
                break

    def random_seed(self):
        seed = 7777
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed(seed)

    @staticmethod
    def patience_exceed(data,indexes):
        if indexes==None:
            return False
        if data[:-indexes]==[] or data[-indexes:]==[]:
            return False
        lower = max(data[:-indexes])
        upper = max(data[-indexes:])
        if upper>lower:
            return False
        else:
            return True

    @staticmethod
    def clear_directory(path):
        if os.path.exists(path):
            for datas in os.listdir(path):
                files = os.path.join(path,datas)
                if os.path.isfile(files):
                    os.remove(os.path.join(path,files))
                elif os.path.isdir(files):
                    shutil.rmtree(os.path.join(path,files))
                elif os.path.islink(files):
                    os.unlink(os.path.join(path,files))
        else:
            os.makedirs(path)

### Image Encoding

In [None]:
features = {}
model = encoder().to(device)
model.eval()
count = 1
for images in os.listdir(image_path):
    img = Image.open(os.path.join(image_path,images))
    x = convert(img).to(device)
    x = x.reshape(1,*x.shape)
    feat = model(x)
    # feat = x
    features[images] = feat.tolist()
    if count%100==0:
        print(f'{count} images are completed....')
    count+=1
with open(os.path.join(file_path,'features.pkl'),'wb') as f:
    pkl.dump(features,f)

### Dataset Preprocessing

In [None]:
def add_tokens(dct,s):
    count = max(dct.values())
    for i in s.split():
        if i not in dct:
            count += 1
            dct[i] = count
    return dct

def tokenizer(sentence,tokens):
    return [tokens[i] for i in sentence.split()]

In [None]:
with open(sent_path) as f:
    datasets = json.load(f)
tokens = {'<sseq>':1,'<eseq>':2}
captions = {}
tokenized_captions = {}
train_test_val = {}
for i in range(len(datasets['images'])):
    filename = datasets['images'][i]['filename']
    split = datasets['images'][i]['split']
    train_test_val[split] = train_test_val.get(split,[])+[filename]
    caption = []
    tokenized_caption = []
    for j in range(len(datasets['images'][i]['sentences'])):
        sentence = datasets['images'][i]['sentences'][j]
        tokens = add_tokens(tokens,sentence)
        tokenized_sentence = f'<sseq> {sentence} <eseq>'
        caption.append(tokenized_sentence)
        tokenized_caption.append(tokenizer(tokenized_sentence,tokens))
    captions[filename] = caption
    tokenized_captions[filename] = tokenized_caption
with open(os.path.join(file_path,'train_test_val.pkl'),'wb') as f:
    pkl.dump(train_test_val,f)
with open(os.path.join(file_path,'captions.pkl'),'wb') as f:
    pkl.dump(captions,f)
with open(os.path.join(file_path,'tokenized_captions.pkl'),'wb') as f:
    pkl.dump(tokenized_captions,f)
with open(os.path.join(file_path,'tokens.pkl'),'wb') as f:
    pkl.dump(tokens,f)

### Batch Processing

In [None]:
class batch_preparation:
    def __init__(self,tokens):
        self.tokens = tokens

    def __call__(self,max_len,vocab_size):
        input_token = self.pad_list(max_len)
        # output_token = self.categorical(vocab_size)
        output_token = self.only_position()
        return input_token,output_token

    def pad_list(self,desired_len):
        padded = []
        for idx in range(1,len(self.tokens)):
            lst = self.tokens[:idx]
            padding_length = desired_len - len(lst)
            if padding_length > 0:
                padded_list = [0] * padding_length + lst
                padded.append(padded_list)
            else:
                return padded.append(lst)
        return padded

    def categorical(self,vocab_size):
        cat = []
        for idx in range(1,len(self.tokens)):
            pos = self.tokens[idx]
            if pos < 0 or pos > vocab_size:
                raise ValueError(f"Invalid position value., pos = {pos}, vocab_size = {vocab_size}")
            else:
                cat.append([1 if i == pos else 0 for i in range(vocab_size+1)])
        return cat

    def only_position(self):
        cat = []
        for idx in range(1,len(self.tokens)):
            pos = self.tokens[idx]
            cat.append(pos)
        return cat

In [None]:
class DataLoader:
    def __init__(self,impath,cappath,splitpath):
        with open(impath,'rb') as f:
            self.images = pkl.load(f)
        with open(cappath,'rb') as f:
            self.captions = pkl.load(f)
        with open(splitpath,'rb') as f:
            self.split = pkl.load(f)

    def __call__(self,mode='train',batch_size=64,seed=7777,should_balance=True):
        u = self.batch_divide(mode)
        if seed!='no':
            if isinstance(seed,int):
                random.seed(seed)
            random.shuffle(u)
        if should_balance and len(u)%batch_size:
            updated_length = batch_size*math.ceil(len(u)/batch_size)
            diff = updated_length-len(u)
            u += random.sample(u,diff)
        for i in range(0,len(u),batch_size):
            temp = u[i:i+batch_size]
            x1 = torch.stack([[i][0][0][0] for i in temp])
            x2 = torch.stack([[i][0][0][1] for i in temp])
            y = torch.stack([[i][0][1] for i in temp])
            yield (x1,x2),y

    def batch_divide(self,mode='train'):
        if mode.lower() not in ['train','val']:
            raise Exception('Only train and val mode are allowed....')
        select = self.split[mode]
        x1 = []
        x2 = []
        y = []
        for imgs in select:
            imfeat = self.images[imgs]
            captions = self.captions[imgs]
            max_len,vocab_size = self.max_length()
            inputs = []
            outputs = []
            for caption in captions:
                batch = batch_preparation(caption)
                input_seq,output_vector = batch(max_len,vocab_size)
                inputs += input_seq
                outputs += output_vector
            x1 += (imfeat*len(inputs))
            x2 += inputs
            y += outputs
        return [((torch.tensor(x1[i],dtype=torch.float),torch.tensor(x2[i],dtype=torch.float)),torch.tensor(y[i],dtype=torch.long)) for i in range(len(y))]

    def max_length(self):
        captions = [j for i in self.captions.values() for j in i]
        max_len = 0
        vocab_size = 0
        for i in captions:
            if len(i)>max_len:
                max_len = len(i)
            if max(i)>vocab_size:
                vocab_size = max(i)
        return int(1.25*max_len),vocab_size

In [None]:
dataloader = DataLoader(os.path.join(file_path,'features.pkl'),os.path.join(file_path,'tokenized_captions.pkl'),os.path.join(file_path,'train_test_val.pkl'))

### Model Training

ResNet = 2048 <br>
VGGNet = 4096

In [None]:
model = LSTMModel(img_feat=2048,vocab_size=dataloader.max_length()[1],word_feat_dim=256).to(device)
trainer = TrainModel(model)
trainer(dataloader,savepath=model_path,patience=5)

### Prediction

In [None]:
class show_datetime:
    def __init__(self,time):
        self.time = round(time)
    def __call__(self,string=''):
        minutes,seconds = int(self.time//60),self.time%60
        if minutes==0:
            if seconds>0:
                return f'{string}{show_datetime.formatting("second",seconds)}\033[0m'
            else:
                return f'{string}0 second\033[0m'
        else:
            hours,minutes = int(minutes//60),int(minutes%60)
            if hours==0:
                return f'{string}{show_datetime.formatting("minute",minutes)}{show_datetime.formatting("second",seconds)}\033[0m'
            else:
                days,hours = int(hours//24),int(hours%24)
                if days==0:
                    return f'{string}{show_datetime.formatting("hour",hours)}{show_datetime.formatting("minute",minutes)}{show_datetime.formatting("second",seconds)}\033[0m'
                else:
                    weeks,days = int(days//7),int(days%7)
                    if weeks==0:
                        return f'{string}{show_datetime.formatting("day",days)}{show_datetime.formatting("hour",hours)}{show_datetime.formatting("minute",minutes)}{show_datetime.formatting("second",seconds)}\033[0m'
                    else:
                        raise Exception('Too big time, exceeding days....')
    @staticmethod
    def formatting(string,number):
        if number==0:
            return ''
        elif number==1:
            return f'{number} {string} '
        else:
            return f'{number} {string}s '

class Prediction_of_Image:

    def __init__(self, model, index, maxlen, img_feat, vocab_size, word_feat_dim,embedding_vector=None, trainable=False):
        self.model = LSTMModel(img_feat, vocab_size, word_feat_dim, embedding_vector=embedding_vector, trainable=trainable)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = self.model.to(device)
        self.model.load_state_dict(torch.load(model))
        self.model.eval()
#         trainable_params = 0
#         non_trainable_params = 0
#         for param in self.model.parameters():
#             if param.requires_grad:
#                 trainable_params += param.numel()
#             else:
#                 non_trainable_params += param.numel()
#         print(f"Trainable parameters: {trainable_params}")
#         print(f"Non-trainable parameters: {non_trainable_params}")
        self.index = index
        self.rev_index = {j:i for i,j in index.items()}
        self.maxlen = maxlen

    def __call__(self, photos):
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        if not isinstance(photos,torch.Tensor):
            photos = torch.tensor(photos,dtype=torch.float32).to(device)
        if len(photos.shape) != 2:
            photos = photos.reshape((1, -1))
        ind = self.index['<sseq>']
        seq =torch.tensor(ind,dtype=torch.long).reshape([1, -1])
        seq = seq.to(device)
        sentence = '<sseq>'
        with torch.no_grad():
            for i in range(self.maxlen-1):
                new_seq = F.pad(seq,(self.maxlen-seq.shape[1],0)).reshape(1,-1).to(device)
                pred = F.softmax(self.model(photos, new_seq).reshape(-1),dim=0)
                next_seq = pred.max(0).indices
                next_word = self.rev_index[next_seq.item()]
                sentence += f' {next_word}'
                seq = torch.cat((seq, next_seq.reshape(1, 1).to(device)), axis=1)
                if next_word == '<eseq>':
                    break
        sentence = ' '.join([i for i in sentence.split() if i not in ['<sseq>','<eseq>']])
        return sentence

class Prediction_of_Image_Beam_Search:

    def __init__(self, model, index, maxlen, img_feat, vocab_size, word_feat_dim,embedding_vector=None, trainable=False, beam_size=5, no_of_captions=None):
        self.model = LSTMModel(img_feat, vocab_size, word_feat_dim, embedding_vector=embedding_vector, trainable=trainable)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = self.model.to(device)
        self.model.load_state_dict(torch.load(model))
        self.model.eval()
#         trainable_params = 0
#         non_trainable_params = 0
#         for param in self.model.parameters():
#             if param.requires_grad:
#                 trainable_params += param.numel()
#             else:
#                 non_trainable_params += param.numel()
#         print(f"Trainable parameters: {trainable_params}")
#         print(f"Non-trainable parameters: {non_trainable_params}")
        self.index = index
        self.rev_index = {j:i for i,j in index.items()}
        self.maxlen = maxlen
        self.beam_size = beam_size
        self.no_of_captions = no_of_captions if no_of_captions else beam_size
    def __call__(self, photos):
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        if not isinstance(photos,torch.Tensor):
            photos = torch.tensor(photos,dtype=torch.float32).to(device)
        if len(photos.shape) != 2:
            photos = photos.reshape((1, -1))
        srtind = self.index['<sseq>']
        endind = self.index['<eseq>']
        srtindt = torch.tensor(srtind,dtype=torch.long).reshape(1, 1)
        beams = [[srtindt, 0.0]]
        with torch.no_grad():
            for i in range(self.maxlen-1):
                temp = []
                for seq, prob in beams:
                    new_seq = F.pad(seq, (self.maxlen - seq.shape[1],0)).reshape(1,-1).to(device)
                    preds = F.softmax(self.model(photos, new_seq).reshape(-1),dim=0)
                    words = (preds.argsort()[-self.beam_size:]).reshape(-1)
                    for word in words:
                        seq = seq.to(device)
                        word = word.reshape(1,1).to(device)
                        fseq = torch.cat([seq, word.to(torch.long)], axis=1)
                        idx = word.item()
                        fprob = prob + torch.log2(preds[idx]).item()
                        temp.append([fseq, fprob])
                beams = sorted(temp, reverse=True, key=lambda l: l[1])[0:self.no_of_captions]
        sentence = max(beams,key = lambda x:x[1])[0].detach().tolist()[0]
        tokensd = sentence
        if endind in sentence:
            sentence = sentence[1:sentence.index(endind)]
        else:
            sentence = sentence[1:]
        sentence = ' '.join([self.rev_index[idx] for idx in sentence if self.rev_index[idx] not in ('<sseq>','<eseq>')])
        return sentence

class Prediction_of_Image_Dataset:

    def __init__(self, model_path, index, maxlen, image_feature_dim, vocab_size, word_feature_dim, embedding_vector=None,mode='g', arch_path=None, cap_path=None, beam_size=5, no_of_captions=None,no_of_similarities=4):
        if mode.lower() in ['g','greedy']:
            self.pred = Prediction_of_Image(model_path, index=index, maxlen=maxlen, img_feat=image_feature_dim, vocab_size=vocab_size, word_feat_dim=word_feature_dim, embedding_vector=embedding_vector, )
        elif mode.lower() in ['b','beam']:
            self.pred = Prediction_of_Image_Beam_Search(model_path, index=index, maxlen=maxlen, img_feat=image_feature_dim, vocab_size=vocab_size, word_feat_dim=word_feature_dim, embedding_vector=embedding_vector, beam_size=beam_size, no_of_captions=no_of_captions)
        else:
            raise ValueError('Only greedy (g), and beam (b) as mode are allowed.')

    def __call__(self, images, save_path='./results/results.pkl', keep_prev=True, display_interval=100):
        import os
        import pickle as pkl
        import json
        count=0
        directory,filename = os.path.split(save_path)
        _,ext = os.path.splitext(filename)
        val_path = os.path.join(directory,'Values.pkl')
        if ext not in ['.pkl','.json']:
            raise TypeError('Only .pkl and .json as file extension is allowed to save captions.')
        if not(os.path.exists(directory)):
            os.makedirs(directory)
        if keep_prev and os.path.exists(save_path):
            if ext=='.pkl':
                with open(save_path,'rb') as f:
                    sentences = pkl.load(f)
            elif ext=='.json':
                with open(save_path) as f:
                    sentences = json.load(f)
        else:
            sentences = {}
        t1 = time.time()
        for image,feature in images.items():
            if image not in sentences:
                sentence = self.pred(feature)
                sentences[image] = sentence
            count+=1
            if count%display_interval==0 or count==len(images):
                print(f'\033[33m{count}th image completed....\033[0m')
            if count%50==0 or count==len(images):
                if ext=='.pkl':
                    with open(save_path,'wb') as f:
                        pkl.dump(sentences,f)
                elif ext=='.json':
                    with open(save_path,'w') as f:
                        json.dump(sentences,f)
        t2 = time.time()
        handle = show_datetime(t2-t1)
        string = '\033[34mProcess completed in: \033[1m'
        result = handle(string)
        print(result)

In [None]:
model_path = '/tf/DRDO/content/SYDNEY/models'
model = os.path.join(model_path,os.listdir(model_path)[0])
pred = Prediction_of_Image_Dataset(model_path=model, index=tokens, maxlen=dataloader.max_length()[0], image_feature_dim=2048, vocab_size=dataloader.max_length()[1], word_feature_dim=256,mode='g')
with open(os.path.join(file_path,'features.pkl'),'rb') as f:
    features = pkl.load(f)
with open(os.path.join(file_path,'train_test_val.pkl'),'rb') as f:
    ttvl = pkl.load(f)
ttvl_test = ttvl['test']
test_images = {img:feat for img,feat in features.items() if img in ttvl_test}
pred(test_images,os.path.join(pred_path,'predictions_greedy.pkl'),keep_prev=False)

In [None]:
model_path = '/tf/DRDO/content/SYDNEY/models'
model = os.path.join(model_path,os.listdir(model_path)[0])
pred = Prediction_of_Image_Dataset(model_path=model, index=tokens, maxlen=dataloader.max_length()[0], image_feature_dim=2048, vocab_size=dataloader.max_length()[1], word_feature_dim=256,mode='b')
with open(os.path.join(file_path,'features.pkl'),'rb') as f:
    features = pkl.load(f)
with open(os.path.join(file_path,'train_test_val.pkl'),'rb') as f:
    ttvl = pkl.load(f)
ttvl_test = ttvl['test']
test_images = {img:feat for img,feat in features.items() if img in ttvl_test}
pred(test_images,os.path.join(pred_path,'predictions_beam.pkl'),keep_prev=False)

### Model Evaluation

In [None]:
def compute_metrics(predictions,references):
    if isinstance(predictions,dict):
        pred_keys = list(predictions)
        predictions = list(predictions.values())
    if isinstance(references,dict):
        refs_keys = list(references)
        references = list(references.values())
    try:
        if pred_keys!=refs_keys:
            message = "Keys of predictions and references are either not the same or not in the same order. Unexpected results may be seen."
            warnings.warn(message)
    except:
        pass
    from scipy.stats import gmean
    rouge = evaluate.load("rouge")
    bleu = evaluate.load("bleu")
    meteor = evaluate.load("meteor")
    rouge_score = round(rouge.compute(predictions=predictions, references=references, rouge_types=["rougeL"])["rougeL"],6)
    meteor_score = round(meteor.compute(predictions=predictions, references=references)["meteor"],6)
    bleu_result = bleu.compute(predictions=predictions,references=references)
    brevity_penalty = bleu_result['brevity_penalty']
    bleu1_score = bleu_result['precisions'][0]*brevity_penalty
    bleu2_score = bleu_result['precisions'][1]*brevity_penalty
    bleu3_score = bleu_result['precisions'][2]*brevity_penalty
    bleu4_score = bleu_result['precisions'][3]*brevity_penalty
    bleu1_avg_score = round(gmean([bleu1_score]),6)
    bleu2_avg_score = round(gmean([bleu1_score,bleu2_score]),6)
    bleu3_avg_score = round(gmean([bleu1_score,bleu2_score,bleu3_score]),6)
    bleu4_avg_score = round(gmean([bleu1_score,bleu2_score,bleu3_score,bleu4_score]),6)
    return {'BLEU-1':bleu1_avg_score,'BLEU-2':bleu2_avg_score,'BLEU-3':bleu3_avg_score,'BLEU-4':bleu4_avg_score,'METEOR':meteor_score,'ROUGE-L':rouge_score}

def filter_sentence(sentence,filter_by=['<sseq>','<eseq>']):
    splits = [i for i in sentence.split() if i not in filter_by]
    filtered_sentence = ' '.join(splits)
    return filtered_sentence

In [None]:
with open(os.path.join(file_path,'captions.pkl'),'rb') as f:
    captions = pkl.load(f)
ttvl_test = ttvl['test']
test_caps = {img:caps for img,caps in captions.items() if img in ttvl_test}
with open(os.path.join(pred_path,'predictions_beam.pkl'),'rb') as f:
    pred_caps = pkl.load(f)
keys = test_caps.keys()
references = [[filter_sentence(j) for j in test_caps[i]] for i in keys]
predictions = [filter_sentence(pred_caps[i]) for  i in keys]
result = compute_metrics(predictions,references)
print(result)

### Examples

In [None]:
immodel = encoder().to(device)
immodel.eval();

In [None]:
img = random.choice(ttvl_test)
impath = os.path.join(image_path,img)
image = Image.open(impath)
image.show()
tensored_image = convert(image).to(device)
tensored_image = tensored_image.reshape(1,*tensored_image.shape)
feat = immodel(tensored_image)
model_path = '/tf/DRDO/content/SYDNEY/models'
model = os.path.join(model_path,os.listdir(model_path)[0])

pred = Prediction_of_Image(model=model, index=tokens, maxlen=dataloader.max_length()[0], img_feat=2048, vocab_size=dataloader.max_length()[1], word_feat_dim=256)
caps = pred(feat)
print('\033[1mGreedy Search: \033[0m',caps)

pred = Prediction_of_Image_Beam_Search(model=model, index=tokens, maxlen=dataloader.max_length()[0], img_feat=2048, vocab_size=dataloader.max_length()[1], word_feat_dim=256)
caps = pred(feat)
print('\033[1mBeam Search: \033[0m',caps)