In [None]:
import unicodedata
import re
from nltk.sem.logic import Expression
from nltk.inference import Prover9
import os
import logging
import pandas as pd
from sklearn.model_selection import train_test_split
import concurrent.futures

#-----------------------------------------------------------
# Preprocess and Evaluation Metrics
#-----------------------------------------------------------

# Converts FOL symbols to tokenizable representation for preprocessing.
def fol_preprocess(expression):
    replacements = {
        '∀': ' FORALL ',
        '∃': ' EXISTS ',
        '¬': 'NOT ',
        '∧': 'AND',
        '⊕': 'XOR',
        '∨': 'OR',
        '→': 'THEN',
        '↔': 'IFF',
    }
    for symbol, replacement in replacements.items():
        expression = expression.replace(symbol, replacement)
    return expression

# Converts tokenizable representation back to FOL syntax for postprocessing.
def fol_postprocess(text):
    inv_replacements = {
        'FORALL': '∀',
        'EXISTS': '∃',
        'NOT': '¬',
        'AND': '∧',
        'XOR': '⊕',
        'OR': '∨',
        'THEN': '→',
        'IFF': '↔',
    }
    for replacement, symbol in inv_replacements.items():
        text = text.replace(replacement, symbol)
    return text

# Converts tokenized representations back to standard FOL syntax excluding XOR, which is not supported by Prover9.
def fol_postprocessXOR(text):
    inv_replacements = {
        'FORALL': '∀',
        'EXISTS': '∃',
        'NOT': '¬',
        'AND': '∧',
        'XOR': '∨',
        'OR': '∨',
        'THEN': '→',
        'IFF': '↔',
    }
    for replacement, symbol in inv_replacements.items():
        text = text.replace(replacement, symbol)
    return text

# Normalization
def nl_preprocess(sentence):
    return unicodedata.normalize('NFKC', sentence.lower())

# Converts a logical expression into NLTK-compatible format by replacing symbols with NLTK equivalents.
def to_nltk(expression):
    replacements = {
        '∀': ' all ',
        '∃': ' exists ',
        '¬': '-',
        '∧': '&',
        '⊕': '!=',
        '∨': '|',
        '→': '->',
        '↔': '<->',
        'FORALL': ' all ',
        'EXISTS': ' exists ',
        'NOT': ' - ',
        'AND': '&',
        'XOR': '!=',
        'OR': '|',
        'THEN': '->',
        'IFF': '<->',
    }
    for symbol, nltk in replacements.items():
        expression = re.sub(re.escape(symbol), nltk, expression)
    return expression

# Checks if a logical expression is well-formed.
def check_grammar(expression):
    nltk_expression = to_nltk(expression)
    try:
        parsed_expr = Expression.fromstring(nltk_expression)
        return len(parsed_expr.free()) == 0
    except Exception:
        return False

# Replaces predicate names in logical expressions with numeric placeholders.
def predicate_abstraction(expr1, expr2):
    def abstract_independently(expr):
        seen = {}
        counter = 1
        for match in re.finditer(r'\b([a-zA-Z0-9_]+)\(', expr):
            pred = match.group(1)
            if pred not in seen:
                seen[pred] = str(counter)
                counter += 1
        return seen

    pred_map1 = abstract_independently(expr1)
    pred_map2 = abstract_independently(expr2)

    shared_preds = set(pred_map1) & set(pred_map2)
    aligned_map = {}
    next_id = max(len(pred_map1), len(pred_map2)) + 1
    for pred in shared_preds:
        aligned_map[pred] = str(next_id)
        next_id += 1

    def apply_abstraction(expr, local_map):
        combined = {**local_map}
        for pred in aligned_map:
            if pred in combined:
                combined[pred] = aligned_map[pred]

        for pred in sorted(combined, key=len, reverse=True):
            expr = re.sub(r'\b' + re.escape(pred) + r'\(', combined[pred] + '(', expr)
        return expr

    return apply_abstraction(expr1, pred_map1), apply_abstraction(expr2, pred_map2)

# Compares the abstracted structures of two logical expressions.
def compare_structure(expr1, expr2):
    abstracted_expr1, abstracted_expr2 = predicate_abstraction(expr1, expr2)
    exp1 = to_nltk(abstracted_expr1)
    exp2 = to_nltk(abstracted_expr2)

    try:
        parsed_expr1 = Expression.fromstring(exp1)
        parsed_expr2 = Expression.fromstring(exp2)
        return parsed_expr1 == parsed_expr2
    except Exception:
        return False

# Counts the number of unique predicates in a logical expression.
def count_predicates(expression):
    predicate_pattern = r'\b[A-Za-z]+\d*\([^()]*\)'
    predicates = re.findall(predicate_pattern, expression)
    predicate_set = {re.sub(r'\([^()]*\)', '', pred).strip() for pred in predicates}
    return len(predicate_set)

