In [1]:
# from google.colab import drive

# drive.mount('/content/drive/')
# %cd Multistep-reasoning

In [1]:
import os

In [2]:
# Load the model
from entailment_bank.utils.nlp_agent import MultiAngleModel, NlpAgent
ew_model = MultiAngleModel(model_path="allenai/entailer-11b", cuda_devices=[10, 11])
prover = NlpAgent(model=ew_model, default_outputs="proof")
entail_verifier = NlpAgent(model=ew_model, default_outputs=["implied"], default_options={"explicit_outputs": ['true', 'false']})
hyp_verifier = NlpAgent(model=ew_model, default_outputs=["valid"], default_options={"explicit_outputs": ['true', 'false']})

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


In [5]:
# Try to prove a hypothesis
hyp = "The leaves of plant benefit from sun"
proof = prover({"hypothesis": hyp})
premises = [x.strip() for x in proof.split("[PREMISE]") if x.strip()]

In [6]:
hyp, premises

('The leaves of plant benefit from sun',
 ['A plant requires sunlight for photosynthesis.',
  'If a plant requires something for photosynthesis then that something benefits that plant.'])

In [7]:

# Does the model think the reasoning is good? Yes:
print(entail_verifier({"hypothesis": hyp, "proof": proof}))
# {'implied': 'true', 'output_prob': 0.9999831914921629}

# Does the model believe the original hypothesis? No:
print(hyp_verifier({"hypothesis": hyp}))
# {'valid': 'false', 'output_prob': 0.9711676239967346}

# Does the model believe the premises in the proof? Yes:
print(hyp_verifier({"hypothesis": premises[0]}))
# {'valid': 'true', 'output_prob': 0.9990471005439758}

print(hyp_verifier({"hypothesis": premises[1]}))
# {'valid': 'true', 'output_prob': 0.9997676014900208}

{'implied': 'true', 'output_prob': 0.9937287875578781}
{'valid': 'true', 'output_prob': 0.9677879113324139}
{'valid': 'true', 'output_prob': 0.998050973368786}
{'valid': 'true', 'output_prob': 0.992581887513396}


In [8]:
def generate_entailment_tree(hyp, model, prover, entail_verifier, hyp_verifier, visited, max_depth=3, k=5):
    """
    Entailer's backchaining algorithm for searching for the best proof tree(H) and score s(H) for a hypothesis H.
    """
    tree = {}
    


    visited[hyp] = True

    hyp_res = hyp_verifier({"hypothesis":hyp})
    
    sd_H = hyp_res['output_prob'] if hyp_res['valid']=='true' else 1-hyp_res['output_prob']
    cd_H = max(sd_H, 1-sd_H)
    
    if max_depth == 0:
        return sd_H, {}, {}, 0

    P = one_step(hyp, model, prover, entail_verifier, hyp_verifier, k=k)

    if P is None:
        # print(f"No good premise for HYP:{hyp}")
        return sd_H,{},{},0

    premises = [x.strip() for x in P.split("[PREMISE]") if x.strip()]

    ent_res = entail_verifier({"hypothesis": hyp, "proof":P})
    entail_score = ent_res['output_prob'] if ent_res['implied']=='true' else 1-ent_res['output_prob']
    
    p_tree = [None]*len(premises)
    p_score = [None]*len(premises)
    
    prem_scores = {}

    if entail_score>cd_H:
        sr_H = entail_score
        for i in range(len(premises)):
            if premises[i] not in visited.keys():
                p_score[i], p_tree[i], _, _ = generate_entailment_tree(premises[i], model, prover, entail_verifier, hyp_verifier, visited, max_depth-1, k)
                sr_H *= p_score[i]
                prem_scores[premises[i]] = p_score[i]
    else:
        # print("Poor entail score")
        # print("HYP:", hyp)
        # print(premises)
        # print(entail_score)
        for i in range(len(premises)):
            p_res = hyp_verifier({"hypothesis":premises[i]})
            p_score[i] = p_res['output_prob'] if p_res['valid']=='true' else 1-p_res['output_prob']
            prem_scores[premises[i]] = p_score[i]
            
        sr_H = 0

    cr_H = sr_H

    # if reasoning confidence is higher we expand the node
    # print(f"For HYP: {hyp}")
    # print(premises)
    # print(cr_H, cd_H)
    if cr_H > cd_H:
        tree_score = sr_H
        for i in range(len(premises)):
            tree[premises[i]] = p_tree[i]
    else:
        tree_score = sd_H

    # print(tree)

    return tree_score, tree, prem_scores, entail_score

