In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import time
from torchsummary import summary

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

Hyper - Parameters

In [None]:
# out look of the model
Number_of_images = 1000
Image_size = (32, 32) # (height, width)
Image_embedding_size = 50
Text_embedding_size = 40
Max_Number_of_Words = 350

# Joiner Embedder parameters
Joiner_Input_size = Text_embedding_size #40
Joiner_output_size = Image_embedding_size #50

# LSTM parameters for the RNN
LSTM_Input_size = Joiner_output_size #50
LSTM_hidden_size = LSTM_Input_size #50
LSTM_num_layers = 1
LSTM_output_size = LSTM_hidden_size #50

# reverse Embedding parameters
Reverse_Input_size = LSTM_output_size #50
Reverse_output_size = Text_embedding_size #40

drop_prob = 0.3

CNN - Encoder

In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class EncoderCNN(nn.Module):
    def __init__(self) -> None:
        super(EncoderCNN, self).__init__()
        # input: 32x32
        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=0)  # 30x30x64
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)  # 30x30x64
        self.BatchNorm1 = nn.BatchNorm2d(64, momentum=0.1)
        self.MaxPool1 = nn.MaxPool2d(kernel_size=2, stride=2)  # 15x15x64
        self.dropout1 = nn.Dropout2d(p=drop_prob)

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)  # 15x15x128
        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)  # 15x15x128
        self.BatchNorm2 = nn.BatchNorm2d(128, momentum=0.1)
        self.MaxPool2 = nn.MaxPool2d(kernel_size=2, stride=2)  # 7x7x128
        self.dropout2 = nn.Dropout2d(p=drop_prob)

        self.conv5 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)  # 7x7x256
        self.conv6 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)  # 7x7x256
        self.BatchNorm3 = nn.BatchNorm2d(256, momentum=0.1)
        self.MaxPool3 = nn.MaxPool2d(kernel_size=2, stride=2) # 3x3x256
        self.dropout3 = nn.Dropout2d(p=drop_prob)

        self.conv7 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)  # 3x3x512
        self.conv8 = nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1)  # 3x3x512
        self.BatchNorm4 = nn.BatchNorm2d(512, momentum=0.1)
        self.MaxPool4 = nn.MaxPool2d(kernel_size=3, stride=3) # 1x1x512
        self.dropout4 = nn.Dropout2d(p=drop_prob)

        # Assuming Image_embedding_size is the output size of the linear layer
        self.Dense = nn.Sequential(
            nn.Linear(512, 1000),
            nn.ReLU(),
            nn.Linear(1000, 500),
            nn.ReLU(),
            nn.Linear(500, Image_embedding_size)
            )

    def forward(self, x):
        # input: 32x32
        x1 = F.relu(self.conv1(x)) # 30x30x64
        x2 = self.conv2(x1) # 30x30x64
        x3 = F.relu(torch.add(x1, self.BatchNorm1(x2))) # skip connection of x1 and x2 (residual connection)
        x = self.MaxPool1(x3) # 15x15x64
        x = self.dropout1(x)

        x1 = F.relu(self.conv3(x)) # 15x15x128
        x2 = self.conv4(x1) # 30x30x64
        x3 = F.relu(torch.add(x1, self.BatchNorm2(x2))) # skip connection of x1 and x2 (residual connection)
        x = self.MaxPool2(x3) # 7x7x128
        x = self.dropout2(x)

        x1 = F.relu(self.conv5(x)) # 7x7x256
        x2 = self.conv6(x1) # 30x30x64
        x3 = F.relu(torch.add(x1, self.BatchNorm3(x2))) # skip connection of x1 and x2 (residual connection)
        x = self.MaxPool3(x3) # 3x3x256
        x = self.dropout3(x)

        x1 = F.relu(self.conv7(x)) # 3x3x512
        x2 = self.conv8(x1) # 30x30x64
        x3 = F.relu(torch.add(x1, self.BatchNorm4(x2))) # skip connection of x1 and x2 (residual connection)
        x = self.MaxPool4(x3) # 1x1x512
        x = self.dropout4(x)

        # Reshape before passing through linear layers
        x = x.view(x.size(0), -1)
        x = self.Dense(x)
        return x

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=0.01)
        return optimizer


