In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import time
from skimage import io, transform
import pickle
import matplotlib.pyplot as plt # for plotting
import numpy as np
import torchvision.models as models
from torch.nn.utils.rnn import pad_sequence

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount = True)
# !unzip "/content/drive/MyDrive/COL774/test_data.zip" -d "/content/drive/MyDrive/COL774"
# !unzip "/content/drive/MyDrive/COL774/train_data.zip" -d "/content/drive/MyDrive/COL774"

Mounted at /content/drive


In [None]:
class Rescale(object):
    """Rescale the image in a sample to a given size.

    Args:
        output_size (tuple or int): Desired output size. If tuple, output is
            matched to output_size. If int, smaller of image edges is matched
            to output_size keeping aspect ratio the same.
    """

    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size

    def __call__(self, image):
        h, w = image.shape[:2]
        if isinstance(self.output_size, int):
            if h > w:
                new_h, new_w = self.output_size * h / w, self.output_size
            else:
                new_h, new_w = self.output_size, self.output_size * w / h
        else:
            new_h, new_w = self.output_size
        new_h, new_w = int(new_h), int(new_w)
        img = transform.resize(image, (new_h, new_w))
        return img


class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""

    def __call__(self, image):
        image = image.transpose((2, 0, 1))
        return image


IMAGE_RESIZE = (224, 224)
img_transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.485, 0.456, 0.406), 
                             (0.229, 0.224, 0.225)) ])

In [None]:
class CaptionsPreprocessing:
    """Preprocess the captions, generate vocabulary and convert words to tensor tokens

    Args:
        captions_file_path (string): captions tsv file path
    """
    def __init__(self, captions_file_path):
        self.captions_file_path = captions_file_path

        self.raw_captions_dict = self.read_raw_captions()

        self.captions_dict = self.process_captions()

        self.vocab = self.generate_vocabulary()
        self.word_to_ix = {word: i for i, word in enumerate(self.vocab)}
        self.ix_to_word = {i : word for i, word in enumerate(self.vocab)}

    def read_raw_captions(self):
        """
        Returns:
            Dictionary with raw captions list keyed by image ids (integers)
        """

        captions_dict = {}
        count = 0
        with open(self.captions_file_path, 'r', encoding='utf-8') as f:
            for img_caption_line in f.readlines():
                img_captions = img_caption_line.strip().split('\t')
                img_captions[0] = 'drive/MyDrive/COL774/' + img_captions[0]
                captions_dict[img_captions[0]] = img_captions[1]

        return captions_dict

    def process_captions(self):
        """
        Use this function to generate dictionary and other preprocessing on captions
        """
        raw_captions_dict = self.raw_captions_dict

        # Do the preprocessing here
        for caption in raw_captions_dict:
          raw_captions_dict[caption] = '<start> ' + raw_captions_dict[caption] + ' <end>'

        captions_dict = raw_captions_dict
        return captions_dict

    def generate_vocabulary(self):
        """
        Use this function to generate dictionary and other preprocessing on captions
        """
        captions_dict = self.captions_dict

        # Generate the vocabulary
        vocab = set()
        for i in captions_dict:
          tokens = captions_dict[i].split()
          for token in tokens:
            vocab.add(token)

        return vocab


    def captions_transform(self, img_caption_list):
        """
        Use this function to generate tensor tokens for the text captions
        Args:
            img_caption_list: List of captions for a particular image
        """
        vocab = self.vocab
        img_caption_token = torch.tensor([self.word_to_ix[w] for w in img_caption_list.split()]).to(torch.int64)
        return img_caption_token
      
CAPTIONS_FILE_PATH = '/content/drive/MyDrive/COL774/Train_text.tsv'
captions_preprocessing_obj = CaptionsPreprocessing(CAPTIONS_FILE_PATH)

In [None]:
class ImageCaptionsDataset(Dataset):

    def __init__(self, img_dir, captions_dict, img_transform=None, captions_transform=None):
        """
        Args:
            img_dir (string): Directory with all the images.
            captions_dict: Dictionary with captions list keyed by image paths (strings)
            img_transform (callable, optional): Optional transform to be applied
                on the image sample.
            captions_transform: (callable, optional): Optional transform to be applied
                on the caption sample (list).
        """
        self.img_dir = img_dir
        self.captions_dict = captions_dict
        self.img_transform = img_transform
        self.captions_transform = captions_transform
        self.image_ids = list(captions_dict.keys())


    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        img_name = self.image_ids[idx]
        image = io.imread(img_name)
        captions = self.captions_dict[img_name]
        image = Image.fromarray(np.uint8(image))
        # print(img_name)
        if self.img_transform:
            trnsform = transforms.Resize(IMAGE_RESIZE)
            image = trnsform(image)
            image = self.img_transform(image)

        if self.captions_transform:
            captions = self.captions_transform(captions)
      
        return image, captions

