## Experiments

In [None]:
## check balancing of FOLIO dataset
import json
with open('data/FOLIO/train.json', 'r') as f:
    folio = json.load(f)
    
with open('data/FOLIO/dev.json', 'r') as f:
    f_dev = json.load(f)
    
    
with open('data/LogicNLI/train.json', 'r') as f:
    lni = json.load(f)

with open('data/LogicNLI/dev.json', 'r') as f:
    l_dev = json.load(f)

answers_folio = [sample['answer'] for sample in folio]
answers_folio_dev = [sample['answer'] for sample in f_dev]

answers_lni = [sample['answer'] for sample in lni]
answers_lni_dev = [sample['answer'] for sample in l_dev]

In [None]:
import pandas as pd

answers_folio = pd.Series(answers_folio)
answers_folio_dev = pd.Series(answers_folio_dev)

answers_lni = pd.Series(answers_lni)
answers_lni_dev = pd.Series(answers_lni_dev)

In [None]:
answers_folio_all = pd.concat([answers_folio, answers_folio_dev])

In [None]:
answers_folio_all.value_counts()/len(answers_folio_all)

In [None]:
answers_folio.value_counts()/len(answers_folio)


In [None]:
answers_folio_dev.value_counts()/len(answers_folio_dev)

In [None]:
answers_lni.value_counts()/len(answers_lni)

## Get batch response

In [None]:
from openai import OpenAI
client = OpenAI()

In [None]:
import json
import re
def parse_batch_preds(path):
    
    with open('data/LogicNLI/dev.json', 'r') as f:
        dev_data = json.load(f)
        
    dev_data_dict = {d['id']: d for d in dev_data}
    
    with open(path, "r") as f:
        lines = f.readlines()
        
    raw_results = [json.loads(line) for line in lines]
    parsed_results = {}

    for result in raw_results:
        result_id = result["custom_id"]
        response = result['response']['body']['choices'][0]['message']['content'].replace('`', '').replace('json\n', '').replace('"""', '')
        
        response_json_str = re.sub(r'\\{',  '{', response)
        response_json_str = re.sub(r'\\}',  '}', response_json_str)
        
        response_json = json.loads(response_json_str)


        result_context = dev_data_dict[int(result_id)]['context']
        result_question = dev_data_dict[int(result_id)]['question']

    
        parsed_results[int(result_id)] = {
                "context": result_context,
                "question": result_question,
                "logic_predicates": response_json['First-Order-Logic Predicates'].split('\n')
            }
    

        
    return parsed_results

def parse_batch_progs(progs_path, preds_path):
    
    with open('data/LogicNLI/dev.json', 'r') as f:
        dev_data = json.load(f)
        
    dev_data_dict = {d['id']: d for d in dev_data}
    
    with open(preds_path, "r") as f:
        preds = json.load(f)
        
    preds_dict = {int(k): v for k, v in preds.items()} 
        
    
    with open(progs_path, "r") as f:
        lines = f.readlines()
        
    raw_results = [json.loads(line) for line in lines]
    parsed_results = []

    for result in raw_results:

        result_id = result["custom_id"]


        if result['response']['body']['choices'][0]['finish_reason'] != 'stop':
            parsed_results.append({
                    "id": int(result_id),
                    "context": dev_data_dict[int(result_id)]['context'],
                    "question": dev_data_dict[int(result_id)]['question'],
                    "answer": dev_data_dict[int(result_id)]['answer'],
                    "raw_logic_programs": json.loads('{\n"First-Order-Logic Rules": "No rules found",\n"First-Order-Logic Question": "No question found"\n}'),
                    "predicates": preds_dict[int(result_id)]['logic_predicates']
                }
            )
            continue


        response = json.loads(result['response']['body']['choices'][0]['message']['content'])
        
        parsed_results.append({
                "id": int(result_id),
                "context": dev_data_dict[int(result_id)]['context'],
                "question": dev_data_dict[int(result_id)]['question'],
                "answer": dev_data_dict[int(result_id)]['answer'],
                "raw_logic_programs": response,
                "predicates": preds_dict[int(result_id)]['logic_predicates']
            }
        )
        
    return parsed_results
            
        

In [None]:
batch = client.batches.retrieve("batch_xNhzhGB0yOgxUHzUUKNZEEv8")

if batch.status == "completed":
    print("Batch is completed")
    out_file_id = batch.output_file_id
    content = client.files.content(out_file_id)

