# Project Description

This notebook attempts to achieve a high accuracy of ReCOGS via instructions on how to chunk a sentence.

In [None]:
from collections import defaultdict
from itertools import product
import re

def variable_change(phi, sourcevar, targetvar, flag="000000"):
    replace_re = re.compile(rf"\b{sourcevar}\b")
    return replace_re.sub(f"{flag}{targetvar}", phi)


def recogs_exact_match(gold, pred, flag="000000"):

    flag = "000000"
    if not isinstance(gold, str):
        gold = normalize_formula(gold.logical_form)
        pred = normalize_formula(pred.logical_form)
    

    gold_conj_set = get_conj_set(gold)
    # Loop over all viable mappings from pred_vars to gold_vars:
    for this_map in _candidate_variable_maps(gold, pred):
        phi = pred
        # For each mapping, we need to replace the variables in        
        for sourcevar, targetvar in this_map.items():
            # The flag makes sure we don't accidentally do a chain
            # of replacements via successive changes in situations
            # where the domain and range of `this_map` share vars.
            phi = variable_change(phi, sourcevar, targetvar, flag=flag)
        phi = phi.replace(flag, "")
        phi_conj_set = get_conj_set(phi)
        # This step assumes that we have no conjuncts that are
        # tautologies, contradictions, or equality predications. If
        # such are introduced, they need to be identified ahead of
        # time and treated separately -- tautologies would be removed,
        # contradictions would reduce to comparisons of only those
        # conjuncts, and equality statements would call for special
        # handling related to variables mapping.
        if phi_conj_set == gold_conj_set:
            return True
    return False


def normalize_formula(phi):
    return phi.replace(" ", "").replace("AND" , " AND ")


binary_pred_re = re.compile(r"""
    (\w+)
    \s*
    \(
    \s*
    (\d+)
    \s*
    ,
    \s*
    (\d+)
    \s*
    \)""", re.VERBOSE)


unary_pred_re = re.compile(r"""
    (\w+)
    \s*
    \(
    \s*
    (\d+)
    \s*
    \)""", re.VERBOSE)


def _candidate_variable_maps(gold, pred):
    # This creates a mapping from tuples of predicates into their
    # associated variables. These serve as equivalence classes over
    # variables that could possibly be translations of each other.
    gold_map = _map_get_preds_to_vars(gold)
    pred_map = _map_get_preds_to_vars(pred)

    # For each prediction variable, get the set of potential
    # translations for it:
    pred2gold = defaultdict(list)
    for preds, pvars in pred_map.items():
        gvars = gold_map[preds]
        for pvar in pvars:
            pred2gold[pvar] = gold_map[preds]

    # Variable sets:
    gold_vars = set(get_variables(gold))
    pred_vars = set(get_variables(pred))

    # Now generate potentially viable mappings:
    for vals in list(product(*list(pred2gold.values()))):
        d = dict(zip(pred2gold.keys(), vals))
        if set(d.keys()) == pred_vars and set(d.values()) == gold_vars:
            yield d


def _map_get_preds_to_vars(phi):
    var2pred = defaultdict(list)
    for pred, var in unary_pred_re.findall(phi):
        var2pred[var].append(pred)
    # We could do somewhat less search by specializing to first and
    # second position for these predicates, but I think it's fine
    # as-is.
    for pred, var1, var2 in binary_pred_re.findall(phi):
        var2pred[var1].append(pred)
        var2pred[var2].append(pred)
    pred2var = defaultdict(list)
    for var, preds in var2pred.items():
        pred2var[tuple(sorted(preds))].append(var)
    return pred2var


def get_variables(phi):
    variable_re = re.compile(r"(\d+)")
    return variable_re.findall(phi)


def get_conj_set(phi):
    conj_splitter_re  = re.compile(r"\s*(?:AND|;)\s*")
    return set(conj_splitter_re.split(phi))




In [115]:
import pandas as pd
SRC_DIRNAME = "data"