In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, embed_size, train_CNN=False):
        super(EncoderCNN, self).__init__()
        self.embed_size = embed_size
        self.train_CNN = train_CNN
        VGG16 = [64, 64, "M", 128, 128, "M", 256, 256, 256, "M", 512, 512, 512, "M", 512, 512, 512, "M" ]
        layers = []
        in_ch = 3

        for x in VGG16:
          if x == "M":
            layer = nn.MaxPool2d(kernel_size = 2, stride = 2)
            layers.append(layer)
          else:
            out_ch = x
            layer = [nn.Conv2d(in_ch, out_ch, kernel_size = (3, 3), padding = (1, 1)), nn.ReLU(inplace = True)]
            layers += layer
            in_ch = x

        self.model = nn.Sequential(*layers)

        self.full_connected = nn.Sequential(                  
           nn.Linear(in_features=512*7*7, out_features=4096),
           nn.ReLU(inplace=True),
           nn.Dropout(p=0.5, inplace=False),
           nn.Linear(in_features=4096, out_features=4096),
           nn.ReLU(inplace=True),
           nn.Linear(in_features=4096, out_features=self.embed_size),
           nn.ReLU(inplace=True),
          )

    def forward(self, images):
        features = self.model(images)
        features = features.view(features.shape[0],-1)
        features = self.full_connected(features)
        return features

class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.5)

    def forward(self, features, captions):
        embeddings = self.dropout(self.embed(captions))
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim=0)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)
        return outputs

class ImageCaptionsNet(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(ImageCaptionsNet, self).__init__()
        self.encoderCNN = EncoderCNN(embed_size)
        self.decoderRNN = DecoderRNN(embed_size, hidden_size, vocab_size, num_layers)

    def forward(self, images, captions):
        features = self.encoderCNN(images)
        outputs = self.decoderRNN(features, captions)
        return outputs

    def caption_image(self, image, vocabulary, max_length=50):
        result_caption = []
        with torch.no_grad():
            x = self.encoderCNN(image).unsqueeze(0)
            states = None
            tok_ind, ind_tok = vocabulary
            for i in range(max_length):
                hiddens, states = self.decoderRNN.lstm(x, states)
                output = self.decoderRNN.linear(hiddens.squeeze(0))
                predicted = output.argmax(1)
                result_caption.append(predicted.item())
                x = self.decoderRNN.embed(predicted).unsqueeze(0)
        return [ind_tok[idx] for idx in result_caption]

    def caption_image_bm(self, image, vocabulary, max_length=50, beam_width = 3):
        result_caption = []
        temp_seq = []
        with torch.no_grad():
            x = self.encoderCNN(image).unsqueeze(0)
            states = None
            tok_ind, ind_tok = vocabulary
            M = nn.Softmax(dim=1)
            
            hiddens, states = self.decoderRNN.lstm(x, states)
            output = self.decoderRNN.linear(hiddens.squeeze(0))
            prob, pred_ids = output.topk(beam_width)
            output = M(output)
            for i in range(pred_ids.shape[1]):
              new_token = pred_ids[0, i].item()
              new_prob = output[0, pred_ids[0, i].item()].item()
              new_state = states
              new_x = self.decoderRNN.embed(pred_ids[0, i].unsqueeze(0)).unsqueeze(0)
              temp_seq.append([[new_token], new_prob, new_state, new_x])

            for _ in range(max_length):
              for j in range(len(temp_seq)):
                hiddens, states = self.decoderRNN.lstm(temp_seq[j][3], temp_seq[j][2])
                output = self.decoderRNN.linear(hiddens.squeeze(0))
                prob, pred_ids = output.topk(beam_width)
                output = M(output)
                for i in range(pred_ids.shape[1]):
                  new_token = pred_ids[0, i].item()
                  new_prob = output[0, pred_ids[0, i].item()].item()
                  new_state = states
                  new_x = self.decoderRNN.embed(pred_ids[0, i].unsqueeze(0)).unsqueeze(0)
    
                  t = temp_seq[j][0]
                  t.append(new_token)
                  temp_seq.append([t, temp_seq[j][1] + new_prob, new_state, new_x])
              temp_seq = temp_seq[beam_width:]  
              temp_seq = sorted(temp_seq, reverse=True, key=lambda l: l[1])
              temp_seq = temp_seq[:beam_width]

        sol = [ind_tok[idx] for idx in temp_seq[0][0]]
        return sol[:max_length]



In [None]:
import os
from PIL import Image
from torch.nn.utils.rnn import pack_padded_sequence

def collate_fn(batch):
  imgs = [item[0].unsqueeze(0) for item in batch]
  imgs = torch.cat(imgs, dim = 0)
  targets = [item[1] for item in batch]
  targets = pad_sequence(targets, batch_first = False)
  return imgs, targets

IMAGE_DIR = '/content/drive/MyDrive/COL774/train_data'

train_dataset = ImageCaptionsDataset(
    IMAGE_DIR, captions_preprocessing_obj.captions_dict, img_transform=img_transform,
    captions_transform=captions_preprocessing_obj.captions_transform
)

torch.backends.cudnn.benchmark = True
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_CNN = False

embed_size = 256
hidden_size = 256
vocab_dict = captions_preprocessing_obj.vocab
vocab_size = len(vocab_dict)+ 1
num_layers = 1
learning_rate = 0.1
num_epochs = 15
step = 0
model = ImageCaptionsNet(embed_size, hidden_size, vocab_size, num_layers).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

model.train()

directory = '/content/drive/My Drive/COL774/'
path = "model_nonComp_new.pth"
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=8, collate_fn=collate_fn)
for epoch in range(num_epochs):
    torch.save(model.state_dict(), directory + path)
    for batch_idx, (imgs, captions) in enumerate(train_loader):
        imgs = imgs.to(device)
        captions = captions.to(device)

        outputs = model(imgs, captions[:-1])
        loss = criterion(
            outputs.reshape(-1, outputs.shape[2]), captions.reshape(-1)
        )

        step += 1

        optimizer.zero_grad()
        loss.backward(loss)
        optimizer.step()
        print(batch_idx, loss.item())
  