# Checks if two logical expressions are equivalent using Prover9. Prover9 must be installed: https://www.cs.unm.edu/~mccune/prover9/
def _equivalence_logic(expr1, expr2):
    os.environ['PROVER9'] = 'Prover9-Mace4/bin'
    prover = Prover9()
    abstracted_expr1, abstracted_expr2 = predicate_abstraction(expr1, expr2)
    expr1 = Expression.fromstring(to_nltk(abstracted_expr1))
    expr2 = Expression.fromstring(to_nltk(abstracted_expr2))
    return expr1.equiv(expr2)

def equivalence(expr1_str, expr2_str, timeout=30):
    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_equivalence_logic, expr1_str, expr2_str)
            return future.result(timeout=timeout)
    except concurrent.futures.TimeoutError:
        logging.warning("equivalence() timed out.")
        return "equivalence could not halt"
    except Exception as e:
        return None

In [None]:
from datasets import Dataset
from sklearn.model_selection import train_test_split
import pandas as pd

#-----------------------------------------------------------
# Tokenization and loading the dataset with preprocessing
#-----------------------------------------------------------

dataset_path = ""

def tokenize_data(dataset, tokenizer):
    def tokenize_function(examples):
        model_inputs = tokenizer(
            examples['NL'],
            max_length=256,
            truncation=True,
            padding='max_length'
        )
        with tokenizer.as_target_tokenizer():
            labels = tokenizer(
                examples['FOL'],
                max_length=256,
                truncation=True,
                padding='max_length'
            )
        model_inputs['labels'] = labels['input_ids']
        return model_inputs
    return dataset.map(tokenize_function, batched=True)

def load_train_val_test_data(tokenizer):
    dataset = pd.read_excel(dataset_path)
    train_data, test_data = train_test_split(dataset, test_size=0.03, random_state=42)
    train_data, validation_data = train_test_split(train_data, test_size=0.1, random_state=42)

    print(f"Train Dataset size: {len(train_data)}")
    print(f"Validation Dataset size: {len(validation_data)}")
    print(f"Test Dataset size: {len(test_data)}")

    train_data = Dataset.from_pandas(train_data)
    validation_data = Dataset.from_pandas(validation_data)
    test_data = Dataset.from_pandas(test_data)

    train_data = train_data.map(lambda x: {'NL': 'translate English to First-order Logic: ' + nl_preprocess(x['NL_sentence']), 'FOL': fol_preprocess(x['FOL_expression'])})
    validation_data = validation_data.map(lambda x: {'NL': 'translate English to First-order Logic: ' + nl_preprocess(x['NL_sentence']), 'FOL': fol_preprocess(x['FOL_expression'])})
    test_data = test_data.map(lambda x: {'NL': 'translate English to First-order Logic: ' + nl_preprocess(x['NL_sentence']), 'FOL': fol_preprocess(x['FOL_expression'])})

    train_data = tokenize_data(train_data, tokenizer)
    validation_data = tokenize_data(validation_data, tokenizer)
    test_data = tokenize_data(test_data, tokenizer)

    return train_data, validation_data, test_data


In [None]:
import torch
from transformers import T5ForConditionalGeneration, AutoTokenizer

#-----------------------------------------------------------
# Model Preparation and Train, Val, Test Data
#-----------------------------------------------------------

model_path = ''
pretrained_model = ''
max_length = 256
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = T5ForConditionalGeneration.from_pretrained(model_path).to(device)
model.eval()
tokenizer = AutoTokenizer.from_pretrained(pretrained_model)

train_dataset, val_dataset, test_dataset = load_train_val_test_data(tokenizer)


In [None]:
#-----------------------------------------------------------
# Evaluation - Deterministic Inference and Stochastic Reranking
#-----------------------------------------------------------

