In [1]:
import torch
import torch.nn as nn
import json

# read json for dictionary mapping
# open from json file
with open('char2idx.json', 'r', encoding = 'UTF-8') as json_file:
    char2idx = json.load(json_file)

# open from json file
with open('img2idx.json', 'r', encoding = 'UTF-8') as json_file:
    img2idx = json.load(json_file)

idx2char = {value:key for key, value in char2idx.items()}
idx2img = {value:key for key, value in img2idx.items()}


class MemeGeneratorCNN(nn.Module):
    def __init__(self):
        super(MemeGeneratorCNN, self).__init__()
        self.embedding_dim = 16
        self.img_embedding = 8
        self.num_classes = len(char2idx)
        
        # Embedding Layer for Images
        self.embedding_img = nn.Embedding(len(img2idx), self.img_embedding)
        # Embedding Layer for character embeddings
        self.embedding_layer = nn.Embedding(len(char2idx), self.embedding_dim, padding_idx = char2idx['<pad>'])
        
        # project to embedding dim
        self.project_down = nn.Linear(self.img_embedding + self.embedding_dim, self.embedding_dim)

        # convolution block
        self.conv1 = nn.Conv1d(in_channels = 16, out_channels = 1024, kernel_size = 5, padding=2)
        self.relu1 = nn.ReLU()
        self.batchnorm1 = nn.BatchNorm1d(1024)
        self.maxpool1 = nn.MaxPool1d(kernel_size=2)
        self.dropout1 = nn.Dropout(p=0.25)

        # convolution block
        self.conv2 = nn.Conv1d(in_channels = 1024, out_channels= 1024, kernel_size=5, padding=2)
        self.relu2 = nn.ReLU()
        self.batchnorm2 = nn.BatchNorm1d(1024)
        self.maxpool2 = nn.MaxPool1d(kernel_size=2)  # default value of stride = kernel_size
        self.dropout2 = nn.Dropout(p=0.25)

        # convolution block
        self.conv3 = nn.Conv1d(in_channels = 1024, out_channels = 1024, kernel_size = 5, padding=2)
        self.relu3 = nn.ReLU()
        self.batchnorm3 = nn.BatchNorm1d(1024)
        self.maxpool3 = nn.MaxPool1d(kernel_size=2)
        self.dropout3 = nn.Dropout(p=0.25)
        
        # convolution block
        self.conv4 = nn.Conv1d(in_channels = 1024, out_channels = 1024, kernel_size = 5, padding=2)
        self.relu4 = nn.ReLU()
        self.batchnorm4 = nn.BatchNorm1d(1024)
        self.maxpool4 = nn.MaxPool1d(kernel_size=2)
        self.dropout4 = nn.Dropout(p=0.25)

        # convolution block
        self.conv5 = nn.Conv1d(in_channels = 1024, out_channels = 1024, kernel_size = 5, padding=2)
        self.relu5 = nn.ReLU()
        self.batchnorm5 = nn.BatchNorm1d(1024)

        # final layers
        self.dropout5 = nn.Dropout(p=0.25)
        self.Linear1 = nn.Linear(1024, 1024)
        self.LinearRelu1 = nn.ReLU()
        self.batchnorm6 = nn.BatchNorm1d(1024)
        self.dropout6 = nn.Dropout(p=0.25)
        self.fc = nn.Linear(1024, self.num_classes)


    def forward(self, input_img, x):
        # input_img (batch_size)
        # x is the decoder input (batch_size, 128) where 128 is seqlen

        # input_img (batch_size, 1)
        input_img = torch.unsqueeze(input_img, dim=1)

        # need to repeat for concat (batch_size, 128) 
        input_img = input_img.repeat(1,128)
        # image embeddings (batch_size, 128, img_embedding_dim)
        img_out = self.embedding_img(input_img)

        # embedding shape (batch_size, 128, embedding_dim)
        text_out = self.embedding_layer(x)
        
        # concatenate between image and caption embeddings
        # (batch_size, 128, text_emb + img_emb)
        cat = torch.cat((img_out, text_out), dim=2)

        # project down to (batch_size, 128, 16)
        embedding_out = self.project_down(cat)

        # need to permute in order to match convnets
        # embedding shape (batch_size, embedding_dim, 128) -> (batch_size, 16, 128)
        embedding_out = embedding_out.permute(0, 2, 1)

        # apply convolution (batch_size, out_channels, 128) -> (batch_size, 1024, 128)
        conv1_out = self.conv1(embedding_out)
        conv1_out = self.relu1(conv1_out)
        # apply batchnorm -> (batch_size, 1024, 128)
        batchnorm1_out = self.batchnorm1(conv1_out)
        # apply maxpooling1 -> (batch_size, 1024, 64) kernel is 2 here
        maxpool1_out = self.maxpool1(batchnorm1_out)
        # apply dropout 1 -> (batch_size, 1024, 64)
        dropout1_out = self.dropout1(maxpool1_out)

        # apply convolution (batch_size, 1024, out_channels) -> (batch_size, 1024, 64)
        conv2_out = self.conv2(dropout1_out)
        conv2_out = self.relu2(conv2_out)
        # apply batchnorm -> (batch_size, 1024, 64)
        batchnorm2_out = self.batchnorm2(conv2_out)
        # apply maxpooling2 -> (batch_size, 1024, 32) kernel is 2 here
        maxpool2_out = self.maxpool2(batchnorm2_out)
        # apply dropout 1 -> (batch_size, 1024, 32)
        dropout2_out = self.dropout2(maxpool2_out)

        # apply convolution (batch_size, 1024, out_channels) -> (batch_size, 1024, 32)
        conv3_out = self.conv3(dropout2_out)
        conv3_out = self.relu3(conv3_out)
        # apply batchnorm -> (batch_size, 1024, 32)
        batchnorm3_out = self.batchnorm3(conv3_out)
        # apply maxpooling3 -> (batch_size, 1024, 16) kernel is 2 here
        maxpool3_out = self.maxpool3(batchnorm3_out)
        # apply dropout 1 -> (batch_size, 1024, 16)
        dropout3_out = self.dropout3(maxpool3_out)

        # apply convolution (batch_size, 1024, out_channels) -> (batch_size, 1024, 32)
        conv4_out = self.conv4(dropout3_out)
        conv4_out = self.relu4(conv4_out)
        # apply batchnorm -> (batch_size, 1024, 16)
        batchnorm4_out = self.batchnorm4(conv4_out)
        # apply maxpooling4 -> (batch_size, 1024, 8) kernel is 2 here
        maxpool4_out = self.maxpool4(batchnorm4_out)
        # apply dropout 1 -> (batch_size, 1024, 8)
        dropout4_out = self.dropout4(maxpool4_out)

        # apply convolution (batch_size, 1024, out_channels) -> (batch_size, 1024, 8)
        conv5_out = self.conv5(dropout4_out)
        conv5_out = self.relu5(conv5_out)
        # apply batchnorm -> (batch_size, 1024, 8)
        batchnorm5_out = self.batchnorm5(conv5_out)

        # Global MaxPooling1d shape (batch_size, 1024)
        # this takes maximum among all channels
        gmaxpool_out = torch.max(batchnorm5_out, dim=2)
        gmaxpool_out = gmaxpool_out.values
        gmaxpool_out = self.dropout5(gmaxpool_out)

        # apply dense layer (batch_size, 1024)
        linear_out = self.Linear1(gmaxpool_out)
        linear_out = self.LinearRelu1(linear_out)
        batchnorm6_out = self.batchnorm6(linear_out)
        dropout6_out = self.dropout6(batchnorm6_out)

        final = self.fc(dropout6_out)

        return final