RNN - Decoder

In [None]:
class LSTM_Net(nn.Module):
    def __init__(self) -> None:
        super(LSTM_Net, self).__init__()
        
        # embedding layer sizes
        self.einput_size = Joiner_Input_size #175
        self.eoutput_size = Joiner_output_size #50
        
        # LSTM parameters
        self.embed_size = LSTM_Input_size #50
        self.hidden_size = LSTM_hidden_size #50
        self.num_layers = LSTM_num_layers #1
        
        # reverse embedding layer sizes
        self.Rinput_size = Reverse_Input_size #50
        self.Routput_size = Reverse_output_size #175

        # dense embedding layers from 175 to 20
        self.embedding1 = nn.Linear(self.einput_size, self.eoutput_size, bias=False)
        
        # LSTM layer
        self.lstm1 = nn.LSTM(input_size = self.embed_size, hidden_size = 50, num_layers = self.num_layers, bidirectional = True, batch_first=True, dropout = drop_prob) #50 to 100
        self.lstm2 = nn.LSTM(input_size = 100, hidden_size = 100, num_layers = self.num_layers, bidirectional = True, batch_first=True) #100 to 200
        self.lstm3 = nn.LSTM(input_size = 200, hidden_size = 150, num_layers = self.num_layers, bidirectional = True, batch_first=True, dropout = drop_prob) #200 to 300
        self.lstm4 = nn.LSTM(input_size = 300, hidden_size = 200, num_layers = self.num_layers, bidirectional = True, batch_first=True) #300 to 400
        self.lstm5 = nn.LSTM(input_size = 400, hidden_size = 250, num_layers = self.num_layers, bidirectional = True, batch_first=True, dropout = drop_prob) #400 to 500
        self.lstm6 = nn.LSTM(input_size = 500, hidden_size = 250, num_layers = self.num_layers, bidirectional = True, batch_first=True) #500 to 500
        self.lstm7 = nn.LSTM(input_size = 500, hidden_size = 250, num_layers = self.num_layers, bidirectional = True, batch_first=True, dropout = drop_prob) #500 to 500
        self.lstm8 = nn.LSTM(input_size = 500, hidden_size = 200, num_layers = self.num_layers, bidirectional = True, batch_first=True) #500 to 400
        self.lstm9 = nn.LSTM(input_size = 400, hidden_size = 150, num_layers = self.num_layers, bidirectional = True, batch_first=True, dropout = drop_prob) #400 to 300
        self.lstm10 = nn.LSTM(input_size = 300, hidden_size = 100, num_layers = self.num_layers, bidirectional = True, batch_first=True) #300 to 200
        self.lstm11 = nn.LSTM(input_size = 200, hidden_size = 50, num_layers = self.num_layers, bidirectional = True, batch_first=True, dropout = drop_prob) #200 to 100
        self.lstm12 = nn.LSTM(input_size = 100, hidden_size = 25, num_layers = self.num_layers, bidirectional = True, batch_first=True) #100 to 50

        # attention layers for the LSTM
        self.attention_Q = nn.Linear(self.Rinput_size, self.Rinput_size)
        self.attention_K = nn.Linear(self.Rinput_size, self.Rinput_size)
        self.attention_V = nn.Linear(self.Rinput_size, self.Rinput_size)

        # dense layers from 20 to 175
        self.Dense1 = nn.Linear(self.Rinput_size, self.Routput_size, bias=False)

        # initialise the weights of the embedding layers
        self.relu = nn.ReLU()
         
    def init_hidden(self, batch_size):
        self.hidden1 = (torch.zeros(2*self.num_layers, batch_size, 50).to(device),
                torch.zeros(2*self.num_layers, batch_size, 50).to(device))

        self.hidden2 = (torch.zeros(2*self.num_layers, batch_size, 100).to(device),
                torch.zeros(2*self.num_layers, batch_size, 100).to(device))

        self.hidden3 = (torch.zeros(2*self.num_layers, batch_size, 150).to(device),
                torch.zeros(2*self.num_layers, batch_size, 150).to(device))

        self.hidden4 = (torch.zeros(2*self.num_layers, batch_size, 200).to(device),
                torch.zeros(2*self.num_layers, batch_size, 200).to(device))

        self.hidden5 = (torch.zeros(2*self.num_layers, batch_size, 250).to(device),
                torch.zeros(2*self.num_layers, batch_size, 250).to(device))
                
        self.hidden6 = (torch.zeros(2*self.num_layers, batch_size, 250).to(device),
                torch.zeros(2*self.num_layers, batch_size, 250).to(device))
        
        self.hidden7 = (torch.zeros(2*self.num_layers, batch_size, 250).to(device),
                torch.zeros(2*self.num_layers, batch_size, 250).to(device))
                
        self.hidden8 = (torch.zeros(2*self.num_layers, batch_size, 200).to(device),
                torch.zeros(2*self.num_layers, batch_size, 200).to(device)) 
                
        self.hidden9 = (torch.zeros(2*self.num_layers, batch_size, 150).to(device),
                torch.zeros(2*self.num_layers, batch_size, 150).to(device))

        self.hidden10 = (torch.zeros(2*self.num_layers, batch_size, 100).to(device),
                torch.zeros(2*self.num_layers, batch_size, 100).to(device))
        
        self.hidden11 = (torch.zeros(2*self.num_layers, batch_size, 50).to(device),
                torch.zeros(2*self.num_layers, batch_size, 50).to(device))
        
        self.hidden12 = (torch.zeros(2*self.num_layers, batch_size, 25).to(device),
                torch.zeros(2*self.num_layers, batch_size, 25).to(device))
    

    def forward(self, input, New = False):
        
        if New: # if the input is the image embedding then reset the hidden layers to zeros.
            self.init_hidden(input.shape[0])
        else:
            # embedding for input if the onput is not the image embedding.
            input = self.embedding1(input) # 40 to 50
        
        # LSTM layers
        output1, self.hidden1 = self.lstm1(input, self.hidden1)
        output1 = self.relu(output1)
        output2, self.hidden2 = self.lstm2(output1, self.hidden2)
        output3, self.hidden3 = self.lstm3(output2, self.hidden3)
        output3 = self.relu(output3)
        output4, self.hidden4 = self.lstm4(output3, self.hidden4)
        output5, self.hidden5 = self.lstm5(output4, self.hidden5)
        output5 = self.relu(output5)
        output6, self.hidden6 = self.lstm6(output5, self.hidden6)
        output7, self.hidden7 = self.lstm7(output6, self.hidden7)
        output7 = self.relu(output7)
        output8, self.hidden8 = self.lstm8(output7, self.hidden8)
        output9, self.hidden9 = self.lstm9(output8, self.hidden9)
        output9 = self.relu(output9)
        output10, self.hidden10 = self.lstm10(output9, self.hidden10)
        output11, self.hidden11 = self.lstm11(output10, self.hidden11)
        output11 = self.relu(output11)
        output12, self.hidden12 = self.lstm12(output11, self.hidden12)


        # attention layer
        Q = self.attention_Q(output12)
        K = self.attention_K(output12)
        V = self.attention_V(output12)
        attention = torch.bmm(Q, K.transpose(1, 2))
        attention = F.softmax(attention, dim=2)
        attention = torch.bmm(attention, V)
        
        # dense layer
        attention = self.Dense1(attention)

        return attention