def load_split(filename):
    return pd.read_csv(
        filename,
        delimiter="\t",
        names=['input', 'output', 'category'])

dataset = {}

for splitname in ("RECOGStrain", "RECOGSdev", "RECOGSgen"):
    dataset[splitname] = load_split(f"{SRC_DIRNAME}/{splitname}.tsv")

In [116]:
test_sentence = dataset["RECOGSgen"]["input"].tolist()[1]
print (test_sentence)

gold_Label = dataset["RECOGSgen"]["output"].tolist()[1]
print (gold_Label)

Zoe thought that a hippo cleaned .
Zoe ( 10 ) ; hippo ( 36 ) ; think ( 3 ) AND agent ( 3 , 10 ) AND ccomp ( 3 , 44 ) AND clean ( 44 ) AND agent ( 44 , 36 )


In [117]:
import dspy

dspy_recogs_train = [
    dspy.Example(
        sentence=row['input'], logical_form=row['output']
    ).with_inputs("sentence")
    for _, row in dataset['RECOGStrain'].iterrows()]

dspy_recogs_dev = [
    dspy.Example(
        sentence=row['input'], logical_form=row['output']
    ).with_inputs("sentence")
    for _, row in dataset['RECOGSdev'].iterrows()]
    

Average Metric: 0 / 446  (0.0):   2%|▏         | 446/24155 [01:16<1:07:27,  5.86it/s]


In [118]:
from datasets import load_dataset
import openai
import os
import dspy
from dotenv import load_dotenv
import spacy

root_path = '.'
os.environ["DSP_NOTEBOOK_CACHEDIR"] = os.path.join(root_path, 'cache')
# keep the API keys in a `.env` file in the local root directory
load_dotenv()
openai_key = os.getenv('OPENAI_API_KEY')  # use the .env file as it is a good practice to keep keys outside of one's code

In [119]:
lm = dspy.OpenAI(model='gpt-3.5-turbo', api_key=openai_key)
dspy.settings.configure(lm=lm)

In [120]:
class HierarchyParse(dspy.Signature):
    __doc__ = """Parse a sentence into a hierarchy of phrases."""
    sentence = dspy.InputField(desc="a sentence to parse")
    hierarchy = dspy.OutputField(desc="propose a hierarchy of phrases in the sentence; the hierarchy is marked by opening and closing brackets for each chunk and can be nested; the output should be a string")

class ReCOGS(dspy.Signature):
    __doc__ = """Given a sentence hierarchy, convert it to a ReCOGS logical form"""
    hierarchy = dspy.InputField(desc="a hierarchy of phrases in the sentence")
    logical_form = dspy.OutputField(desc="the ReCOGS logical form of the sentence hierarchy. the output should be a string")

class BasicParse(dspy.Module):
    def __init__(self):
        super().__init__()
        self.parse = dspy.Predict(HierarchyParse)
        self.generaterecogs = dspy.Predict(ReCOGS)

    def forward(self, sentence):
        hierarchy = self.parse(sentence=sentence).hierarchy
        return self.generaterecogs(hierarchy=hierarchy)

In [None]:
from collections import defaultdict
from itertools import product
import re

def variable_change(phi, sourcevar, targetvar, flag="000000"):
    replace_re = re.compile(rf"\b{sourcevar}\b")
    return replace_re.sub(f"{flag}{targetvar}", phi)