In [2]:
import torch
import torch.nn as nn
import json
import numpy as np

# read json for dictionary mapping
# open from json file
with open('char2idx_m2.json', 'r', encoding = 'UTF-8') as json_file:
    char2idx = json.load(json_file)

# open from json file
with open('img2idx.json', 'r', encoding = 'UTF-8') as json_file:
    img2idx = json.load(json_file)

idx2char = {value:key for key, value in char2idx.items()}
idx2img = {value:key for key, value in img2idx.items()}



class MemeGeneratorLSTM(nn.Module):
    def __init__(self):
        super(MemeGeneratorLSTM, self).__init__()
        self.embedding_dim = 128
        self.img_embedding = 32
        self.seqlen = 1
        self.num_classes = len(char2idx)

        self.lstm_hidden_size = 1024
        self.lstm_layer_size = 1
        self.lstm_num_directions = 1
        
        # Embedding Layer for Images
        self.embedding_img = nn.Embedding(len(img2idx), self.img_embedding)
        # Embedding Layer for character embeddings
        self.embedding_layer = nn.Embedding(len(char2idx), self.embedding_dim, padding_idx = char2idx['<pad>'])
        
        # project to embedding dim
        self.project_down = nn.Linear(self.img_embedding + self.embedding_dim, self.embedding_dim)

        # LSTM layer
        self.lstm_layer = nn.LSTM(input_size = self.embedding_dim, hidden_size = self.lstm_hidden_size, num_layers=self.lstm_layer_size, bidirectional=False, batch_first = True)
        # fc layer
        self.fc = nn.Linear(self.lstm_hidden_size, self.num_classes) 


    def forward(self, input_img, x, prev_state_h, prev_state_c):
        # input_img (batch_size)
        # x is the decoder input (batch_size, 1) where 1 is seqlen
        # prev_state_h (num_layers_dec * num_directions_dec, batch_size, hidden_size_dec)
        # prev_state_c (num_layers_dec * num_directions_dec, batch_size, hidden_size_dec)
        batch_size = x.size()[0]

        # input_img (batch_size, 1)
        input_img = torch.unsqueeze(input_img, dim=1)
        
        # repeat for replication (batch_size, 1)
        input_img = input_img.repeat(1,1)

        # image embeddings (batch_size, 1, img_embedding_dim)
        img_out = self.embedding_img(input_img)

        # embedding shape (batch_size, 1, embedding_dim) where 1 is seqlen
        text_out = self.embedding_layer(x)

        # concatenate between image and caption embeddings
        # (batch_size, 1, text_emb + img_emb) where 1 is seqlen
        cat = torch.cat((img_out, text_out), dim=2)

        # project down to (batch_size, 1, 128) where 1 is seqlen
        embedding_out = self.project_down(cat)

        # apply LSTM layer
        # HERE IN THE DECODER WE PASS IN SEQ_LEN = 1 to force feed decoder
        # input = batchsize x seq_len x input_size -> Here input_size = 128
        # lstm_out = (batch, seq_len, num_directions * hidden_size)
        # hn = hidden at t=seq_len  (numdirection x num_layers, batchsize, hidden_size)
        # cn = cell at t=seq_len (numdirection x num_layers, batchsize, hidden_size)
        lstm_out, (hn, cn) = self.lstm_layer(embedding_out, (prev_state_h, prev_state_c))

        # output shape before squeeze == (batch_size, 1, hidden_size)
        # output shape after squeeze == (batch_size, hidden_size)
        output = torch.squeeze(lstm_out, dim = 1)
        
        # output shape == (batch_size, vocab)
        out = self.fc(output)
        return out, hn, cn #, attention_weights


    def init_state(self, batch_size):
        # first one is layer size * num_directions
        return (torch.zeros(1, batch_size, self.lstm_hidden_size),
                torch.zeros(1, batch_size, self.lstm_hidden_size))


