In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torch.optim as optim
import os
import csv
import json

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
feature_size=1024
hidden_size=feature_size
learning_rate=3e-4

In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, embed_size = 1024,train_CNN=False):
      super(EncoderCNN, self).__init__()
      self.train_CNN=False
      # get the pretrained densenet model
      self.densenet = models.densenet121(pretrained=True)
      # replace the classifier with a fully connected embedding layer
      self.densenet.classifier = nn.Linear(in_features=1024, out_features=1024)
      # add another fully connected layer
      self.embed = nn.Linear(in_features=1024, out_features=embed_size)
      # dropout layer
      self.dropout = nn.Dropout(p=0.5)
      # activation layers
      self.prelu = nn.PReLU()

    def forward(self, images):
        # get the embeddings from the densenet
        densenet_outputs = self.dropout(self.prelu(self.densenet(images)))

        # pass through the fully connected
        embeddings = self.embed(densenet_outputs)
        return embeddings

In [None]:
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderRNN, self).__init__()
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.lstm_cell = nn.LSTMCell(embed_size, hidden_size,num_layers)
        self.fc_out = nn.Linear(in_features=self.hidden_size, out_features=self.vocab_size)
        self.embed = nn.Embedding(num_embeddings=self.vocab_size, embedding_dim=self.embed_size)
        self.dropout=nn.Dropout(0.5)
        # activations
        self.softmax = nn.Softmax(dim=1)

    def forward(self, features, captions):

        # batch size
        batch_size = features.size(0)

        # init the hidden and cell states to zeros
        hidden_state = torch.zeros((batch_size, self.hidden_size))
        cell_state = torch.zeros((batch_size, self.hidden_size))

        # define the output tensor placeholder
        outputs = torch.empty((batch_size, captions.shape[1], self.vocab_size))

        # embed the captions
        captions_embed = self.dropout(self.embed(captions))
        # print("Captions_embed size :- ",captions_embed.shape)
        # pass the caption word by word
        for t in range(captions.size(1)):

            # for the first time step the input is the feature vector
            if t == 0:
                hidden_state, cell_state = self.lstm_cell(features, (hidden_state, cell_state))

            # for the 2nd+ time step, using teacher forcer
            else:
                # print("hidden_state size :- ",captions_embed[:, t, :].shape)
                hidden_state, cell_state = self.lstm_cell(captions_embed[:, t, :], (hidden_state, cell_state))

            # output of the attention mechanism
            out = self.fc_out(hidden_state)

            # build the output tensor
            outputs[:, t, :] = out


        return outputs



In [None]:
# dec_model=DecoderRNN(embed_size,hidden_size,vocab_size)
# captions=caption.split()
# indexes=[i for i in range(len(captions))]
# vocab_dct={idx: word for idx,word in zip(indexes,captions)}
# my_tensor = torch.tensor(indexes)
# my_tensor = my_tensor.unsqueeze(0)
# # indexes.size
# my_tensor.shape

In [None]:
class TrainData(Dataset):
    def __init__(self, img_path, caption_path):
        # self.img_folder=image_path
        file_list = os.listdir(img_path)
        file_list = sorted(file_list, key=lambda x: int(x[6:-4]))
        self.image_path = []
        for file in file_list:
          path=os.path.join(img_path,file)
          self.image_path.append(path)

        self.captions=[]
        with open(caption_path, mode='r') as file:
          reader = csv.reader(file)
          for row in reader:
            self.captions.append(row)
        self.captions=self.captions[1:]
        self.captions=[arr[2] for arr in self.captions]
        # self.captions=self.captions[:10]

        self.processor = transforms.Compose([
          transforms.Resize((224, 224)),  # Resize image to match model input size
          transforms.ToTensor(),           # Convert image to tensor
          transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize image
        ])

        words = [word for string in self.captions for word in string.split()]
        unique_words = list(set(words))
        unique_words.sort()
        unique_words.append('<START>')
        unique_words.append('<EOS>')
        self.vocabulary=unique_words.copy()
        self.vocabulary={i:word for i,word in enumerate(self.vocabulary)}
        with open('/content/drive/MyDrive/Colab Notebooks/Data/IC_dataset/vocabulary.json', 'w') as json_file:
          json.dump(self.vocabulary, json_file)
        self.encoder=EncoderCNN()
        # self.vocabulary

    def __len__(self):
      return len(self.captions)

    def __getitem__(self, idx):
        img_name=self.image_path[idx]
        input_image = Image.open(img_name)
        if input_image.mode != 'RGB':
          input_image = input_image.convert('RGB')
        input_tensor = self.processor(input_image)
        input_batch = input_tensor.unsqueeze(0)
        with torch.no_grad():
          output = self.encoder.forward(input_batch)

        captions=self.captions[idx].split()

        indexes={word:i for i,word in self.vocabulary.items()}
        # print(indexes)
        start_token=indexes['<START>']
        caption_idx=[]
        caption_idx.append(start_token)
        word_idx_map=[indexes[word] for word in captions]
        caption_idx=caption_idx+word_idx_map
        caption_idx.append(indexes['<EOS>'])
        token = torch.tensor(caption_idx)
        token = token.unsqueeze(0)

        return {
            'feature':output,
            'caption':token
        }