def recogs_exact_match(gold, pred, flag="000000"):

    flag = "000000"
    if not isinstance(gold, str):
        gold = normalize_formula(gold.logical_form)
        pred = normalize_formula(pred.logical_form)
    

    gold_conj_set = get_conj_set(gold)
    # Loop over all viable mappings from pred_vars to gold_vars:
    for this_map in _candidate_variable_maps(gold, pred):
        phi = pred
        # For each mapping, we need to replace the variables in        
        for sourcevar, targetvar in this_map.items():
            # The flag makes sure we don't accidentally do a chain
            # of replacements via successive changes in situations
            # where the domain and range of `this_map` share vars.
            phi = variable_change(phi, sourcevar, targetvar, flag=flag)
        phi = phi.replace(flag, "")
        phi_conj_set = get_conj_set(phi)
        # This step assumes that we have no conjuncts that are
        # tautologies, contradictions, or equality predications. If
        # such are introduced, they need to be identified ahead of
        # time and treated separately -- tautologies would be removed,
        # contradictions would reduce to comparisons of only those
        # conjuncts, and equality statements would call for special
        # handling related to variables mapping.
        if phi_conj_set == gold_conj_set:
            return True
    return False


def normalize_formula(phi):
    return phi.replace(" ", "").replace("AND" , " AND ")


binary_pred_re = re.compile(r"""
    (\w+)
    \s*
    \(
    \s*
    (\d+)
    \s*
    ,
    \s*
    (\d+)
    \s*
    \)""", re.VERBOSE)


unary_pred_re = re.compile(r"""
    (\w+)
    \s*
    \(
    \s*
    (\d+)
    \s*
    \)""", re.VERBOSE)


def _candidate_variable_maps(gold, pred):
    # This creates a mapping from tuples of predicates into their
    # associated variables. These serve as equivalence classes over
    # variables that could possibly be translations of each other.
    gold_map = _map_get_preds_to_vars(gold)
    pred_map = _map_get_preds_to_vars(pred)

    # For each prediction variable, get the set of potential
    # translations for it:
    pred2gold = defaultdict(list)
    for preds, pvars in pred_map.items():
        gvars = gold_map[preds]
        for pvar in pvars:
            pred2gold[pvar] = gold_map[preds]

    # Variable sets:
    gold_vars = set(get_variables(gold))
    pred_vars = set(get_variables(pred))

    # Now generate potentially viable mappings:
    for vals in list(product(*list(pred2gold.values()))):
        d = dict(zip(pred2gold.keys(), vals))
        if set(d.keys()) == pred_vars and set(d.values()) == gold_vars:
            yield d


def _map_get_preds_to_vars(phi):
    var2pred = defaultdict(list)
    for pred, var in unary_pred_re.findall(phi):
        var2pred[var].append(pred)
    # We could do somewhat less search by specializing to first and
    # second position for these predicates, but I think it's fine
    # as-is.
    for pred, var1, var2 in binary_pred_re.findall(phi):
        var2pred[var1].append(pred)
        var2pred[var2].append(pred)
    pred2var = defaultdict(list)
    for var, preds in var2pred.items():
        pred2var[tuple(sorted(preds))].append(var)
    return pred2var


def get_variables(phi):
    variable_re = re.compile(r"(\d+)")
    return variable_re.findall(phi)


def get_conj_set(phi):
    conj_splitter_re  = re.compile(r"\s*(?:AND|;)\s*")
    return set(conj_splitter_re.split(phi))




In [121]:
from dspy.teleprompt import BootstrapFewShot, LabeledFewShot, BootstrapFewShotWithRandomSearch

fewshot_optimizer = BootstrapFewShot(metric=recogs_exact_match)
compiled = fewshot_optimizer.compile(student = BasicParse(), trainset=dspy_recogs_train)

  0%|          | 13/24155 [00:00<00:15, 1562.66it/s]

Bootstrapped 4 full traces after 14 examples in round 0.





In [124]:
ssamp = dataset['RECOGSgen'].sample(25)
ssamp['prediction'] = ssamp.input.apply(
    lambda x: compiled(sentence=x).logical_form)

In [125]:

ssamp['correct'] = ssamp.apply(
    lambda row: recogs_exact_match(row['output'], row['prediction']), axis=1)  
ssamp['correct'].sum() / ssamp.shape[0]      

np.float64(0.32)

In [130]:
import dspy