class MemeGeneratorM2(nn.Module):
    def __init__(self):
        super(MemeGeneratorM2, self).__init__()
        self.memegeneratorlstm = MemeGeneratorLSTM()

    def forward(self, input_img, x, label, device, prediction_mode = False):
        # input_img (batch_size)
        # x is the lstm input (batch_size, 199) where 199 is seqlen
        # label is the label (batch_size, 199) where 199 is seqlen
        # device = "cpu"/"cuda:0"
        # prediction_mode = True/False

        batch_size = input_img.size()[0]

        # LSTM hidden state initialization
        prev_state_h, prev_state_c = self.memegeneratorlstm.init_state(batch_size)
        prev_state_h = prev_state_h.to(device)
        prev_state_c = prev_state_c.to(device)

        # store predictions and outputs
        predictions_arr = []
        output_tensor = torch.zeros((batch_size, 1, len(char2idx)))
        output_tensor = output_tensor.to(device)

        lstm_input = torch.unsqueeze(x[:,0], dim = 1)

        # for prediction only
        first_nonzero = (x == 0).nonzero(as_tuple=False)[0][1].item()

        # Teacher forcing - feeding the target as the next input
        for t in range(0, x.size()[1]): # iterate until len of sequence
             # prediction size (batchsize, num_vocab)
             predictions, prev_state_h, prev_state_c = self.memegeneratorlstm(input_img, lstm_input, prev_state_h, prev_state_c)

             # store lstm_output_tensor
             output_tensor = torch.cat([output_tensor, torch.unsqueeze(predictions, dim=1)], dim = 1)            
             # get one prediction
             one_prediction = torch.max(predictions, dim = 1).indices
             # save the prediction in list
             predictions_arr.append(one_prediction.detach().cpu().numpy())

             if prediction_mode == False:
                 # using teacher forcing
                 lstm_input = torch.unsqueeze(label[:, t], dim = 1)
             else:
                 # only for batchsize = 1
                 # use teacher forcing for initial starter string
                 if t < first_nonzero - 1:
                    lstm_input = torch.zeros((1, 1)).long()
                    lstm_input[0] = x[0][t+1]
                 else:
                    # use prediction as previous output previous input
                    one_prediction = torch.unsqueeze(one_prediction, dim = 1)
                    lstm_input = one_prediction
        
        # remove the original zeros for output_tensor
        # output_tensor shape (batch_size, seqlen, numclasses)
        # predictions_arr shape (seqlen, batch_size)
        output_tensor = output_tensor[:,1:,:]
        predictions_arr = np.array(predictions_arr)
        predictions_arr = np.transpose(predictions_arr)

        return output_tensor, predictions_arr