torch.save(model.state_dict(), directory + path)


In [None]:
img_ids = [i for i in range(1, 5001)]
directory = '/content/drive/My Drive/COL774/'
path = "model_nonComp.pth"
class ImageTestDataset(Dataset):
    def __init__(self, img_dir, img_ids, img_transform=None):
        self.img_dir = img_dir
        self.img_transform = img_transform
        self.image_ids = img_ids

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        img_name = 'drive/MyDrive/COL774/test_data/test{}.jpg'.format(idx+1) 
        print(img_name)
        image = io.imread(img_name)
        image = Image.fromarray(np.uint8(image))
        if self.img_transform:
            trnsform = transforms.Resize(IMAGE_RESIZE)
            image = trnsform(image)
            image = self.img_transform(image)
        return image

cap_model = ImageCaptionsNet(embed_size, hidden_size, vocab_size, num_layers)
cap_model.load_state_dict(torch.load(directory + path))
cap_model.to(device)

Test_dir = '/content/drive/MyDrive/COL774/test_data'

img_transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.485, 0.456, 0.406), 
                             (0.229, 0.224, 0.225)) ])

private_test_dataset = ImageTestDataset(
    Test_dir, img_ids, img_transform=img_transform,
    )

private_test_loader = DataLoader(private_test_dataset, batch_size=1, shuffle= False, num_workers=0)

count = 0

def write_file(batchid,prediction,output_file):
    start_tok = "<start>"
    end_tok = "<end>"
    tokenless = list(filter((start_tok).__ne__, prediction)) 
    tokenless = list(filter((end_tok).__ne__, tokenless)) 
    caption = ' '.join(tokenless)
    img_id = "test_data/test{0}.jpg".format(batchid+1)
    line = [img_id,caption]
    output_file.write('\t'.join(line)+'\n')

##### generate file to print output #####
!touch "/content/drive/MyDrive/COL774/my_file_nonComp.csv"

##### open file to write #####
outfile = "/content/drive/MyDrive/COL774/my_file_nonComp.csv"
output_file = open(outfile,'w') 

vocabulary = (captions_preprocessing_obj.word_to_ix, captions_preprocessing_obj.ix_to_word)
for batchid, sample in enumerate(private_test_loader):
  sample = sample.to(device)
  prediction = cap_model.caption_image_bm(sample, vocabulary, 8)
  # write_file(batchid,prediction,output_file)
  print(prediction)
  count += 1
  if count >= 10:
    break

output_file.close()