# Step 1: Define a signature
class SemanticParsingSignature(dspy.Signature):
    """Map a sentence to a structured logical form."""
    sentence = dspy.InputField()
    logical_form = dspy.OutputField()

# Step 2: Create few-shot examples from the study examples
fewshot_examples = [
    dspy.Example(sentence="A worm held Luna .",
                 logical_form="TOWEL ( X _ 1 ) AND HOLD &. AGENT ( X _ 2 , X _ 1 ) AND HOLD &. THEME ( X _ 2 , OLIVER )"),

    dspy.Example(sentence="Hannah burned the valve beside the trunk .",
                 logical_form="* FARMER ( X _ 3 ) ; * BEE ( X _ 6 ) ; BURN &. AGENT ( X _ 1 , ARIA ) AND BURN &. THEME ( X _ 1 , X _ 3 ) AND FARMER &. NMOD &. BESIDE ( X _ 3 , X _ 6 )"),

    dspy.Example(sentence="A china was eaten by a shoebox .",
                 logical_form="JEEP ( X _ 1 ) AND PAINT &. THEME ( X _ 3 , X _ 1 ) AND PAINT &. AGENT ( X _ 3 , X _ 6 ) AND SPEAKER ( X _ 6 )"),

    dspy.Example(sentence="Mila liked the newspaper beside the hero .",
                 logical_form="* STAND ( X _ 3 ) ; * MOUND ( X _ 6 ) ; LIKE &. AGENT ( X _ 1 , GRAYSON ) AND LIKE &. THEME ( X _ 1 , X _ 3 ) AND STAND &. NMOD &. BESIDE ( X _ 3 , X _ 6 )"),

    dspy.Example(sentence="The closet laughed .",
                 logical_form="* CRACKER ( X _ 1 ) ; LAUGH &. AGENT ( X _ 2 , X _ 1 )"),

    dspy.Example(sentence="Christopher posted the driver on a glacier to William .",
                 logical_form="* PILLOW ( X _ 3 ) ; PASS &. AGENT ( X _ 1 , LAYLA ) AND PASS &. THEME ( X _ 1 , X _ 3 ) AND PASS &. RECIPIENT ( X _ 1 , ASHER ) AND PILLOW &. NMOD &. ON ( X _ 3 , X _ 6 ) AND FROG ( X _ 6 )"),

    dspy.Example(sentence="Charlie found a futon in a bun .",
                 logical_form="FIND &. AGENT ( X _ 1 , GRACE ) AND FIND &. THEME ( X _ 1 , X _ 3 ) AND ROSE ( X _ 3 ) AND ROSE &. NMOD &. IN ( X _ 3 , X _ 6 ) AND BARON ( X _ 6 )"),

    dspy.Example(sentence="The knife was collapsed by a trap .",
                 logical_form="* FLOWER ( X _ 1 ) ; COLLAPSE &. THEME ( X _ 3 , X _ 1 ) AND COLLAPSE &. AGENT ( X _ 3 , X _ 6 ) AND POOL ( X _ 6 )")
]

# Step 3: Create a DSPy module
class MetaLearner(dspy.Module):
    def __init__(self):
        super().__init__()
        self.predictor = dspy.Predict(SemanticParsingSignature)

    def forward(self, sentence):
        return self.predictor(sentence=sentence)

# Step 4: Instantiate the model and compile with few-shot examples
meta_parser = MetaLearner()
fewshot_optimizer = LabeledFewShot(k=8)
compiled = fewshot_optimizer.compile(student = BasicParse(), trainset=fewshot_examples)


In [132]:
query = "A driver was helped by a shoebox ."
prediction = compiled(sentence=query)
print("Predicted Logical Form:\n", prediction.logical_form)

Predicted Logical Form:
 DRIVER ( X _ 1 ) AND HELP &. AGENT ( X _ 2 , X _ 1 ) AND HELP &. THEME ( X _ 2 , X _ 3 ) AND SHOEBOX ( X _ 3 )