Auto encoder for word embedding from size of 175 to 40.

In [None]:
class Encode(nn.Module):
    def __init__(self, input_shape: int, output_shape: int):
        super(Encode, self).__init__()
        self.encode1 = nn.Linear(input_shape, 100, bias=False)
        self.encode2 = nn.Linear(100, output_shape, bias=False)
    def forward(self, x: torch.Tensor):
        x = self.encode1(x)
        x = F.leaky_relu(x, 0.2)
        x = self.encode2(x)
        return x

class Decode(nn.Module):
    def __init__(self, input_shape: int, output_shape: int):
        super(Decode, self).__init__()
        self.decode1 = nn.Linear(input_shape, 100, bias=False)
        self.decode2 = nn.Linear(100, output_shape, bias=False)
    def forward(self, x: torch.Tensor):
        x = self.decode1(x)
        x = F.leaky_relu(x, 0.2)
        x = self.decode2(x)
        return x
    

Training - Process

In [None]:
saved_model_losses = []
Losses = []
Accuracies = []
saved_accuracies = []

In [None]:
cnn = EncoderCNN().to(device)
network = LSTM_Net().to(device)

cnn.load_state_dict(torch.load('/home/ocr/teluguOCR/Saved_Models/CNN_latest.pth'))
network.load_state_dict(torch.load('/home/ocr/teluguOCR/Saved_Models/Network_latest.pth'))