In [3]:
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image

# Custom Dataset class
class MemeDataset(Dataset):
    def __init__(self, csv_file):
        self.df = pd.read_csv(csv_file)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        meme_text = self.df.iloc[idx, 0]
        meme_link = self.df.iloc[idx, 1]

        # Load and preprocess image
        image = Image.open(meme_link)
        image = your_image_transforms(image)

        return image, meme_text

# Define your image transforms (resize, normalize, etc.)
your_image_transforms = transforms.Compose([transforms.Resize((224, 224)),
                                            transforms.ToTensor(),
                                            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

# Load your dataset
dataset = MemeDataset('sample.csv')

# Create DataLoader
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# Check if GPU is available, otherwise use CPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Move models to the appropriate device
model_cnn = MemeGeneratorCNN().to(device)
model_lstm = MemeGeneratorLSTM().to(device)

# Define your loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer_cnn = torch.optim.Adam(model_cnn.parameters(), lr=0.001)
optimizer_lstm = torch.optim.Adam(model_lstm.parameters(), lr=0.001)

# Training loop
for epoch in range(num_epochs):
    for batch in dataloader:
        # Extract data from the batch
        images, captions = batch

        # Move data to the appropriate device
        images = images.to(device)
        captions = captions.to(device)

        # Forward pass
        cnn_output = model_cnn(images, captions)
        lstm_output, predictions_arr = model_lstm(images, captions, label, device)

        # Compute loss
        loss_cnn = criterion(cnn_output, your_target)  # Define your target based on the CNN output
        loss_lstm = criterion(lstm_output, your_target)  # Define your target based on the LSTM output
        total_loss = loss_cnn + loss_lstm

        # Backward pass and optimization
        optimizer_cnn.zero_grad()
        optimizer_lstm.zero_grad()
        total_loss.backward()
        optimizer_cnn.step()
        optimizer_lstm.step()


NameError: name 'num_epochs' is not defined