In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from torchvision import models
from tqdm import tqdm
import numpy as np
import PIL
import random
from nltk.tokenize import word_tokenize
from PIL import Image
import pandas as pd


import math
from nltk import word_tokenize
from collections import Counter
from nltk.util import ngrams


class BLEU(object):
    @staticmethod
    def compute(candidate, references, weights):
        candidate = [c.lower() for c in candidate]
        references = [[r.lower() for r in reference] for reference in references]

        p_ns = (BLEU.modified_precision(candidate, references, i) for i, _ in enumerate(weights, start=1))
        s = math.fsum(w * math.log(p_n) for w, p_n in zip(weights, p_ns) if p_n)

        bp = BLEU.brevity_penalty(candidate, references)
        return bp * math.exp(s)

    @staticmethod
    def modified_precision(candidate, references, n):
        counts = Counter(ngrams(candidate, n))

        if not counts:
            return 0

        max_counts = {}
        for reference in references:
            reference_counts = Counter(ngrams(reference, n))
            for ngram in counts:
                max_counts[ngram] = max(max_counts.get(ngram, 0), reference_counts[ngram])

        clipped_counts = dict((ngram, min(count, max_counts[ngram])) for ngram, count in counts.items())

        return sum(clipped_counts.values()) / sum(counts.values())
    
    @staticmethod
    def brevity_penalty(candidate, references):
        c = len(candidate)
        # r = min(abs(len(r) - c) for r in references)
        r = min(len(r) for r in references)

        if c > r:
            return 1
        else:
            return math.exp(1 - r / c)

def give_score(grount_truths, predictions):
    scorer = BLEU()


    overall = 0
    for gt, pred in zip(grount_truths, predictions):
        gt = gt.split()
        pred = pred.split()
        overall += BLEU.compute(pred,[gt], weights=[1/4, 1/4, 1/4, 1/4])

    print("Macro Bleu : ", overall/len(predictions))

# Device configuration
#device = torch.device('mps' if torch.cuda.is_available() else 'cpu')
device = torch.device('cuda')
device