In [None]:
content.write_to_file(".tmp/LogicNLI_dev_gpt-4o_dynamic.jsonl")

In [None]:
results = parse_batch_progs('.tmp/LogicNLI_dev_gpt-4o_dynamic.jsonl', 'outputs_3/logic_predicates/LogicNLI_dev_gpt-4o.json')

In [None]:
with open('outputs_3/logic_programs/LogicNLI_dev_gpt-4o_dynamic.json', 'w') as f:
    json.dump(results, f, indent=2, ensure_ascii=False)

In [None]:
import os


refiner_path = 'here/is/a/path/to/a/file.txt'
refiner_path.split('/')[-1].split('.')[0]

In [None]:
import os
import json


def compare_evaluate(result_path, dataset_name, split, model_name, prompt_mode, backup, self_refine_round, output_path):
    result_file = os.path.join(result_path, f'self-refine-{self_refine_round}_{dataset_name}_{split}_{model_name}_{prompt_mode}_backup-{backup}.json')
    original_result_file = os.path.join(result_path, f'{dataset_name}_{split}_{model_name}_{prompt_mode}_backup-{backup}.json')


    with open(original_result_file, 'r') as f:
        original_results = json.load(f)

    with open(result_file, 'r') as f:
        results = json.load(f)
        
        
    original_dict = {sample['id']: sample for sample in original_results}
    results_dict = {sample['id']: sample for sample in results}
    new_executable = [sample for sample_id, sample in results_dict.items() if sample['flag']=='success' and original_dict[sample_id]['flag'] != 'success']

    if not os.path.exists(f'compared_evaluation/{output_path}'):
        os.makedirs(f'compared_evaluation/{output_path}')

    with open(f'compared_evaluation/{output_path}/self-refine-{self_refine_round}_{dataset_name}_{split}_{model_name}_{prompt_mode}_backup-{backup}.json', 'w') as f:
        json.dump(new_executable, f, indent=4)

In [None]:
result_path = 'outputs_llama3_70B/logic_inference'
dataset_name = 'FOLIO'
split = 'dev'
model_names = ['gpt-3.5-turbo', 'gpt-4-turbo', 'gpt-4o']
prompt_mode = 'dynamic'
backup = 'random'
output_paths = ['llama3_70B', 'mixtral_8x7B', 'mistral_7B']

for output_path in output_paths:
    for model_name in model_names:
        for self_refine_round in range(1, 4):
            compare_evaluate(result_path, dataset_name, split, model_name, prompt_mode, backup, self_refine_round, output_path)


In [None]:
os.path.split(result_path)

## Count tokens

In [None]:
import tiktoken

enc = tiktoken.get_encoding('cl100k_base')

In [None]:
def num_tokens_from_string(string: str) -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.get_encoding('cl100k_base')
    num_tokens = len(encoding.encode(string))
    return num_tokens

In [None]:
import json

with open('data/LogicNLI/train.json', 'r') as f:
    data = json.load(f)
    
    total = 0
    for d in data:

        total += num_tokens_from_string(' '.join(d['context']))
        total += num_tokens_from_string(' '.join(d['context_fol']))
        total += num_tokens_from_string(d['question'])
        total += num_tokens_from_string(d['question_fol'])
    
    
with open('data/LogicNLI/dev.json', 'r') as f:
    data = json.load(f)
    
    for d in data:

        total += num_tokens_from_string(' '.join(d['context']))
        total += num_tokens_from_string(' '.join(d['context_fol']))
        total += num_tokens_from_string(d['question'])
        total += num_tokens_from_string(d['question_fol'])
    
    print(total)

In [None]:
(total/1000000)*0.13

## Convert LogicNLI 

In [None]:
import json

with open('data/LogicNLI_original/dev_language.json', 'r') as f:
    sample_language = json.load(f)

with open('data/LogicNLI_original/dev_logic.json', 'r') as f:
    sample_logic = json.load(f)


In [None]:
label_mapping = {
    'entailment': 'A',
    'contradiction': 'B',
    'self_contradiction': 'B',
    'neutral': 'C'
}

In [None]:

output = []
question_id = 0