def one_step(hyp, model, prover, entail_verfier, hyp_verifier, k=5):
    """
    Finds the best set of premises P which entail the hypothesis H
    """
    premises_dict = {}
    for i in range(k):
        proof = prover({"hypothesis": hyp})
        premises = [x.strip() for x in proof.split("[PREMISE]") if x.strip()]

        premise_score = [None]*len(premises)

        for j in range(len(premises)):
            ver_res = hyp_verifier({"hypothesis": premises[j]})

            premise_score[j] = ver_res['output_prob'] if ver_res['valid']=='true' else 1-ver_res['output_prob']

        entail_res = entail_verfier({"hypothesis": hyp, "proof": proof})
        entail_score = entail_res['output_prob'] if entail_res['implied']=='true' else 1-entail_res['output_prob']

        if min(premise_score)<0.5 or entail_score<0.5:
            continue

        score = entail_score
        for j in range(len(premises)):
            score *= premise_score[j]

        premises_dict[proof] = score

    if len(premises_dict.keys())>0:
        best_premises = max(premises_dict, key=lambda x: premises_dict[x])
    else:
        best_premises = None

    return best_premises

In [9]:
hyp = "Predators eat prey"
score, tree, prem_scores, entail_score = generate_entailment_tree(hyp, ew_model, prover, entail_verifier, hyp_verifier, {}, 2, 2)

In [10]:
print(score, tree)

0.9995912980789871 {}


In [11]:
print(prem_scores)
print(entail_score)

{'Predators eat other animals.': 0.9997791724095421, 'Prey is a kind of animal.': 0.999582878700273}
0.9998809122387073


In [1]:
from datasets import load_dataset
from tqdm import tqdm

dataset = load_dataset("allenai/openbookqa")

In [2]:
dataset["test"]

Dataset({
    features: ['id', 'question_stem', 'choices', 'answerKey'],
    num_rows: 500
})

In [14]:
import json

with open("obqa-result.jsonl", 'a') as out:
    for i, data in tqdm(enumerate(dataset['test'])):
        best_score, best_tree = 0, {}
        best_choice = "A"
        choices = {}
        for i in range(len(data["choices"]["text"])):
            # TODO Generate the hypothesis in a better way
            hyp = data["question_stem"]+" "+data["choices"]['text'][i]
            score, tree, prem_scores, entail_score = generate_entailment_tree(hyp, ew_model, prover, entail_verifier, hyp_verifier, {}, 2, 2)
            
            choices[data["choices"]["label"][i]] = {"premises": prem_scores, "entail_score": entail_score}
            
            if score>best_score:
                best_score = score
                best_tree = tree
                best_choice = choices[data["choices"]["label"][i]]
            
        json.dump({"HYP": data["question_stem"], "best_choice": best_choice, "tree": best_tree, "score": best_score, "choices": choices}, out)
        out.write('\n')   

500it [4:27:52, 32.15s/it]


In [16]:
# Measuring accuracy
crct = 0
comparisions = []

with open("obqa-result.jsonl", 'r') as f:
    lines = f.readlines()
    for i,line in enumerate(lines):
        line = line.replace('null', '-1')
        line = eval(line)
        label = dataset["test"][i]["answerKey"]
        if line["choices"][label]==line["best_choice"]:
            crct+=1
        else:
            comparisions.append({"Pred":line["best_choice"], "Actual": line["choices"][label]})

In [21]:
with open("comparisions.txt", 'a') as f:
    for comparision in comparisions:
        f.write(str(comparision)+'\n')