In [None]:
import os
import re
import warnings

import torch
import torch.nn as nn
import pandas as pd
from tqdm import tqdm
import dill
from collections import Counter, OrderedDict
from torchtext.vocab import vocab

from utils import Model, generate_eqn_mask
from tokenizer import Tokenizer

warnings.filterwarnings("ignore")

In [4]:
# Special token indices
BOS_IDX, PAD_IDX, EOS_IDX, UNK_IDX, SEP_IDX = 0, 1, 2, 3, 4

# Special symbols list
special_symbols = ['<s>', '<pad>', '</s>', '<unk>', '<sep>']

# Device
device = 'cuda'

# Seed
seed = 42

# Checkpoint path for pretrained model
ckp_path = "Final_transformers.pth"

In [None]:
#loading features data
data_directory = 'Data/Feynman_with_units'
N = 60 # number of feature rows per equation 
data = []

for filename in os.listdir(data_directory):
    if os.path.isfile(os.path.join(data_directory, filename)):
        file_path = os.path.join(data_directory, filename)
        with open(file_path, 'r', encoding='utf-8') as f:
            lines = f.read().split('\n')
            for line in lines[:N]:
                data.append((filename, line))
                
df = pd.DataFrame(data, columns=['Filename', 'features'])
del data


#loading target/equation data
eq_df = pd.read_csv("Data/FeynmanEquations.csv")[['Filename','Formula']]

#merging features & target dataframes
df = pd.merge(eq_df,df,on="Filename",how='inner').drop(columns=['Filename'])
del eq_df

In [7]:
# Get formulas from DataFrame
fyn = df.Formula.tolist()

# Initialize tokenizer
tokenizer = Tokenizer(fyn, special_symbols)

# Build target vocabulary
v = tokenizer.build_tgt_vocab()

# Create dictionary mapping indices to tokens
itos = {value: key for key, value in v.get_stoi().items()}

# Calculate source and target vocabulary sizes
src_voc_size = len(tokenizer.build_src_vocab())
tgt_voc_size = len(v)

del eq_df

In [None]:
# configurations of the pretrained model

model_config = {
    "emb_size": 512,
    "dim_feedforward": 3072,
    "nhead": 8,
    "num_encoder_layers": 4,
    "num_decoder_layers": 4,
    "tgt_vocab_size" : tgt_voc_size,
    'src_vocab_size' : src_voc_size,
}

In [None]:
class Predictor():
    """
    Class for generating predictions using a trained model.

    Args:
        device (str): Device to use for inference.
        epoch (int): Epoch number.

    Attributes:
        model (Model): Trained model for prediction.
        path (str): Path to the trained model.
        device (str): Device for inference.
        df (DataFrame): DataFrame containing training data.
        vocab (dict): Vocabulary for tokenization.
        attrs (list): List of attributes in the dataset.
    """

    def __init__(self,model):
        self.model = model
        self.device = device
        self.model.to(self.device)
        self.vocab = {}
        self.attrs = ['features', 'Formula']
        
        self.vocab[self.attrs[0]] = tokenizer.build_src_vocab()
        self.vocab[self.attrs[1]] = tokenizer.build_tgt_vocab()

    def tok_to_id(self, tokenize, vocab, val):
        """
        Convert tokens to token IDs using the provided tokenizer and vocabulary.

        Args:
            tokenize (function): Tokenization function.
            vocab (function): Vocabulary function.
            val (str): Input string.

        Returns:
            Tensor: Token IDs.
        """
        val = tokenize(val)
        token_ids = vocab(val)
        return torch.tensor(token_ids, dtype=torch.int)

    def greedy_decode(self, src, src_mask, max_len, start_symbol):
        """
        Generate a sequence using greedy decoding.

        Args:
            src (Tensor): Source input.
            src_mask (Tensor): Mask for source input.
            max_len (int): Maximum length of the generated sequence.
            start_symbol (int): Start symbol for decoding.

        Returns:
            Tensor: Generated sequence.
        """
        src = src.to(self.device)
        src_mask = src_mask.to(self.device)
        dim = 1

        memory = self.model.encode(src, src_mask)
        memory = memory.to(self.device)
        dim = 0
        ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(self.device)
        for i in range(max_len - 1):
            tgt_mask = (generate_eqn_mask(ys.size(0), self.device).type(torch.bool)).to(self.device)
            out = self.model.decode(ys, memory, tgt_mask)
            out = out.transpose(0, 1)
            prob = self.model.generator(out[:, -1])

            _, next_word = torch.max(prob, dim=1)
            next_word = next_word.item()

            ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=dim)
            if next_word == EOS_IDX:
                break
        return ys

    def predict(self, test_example, raw_tokens=False):
        """
        Generate prediction for a test example.

        Args:
            test_example (dict): Test example containing input features.
            raw_tokens (bool, optional): Whether to return raw tokens. Defaults to False.

        Returns:
            str or tuple: Decoded equation or tuple of original and predicted tokens.
        """
        self.model.eval()
        src_sentence = test_example[self.attrs[0]]

        src = self.tok_to_id(tokenizer.src_tokenize, self.vocab[self.attrs[0]], src_sentence).view(-1, 1)
        num_tokens = src.shape[0]

        src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
        tgt_tokens = self.greedy_decode(src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()

        if raw_tokens:
            original_sentence = test_example[self.attrs[1]]
            original_tokens = self.tok_to_id(tokenizer.tgt_tokenize, self.vocab[self.attrs[1]], original_sentence)
            return original_tokens, tgt_tokens

        decoded_eqn = ''
        for t in tgt_tokens:
            decoded_eqn += itos[int(t)]

        return decoded_eqn

In [None]:
def sequence_accuracy(model, frac=0.1):
    """
    Compute the sequence accuracy of the model.

    Args:
        model: The trained model to evaluate.
        frac (float): Fraction of data to sample for evaluation.

    Returns:
        float: Sequence accuracy of the model.
    """
    # Initialize Predictor with the model
    predictor = Predictor(model)
    
    # Initialize count variable for accurate predictions
    count = 0
    
    random_df = df.sample(frac=frac, random_state=seed)
    length = len(random_df)
    
    pbar = tqdm(range(length))
    pbar.set_description("Seq_Acc_Cal")
    
    for i in pbar:
        original_tokens, predicted_tokens = predictor.predict(random_df.iloc[i], raw_tokens=True)

        original_tokens = original_tokens.tolist()
        predicted_tokens = predicted_tokens.tolist()

        if original_tokens == predicted_tokens:
            count = count + 1

        pbar.set_postfix(seq_accuracy=count / (i + 1))

    return count / length


In [None]:
def get_model(config):
    """
    Function to instantiate a Model object and initialize its parameters using 
    previously defined global variables.

    Returns:
        Model: Initialized model object.
    """
    model = Model(**config)

    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

    return model

In [None]:
model = get_model(model_config)

In [None]:
state = torch.load(ckp_path)
model.load_state_dict(state['state_dict'])

In [None]:
sequence_accuracy(model)