for story_id, ((data_id, data), (statement_id, statement)) in enumerate(zip(sample_logic.items(), sample_language.items())):
    
    context_fol = []
    context = []
    for (fact_id, fact), (fact_nl) in zip(data['facts'].items(), statement['facts']):
        
        subject = fact[0].lower()
        attribute = fact[1].capitalize()
        polarity = fact[2]
        
        context_fol.append(f"{attribute}({subject})" if polarity == '+' else f"¬{attribute}({subject})")
        context.append(fact_nl)
        
    for (rule_id, rule_data), (rule_nl) in zip(data['rules'].items(), statement['rules']):
        premise = []
        
        same_quant_subject = all(fact[0] == rule_data['p']['fact'][0][0] for fact in rule_data['p']['fact']) and all(fact[0] in ['all', 'exist'] for fact in rule_data['p']['fact'])
        
        for fact in rule_data['p']['fact']:
            
            attribute = fact[1].capitalize()
            polarity = fact[2]
            
            if same_quant_subject:
                quant = "∃x " if fact[0] == 'exist' else "∀x "
                
                premise.append(f'{attribute}(x)' if polarity == '+' else f'¬{attribute}(x)')
            
            else:
                if fact[0] == 'exist':
                    premise.append(f"∃x " + (f'{attribute}(x)' if polarity == '+' else f'¬{attribute}(x)'))
                elif fact[0] == 'all':
                    premise.append(f"∀x " + (f'{attribute}(x)' if polarity == '+' else f'¬{attribute}(x)'))
                else:
                    subject = fact[0].lower()
                    premise.append(f'{attribute}({subject})' if polarity == '+' else f'¬{attribute}({subject})')
                
        if same_quant_subject:
            premise_str = quant + '('
        else:
            premise_str = '('
        premise_str += ' ∨ '.join(premise) if rule_data['p']['conj'] == 'or' else ' ∧ '.join(premise)
        premise_str += ')'

        same_quant_subject = all(fact[0] == rule_data['q']['fact'][0][0] for fact in rule_data['q']['fact']) and all(fact[0] in ['all', 'exist'] for fact in rule_data['q']['fact'])

        conclusion = []
        for fact in rule_data['q']['fact']:
            
            attribute = fact[1].capitalize()
            polarity = fact[2]
            
            if same_quant_subject:
                quant = "∃x " if fact[0] == 'exist' else "∀x "
                
                conclusion.append(f'{attribute}(x)' if polarity == '+' else f'¬{attribute}(x)')
            
            else:
                if fact[0] == 'exist':
                    conclusion.append(f"∃x " + (f'{attribute}(x)' if polarity == '+' else f'¬{attribute}(x)'))
                elif fact[0] == 'all':
                    conclusion.append(f"∀x " + (f'{attribute}(x)' if polarity == '+' else f'¬{attribute}(x)'))
                else:
                    subject = fact[0].lower()
                    conclusion.append(f'{attribute}({subject})' if polarity == '+' else f'¬{attribute}({subject})')
                
                
        if same_quant_subject:
            conclusion_str = quant + '('
        else:
            conclusion_str = '('
        conclusion_str += ' ∨ '.join(conclusion) if rule_data['q']['conj'] == 'or' else ' ∧ '.join(conclusion)
        conclusion_str += ')'
        
        # print(conclusion_str)

        if rule_data['type'] == 'imp':
            context_fol.append(f"({premise_str}) → ({conclusion_str})")
        else:
            context_fol.append(f"({premise_str}) ↔ ({conclusion_str})")
            
        context.append(rule_nl)
    
    
    for ((_, question), (question_nl), (label)) in zip(data['statements'].items(), statement['statements'], statement['labels']):

        if label == 'self_contradiction':
            continue

        attribute = question[1].capitalize()
        polarity = question[2]
        subject = question[0].lower()
        
        question_fol = f'{attribute}({subject})' if polarity == '+' else f'¬{attribute}({subject})'        
        
        output.append({
            'id': int(question_id),
            'story_id': int(story_id),
            'context': context,
            'context_fol': context_fol,
            'question': question_nl,
            'question_fol': question_fol,
            'answer': label_mapping[label]
        })
        
        question_id += 1


In [None]:
with open('data/LogicNLI/dev.json', 'w') as f:
    json.dump(output, f, indent=2, ensure_ascii=False)

In [None]:
with open('data/LogicNLI/train.json', 'r') as f:
    a = json.load(f)

## Predicates

In [None]:
import re
import json

reg = re.compile(r'(?!x|y|z|w)\b\w+\s*\([^\)]+\)')

with open('data/LogicNLI/dev.json', 'r') as f:
    train = json.load(f)

In [None]:
args_map = {
    1: 'x',
    2: 'x, y',
    3: 'x, y, z',
    4: 'x, y, z, w'
}