def deterministic_eval(dataset, model, tokenizer, device):
    metrics = {
        'accurate': 0,
        'formally_accurate': 0,
        'equivalent': 0,
        'well_formed': 0,
        'ill_formed': 0,
        'false_cases': []
    }

    for example in dataset:
        input_ids = torch.tensor(example['input_ids']).unsqueeze(0).to(device)
        label_ids = torch.tensor(example['labels']).unsqueeze(0).to(device)

        with torch.no_grad():
            outputs = model.generate(input_ids, max_length=max_length)

        nl_input = tokenizer.decode(input_ids[0], skip_special_tokens=True)
        pred_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
        label_text = tokenizer.decode(label_ids[0], skip_special_tokens=True)

        pred = fol_postprocess(pred_text)
        ref = fol_postprocess(label_text)
        predXOR = fol_postprocessXOR(pred_text)
        refXOR = fol_postprocessXOR(label_text)
        abstracted_pred, abstracted_ref = predicate_abstraction(pred, ref)

        print("\n--- Evaluation Instance ---")
        print(f"Natural Language Input:  {nl_input}")
        print(f"Generated Expression:    {pred}")
        print(f"Reference Expression:    {ref}")
        print(f"Abstracted Prediction:   {abstracted_pred}")
        print(f"Abstracted Reference:    {abstracted_ref}")

        is_well_formed_pred = check_grammar(pred)
        is_well_formed_ref = check_grammar(ref)
        print(f"Prediction Well-Formed:  {is_well_formed_pred}")
        print(f"Reference Well-Formed:   {is_well_formed_ref}")

        if is_well_formed_pred and is_well_formed_ref:
            metrics['well_formed'] += 1

            is_exact = (pred == ref)
            is_formal = compare_structure(pred, ref)
            is_equiv = equivalence(predXOR, refXOR)

            print(f"Exact Match:             {is_exact}")
            print(f"Formally Accurate:       {is_formal}")
            print(f"Logically Equivalent:    {is_equiv}")

            if is_exact:
                metrics['accurate'] += 1
            if is_formal:
                metrics['formally_accurate'] += 1
            elif is_equiv:
                metrics['equivalent'] += 1
            else:
                metrics['false_cases'].append({
                    'input_ids': input_ids.squeeze(0).cpu(),
                    'reference': ref,
                    'nl_sentence': nl_input,
                    'prediction': pred
                })
        else:
            print("One or both expressions ill-formed.")
            metrics['ill_formed'] += 1
            metrics['false_cases'].append({
                'input_ids': input_ids.squeeze(0).cpu(),
                'reference': ref,
                'nl_sentence': nl_input,
                'prediction': pred
            })

    total = len(dataset)
    # denom = max(total - metrics['ill_formed'], 1) # For omitting ill-formed expressions
    denom = total

    return {
        'accuracy_rate': metrics['accurate'] / denom,
        'formal_rate': metrics['formally_accurate'] / denom,
        'equivalent_rate': (metrics['formally_accurate'] + metrics['equivalent']) / denom,
        'well_formed_rate': metrics['well_formed'] / total,
        'false_cases': metrics['false_cases'],
        'det_correct': metrics['formally_accurate'] + metrics['equivalent'],
        'det_total': denom
    }

def stochastic_reranking(false_cases, model, tokenizer, device, num_samples=10):
    improved_count = 0
    results = []
    corrected_cases = []

    for case in false_cases:
        input_ids = case['input_ids'].unsqueeze(0).to(device)
        nl_sentence = case['nl_sentence']
        reference = case['reference']

        with torch.no_grad():
            outputs = model.generate(
                input_ids,
                max_length=max_length,
                do_sample=True,
                top_k=50,
                top_p=0.95,
                temperature=0.8,
                num_return_sequences=num_samples
            )

        successful = False
        best_candidate = None
        samples = []

        for output in outputs:
            pred = fol_postprocess(tokenizer.decode(output, skip_special_tokens=True))
            well_formed = check_grammar(pred)
            equivalent = equivalence(pred, reference)

            samples.append({
                'prediction': pred,
                'well_formed': well_formed,
                'equivalent': equivalent
            })

            if equivalent:
                if not successful:
                    best_candidate = pred
                successful = True

        if successful:
            improved_count += 1
            print("\n--- CORRECTION FOUND ---")
            print(f"Natural Language Sentence: {nl_sentence}")
            print(f"Reference FOL:             {case['reference']}")
            print(f"Incorrect Prediction:      {case['prediction']}")
            print(f"Corrected Prediction:      {best_candidate}")

            corrected_cases.append({
                'nl_sentence': nl_sentence,
                'previous_prediction': case['prediction'],
                'reference': case['reference'],
                'corrected_prediction': best_candidate
            })

        results.append({
            'nl_sentence': nl_sentence,
            'reference': reference,
            'samples': samples
        })

    improvement_rate = improved_count / len(false_cases) if false_cases else 0
    return improvement_rate, results, corrected_cases, improved_count



In [None]:
def evaluate_model():
    base_results = deterministic_eval(test_dataset, model, tokenizer, device)

    output_lines = []
    output_lines.append(f"Model Path: {model_path}")
    output_lines.append(f"Tokenizer: {model_name}")
    output_lines.append("\n--- Deterministic Evaluation ---")
    output_lines.append(f"Well-formed:       {base_results['well_formed_rate']:.4f}")
    output_lines.append(f"Exact match:       {base_results['accuracy_rate']:.4f}")
    output_lines.append(f"Formal match:      {base_results['formal_rate']:.4f}")
    output_lines.append(f"Equivalence:       {base_results['equivalent_rate']:.4f}")

    improvement_rate, samples, corrected_cases, corrected_count = stochastic_reranking(
        base_results['false_cases'], model, tokenizer, device
    )

    corrected_total = base_results['det_correct'] + corrected_count
    total_evaluated = base_results['det_total']
    corrected_equiv_rate = corrected_total / total_evaluated

    output_lines.append("\n--- Stochastic Reranking ---")
    output_lines.append(f"Deterministic Score:     {base_results['equivalent_rate']:.4f}")
    output_lines.append(f"Corrected Predictions:   {corrected_count}")
    output_lines.append(f"Final Score:             {corrected_equiv_rate:.4f}")
    output_lines.append(f"Overall Total Evaluated: {total_evaluated}")

    model_name = pretrained_model.split("/")[-1].replace("-", "_")
    output_filename = f"{model_name}_evaluation_results.txt"
    output_path = output_filename

    with open(output_path, "w") as f:
        for line in output_lines:
            f.write(line + "\n")

    return base_results, improvement_rate, samples, corrected_cases

evaluate_model()