In [None]:
def preprocess_image(image_path):
    # Open the image file
    img = Image.open(image_path)
    
    img = img.resize((224, 224))

    img_array = np.array(img)
    new_im = np.stack([img_array, img_array, img_array], axis = 2)

    
    # Convert the image to a PyTorch tensor and normalize
    transform = transforms.Compose([
        transforms.ToTensor(),  # Converts the image to a PyTorch tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    img_tensor = transform(new_im)
    return img_tensor

In [None]:
class Mydata(Dataset):
    def __init__(self, csv_path, path):
        self.path = path
        self.csv_path = csv_path
        self.vocab = []
        self.form = []
        self.img = []
        self.dataframe = None
        self.preprocess()

    def preprocess(self):
        df = pd.read_csv(self.csv_path)
        self.dataframe = df
        training_dataset = df['formula']
        all_tokens = []
        for formula in training_dataset:    
            for token in formula.split():
                all_tokens.append(token)

        vocab = ["<pad>", "<sos>", "<eos>"]
        vocab.extend(list(set(all_tokens)))
        token_to_index = {token: idx for idx, token in enumerate(vocab)}
        self.vocab = token_to_index

        indexed_dataset = []
        for formula in training_dataset:
            pres = [token_to_index["<sos>"]]
            for j in formula.split():
                pres.append(token_to_index[j])
            pres.append(token_to_index["<eos>"])
            indexed_dataset.append(pres)

        max_sequence_length = max(len(formula) for formula in indexed_dataset)
        padded_sequences = [formula + [0] * (max_sequence_length - len(formula)) for formula in indexed_dataset]
        padded_sequences_tensor = torch.tensor(padded_sequences)
        self.form = padded_sequences
    
    def __getitem__(self, index):
        curr = self.dataframe['image'][index]
        res_im = preprocess_image(self.path + "images/train/" + curr)
        return {"formula":self.form[index], "image": res_im}

    def __len__(self):
        return len(self.dataframe)

def collate_fn(batch):
    # Extract individual elements from the batch
    formulas = [item["formula"] for item in batch]
    images = [item["image"] for item in batch]
    formulas_tensor = torch.tensor(formulas, dtype=torch.long)
    
    # Assuming your images are already in tensor format
    images_tensor = torch.stack(images)

    return {"formula": formulas_tensor, "image": images_tensor}


In [None]:
# Define the Encoder (CNN) architecture
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(32, 64, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(64, 128, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(128, 256, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(256, 512, kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.AvgPool2d(kernel_size=3, stride=1)
        )

    def forward(self, x):
        temp = self.cnn(x)
        temp = temp.squeeze()
        return temp


In [None]:
# Define the Decoder (LSTM) architecture
class Decoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, input_size):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(input_size + embedding_dim, hidden_dim, batch_first=True, bidirectional= True)
        self.output_layer = nn.Linear(2*hidden_dim, vocab_size)
        self.softmax = nn.LogSoftmax(dim = 1)

    def forward(self, context, prev_hidden, prev_cell, prev_word):
        embedded = self.embedding(prev_word)
        ''' shape of embedded: 64 x 1 x 512 '''
        #embedded = embedded.squeeze(dim=1)
        ''' shape of lstm_input: 64 x 1 x 1024 '''
        lstm_input = torch.cat((context, embedded), dim=-1)
        output, (hidden , cell) = self.lstm(lstm_input, (prev_hidden, prev_cell))
        '''
            output shape: 64 x 1 x 512
            hidden shape: 1 x 64 x 512
            cell shape: 1 x 64 x 512
        '''
#         print("output shape, ", output.shape)
        output = self.output_layer(output)
        #prediction = self.softmax(output)
        return (hidden, cell), output

In [None]:

class MyModel(nn.Module):
    def __init__(self, encoder, decoder, vocab_size):
        super(MyModel, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.vocab_size = vocab_size

    def forward(self, source, target, teacher_forcing= 0.5):
        ''' src shape = 64 x 3 x 224 x 224 '''
        ''' trg shape = 64 x 88 '''
        batch_size = source.shape[0]
        target_len = target.shape[1]

        context= self.encoder(source)
        context = context.unsqueeze(dim=1)
        
        
        hidden = torch.zeros(2, batch_size, 512).to(device)
        cell = torch.zeros(2, batch_size, 512).to(device)
#         hidden = context.clone()
#         cell = context.clone()
#         hidden = hidden.permute(1, 0, 2)
#         cell = cell.permute(1, 0, 2)
#         hidden = hidden.repeat(2, 1, 1)
#         cell = cell.repeat(2, 1, 1)
#         print("Actual hidden shape ", hidden.shape)
        outputs = torch.zeros(target_len, batch_size, self.vocab_size).to(device)

        input = target[:, 0]
        for t in range(1, target_len):
            ''' shape of input: 64 x 1 '''
            input = input.unsqueeze(1)
#             print("shape of input: ", input.shape)
            (hidden, cell), output = self.decoder(context, hidden, cell, input)
            output = output.squeeze(dim=1)
            outputs[t] = output
            teacher_mc = random.random() < teacher_forcing
            top1 = output.argmax(dim=1)
            input = target[:, t] if teacher_mc else top1
        return outputs

In [None]:
def train(model, dataload, optimizer, criteria):
    model.train()
    epoch_loss = 0
    for i, batch in enumerate(tqdm(dataload)):
        src = batch["image"].to(device)
        trg = batch["formula"].to(device)
        
        
        '''For parallel data'''
#         src = batch["image"].cuda()
#         trg = batch["formula"].cuda()
        # print("src shape: ", src.shape)
        # print("trg shape: ", trg.shape)
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)

        trg = trg.T[1:]
        trg = trg.reshape((trg.shape[0]*trg.shape[1], ))

        loss = criteria(output, trg)
        loss.backward()

        optimizer.step()
        epoch_loss += loss.item()
    print("loss: ", epoch_loss/len(dataload))
    return epoch_loss/len(dataload)


In [None]:
path = "/kaggle/input/handwritten-eq-to-latex-conv-dataset/HandwrittenData/"

train_data = Mydata("/kaggle/input/handwritten-eq-to-latex-conv-dataset/HandwrittenData/train_hw.csv", path)
train_dataloader = DataLoader(train_data, batch_size=64, shuffle=False, collate_fn = collate_fn)


In [None]:
encoder= Encoder()
decoder = Decoder(vocab_size=len(train_data.vocab), embedding_dim=512, hidden_dim=512, input_size = 512)
Model = MyModel(encoder=encoder, decoder=decoder, vocab_size=len(train_data.vocab)).to(device)
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=Model.parameters(), lr=0.001)



In [None]:
epochs = 11
loss = []
for i in range(epochs):
    curr = train(Model, train_dataloader, optimizer, loss_fn)
    loss.append(curr)

In [None]:
checkpoint = {
    'model_state_dict': Model.state_dict(),
    'vocab': train_data.vocab,
    # Add any other information you want to save
}

# Specify the path where you want to save the model
torch.save(checkpoint, 'Model1.pth')

In [None]:
def predict(Model, vocab, image_path):
    source = preprocess_image(image_path).unsqueeze(dim = 0).to(device)    
    context = Model.encoder(source).to(device)
    context = context.unsqueeze(dim = 0)
    context = context.unsqueeze(dim = 0)
    hidden = torch.zeros(2, 1, 512).to(device)
    cell = torch.zeros(2, 1, 512).to(device)
    
    
    input = torch.tensor([ 1]).to(device)
    end = torch.tensor([2]).to(device)
    
    formula = torch.tensor([ 1 ]).to(device)
    len = 0
    while torch.equal(input, end) == False and formula.shape[0] < 120:
        input = input.unsqueeze(1)
        (hidden, cell), output = Model.decoder(context, hidden, cell, input)
        output = output.squeeze(dim=1)
        top1 = output.argmax(dim=1)
        #res = top1.squeeze()
        formula = torch.cat((formula, top1), dim = 0)
        input = top1
        len += 1
    torch.cat((formula, end), dim = 0)
    
    formula.to('cpu')
    res = ""
    for i in range(formula.shape[0]):
        res += get_token(vocab, formula[i])
        res += " "

    return res



**loading

In [None]:
checkpoint = torch.load('/kaggle/input/mtp-model-1/Model1.pth')
decoder = Decoder(vocab_size=len(train_data.vocab), embedding_dim=512, hidden_dim=512, input_size = 512)
Model = MyModel(encoder=Encoder(), decoder=decoder, vocab_size=len(train_data.vocab)).to(device)
Model.load_state_dict(checkpoint['model_state_dict'])

In [None]:
def get_token(dictionary, search_value):
    for key, value in dictionary.items():
        if value == search_value:
            return key
    # If the value is not found, you might want to handle this case accordingly.
    raise ValueError(f"Value '{search_value}' not found in the dictionary")

In [None]:
validation_data = Mydata("/kaggle/input/handwritten-eq-to-latex-conv-dataset/HandwrittenData/val_hw.csv", "/kaggle/input/handwritten-eq-to-latex-conv-dataset/HandwrittenData/")
validation_dataloader = DataLoader(validation_data, batch_size=64, shuffle=False, collate_fn = collate_fn)

In [None]:
images_path = "/kaggle/input/handwritten-eq-to-latex-conv-dataset/HandwrittenData/images/train/"
target = []
req = []
def validation(csv_path):
    #new addition
#     target = [] 
#     req = []
    df = pd.read_csv(csv_path)
    print("total len ", len(df))
#     for i in range(0, len(df)):
    for i in range(len(df)):
        if(i>10):#--new addtion
            break
        print(i)
        if(i == 114):
            continue
        image = df["image"][i]
        target.append(str(df["formula"][i]))
        req.append(predict(Model, train_data.vocab, images_path + image))
           
    print(give_score(target, req))
        

In [None]:
validation("/kaggle/input/handwritten-eq-to-latex-conv-dataset/HandwrittenData/val_hw.csv")

In [None]:
print(target[0])
print(req[0])
print(req[1])

In [None]:
image_path1="/kaggle/input/handwritten-eq-to-latex-conv-dataset/HandwrittenData/images/train/MfrDB3500.png"
result1=predict(Model,train_data.vocab,image_path1)
print(result1)