In [None]:
def clean_predicate(predicate):
    name = predicate.split('(')[0].strip()
    arguments = [p.split() for p in predicate.split('(')[1][:-1].split(',')]
    
    n_arguments = len(arguments)
    
    clean_predicate = name + '(' + args_map[n_arguments] + ')'

    return clean_predicate

In [None]:
for point in train:
    formulas = ' '.join(point['context_fol'])
    preds = list(set(re.findall(reg, formulas)))
    
    clean_preds = list(set(clean_predicate(pred) for pred in preds))
    
    point['logic_predicates'] = clean_preds

In [None]:
with open('data/LogicNLI/dev.json', 'w') as f:
    json.dump(train, f, indent=2, ensure_ascii=False)

## The rest

In [None]:
import json

In [None]:
with open('outputs/logic_predicates/FOLIO_dev_gpt-3.5-turbo.json', 'r') as f:
    preds = json.load(f)

In [None]:
with open('outputs/logic_programs/FOLIO_dev_gpt-3.5-turbo_static.json', 'r') as f:
    progs = json.load(f)

In [None]:
preds.keys()

In [None]:
for prog in progs:
    if str(prog['id']) in preds:
        prog['predicates'] = preds[str(prog['id'])]['logic_predicates']
    else:
        prog['predicates'] = []

In [None]:
progs

## GCD

In [None]:
user = """
String:
'''
1. e4 e5
2. Nf3 Nc6
3. Bb5 a6
4. Ba4 Nf6
5. O-O Be7
6. Re1 b5
7. Bb3 d6
8. c3 O-O
9. h3 Na5
10. Bc2 c5
11. d4 Qc7
12. Nbd2 Nc6
13. Nf1 Re8
14. Ng3 Bf8
15. d5 Nb8
16. Nh4 g6
17. Qf3 Bg7
18. Bg5 Nbd7
19. Rad1 h6
20. Bc1 Nf8
'''
Grammar:
'''
root    ::= "1. " move " " move "\n" ([1-9] [0-9]? ". " move " " move "\n")+
move    ::= (pawn | nonpawn | castle) [+#]?

# piece type, optional file/rank, optional capture, dest file & rank
nonpawn ::= [NBKQR] [a-h]? [1-8]? "x"? [a-h] [1-8]

# optional file & capture, dest file & rank, optional promotion
pawn    ::= ([a-h] "x")? [a-h] [1-8] ("=" [NBKQR])?

castle  ::= "O-O" "-O"?
'''
------
String:
'''
x = 42
y = x + 3
z = (y - 2) * 5
result = z / (x + y)
a = 100 - 50
b = (a + result) * 2
c = b / 4
d = c - a + x
e = (d * 2) + (x - 3) / 7
'''
Grammar:
'''
root  ::= (expr "=" ws term "\n")+
expr  ::= term ([-+*/] term)*
term  ::= ident | num | "(" ws expr ")" ws
ident ::= [a-z] [a-z0-9_]* ws
num   ::= [0-9]+ ws
ws    ::= [ \t\n]*
'''
------
String:
'''
"Czech(miroslav) ∧ ChoralConductor(miroslav) ∧ Specialize(miroslav, renaissance) ∧ Specialize(miroslav, baroque)",
"∀x (ChoralConductor(x) → Musician(x))",
"∃x (Musician(x) → Love(x, music))",
"Book(methodOfStudyingGregorianChant) ∧ Author(miroslav, methodOfStudyingGregorianChant) ∧ Publish(methodOfStudyingGregorianChant, year1946)"
'''
Grammar:
'''
"""

In [None]:
task_description = """
Given a string, the task is to write the grammar that can generate that string, in EBNF format. To do so, follow the provided examples
"""

In [None]:
raw_grammar = """
root ::= ProductionList 

ProductionList ::= Production ProductionList | Production 

Production ::= NonTerminal "::=" RHS 

RHS ::= Symbol RHS | Symbol 

Symbol ::= Terminal | NonTerminal | "|" 

NonTerminal ::= [A-Z]

Terminal ::= [a-z]
"""

In [None]:
from llama_cpp.llama import LlamaGrammar
from llama_cpp import Llama

model_path = "GCD/llms/mistral-7b-instruct-v0.2.Q6_K.gguf"
n_ctx = 2048
n_gpu_layers = -1
n_batch = 512