In [None]:
caption_path="/content/drive/MyDrive/Colab Notebooks/Data/IC_dataset/train.csv"
img_path=r"/content/drive/MyDrive/Colab Notebooks/Data/IC_dataset/train"
train_data=TrainData(img_path,caption_path)
voc=train_data.vocabulary
# data=train_data.__getitem__(0)
# data['caption']
# voc
data=train_data.__getitem__(87)
# data_dict={}
# for i,batch in enumerate(train_data):
#   print(i ,"  shapes :- ",batch['feature'][0].shape," ",batch['caption'][0].shape)
#   output_list = batch['feature'][0].tolist()
#   token_list = batch['caption'][0].tolist()
#   # Create a dictionary
#   data = {
#     'feature': output_list,
#     'caption': token_list
#   }
#   data_dict[i]=data
#   # Save the dictionary to a JSON file
#   if i!=0 and i%1000==0:
#     with open(f'/content/drive/MyDrive/Colab Notebooks/Data/IC_dataset/train_data{}.json', 'w') as json_file:
#       json.dump(data_dict, json_file)
#     break

Downloading: "https://download.pytorch.org/models/densenet121-a639ec97.pth" to /root/.cache/torch/hub/checkpoints/densenet121-a639ec97.pth
100%|██████████| 30.8M/30.8M [00:00<00:00, 138MB/s]


In [None]:
train_dataloader = DataLoader(train_data, batch_size = 1, shuffle = False)
data_dict={}
j=0
for i,batch in enumerate(train_dataloader):
  print(i ,"  shapes :- ",batch['feature'][0].shape," ",batch['caption'][0].shape)
  output_list = batch['feature'][0].tolist()
  token_list = batch['caption'][0].tolist()
  # Create a dictionary
  data = {
    'feature': output_list,
    'caption': token_list
  }
  data_dict[i]=data
  # Save the dictionary to a JSON file
  if i!=0 and i%1000==0:
    j=j+1
    with open(f'/content/drive/MyDrive/Colab Notebooks/Data/IC_dataset/train_data_{i/1000}.json', 'w') as json_file:
      json.dump(data_dict, json_file)
    data_dict={}
with open(f'/content/drive/MyDrive/Colab Notebooks/Data/IC_dataset/train_data_{j+1}.json', 'w') as json_file:
  json.dump(data_dict, json_file)
# data=train_data.__getitem__(0)
# print(train_data.__getitem__(0)['feature'].shape[1])



In [None]:
import json
merged_data = {}
# Loop through each JSON file
for i in range(1, 7):
    with open(f'/content/drive/MyDrive/Colab Notebooks/Data/IC_dataset/train_data_{i}.json', 'r') as file:
        data = json.load(file)  # Load JSON content into a dictionary
        merged_data.update(data)  # Merge the dictionaries
print(len(merged_data.keys()))
# Write the merged data into a new JSON file
with open('/content/drive/MyDrive/Colab Notebooks/Data/IC_dataset/train_data.json', 'w') as outfile:
    json.dump(merged_data, outfile, indent=4)  # Write merged data to the file with indentation

In [None]:
######################  TRAIN LOOP  #########################
model=DecoderRNN(feature_size,feature_size,len(train_data.vocabulary))
optimizer=optim.Adam(model.parameters(),lr=learning_rate)
criterion=nn.CrossEntropyLoss()
losses = list()
for batch in train_dataloader:
    print("shapes :- ",batch['feature'][0].shape," ",batch['caption'][0].shape)
    output = model.forward(batch['feature'][0],batch['caption'][0])
    loss = criterion(output.view(-1, len(train_data.vocabulary)), batch['caption'][0].contiguous().view(-1))
    losses.append(loss)
    # optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
losses

In [None]:
##################################### TEST LOOP  #########################
vocabulary=train_data.vocabulary
processor = transforms.Compose([
          transforms.Resize((224, 224)),  # Resize image to match model input size
          transforms.ToTensor(),           # Convert image to tensor
          transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize image
        ])
test_img_path=r"/content/drive/MyDrive/Colab Notebooks/Data/images/train_1.jpg"
input_image = Image.open(test_img_path)
encoder=EncoderCNN()
input_tensor = processor(input_image)
input_batch = input_tensor.unsqueeze(0)
with torch.no_grad():
    features = encoder.forward(input_batch)
features.shape

In [None]:
vocabulary={i:word for i,word in enumerate(vocabulary)}
# model.predict(features,10, vocabulary)
states=None
hiddens=None
# features.shape
max_words=10
embed_hidst=features.clone()
output=[]
for _ in range(max_words):
  if hiddens==None:
    hiddens, states = model.lstm_cell(features, states)
  else:
    hiddens, states = model.lstm_cell(embed_hidst, (hiddens,states))
  output = model.fc_out(hiddens.unsqueeze(0))
  max_index = torch.argmax(output)
  print(max_index)
  output.append(max_index)
  max_index=max_index.unsqueeze (0)
  embed_hidst=model.embed(max_index)

In [None]:
# output
hiddens, states = model.lstm_cell(embed_hidst, (hiddens,states))
output = model.fc_out(hiddens.unsqueeze (0))
max_index = torch.argmax(output)
max_index

In [None]:
def caption_image(self, image, vocabulary, max_length=50): result_caption = []
with torch.no_grad():
x = self.encoderCNN(image).unsqueeze (0)
states = None
for in range(max_length):
hiddens, states = self.decoderRNN.lstm(x, states)
output = self.decoderRNN.linear(hiddens.unsqueeze (0)) predicted = output.argmax(1)
result_caption.append(predicted.item()) x = self.decoderRNN.embed(predicted).unsqueeze (0)
if vocabulary.itos [predicted.item()] = "<EOS>":
break
return [vocabulary.itos [idx] for idx in result_caption]