encoder = Encode(175, 40).to(device)
encoder.load_state_dict(torch.load('/home/ocr/teluguOCR/Models/Encode_no_act40.pth'))
encoder.eval()

decoder = Decode(40, 175).to(device)
decoder.load_state_dict(torch.load('/home/ocr/teluguOCR/Models/Decode_no_act40.pth'))
decoder.eval()

cnn.train()
network.train()

params = list(network.parameters()) + list(cnn.parameters())
optimizer = optim.Adam(params, lr=5e-6)

# gradient clipping
clip = 10.0
torch.nn.utils.clip_grad_norm_(params, clip, norm_type=2, error_if_nonfinite=False)

In [None]:
critereon = nn.MSELoss().cuda() if torch.cuda.is_available() else nn.MSELoss()

num_of_epochs = 4500

Images_path = "/home/ocr/teluguOCR/Dataset/Batch_Image_Tensors/Image"
Labels_path = '/home/ocr/teluguOCR/Dataset/Batch_Label_Tensors/Label'

def get_data_loader(i):
    images = torch.load(Images_path + str(i) + '.pt')
    labels = torch.load(Labels_path + str(i) + '.pt')
    labels = labels.float()
    # labels *= 1e5
    return images, labels

num = 1
Num_of_files = 50

for i in range(1, num_of_epochs + 1):
        start = time.time()
        l_min = 1e18    
        l_max = 0
        l = 0

        # if i%100 == 0:
        #     torch.save(network.state_dict(), '/home/ocr/teluguOCR/Saved_Models/Network_latest.pth')
        #     torch.save(cnn.state_dict(), '/home/ocr/teluguOCR/Saved_Models/CNN_latest.pth')

        num_of_points = 0
        batchSize = 1000
        for j in range(1, Num_of_files + 1):
            file_start = time.time()
            images, labels = get_data_loader(j)
            fl = 0
            images = images.to(device)
            labels = labels.to(device)
            size = images.shape[0]
            num_of_points += size
            if size > batchSize:
                for k in range(0, images.shape[0], batchSize):
                    optimizer.zero_grad()
                    images_ = images[k:min(k+batchSize, size)]
                    labels_ = labels[k:min(k+batchSize, size)]
                    labels_ = encoder(labels_)
                    features = cnn(images_)
                    features = features.unsqueeze(1)
                    outputs = torch.zeros_like(labels_).to(device)
                    
                    outputs[:, 0, :] = network(features, New = True)[0][0]
                    for t in range(labels_.shape[1] - 1):
                        outputs[:, t+1, :] = network(labels_[:, t, :].unsqueeze(1) , New = False)[0][0]
                    
                    # outputs = torch.sigmoid(outputs)
                    loss = critereon(outputs, labels_)
                    loss.backward()
                    optimizer.step()
                    fl += loss.item()
                    del images_
                    del labels_
                    del outputs
                    del loss  
                del images
                del labels
            else:
                optimizer.zero_grad()
                labels = encoder(labels)
                features = cnn(images)
                features = features.unsqueeze(1)
                outputs = torch.zeros_like(labels).to(device)
                
                outputs[:, 0, :] = network(features, New = True)[0][0]
                for k in range(labels.shape[1] - 1):
                    outputs[:, k+1, :] = network(labels[:, k, :].unsqueeze(1) , New = False)[0][0]

                # outputs = torch.sigmoid(outputs)
                loss = critereon(outputs, labels)
                loss.backward()
                optimizer.step()
                fl += loss.item()
                del images
                del labels
                del outputs
                del loss
            l_min = min(l_min, fl)
            l_max = max(l_max, fl) 
            l += fl 
        print(f"Epoch {i} completed in {format(time.time() - start, '.0f')} seconds with loss ({l_min}, {l_max}), {l}")
        Losses.append(l)