llm = Llama(
    model_path=model_path,
    n_gpu_layers=n_gpu_layers,
    n_batch=n_batch,
    n_ctx = n_ctx,
    f16_kv=True,  # MUST set to True, otherwise you will run into problem after a couple of calls
    verbose = False
)



In [None]:

# max_tokens=50 
# temperature=0.01
# frequency_penalty=0.0
# repeat_penalty=1.1
# # presence_penalty=0.0
# top_p=0.9
# top_k=20
stop=['------']

grammar = LlamaGrammar.from_string(raw_grammar, verbose=False)


In [None]:

result = llm.create_chat_completion(
messages=[
    {"role": "system", "content": task_description},
    {"role": "user", "content": user}
    ],
max_tokens = 100,

# frequency_penalty = frequency_penalty,
# repeat_penalty = repeat_penalty,
# # presence_penalty = presence_penalty,

# temperature=temperature,

# top_p=top_p,
# top_k=top_k,

# stop = stop,
grammar = grammar,
)

In [None]:
result['choices'][0]['message']['content']

## Logic

In [None]:
import re
import json
import os
from copy import deepcopy
from nltk.inference.prover9 import Prover9Command
from nltk.sem.logic import *

from models.symbolic_solvers.fol_solver.Formula_util import FOL_Formula
from models.symbolic_solvers.fol_solver.fol_prover9_parser import Prover9_FOL_Formula

from sentence_transformers import SentenceTransformer, util
import numpy as np
from tqdm.autonotebook import tqdm

os.environ['PROVER9'] = './models/symbolic_solvers/Prover9/bin'

Samples removed from dev:

- 66
- 67
- 68
- 138
- 139
- 171

Samples removed from train:

- 691

In [None]:
original = Expression.fromstring('all x.(Roundel(x) -> -Higher(x) -> Lower(x))')
new = Expression.fromstring('all x y.exists z(Know(x, z) & Know(y, z) & UniversalLanguage(z)) -> Communicate(x, y)')

original.equiv(new)

In [None]:
original

(all x.(GSCjasdhjvfd))

∀x (WantlongVacation(x) → (Love(x, summer) ∧ ¬Love(x, spring) ∧ ¬Love(x, fall) ∧ ¬Love(x, winter)))

In [None]:
context_fol= ['∀x (nail(x) → metal(x))',
 '∀x (metal(x) → conductive(x))']

question_fol = '∀x (nail(x) → conductive(x))'

In [None]:
ass= []

for s in context_fol:
    fol_formula = FOL_Formula(s)

    if fol_formula.is_valid:
        prover9_formula = Prover9_FOL_Formula(fol_formula).formula
        # 
        ass.append(Expression.fromstring(prover9_formula))
    else:
        print(f'error: ', s)

In [None]:
goal = Expression.fromstring(Prover9_FOL_Formula(FOL_Formula(question_fol)).formula)

In [None]:
prover = Prover9Command(goal, ass)

In [None]:
goal = NegatedExpression(goal)

In [None]:
prover = Prover9Command(goal, ass)

In [None]:
prover.prove()

In [None]:
prover.proof(simplify=False)

In [None]:
ass

In [None]:
with open('old/outputs/logic_inference/FOLIO_dev_gpt-3.5-turbo_static_text_backup-LLM.json', 'r') as f:
    old_inference = json.load(f)

In [None]:
with open('outputs/logic_inference/FOLIO_dev_gpt-3.5-turbo_static_text_backup-LLM.json', 'r') as f:
    new_inference = json.load(f)

## Conversion Datasets

In [None]:
with open('parsing/new_formulas_v0/FOLIO_dev_gpt-3.5-turbo_static_text.json', 'r') as j:
    folio_v0_train_parsed = json.load(j)

In [None]:
with open('data/FOLIO/dev.json', 'r') as j:
    folio_v0_train = json.load(j)

In [None]:
folio_v0_train_dict = {val['id']: val for val in folio_v0_train}

In [None]:
folio_v0_train_parsed_dict = {val['id']: val for val in folio_v0_train_parsed}

In [None]:
for id in folio_v0_train_dict.keys():
    folio_v0_train_dict[id]['context_fol'] = folio_v0_train_parsed_dict[id]['assumptions']
    folio_v0_train_dict[id]['question_fol'] = folio_v0_train_parsed_dict[id]['goal']

In [None]:
new_folio_v0_train = [val for key, val in folio_v0_train_dict.items()]

In [None]:
with open('parsed_data/FOLIO/dev.json', 'w') as j:
    json.dump(new_folio_v0_train, j)