### Utils

In [6]:
import openai
import time
import tiktoken
encoding = tiktoken.get_encoding("cl100k_base")

openai.api_key = "" # Set your API key here

def get_GPT4_response(input, temp=1.0, max_tokens=256, logit_dict={}, model="gpt-4"):
    while True:
        try:
            completion = openai.ChatCompletion.create(
                model=model,
                messages=[
                {
                    "role": "system", 
                    "content": "You are a helpful factual assistant." 
                },
                {
                    "role": "user", 
                    "content": input
                }
                ],
                max_tokens=max_tokens,
                temperature=temp,
                logit_bias=logit_dict
            )
            break
        except Exception as e:
            sleep_time = 5
            print(e, f"Sleep {sleep_time} seconds.")
            time.sleep(sleep_time)
    # print(completion.usage)
    return completion.choices[0].message["content"]


def get_chat_response(inputs_list, temp=1.0, max_tokens=256, logit_dict={}):
    while True:
        try:
            completion = openai.ChatCompletion.create(
                model="gpt-4",
                messages=[
                {
                    "role": "system", 
                    "content": "You are a helpful factual assistant.\n" + inputs_list[0] 
                },
                {
                    "role": "user", 
                    "content": inputs_list[1]
                },
                {
                    "role": "assistant", 
                    "content": inputs_list[2]
                },
                {
                    "role": "user", 
                    "content": inputs_list[3]
                },
                {
                    "role": "assistant", 
                    "content": inputs_list[4]
                },
                {
                    "role": "user", 
                    "content": inputs_list[5]
                },
                ],
                max_tokens=max_tokens,
                temperature=temp,
                logit_bias=logit_dict
            )
            break
        except Exception as e:
            sleep_time = 5
            print(e, f"Sleep {sleep_time} seconds.")
            time.sleep(sleep_time)
    return completion.choices[0].message["content"]

In [5]:
def parse_sentence_to_conclusion_premise(each_rule):
    each_rule = each_rule.strip()

    assert ":- " in each_rule
    conclusion, premises = each_rule.split(":- ")
    premises_list = premises.split("),")
    for m in range(len(premises_list)):
        premises_list[m] = premises_list[m].strip()
        if m < len(premises_list) - 1:
            premises_list[m] = premises_list[m] + ")"
        elif premises_list[m][-1] == ";" or premises_list[m][-1] == ".":
            premises_list[m] = premises_list[m][:-1]
    return conclusion.strip(), premises_list

# parsing a premise/conclusion into arguments
def argument_parsing(premise, output_rela=False):
    premise = premise.strip()
    args_type_list = []
    args_vairable_list = []
    if premise.count("(") != 1:
        if not output_rela:
            return args_type_list, args_vairable_list
        else:
            return None, args_type_list, args_vairable_list   
         
    rela_end = premise.index("(")
    relation = premise[:rela_end]
    args_list = [each.strip() for each in premise[rela_end+1:-1].split(",")]
    for each in args_list:
        if " " in each:
            each_arg_split= each.split()
            each_type = " ".join(each_arg_split[:-1])
            each_variable = each_arg_split[-1]
            args_type_list.append(each_type)
            args_vairable_list.append(each_variable)
        else:
            if len(each) <= 2:
                args_type_list.append(None)
                args_vairable_list.append(each)
            elif len(each) > 2:
                args_type_list.append(each)
                args_vairable_list.append(None)
    if output_rela:
        return relation, args_type_list, args_vairable_list
    else:
        return args_type_list, args_vairable_list

In [5]:
# get the most similar premise

import numpy as np
def find_lcseque(s1, s2): 
    #  生成字符串长度加1的0矩阵，m用来保存对应位置匹配的结果
    m = [ [ 0 for x in range(len(s2)+1) ] for y in range(len(s1)+1) ] 
    #  d用来记录转移方向
    d = [ [ None for x in range(len(s2)+1) ] for y in range(len(s1)+1) ] 
 
    for p1 in range(len(s1)): 
        for p2 in range(len(s2)): 
            if s1[p1] == s2[p2]:            # 字符匹配成功，则该位置的值为左上方的值加1
                m[p1+1][p2+1] = m[p1][p2]+1
                d[p1+1][p2+1] = 'ok'          
            elif m[p1+1][p2] > m[p1][p2+1]:  # 左值大于上值，则该位置的值为左值，并标记回溯时的方向
                m[p1+1][p2+1] = m[p1+1][p2] 
                d[p1+1][p2+1] = 'left'          
            else:                           # 上值大于左值，则该位置的值为上值，并标记方向up
                m[p1+1][p2+1] = m[p1][p2+1]   
                d[p1+1][p2+1] = 'up'         
 
    (p1, p2) = (len(s1), len(s2)) 
    s = [] 
    while m[p1][p2]:    # 不为None时
        c = d[p1][p2]
        if c == 'ok':   # 匹配成功，插入该字符，并向左上角找下一个
            s.append(s1[p1-1])
            p1 -= 1
            p2 -= 1 
        if c =='left':  # 根据标记，向左找下一个
            p2 -= 1
        if c == 'up':   # 根据标记，向上找下一个
            p1 -= 1
    s = [each for each in s if len(each) > 0]
    return len(s)


def get_most_similar_premise(conc_rela, premise_rela_list):
    most_index = -1
    max_similarity = -1
    simi_list = []
    for i in range(len(premise_rela_list)):
        cur_similarity = find_lcseque(premise_rela_list[i], conc_rela)
        simi_list.append(cur_similarity)
        if cur_similarity > max_similarity:
            most_index = i
            max_similarity = cur_similarity
    return most_index, max_similarity

### Concepts for each domains

In [4]:
# "Person"
# without "Region" and "Time Period" in affordance concepts
concepts_affordance = {
    "Animal": ["dog", "cat", "dinosaur"],
    "Plant": ["fruit", "vegatable", "tree"],
    "Food": ["snack", "barbecue", "ingredient", "beverage"],
    "Alcohol": ["wine", "beer", "spirits"],
    "Disease": ["allergy", "cancer", "rhinitis"],
    "Drug": ["antibiotics", "narcotics", "prescription drug"],
    "Natural Phenomenon": ["weather", "natural disaster", "energy"],
    "Condition": ["climate", "symptom", "environment"], 
    "Material": ["fuel", "steel", "plastic", "wood", "stone"],
    "Substance": ["allergen", "gas", "water", "oxygen"],
    "Furniture": ["table", "chair", "bed"],
    "Publication": ["album", "song", "book", "discography", "magazine", "poem", "musical work", "written work"],
    "Organization": ["company", "club", "party", "union", "league", "community", "studio"],
    "Facility": ['healthcare facility', 'hospital', 'clinic', 'nursing home', "pharmacy",
                'educational facility', 'university', 'school', 'library', "institution", 'lab',
                'recreation facility', 'park', 'amusement park', 'stadium', 'gym', "museum", 'theater', 
                'production facility', "factory", "farm", "assembly plant", "power plant", "brewery",
                'transport facility', "station", "airport", "railway", "harbour", "port", "publisher",
                'business facility', 'mall', 'restaurant', 'bank', 'market', "shop", "store",
                'administrative facility', 'government', "agency", "authority", "department",
                'religious facility', 'church', 'mosque', 'temple',
                'financial institution', "venue", "landmark", "gallery"],
    "Natural Place": ["mountain", "river", "ocean", "desert", "island", "forest", "volcano", "habitat", "area", "mine"],
    "Event": ["conference", "workshop", "celebration", "race", "activity"],
    "Show": ["movie", "tv show", "drama", "concert", "broadcast", "opera", "cartoon", "comedy"],
    "Artwork": ["photograph", "painting", "sculpture", "architecture", "building"],
    "Job": ["doctor", "teacher", "engineer", "actor", "lawyer", "driver", "profession"],
    "Game": ["sport", "card game", "computer game"],
    "Vehicle": ["car", "aircraft", "ship", "bicycle", "rocket"],
    "Tool": ["container", "weapon", "musical instrument", "kitchen tool", "equipment"],
    "Technology": ["telecommunication", "Internet", "browser", "algorithm", "software"],
    "Electronic Device": ["computer", "phone", "refrigerator", "appliance", "device"],
    "Platform": ["operating system", "social media platform", "streaming media platform", "e-commerce platform"],
    "Financial Product": ["insurance", "stock", "bond"],
    "Skill": ["knowledge", "language", "recipe", "method", "capability", "experience", "technique", "course", "workexperience"],
    "Authorization": ["credential", "license", "prescription", "identification document", "ticket", "degree", "certification", "qualification", "medicaldegree"],
    "Legislation": ["policy", "rule", "regulation", "law"]
}
print(len(concepts_affordance))

29


In [3]:
concepts_location = {
    "Animal": ["dog", "cat", "dinosaur"],
    "Plant": ["fruit", "vegatable", "tree"],
    "Food": ["snack", "barbecue", "ingredient", "beverage"],
    "Alcohol": ["wine", "beer", "spirits"],
    "Disease": ["allergy", "cancer", "rhinitis"],
    "Drug": ["antibiotics", "narcotics", "prescription drug"],
    "Natural Phenomenon": ["weather", "natural disaster", "energy"],
    "Condition": ["climate", "symptom", "environment", "crime"], 
    "Material": ["fuel", "steel", "plastic", "wood", "stone"],
    "Substance": ["allergen", "gas", "water", "oxygen"],
    "Furniture": ["table", "chair", "bed"],
    "Publication": ["album", "song", "book", "discography", "magazine", "poem", "musical work", "written work"],
    "Organization": ["company", "club", "party", "union", "league", "community", "studio"],
    "Facility": ['healthcare facility', 'hospital', 'clinic', 'nursing home', "pharmacy",
                'educational facility', 'university', 'school', 'library', "institution", 'lab',
                'recreation facility', 'park', 'amusement park', 'stadium', 'gym', "museum", 'theater', 
                'production facility', "factory", "farm", "assembly plant", "power plant", "brewery",
                'transport facility', "station", "airport", "railway", "harbour", "port", "publisher",
                'business facility', 'mall', 'restaurant', 'bank', 'market', "shop", "store",
                'administrative facility', 'government', "agency", "authority", "department",
                'religious facility', 'church', 'mosque', 'temple',
                'financial institution', "venue", "landmark", "gallery"],
    "Natural Place": ["mountain", "river", "ocean", "desert", "island", "forest", "volcano", "habitat", "area", "mine"],
    "Event": ["conference", "workshop", "celebration", "race", "activity"],
    "Show": ["movie", "tv show", "drama", "concert", "broadcast", "opera", "cartoon", "comedy"],
    "Artwork": ["photograph", "painting", "sculpture", "architecture", "building"],
    "Job": ["doctor", "teacher", "engineer", "actor", "lawyer", "driver", "profession"],
    "Game": ["sport", "card game", "computer game"],
    "Vehicle": ["car", "aircraft", "ship", "bicycle", "rocket"],
    "Tool": ["container", "weapon", "musical instrument", "kitchen tool", "equipment"],
    "Technology": ["telecommunication", "Internet", "browser", "algorithm", "software"],
    "Electronic Device": ["computer", "phone", "refrigerator", "appliance", "device"],
    "Platform": ["operating system", "social media platform", "streaming media platform", "e-commerce platform", "channel"],
    "Financial Product": ["insurance", "stock", "bond"],
    "Skill": ["knowledge", "language", "recipe", "method", "capability", "experience", "technique", "course", "workexperience"],
    "Authorization": ["credential", "license", "prescription", "identification document", "ticket", "degree", "certification", "qualification", "medicaldegree", "authority"],
    "Legislation": ["policy", "rule", "regulation", "law"],
    "Region": ["country", "city", "town", "location", "state", "province", "place"]
}

In [2]:
# "Person"
concepts_accessibility = {
    "Animal": ["dog", "cat", "dinosaur"],
    "Plant": ["fruit", "vegatable", "tree"],
    "Food": ["snack", "barbecue", "ingredient", "beverage"],
    "Alcohol": ["wine", "beer", "spirits"],
    "Disease": ["allergy", "cancer", "rhinitis"],
    "Drug": ["antibiotics", "narcotics", "prescription drug"],
    "Natural Phenomenon": ["weather", "natural disaster", "energy"],
    "Condition": ["climate", "symptom", "environment"], 
    "Material": ["fuel", "steel", "plastic", "wood", "stone"],
    "Substance": ["allergen", "gas", "water", "oxygen", "pollen"],
    "Furniture": ["table", "chair", "bed"],
    "Publication": ["album", "song", "book", "discography", "magazine", "poem", "musical work", "written work"],
    "Organization": ["company", "club", "party", "union", "league", "community", "studio"],
    "Facility": ['healthcare facility', 'hospital', 'clinic', 'nursing home', "pharmacy",
                'educational facility', 'university', 'school', 'library', "institution", 'lab',
                'recreation facility', 'park', 'amusement park', 'stadium', 'gym', "museum", 'theater', 
                'production facility', "factory", "farm", "assembly plant", "power plant", "brewery",
                'transport facility', "station", "airport", "railway", "harbour", "port", "publisher",
                'business facility', 'mall', 'restaurant', 'bank', 'market', "shop", "store",
                'administrative facility', 'government', "agency", "authority", "department",
                'religious facility', 'church', 'mosque', 'temple',
                'financial institution', "venue", "landmark", "gallery"],
    "Natural Place": ["mountain", "river", "ocean", "desert", "island", "forest", "volcano", "habitat", "area", "mine"],
    "Event": ["conference", "workshop", "celebration", "race", "activity"],
    "Show": ["movie", "tv show", "drama", "concert", "broadcast", "opera", "cartoon", "comedy"],
    "Artwork": ["photograph", "painting", "sculpture", "architecture", "building"],
    "Job": ["doctor", "teacher", "engineer", "actor", "lawyer", "driver", "profession"],
    "Game": ["sport", "card game", "computer game"],
    "Vehicle": ["car", "aircraft", "ship", "bicycle", "rocket"],
    "Tool": ["container", "weapon", "musical instrument", "kitchen tool", "equipment"],
    "Technology": ["telecommunication", "Internet", "browser", "algorithm", "software"],
    "Electronic Device": ["computer", "phone", "refrigerator", "appliance", "device"],
    "Platform": ["operating system", "social media platform", "streaming media platform", "e-commerce platform", "channel"],
    "Financial Product": ["insurance", "stock", "bond"],
    "Skill": ["knowledge", "language", "recipe", "method", "capability", "experience", "technique", "course", "workexperience"],
    "Authorization": ["credential", "license", "prescription", "identification document", "ticket", "degree", "certification", "qualification", "medicaldegree", "authority"],
    "Legislation": ["policy", "rule", "regulation", "law"],
    "Time Period": ['season', 'times', 'period', "era", "dynasty", "time"],
    "Region": ["country", "city", "town", "location", "state", "province", "place"]
}

29


In [1]:
def get_backward_rules_input_chat(conclusion):
    system_prompt = f"According to commonsense knowledge in realistic scenarios, please generate 3 logical rules in both Prolog and natural langauge to describe the compositional premises of the given conclusion. \n" +\
        f"The rules in Prolog should have the same meaning with the rules in natural language. \n" + \
        f"Each rule should contain multiple premises and each premise should contain two variables in (X, Y, Z, Z1, Z2). \n"

    example1_input = "Conclusion: CanAccess(Person X, Show Y) \n" + \
        "Rules: \n"
    exmaple1_output = "1. CanAccess(Person X, Show Y):- LocatedIn(Person X, Region Z), BroadcastIn(Show Y, Region Z); \n" + \
                    "If Person X is located in Region Z and Show Y is broadcasted in Region Y, then Person X can access Show Y. \n" + \
                    "2. CanAccess(Person X, Show Y):- ProducedAt(Show Y, Time Period Z1), DiedAt(Person X, Time Period Z2), EarlierThan(Time Period Z1, Time Period Z2); \n" + \
                    "If Show Y was produced at Time Period Z1 and Person X died at a later Time Period Z2, then Person X had the chance to access Show Y. \n" + \
                    "3. CanAccess(Person X, Show Y):- NotAfraid(Person X, Animal Z), ActIn(Animal Z, Show Y); \n" + \
                    "If Person X is not afraid of Animal Z that acts in Show Y, then Person X can access Show Y.\n"
    
    example2_input = "Conclusion: OriginatedFrom(Food X, Region Y) \n" + \
        "Rules: \n"
    exmaple2_output = "1. OriginatedFrom(Food X, Region Y):- ProcessedIn(Food X, Facility Z), LocatedIn(Facility Z, Region Y); \n" + \
                    "If Food X is processed in Facility Z and Facility Z is located in Region Y, then Food X is originated from Region Y. \n"  + \
                    "2. OriginatedFrom(Food X, Region Y):- GrowIn(Food X, Natural Place Z), LocatedIn(Natural Place Z, Region Y); \n" + \
                    "If Food X grows in Natural Place Z and Natural Place Z is located in Region Y, then Food X originate from Region Y. \n" + \
                    "3. OriginatedFrom(Food X, Region Y):- CultivatedBy(Food X, Organization Z), BelongTo(Organization Z, Region Y); \n" + \
                    "If Food X is cultivated by Organization Z, and Organization Z belongs to Region Y, then Food X is originated from Region Y. \n" 

    example3_input = f"Conclusion: {conclusion} \n" + \
        f"Rules: \n"

    return [system_prompt, example1_input, exmaple1_output, example2_input, exmaple2_output, example3_input]

In [10]:
all_variables_symbols = [" X", " Y", " Z", " Z1", " Z2"]
variables_token_ids = []
for each in all_variables_symbols:
    variables_token_ids += encoding.encode(each)
variables_logit_dict = {each: 5.0 for each in set(variables_token_ids)}

def get_affordance_logits(concept):
    properties_list = ["Age", "Price", "Money", "Height", "Length", "Weight", "Strength", "Size", "Density", "Volume", 
    "Temperature", "Hardness", "Speed", "BoilingPoint", "MeltingPoint", "Frequency", "Decibel", "Space"]
    properties_list = [" " + each for each in properties_list]
    location_prep = ["in", "from", "to", "at"]
    
    all_objects = ["Person", "Region"] + list(concept.keys())
    all_objects = ["(" + each for each in all_objects] + [" " + each for each in all_objects]
    all_tokens = properties_list + all_objects + location_prep
    all_tokens = set(all_tokens)
    token_ids = []
    for each in all_tokens:
        token_ids += encoding.encode(each)
    
    logit_dict = {each: 2.0 for each in set(token_ids)}
    logit_dict.update(variables_logit_dict)
    
    return logit_dict

def get_accessibility_logits(concept):
    time_prep = ["In", "During", "At", "From", "To", "EarlierThan", "LaterThan"]
    time_prep = [" " + each for each in time_prep[5:]] + time_prep
    all_objects = ["Person", "Time Period", "Region"] + list(concept.keys())

    all_objects = ["(" + each for each in all_objects] + [" " + each for each in all_objects]
    all_tokens = time_prep + all_objects
    all_tokens = set(all_tokens)
    token_ids = []
    for each in all_tokens:
        token_ids += encoding.encode(each)
    
    logit_dict = {each: 2.0 for each in set(token_ids)}
    logit_dict.update(variables_logit_dict)
    
    return logit_dict

In [15]:
# extend the rule by backward chaining
from tqdm import tqdm
def extend_premise(rule_file, output_rule_file, domain="affordance"):
    with open(rule_file, 'r') as r_f:
        rules = r_f.readlines()

    if domain == "affordance":
        cur_logit_dict = get_affordance_logits(concepts_affordance)
    else:
        cur_logit_dict = get_accessibility_logits(concepts_accessibility)
    with open(output_rule_file, 'w') as w_f:
        all_premises = []
        for each_rule in tqdm(rules):
            each_rule = each_rule.strip()
            _, premises_list = parse_sentence_to_conclusion_premise(each_rule)
            
            for n in range(len(premises_list)):
                each_premise = premises_list[n]
                if "than" in each_premise.lower() or "same" in each_premise.lower() or "similar" in each_premise.lower():
                    continue
                # analyze the premise and replace variables with X,Y
                relation, args_type_list, args_vairable_list = argument_parsing(each_premise, output_rela=True)
                assert len(args_vairable_list) == 2 and len(args_type_list) == 2
                new_premise = f"{relation}({args_type_list[0]} X, {args_type_list[1]} Y)" 
                if new_premise not in all_premises:
                    all_premises.append(new_premise)
                    inputs = get_backward_rules_input_chat(new_premise)
                    response = get_chat_response(inputs, temp=0, max_tokens=400, logit_dict=cur_logit_dict)
                    w_f.write(response+"\n")

In [None]:
rule_file = "ScriptData/Primitive/RuleSet/rule_base/final_rules/interaction_symbolic_pos_rules_allpossibility_strict.txt"
output_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extension/interaction_extension.txt"
extend_premise(rule_file, output_file, domain="affordance")

rule_file = "ScriptData/Primitive/RuleSet/rule_base/final_rules/symbolic_interaction_extension_strict.txt"
output_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extension/interaction_extension_again.txt"
extend_premise(rule_file, output_file, domain="affordance")

rule_file = "ScriptData/Primitive/RuleSet/rule_base/final_rules/symbolic_interaction_extension_again_strict.txt"
output_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extension/interaction_extension_again_2.txt"
extend_premise(rule_file, output_file, domain="affordance")

### Rule Filter

In [16]:
# edit the rule with variable types, upper variables, not errors
# remove the component in premises same as conclusion
from tqdm import tqdm

def edit_rule(rule_file_name, edit_symbolic_rule_file_name, edit_verbalized_rule_file_name):
    with open(rule_file_name, "r") as rule_r_f:
        rules = rule_r_f.readlines()
    
    symbolic_rules, verbalized_rules = [], []
    for i in range(len(rules)):
        if i % 2 == 0:
            if ". " not in rules[i].strip():
                print("*"*10, rules[i])
            assert ". " in rules[i]
            if ":- " not in rules[i]:
                print(rules[i])
            assert ":- " in rules[i]
            symbolic_rules.append(rules[i].split(". ")[1])
        else:
            if ". " in rules[i].strip():
                print(rules[i])
            assert ". " not in rules[i].strip()
            if not (":- " not in rules[i] and "if" in rules[i].lower()):
                print(rules[i])
            assert ":- " not in rules[i] and "if" in rules[i].lower() 
            verbalized_rules.append(rules[i].strip())
    assert len(symbolic_rules) == len(verbalized_rules)
    print(len(symbolic_rules), len(verbalized_rules))
    
    edit_symbolic_rules = []
    edit_verbalized_rules = []
    edit_num_1 = 0
    edit_num_2 = 0
    for n, each_rule in tqdm(enumerate(symbolic_rules)):
        each_rule = each_rule.replace("PersonX", "Person X").replace("PersonY", "Person Y").replace("Personz1", "Person Z1").replace("Personz2", "Person Z2")
        next_rule = False
        rule = each_rule.strip()

        assert ":- " in rule
        conclusion, premises = rule.split(":- ")
        _, conc_args_types, conc_args_variables = argument_parsing(conclusion, output_rela=True)
        if None in conc_args_types:
            print("hh", each_rule)
        assert None not in conc_args_types
        if not ("X" in conc_args_variables and "Y" in conc_args_variables):
            print("hh", each_rule)
            continue
        assert "X" in conc_args_variables and "Y" in conc_args_variables

        variable_type_dict = {}
        variable_type_dict[conc_args_variables[0]] = conc_args_types[0]
        variable_type_dict[conc_args_variables[1]] = conc_args_types[1]
        
        premises_list = premises.split("),")
        premises_list = [_.strip() for _ in premises_list]
        for i in range(len(premises_list)):
            if i < len(premises_list) - 1:
                premises_list[i] = premises_list[i] + ")"
            elif premises_list[i][-1] == ";" or premises_list[i][-1] == ".":
                premises_list[i] = premises_list[i][:-1]
        
        new_premise_list = [] 
        for each_premise in premises_list:
            cur_rela, cur_args_types, cur_args_variables = argument_parsing(each_premise, output_rela=True)
            if len(cur_args_variables) == 0:
                next_rule = True
                break
            cur_args_variables = [each.upper() if each is not None else each for each in cur_args_variables]
            new_premise = cur_rela + "("
            for j in range(len(cur_args_types)):
                if cur_args_types[j] is None:
                    if cur_args_variables[j] is not None:
                        cur_args_types[j] = variable_type_dict.get(cur_args_variables[j], None)
                else:
                    if cur_args_variables[j] is not None and cur_args_variables[j] not in variable_type_dict:
                        variable_type_dict[cur_args_variables[j]] = cur_args_types[j]
                new_premise += (str(cur_args_types[j]) + " " + str(cur_args_variables[j])).replace("None", "").strip()
                if j < len(cur_args_types) - 1:
                    new_premise += ", "
            new_premise += ")"
            new_premise_list.append(new_premise)
        
        if not next_rule:
            new_rule = conclusion + ":- " + ", ".join(new_premise_list) + ";"

            edit_symbolic_rules.append(new_rule)
            edit_verbalized_rules.append(verbalized_rules[n].strip())
        else:
            pass
        
    print(edit_num_1, edit_num_2)
    print(len(edit_symbolic_rules), len(edit_verbalized_rules))

    with open(edit_symbolic_rule_file_name, 'w') as w_f_1:
        for each in edit_symbolic_rules:
            w_f_1.write(each+"\n")
    with open(edit_verbalized_rule_file_name, 'w') as w_f_2:
        for each in edit_verbalized_rules:
            w_f_2.write(each+"\n")

In [17]:
# functions for filtering invalid rules
def rule_filtering_function(rule, is_single=False, is_compositional=False):
    conclusion, premises = rule.split(":- ")
    conc_args_types, conc_args_variables = argument_parsing(conclusion)

    if None in conc_args_types or None in conc_args_variables:
        print("None type")
        return False

    premises_list = premises.split("),")
    for i in range(len(premises_list)):
        premises_list[i] = premises_list[i].strip()
        if i < len(premises_list) - 1:
            premises_list[i] = premises_list[i] + ")"
        elif premises_list[i][-1] == ";" or premises_list[i][-1] == ".":
            premises_list[i] = premises_list[i][:-1]

    if not is_single:
        if is_compositional:
            if len(premises_list) < 2 or len(premises_list) > 8:
                print("Less than two or more than 8 premises")
                return False
        else:
            if len(premises_list) < 2 or len(premises_list) > 4:
                print("Less than two or more than 4 premises")
                return False
    else:
        if len(premises_list) == 1:
            cur_args_types, cur_args_variables = argument_parsing(premises_list[0])
            if set(cur_args_variables) == set(["X", "Y"]) and set(cur_args_types) == set(conc_args_types):
                return True
            else:
                return False
        elif len(premises_list) > 1:
            return False

    all_args_type_list, all_args_vairable_list = [], []
    for each in premises_list:
        cur_args_types, cur_args_variables = argument_parsing(each)
        if len(cur_args_variables) == 2:
            if cur_args_variables[0] == cur_args_variables[1]:
                print(rule, "Same two variables")
                return False
            all_args_type_list += cur_args_types
            all_args_vairable_list += cur_args_variables
        elif len(cur_args_variables) > 2:
            print("Premise with too many arguments >= 3")
            return False
        elif len(cur_args_variables) == 1:
            print("Premise with only one argument")
            return False
            # pass
        elif len(cur_args_variables) == 0:
            print("Premise with no argument")
            return False
    
    if len(all_args_vairable_list) < 2:
        print("No premise with two arguments")
        return False
    if None in all_args_vairable_list:
        print("None type 1")
        return False
    if None in all_args_type_list:
        print("None type 2")
        return False

    all_args_vairable_list = [each.upper() for each in all_args_vairable_list]
    if "X" not in all_args_vairable_list or "Y" not in all_args_vairable_list:
        print("No X or No Y")
        return False
    
    distinct_variables = set(all_args_vairable_list)

    for each in distinct_variables:
        if each not in ["X", "Y"] and all_args_vairable_list.count(each) != 2:
            print(rule, "Not Connected graph from X to Y")
            return False
        elif each in ["X", "Y"] and all_args_vairable_list.count(each) != 1:
            print(rule, "Not Connected graph from X to Y")
            return False
    return True


def filter_invalid_rule(file_name, filter_file_name, verbalized_file_name, filter_vb_file_name, is_single=False, is_compositional=False):
    with open(file_name, "r") as r_f:
        rules = r_f.readlines()
    with open(verbalized_file_name, "r") as r_f_2:
        verbalized_rules = r_f_2.readlines()
    print(len(rules), len(verbalized_rules))

    valid_rules = []
    valid_verbalized_rules = []
    for i, each in tqdm(enumerate(rules)):
        rule = each.strip()
        if rule_filtering_function(rule, is_single=is_single, is_compositional=is_compositional):
            valid_rules.append(rule)
            if verbalized_rules[i].strip()[-1] == ".":
                valid_verbalized_rules.append(verbalized_rules[i].strip())
            else:
                valid_verbalized_rules.append(verbalized_rules[i].strip()+".")
    print(len(valid_rules), len(valid_verbalized_rules))

    with open(filter_file_name, 'w') as w_f:
        for each in valid_rules:
            w_f.write(each+"\n")
    with open(filter_vb_file_name, 'w') as w_f_2:
        for each in valid_verbalized_rules:
            w_f_2.write(each+"\n")

In [18]:
def primitive_filter(rule_file, verbalized_rule_file, write_file_name, write_vb_file_name, concepts):
    with open(rule_file, "r") as r_f:
        rules = r_f.readlines()
    with open(verbalized_rule_file, "r") as v_r_f:
        verbalized_rules = v_r_f.readlines()
    print(len(rules), len(verbalized_rules))

    def find_super_concepts(type):
        for each in concepts:
            if type.lower() in concepts[each]:
                return each
        return None

    constraint_rules = []
    constraint_verbalized_rules = []
    properties_list = ["Age", "Price", "Money", "Height", "Length", "Weight", "Strength", "Size", "Density", "Volume", 
        "Temperature", "Hardness", "Speed", "BoilingPoint", "MeltingPoint", "Frequency", "Decibel", "Space"]
    candidate_concepts = list(concepts.keys()) + ["Person", "Region", "Time Period"] + properties_list

    conc_filte_keywords = ["Age", "Price", "Money", "Height", "Length", "Weight", "Strength", "Size", "Density", "Volume", 
        "Temperature", "Hardness", "Speed", "Frequency", "Decibel", "Space"]
    for n, each_rule in enumerate(rules):
        each_rule = each_rule.strip()
        conclusion, premises_list = parse_sentence_to_conclusion_premise(each_rule)
        conc_args_type_list, _ = argument_parsing(conclusion, output_rela=False)
        if conc_args_type_list[0] in conc_filte_keywords or conc_args_type_list[1] in conc_filte_keywords:
            continue

        replace_types = []
        jump_to_next = False
        for each_premise in [conclusion] + premises_list:
            args_type_list, _ = argument_parsing(each_premise, output_rela=False)
            if args_type_list[0] not in candidate_concepts:
                super_type = find_super_concepts(args_type_list[0])
                if super_type is not None and super_type != args_type_list[1] and super_type not in conc_args_type_list:
                    if [args_type_list[0], super_type] not in replace_types:
                        replace_types.append([args_type_list[0], super_type])
                else:
                    jump_to_next = True
                    break
            elif len(args_type_list) > 1 and args_type_list[1] not in candidate_concepts:
                super_type = find_super_concepts(args_type_list[1])
                if super_type is not None and super_type != args_type_list[0] and super_type not in conc_args_type_list:
                    if [args_type_list[1], super_type] not in replace_types:
                        replace_types.append([args_type_list[1], super_type])
                else:
                    jump_to_next = True
                    break
        if jump_to_next:
            continue
        else:
            each_verbalized_rule = verbalized_rules[n].strip()
            for each_pair in replace_types:
                each_rule = each_rule.replace(each_pair[0], each_pair[1])
                each_verbalized_rule = each_verbalized_rule.replace(each_pair[0], each_pair[1]).replace(each_pair[0][0].lower() + each_pair[0][1:], each_pair[1])
        if each_rule not in constraint_rules:
            constraint_rules.append(each_rule)
            constraint_verbalized_rules.append(each_verbalized_rule)
    print(len(constraint_rules), len(constraint_verbalized_rules))

    with open(write_file_name, 'w') as w_f:
        for each in constraint_rules:
            w_f.write(each+"\n")
    with open(write_vb_file_name, 'w') as w_f_2:
        for each in constraint_verbalized_rules:
            w_f_2.write(each+"\n")

In [19]:
def get_affordance_verbalized_critic_input(each_rule):    
    input = "True or False? Please predict whether the input rule is accurate or not according to commonsense knowledge in realistic scenarios, and also explain why. \n\nExamples:\n" + \
            "Input: If Person X was born in Season Z and Plant Y blooms in the same Season Z, then Person X can access Plant Y. \n" + \
            "Output: False. Because the season of a person's birth and the blooming season of a plant has no logical connection. \n" + \
            "Input: If Person X has an Age Z1 and Vehicle Y requires an Age above Z2 for driving, with Age Z1 being greater than Age Z2, then Person X can drive Vehicle Y. \n" + \
            "Output: True. Because driving vehicle has a minimum age requirement. \n" + \
            "Input: If Person X has Capital Z1 and the minimum capital requirement for establishing Organization Y is Capital Z2, and Capital Z1 is bigger than Capital Z2, then Person X can establish Organization Y. \n" + \
            "Output: False. Because person can not have a capital and capital is not suitable for value comparison. \n" + \
            "Input: If Person X is allergic to Material Z1, and Clothing Y contains Material Z1, then Person X cannot wear Clothing Y. \n" + \
            "Output: True. Because person should avoid contact with allergenic substances. \n\n" + \
            "Input: " + each_rule + "\n" + \
            "Output:\n"
    return input

In [20]:
def get_accessibility_verbalized_critic_input(each_rule):    
    input = "True or False? Please predict whether the input rule is accurate or not according to commonsense knowledge in realistic scenarios, and also explain why. \n\nExamples:\n" + \
            "Input: If Person X was born in Season Z and Plant Y blooms in the same Season Z, then Person X can access Plant Y.\n" + \
            "Output: False. Because person is alive in all seasons and can access plant no matter what season it blooms in.  \n" + \
            "Input: If Person X lives in Region Z and Animal Y inhabits the same Region Z, then Person X can access Animal Y.\n" + \
            "Output: True. Because person and animal exist in the same region. \n\n" + \
            "Input: " + each_rule + "\n" + \
            "Output:\n"
    return input

In [21]:
def get_location_verbalized_critic_input(each_rule):    
    input = "True or False? Please predict whether the input rule is accurate or not according to commonsense knowledge in realistic scenarios, and also explain why. \n\nExamples:\n" + \
            "Input: If Person X is born in City Z and City Z is located in Region Y, then Person X lives in Region Y.\n" + \
            "Output: False. Because the place of birth is not always indicative of the current place of residence. \n" + \
            "Input: If Person X attends School Z and School Z is located in Region Y, then Person X studies in Region Y.\n" + \
            "Output: True. Because if a person attends a school, then the region in which they study is the region where the school is located. \n\n" + \
            "Input: " + each_rule + "\n" + \
            "Output:\n"
    return input

In [22]:
from tqdm import tqdm
def rule_critic(rule_file, verbalized_rule_file, write_file_name, write_vb_file_name, rule_type="affordance"):
    # classify the verbalized rules via GPT-4
    with open(rule_file, "r") as r_f:
        rules = r_f.readlines()
    with open(verbalized_rule_file, "r") as v_r_f:
        verbalized_rules = v_r_f.readlines()
    print(len(rules), len(verbalized_rules))
    
    verbalized_label_list = []
    for each_rule in tqdm(verbalized_rules):
        if rule_type=="affordance":
            v_critic_input = get_affordance_verbalized_critic_input(each_rule.strip())
        elif rule_type == "accessibility":
            v_critic_input = get_accessibility_verbalized_critic_input(each_rule.strip())
        elif rule_type == "location":
            v_critic_input = get_location_verbalized_critic_input(each_rule.strip())
        v_response = get_GPT4_response(v_critic_input, max_tokens=40, temp=0)
        verbalized_label_list.append(v_response)

    print(len(verbalized_label_list))
    print(len(rules), len(verbalized_rules))

    critic_symbolic_rules = []
    critic_verbalized_rules = []
    for i in range(len(verbalized_label_list)):
        if "True" in verbalized_label_list[i]:
            if rules[i].strip() not in critic_symbolic_rules:
                critic_symbolic_rules.append(rules[i].strip())
                critic_verbalized_rules.append(verbalized_rules[i].strip())
    print(len(critic_symbolic_rules), len(critic_verbalized_rules), len(verbalized_label_list)-len(critic_symbolic_rules))

    with open(write_file_name, 'w') as w_f:
        for each in critic_symbolic_rules:
            w_f.write(each+"\n")
    with open(write_vb_file_name, 'w') as w_f_2:
        for each in critic_verbalized_rules:
            w_f_2.write(each+"\n")

In [None]:
output_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extension/interaction_extension.txt"
# Step 3:
# Step 3-1: heuristicaly filter
edit_symbolic_rule_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/symbolic_interaction_extension.txt'
edit_verbalized_rule_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/verbalized_interaction_extension.txt'
edit_rule(output_file, edit_symbolic_rule_file_name, edit_verbalized_rule_file_name)

filter_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/filter/symbolic_interaction_extension.txt'
filter_verbalized_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/filter/verbalized_interaction_extension.txt'
filter_invalid_rule(edit_symbolic_rule_file_name, filter_file_name, edit_verbalized_rule_file_name, filter_verbalized_file_name, is_single=False)

# Step 3-2: primitive concept filter
constraint_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/constraint/symbolic_interaction_extension.txt'
constraint_vb_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/constraint/verbalized_interaction_extension.txt'
primitive_filter(filter_file_name, filter_verbalized_file_name, constraint_file_name, constraint_vb_file_name, concepts_accessibility)

# Step 3-3: rule critic by GPT-4
critic_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/critic/symbolic_interaction_extension.txt'
critic_vb_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/critic/verbalized_interaction_extension.txt'
rule_critic(constraint_file_name, constraint_vb_file_name, critic_file_name, critic_vb_file_name, rule_type="affordance")

In [None]:
output_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extension/interaction_extension_again.txt"
# Step 3:
# Step 3-1: heuristicaly filter
edit_symbolic_rule_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/symbolic_interaction_extension_again.txt'
edit_verbalized_rule_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/verbalized_interaction_extension_again.txt'
edit_rule(output_file, edit_symbolic_rule_file_name, edit_verbalized_rule_file_name)

filter_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/filter/symbolic_interaction_extension_again.txt'
filter_verbalized_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/filter/verbalized_interaction_extension_again.txt'
filter_invalid_rule(edit_symbolic_rule_file_name, filter_file_name, edit_verbalized_rule_file_name, filter_verbalized_file_name, is_single=False)

# Step 3-2: primitive concept filter
constraint_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/constraint/symbolic_interaction_extension_again.txt'
constraint_vb_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/constraint/verbalized_interaction_extension_again.txt'
primitive_filter(filter_file_name, filter_verbalized_file_name, constraint_file_name, constraint_vb_file_name, concepts_accessibility)

# Step 3-3: rule critic by GPT-4
critic_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/critic/symbolic_interaction_extension_again.txt'
critic_vb_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/critic/verbalized_interaction_extension_again.txt'
rule_critic(constraint_file_name, constraint_vb_file_name, critic_file_name, critic_vb_file_name, rule_type="affordance")

In [None]:
output_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extension/interaction_extension_again_2.txt"
# Step 3:
# Step 3-1: heuristicaly filter
edit_symbolic_rule_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/symbolic_interaction_extension_again_2.txt'
edit_verbalized_rule_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/verbalized_interaction_extension_again_2.txt'
edit_rule(output_file, edit_symbolic_rule_file_name, edit_verbalized_rule_file_name)

filter_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/filter/symbolic_interaction_extension_again_2.txt'
filter_verbalized_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/filter/verbalized_interaction_extension_again_2.txt'
filter_invalid_rule(edit_symbolic_rule_file_name, filter_file_name, edit_verbalized_rule_file_name, filter_verbalized_file_name, is_single=False)

# Step 3-2: primitive concept filter
constraint_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/constraint/symbolic_interaction_extension_again_2.txt'
constraint_vb_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/constraint/verbalized_interaction_extension_again_2.txt'
primitive_filter(filter_file_name, filter_verbalized_file_name, constraint_file_name, constraint_vb_file_name, concepts_accessibility)

# Step 3-3: rule critic by GPT-4
critic_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/critic/symbolic_interaction_extension_again_2.txt'
critic_vb_file_name = 'ScriptData/Primitive/RuleSet/rule_base/complex_extension/critic/verbalized_interaction_extension_again_2.txt'
rule_critic(constraint_file_name, constraint_vb_file_name, critic_file_name, critic_vb_file_name, rule_type="affordance")

In [23]:
def substitute_extension(extension_file, use_self=None, use_self2=None):
    with open(extension_file, "r") as r_f:
        extension_rules = r_f.readlines()
    if use_self:
        with open(use_self, "r") as r_f_2:
            extension_rules += r_f_2.readlines()
    if use_self2:
        with open(use_self2, "r") as r_f_3:
            extension_rules += r_f_3.readlines()
    
    extension_dict = {}
    for each_rule in extension_rules:
        conclusion, premises_list = parse_sentence_to_conclusion_premise(each_rule)
        assert len(premises_list) > 1

        if conclusion not in extension_dict:
            extension_dict[conclusion] = [premises_list]
        else:
            if premises_list not in extension_dict[conclusion]:
                extension_dict[conclusion].append(premises_list)
    print(len(extension_dict))
    return extension_dict

In [24]:
from tqdm import tqdm
def substitute_all_components(run_file_name, extension_file, write_file_name, variable_list=["A", 'B'], use_self=None, use_self2=None):
    extension_dict = substitute_extension(extension_file, use_self, use_self2)
    with open(run_file_name, "r") as r_f:
        rules = r_f.readlines()

    extend_rules = []
    extend_rules_len_stas = []
    for each_rule in tqdm(rules):
        # print(each_rule)
        conclusion, premises_list = parse_sentence_to_conclusion_premise(each_rule)

        for premise_index in range(len(premises_list)):  
            each_rela, each_args_types, each_args_variables = argument_parsing(premises_list[premise_index], output_rela=True)
        
            each_key = each_rela + "(" + each_args_types[0] + " X, " + each_args_types[1] + " Y)"
            each_key = each_key[0].upper() + each_key[1:]
            if each_key in extension_dict:
                for prem_extend_index in range(len(extension_dict[each_key])):
                    each_prem_list = extension_dict[each_key][prem_extend_index]

                    variable_start = 0
                    map_variabels_dict = {}
                    map_variabels_dict["X"] = each_args_variables[0]
                    map_variabels_dict["Y"] = each_args_variables[1]

                    rp_premise_list = []
                    for each_sub_prem in each_prem_list:
                        cur_rela, args_type_list, args_vairable_list = argument_parsing(each_sub_prem, output_rela=True)
                        for j in range(len(args_vairable_list)):
                            if args_vairable_list[j].upper() not in map_variabels_dict:
                                map_variabels_dict[args_vairable_list[j].upper()] = variable_list[len(map_variabels_dict)-2+variable_start]
                        if len(args_vairable_list) >= 2:
                            each_prem_edit_variable = f"{cur_rela}({args_type_list[0]} {map_variabels_dict[args_vairable_list[0]]}, {args_type_list[1]} {map_variabels_dict[args_vairable_list[1]]})"
                        else:
                            each_prem_edit_variable = f"{cur_rela}({args_type_list[0]} {map_variabels_dict[args_vairable_list[0]]})"
                        rp_premise_list.append(each_prem_edit_variable)
                                    
                    new_premises_list = premises_list[:premise_index] + rp_premise_list + premises_list[premise_index+1:]
                    new_rule = conclusion + ":- " + ", ".join(new_premises_list) + ";"
                    if new_rule not in extend_rules:
                        extend_rules.append(new_rule)
                        extend_rules_len_stas.append(len(new_premises_list))
    print(len(extend_rules), sum(extend_rules_len_stas)/len(extend_rules_len_stas), max(extend_rules_len_stas), min(extend_rules_len_stas))

    with open(write_file_name, 'w') as w_f:
        for each in extend_rules:
            w_f.write(each+"\n")

In [26]:
from tqdm import tqdm
def substitute_two_premise(run_file_name, extension_file, write_file_name, variable_list = ["A", 'B', 'C', 'D', 'E', 'F', 'G', "H"], use_self=False):
    extension_dict = substitute_extension(extension_file, use_self)
    with open(run_file_name, "r") as r_f:
        rules = r_f.readlines()

    extend_rules = []
    extend_rules_len_stas = []
    for each_rule in tqdm(rules):
        conclusion, premises_list = parse_sentence_to_conclusion_premise(each_rule)

        replace_num = 0
        variable_start = 0
        new_premises_list = [[],]
        premise_start = 0
        for premise_index in range(len(premises_list)):  
            each_rela, each_args_types, each_args_variables = argument_parsing(premises_list[premise_index], output_rela=True)
        
            each_key = each_rela + "(" + each_args_types[0] + " X, " + each_args_types[1] + " Y)"
            each_key = each_key[0].upper() + each_key[1:]
            if each_key in extension_dict:
                replace_num += 1
                temp_premise_lists = []
                for prem_extend_index in range(min(len(extension_dict[each_key]), 3)):
                    each_prem_list = extension_dict[each_key][prem_extend_index]

                    map_variabels_dict = {}
                    map_variabels_dict["X"] = each_args_variables[0]
                    map_variabels_dict["Y"] = each_args_variables[1]
                    rp_premise_list = []
                    for each_sub_prem in each_prem_list:
                        cur_rela, args_type_list, args_vairable_list = argument_parsing(each_sub_prem, output_rela=True)
                        for j in range(len(args_vairable_list)):
                            if args_vairable_list[j].upper() not in map_variabels_dict:
                                map_variabels_dict[args_vairable_list[j].upper()] = variable_list[len(map_variabels_dict)-2+variable_start]
                        each_prem_edit_variable = f"{cur_rela}({args_type_list[0]} {map_variabels_dict[args_vairable_list[0]]}, {args_type_list[1]} {map_variabels_dict[args_vairable_list[1]]})"
                        rp_premise_list.append(each_prem_edit_variable)

                    for n in range(len(new_premises_list)):
                        temp_premise_lists.append(new_premises_list[n] + premises_list[premise_start:premise_index] + rp_premise_list)
                    if len(map_variabels_dict) > 2:
                        variable_start = variable_start + len(map_variabels_dict)-2
                premise_start = premise_index + 1
                new_premises_list = temp_premise_lists
            else:
                temp_premise_lists = []
                for n in range(len(new_premises_list)):
                    temp_premise_lists.append(new_premises_list[n] + premises_list[premise_index: premise_index+1])
                new_premises_list = temp_premise_lists
        for each_new_premises_list in new_premises_list:
            new_rule = conclusion + ":- " + ", ".join(each_new_premises_list) + ";"
            if new_rule not in extend_rules and replace_num > 1:
                extend_rules.append(new_rule)
                extend_rules_len_stas.append(len(each_new_premises_list))
    print(len(extend_rules), sum(extend_rules_len_stas)/len(extend_rules_len_stas), max(extend_rules_len_stas), min(extend_rules_len_stas))

    with open(write_file_name, 'w') as w_f:
        for each in extend_rules:
            w_f.write(each+"\n")

In [None]:
rule_file = "ScriptData/Primitive/RuleSet/rule_base/final_rules/interaction_symbolic_pos_rules_allpossibility_strict.txt"
extension_file = "ScriptData/Primitive/RuleSet/rule_base/final_rules/symbolic_interaction_extension_strict.txt"
one_step_bw_rule_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/interaction_symbolic_rules_one_bw_strict.txt"
substitute_all_components(rule_file, extension_file, one_step_bw_rule_file, use_self=False)

rule_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/repetitive_filter/interaction_symbolic_rules_one_bw_strict.txt"
extension_file = "ScriptData/Primitive/RuleSet/rule_base/final_rules/symbolic_interaction_extension_again_strict.txt"
two_step_bw_rule_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/interaction_symbolic_rules_two_bw_2_strict_useself.txt"
use_self = "ScriptData/Primitive/RuleSet/rule_base/final_rules/symbolic_interaction_extension_strict.txt"
substitute_all_components(rule_file, extension_file, two_step_bw_rule_file, variable_list=["C", 'D'], use_self=use_self)

rule_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/repetitive_filter/interaction_symbolic_rules_two_bw_2_strict_useself.txt"
extension_file = "ScriptData/Primitive/RuleSet/rule_base/final_rules/symbolic_interaction_extension_again_2_strict.txt"
three_step_bw_rule_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/interaction_symbolic_rules_three_bw_3_strict_useself.txt"
use_self = "ScriptData/Primitive/RuleSet/rule_base/final_rules/symbolic_interaction_extension_strict.txt"
use_self2 = "ScriptData/Primitive/RuleSet/rule_base/final_rules/symbolic_interaction_extension_again_strict.txt"
substitute_all_components(rule_file, extension_file, three_step_bw_rule_file, variable_list=["E", 'F'], use_self=use_self, use_self2=use_self2)

#### Filter repetitive extended rules

In [27]:
def filter_repetitive_rules(extend_rule_file, filter_extend_rule_file, thres=3):
    with open(extend_rule_file, "r") as r_f:
        diversified_rules = r_f.readlines()
    print(len(diversified_rules))

    filtered_rules = []
    for each_rule in tqdm(diversified_rules):
        each_rule = each_rule.strip()
        conclusion, premises_list = parse_sentence_to_conclusion_premise(each_rule)

        conc_rela, _, _ = argument_parsing(conclusion, output_rela=True)
        premise_rela_list = []
        if len(set(premises_list)) < len(premises_list):
            continue
        for i in range(len(premises_list)):
            each_rela, each_args_types, each_args_variables = argument_parsing(premises_list[i], output_rela=True)
            assert len(each_args_variables) == 2 and len(each_args_types) == 2
            premise_rela_list.append(each_rela.lower())
        # if len(set(premise_rela_list)) < len(premise_rela_list):
        #     continue
        _, max_similarity = get_most_similar_premise(conc_rela.lower(), premise_rela_list)
        if max_similarity <= thres:
            filtered_rules.append(each_rule)
    print(len(filtered_rules))

    with open(filter_extend_rule_file, "w") as w_f:
        for each in tqdm(filtered_rules):
            w_f.write(each+"\n")

def remove_character(text):
    for character in ["X", "Y", "A", "B", "C", "D", "E", "F"]:
        text = text.replace(" " + character, "")
    return text


def filter_same_rules(extend_rule_file, filter_extend_rule_file, thres=4):
    with open(extend_rule_file, "r") as r_f:
        diversified_rules = r_f.readlines()
    print(len(diversified_rules))

    unique_rules = []
    filtered_rules = []
    for each_rule in tqdm(diversified_rules):
        each_rule = each_rule.strip()

        cur_unique_rule = remove_character(each_rule)
        if cur_unique_rule not in unique_rules:
            unique_rules.append(cur_unique_rule)
        
            conclusion, premises_list = parse_sentence_to_conclusion_premise(each_rule)

            conc_rela, _, _ = argument_parsing(conclusion, output_rela=True)
            premise_rela_list = []
            if len(set(premises_list)) < len(premises_list):
                continue
            for i in range(len(premises_list)):
                each_rela, each_args_types, each_args_variables = argument_parsing(premises_list[i], output_rela=True)
                premise_rela_list.append(each_rela.lower())
            if "same" in premise_rela_list or "sameas" in premise_rela_list or "similar" in premise_rela_list or "similarto" in premise_rela_list or "equivalent" in premise_rela_list:
                continue
            else:
                _, max_similarity = get_most_similar_premise(conc_rela.lower(), premise_rela_list)
                if max_similarity <= thres and each_rule not in filtered_rules:
                    filtered_rules.append(each_rule)
    print(len(filtered_rules))

    with open(filter_extend_rule_file, "w") as w_f:
        for each in tqdm(filtered_rules):
            w_f.write(each+"\n")

In [28]:
from tqdm import tqdm
def get_whole_verbalize_input(each_premise):
    input = "Please verbalize each input rule into a natural langauge statement in 'if-then' format. \n\nExamples:\n" + \
            "Rule:\n" + \
            "CanRequest(Person X, Authorization Y):- Have(Person X, Age Z1), RequireMinimumAge(Authorization Y, Age Z2), BiggerThan(Age Z1, Age Z2);\n" + \
            "Statement:\n" + \
            "If Person X has Age Z1 and the minimum age requirement for requesting Authorization Y is Age Z2, Age Z1 is bigger than Age Z2, then Person X can request Authorization Y. \n" + \
            "Rule:\n" + \
            "CanRepair(Person X, Electronic Device Y):- Master(Person X, Skill Z2), RequiredForRepairing(Skill Z2, Electronic Device Y);\n" + \
            "Statement:\n" + \
            "If Person X has mastered Skill Z2 and Skill Z2 is required for repairing Electronic Device Y, then Person X can repair Electronic Device Y.\n\n" + \
            "Rule:\n" + \
            each_premise + "\n" + \
            "Statement:\n"
    return input

def verbalized_extended_rules(file_name, write_verbalized_rule_file):
    with open(file_name, "r") as r_f:
        rules = r_f.readlines()

    with open(write_verbalized_rule_file, "w") as w_f:
        for each_rule in tqdm(rules):
            input = get_whole_verbalize_input(each_rule.strip())
            response = get_GPT4_response(input, max_tokens=100, temp=0)
            w_f.write(response+"\n")

def get_prem_verbalize_input(each_premise):
    input = "Please verbalize input facts into a natural langauge statement. \n\nExamples:\n" + \
            "Facts:\n" + \
            "Have(Person X, Age Z1), RequireMinimumAge(Authorization Y, Age Z2), BiggerThan(Age Z1, Age Z2);\n" + \
            "Statement:\n" + \
            "Person X has Age Z1 and the minimum age requirement for requesting Authorization Y is Age Z2, Age Z1 is bigger than Age Z2. \n" + \
            "Facts:\n" + \
            "Master(Person X, Skill Z2), RequiredForRepairing(Skill Z2, Electronic Device Y);\n" + \
            "Statement:\n" + \
            "Person X has mastered Skill Z2, and Skill Z2 is required for repairing Electronic Device Y.\n\n" + \
            "Facts:\n" + \
            each_premise + "\n" + \
            "Statement:\n"
    return input

def get_conc_verbalize_input(each_premise):
    input = "Please verbalize each input fact into a natural langauge statement. \n\nExamples:\n" + \
            "Fact: IsProficientIn(Person X, Skill Y)\n" + \
            "Statement:\n" + \
            "Person X has a level of proficiency or expertise in Skill Y. \n" + \
            "Fact: ReleasedIn(Item X, Year A)\n" + \
            "Statement:\n" + \
            "Item X was released in the year A.\n" + \
            "Fact: CanUse(Person B, Item Y)\n" + \
            "Statement:\n" + \
            "Person B can use Item Y. \n" + \
            "Fact: " + each_premise + "\n" + \
            "Statement:\n"
    return input

def separate_verbalized_extended_rules(file_name, write_verbalized_rule_file):
    with open(file_name, "r") as r_f:
        rules = r_f.readlines()

    with open(write_verbalized_rule_file, "w") as w_f:
        for each_rule in tqdm(rules):
            conclusion, _ = each_rule.split(":- ")
            conclusion = conclusion.strip()
            all_text = get_GPT4_response(get_whole_verbalize_input(each_rule.strip()), max_tokens=100, temp=0.1)
            assert all_text[:3] == "If " and ", then " in all_text
            premise_text = all_text.split(", then ")[0] 
            conc_text = get_GPT4_response(get_conc_verbalize_input(conclusion), max_tokens=100, temp=0.1)
            rule_text = premise_text + ", then " + conc_text
            # print(each_rule)
            # print(rule_text)
            # print("*"*100)
            w_f.write(rule_text+"\n")

In [29]:
def remove_concept(text, concepts):
    for each in list(concepts.keys()) + ["Person", "Region"]:
        if each+" " in text:
            text = text.replace(each+" ", "")
    text = text.replace("X ", "").replace("X", "").replace("Y ", "").replace("Y", "")
    return text

def remove_character(text):
    for character in ["X", "Y", "A", "B", "C", "D", "E", "F"]:
        text = text.replace(" " + character, "")
    return text

def filter_diversified_verblized_rules(rule_file, vb_rule_file, filter_file_name, filter_vb_file_name, concepts):
    with open(rule_file, "r") as r_f:
        rules = r_f.readlines()
    with open(vb_rule_file, "r") as r_f:
        vb_rules = r_f.readlines()
    print(len(rules), len(vb_rules))

    unique_rules = []
    filtered_rules = []
    filtered_vb_rules = []
    for n, each_rule in tqdm(enumerate(vb_rules)):
        each_rule = each_rule.strip()
        assert each_rule[:3] == "If " and ", then " in each_rule
        premise, conclusion = each_rule[3:].split(", then ")
        premise = remove_concept(premise, concepts).lower()
        conclusion = remove_concept(conclusion, concepts).lower()

        cur_unique_rule = remove_character(rules[n])
        if cur_unique_rule not in unique_rules:
            unique_rules.append(cur_unique_rule)
            similarity = find_lcseque(premise, conclusion)
            if similarity <= 15: # multiple 15 single 12
                filtered_rules.append(rules[n].strip())
                filtered_vb_rules.append(each_rule)
    print(len(filtered_rules), len(filtered_vb_rules))

    with open(filter_file_name, "w") as w_f:
        for each in tqdm(filtered_rules):
            w_f.write(each+"\n")
    with open(filter_vb_file_name, "w") as w_f:
        for each in tqdm(filtered_vb_rules):
            w_f.write(each+"\n")

In [35]:
def prepara_annotation_data(verbalized_rule_file, annotation_file):
    with open(verbalized_rule_file, "r") as v_r_f:
        verbalized_rules = v_r_f.readlines()
        print(len(verbalized_rules))

    selected_index = range(len(verbalized_rules))
    # selected_index = range(200)

    data_list = []
    for i in selected_index:
        each_data = {}
        each_data['id'] = i
        each_rule = verbalized_rules[i].strip()
        assert each_rule[:3] == "If " and ", then " in each_rule
        premise, conclusion = each_rule[3:].split(", then ")
        premise = premise + "."
        each_data['premise'] = premise
        each_data['conclusion'] = conclusion
        data_list.append(each_data)
    print(len(data_list))    

    import csv
    with open(annotation_file, "w") as csv_f:
        writer = csv.writer(csv_f)
        writer.writerow(['entryid', 'premise', 'conclusion'])

        for each in data_list:
            writer.writerow([each['id'], each['premise'], each['conclusion']])

In [2]:
import csv
from tqdm import tqdm
def process_annotation_data(rule_file, verbalized_rule_file, result_file, write_file_name, write_verbalized_file_name, strict=False, post_filter=False):
    with open(rule_file, "r") as v_r_f:
        rules = v_r_f.readlines()
        print(len(rules))
    with open(verbalized_rule_file, "r") as v_r_f:
        verbalized_rules = v_r_f.readlines()
        print(len(verbalized_rules))

    all_data = []
    with open(result_file) as csvfile:
        reader = csv.reader(csvfile)
        for row in reader:
            all_data.append(row)
    print(len(all_data)-1)
    print("worker_id", all_data[0].index("WorkerId"))

    annotation_dict_correct = {}
    annotation_dict_common_premise = {}
    annotation_dict_common_conc = {}
    for i in range(1, len(all_data)):
        if all_data[i][15] != 'A3MTHVR1EJ8LMM':
            entry_id = all_data[i][27]
            if entry_id not in annotation_dict_correct:
                annotation_dict_correct[entry_id] = [all_data[i][32]]
            else:
                annotation_dict_correct[entry_id].append(all_data[i][32]) 
            
            if entry_id not in annotation_dict_common_premise:
                annotation_dict_common_premise[entry_id] = [all_data[i][33]]
            else:
                annotation_dict_common_premise[entry_id].append(all_data[i][33]) 
            
            if entry_id not in annotation_dict_common_conc:
                annotation_dict_common_conc[entry_id] = [all_data[i][34]]
            else:
                annotation_dict_common_conc[entry_id].append(all_data[i][34]) 
    print(len(annotation_dict_correct), len(annotation_dict_common_premise), len(annotation_dict_common_conc))

    all_annotate_ids = []
    filter_ids_1 = []
    for each in annotation_dict_correct:
        if len(annotation_dict_correct[each]) < 3:
            all_annotate_ids.append(each)

        entaiment_count = annotation_dict_correct[each].count("2")
        premise_count = annotation_dict_common_premise[each].count("2")
        conc_count = annotation_dict_common_conc[each].count("2")
        if not strict:
            if premise_count >= 2 and conc_count >= 2 and entaiment_count >= 2:   # multiple 
                filter_ids_1.append(each)
        else:
            if premise_count >= 2 and conc_count >= 2 and entaiment_count >= 3:   # multiple 
                filter_ids_1.append(each)
    correct_index = sorted(filter_ids_1)

    if post_filter:
        new_correct_index = []
        for each_id in correct_index:
            conclusion, premises_list = parse_sentence_to_conclusion_premise(rules[int(each_id)].strip())
            premise, vb_conclusion = verbalized_rules[int(each_id)].strip()[3:].split(", then ") 
            if "Can" not in conclusion and " can " in vb_conclusion:
                pass
            else:
                new_correct_index.append(each_id)
        correct_index = new_correct_index

    print(len(correct_index))
    print("all_annotate_ids", all_annotate_ids)

    with open(write_file_name, 'w') as w_f:
        for id in correct_index:
            w_f.write(rules[int(id)].strip()+"\n")
    with open(write_verbalized_file_name, 'w') as w_f:
        for id in correct_index:
            w_f.write(verbalized_rules[int(id)].strip()+"\n")

In [None]:
# One Step 
one_step_bw_rule_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/interaction_symbolic_rules_one_bw_strict.txt"
# Step 4-4: filter repetitive extended rules
filter_symbolic_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/filter/interaction_symbolic_rules_one_bw_strict.txt'
filter_same_rules(one_step_bw_rule_file, filter_symbolic_extend_rule_file, thres=4)

# Step 4-5: verbalize extended rules
filter_verbalized_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/filter/interaction_verbalized_rules_one_bw_strict.txt'
verbalized_extended_rules(filter_symbolic_extend_rule_file, filter_verbalized_extend_rule_file)

# Step 4-6: filter repetitive verbalized extended rules
re_filter_symbolic_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/repetitive_filter/interaction_symbolic_rules_one_bw_strict.txt'
re_filter_verbalized_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/repetitive_filter/interaction_verbalized_rules_one_bw_strict.txt'
filter_diversified_verblized_rules(filter_symbolic_extend_rule_file, filter_verbalized_extend_rule_file, re_filter_symbolic_extend_rule_file, re_filter_verbalized_extend_rule_file, concepts_accessibility)

filter_invalid_rule(re_filter_symbolic_extend_rule_file, re_filter_symbolic_extend_rule_file, re_filter_verbalized_extend_rule_file, re_filter_verbalized_extend_rule_file, is_single=False, is_compositional=True)


# Two Step 
two_step_bw_rule_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/interaction_symbolic_rules_two_bw_2_strict_useself.txt"
# Step 4-4: filter repetitive extended rules
filter_symbolic_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/filter/interaction_symbolic_rules_two_bw_2_strict_useself.txt'
filter_same_rules(two_step_bw_rule_file, filter_symbolic_extend_rule_file, thres=10)

# Step 4-5: verbalize extended rules
filter_verbalized_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/filter/interaction_verbalized_rules_two_bw_2_strict_useself.txt'
verbalized_extended_rules(filter_symbolic_extend_rule_file, filter_verbalized_extend_rule_file)

# Step 4-6: filter repetitive verbalized extended rules
re_filter_symbolic_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/repetitive_filter/interaction_symbolic_rules_two_bw_2_strict_useself.txt'
re_filter_verbalized_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/repetitive_filter/interaction_verbalized_rules_two_bw_2_strict_useself.txt'
filter_diversified_verblized_rules(filter_symbolic_extend_rule_file, filter_verbalized_extend_rule_file, re_filter_symbolic_extend_rule_file, re_filter_verbalized_extend_rule_file, concepts_accessibility)

filter_invalid_rule(re_filter_symbolic_extend_rule_file, re_filter_symbolic_extend_rule_file, re_filter_verbalized_extend_rule_file, re_filter_verbalized_extend_rule_file, is_single=False, is_compositional=True)


# Three Step v3
three_step_bw_rule_file = "ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/interaction_symbolic_rules_three_bw_3_strict_useself.txt"
# Step 4-4: filter repetitive extended rules
filter_symbolic_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/filter/interaction_symbolic_rules_three_bw_3_strict_useself.txt'
filter_same_rules(three_step_bw_rule_file, filter_symbolic_extend_rule_file, thres=10)

# Step 4-5: verbalize extended rules
filter_verbalized_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/filter/interaction_verbalized_rules_three_bw_3_strict_useself.txt'
verbalized_extended_rules(filter_symbolic_extend_rule_file, filter_verbalized_extend_rule_file)

# Step 4-6: filter repetitive verbalized extended rules
re_filter_symbolic_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/repetitive_filter/interaction_symbolic_rules_three_bw_3_strict_useself.txt'
re_filter_verbalized_extend_rule_file = 'ScriptData/Primitive/RuleSet/rule_base/complex_extended_rules/bw/repetitive_filter/interaction_verbalized_rules_three_bw_3_strict_useself.txt'
filter_diversified_verblized_rules(filter_symbolic_extend_rule_file, filter_verbalized_extend_rule_file, re_filter_symbolic_extend_rule_file, re_filter_verbalized_extend_rule_file, concepts_location)

filter_invalid_rule(re_filter_symbolic_extend_rule_file, re_filter_symbolic_extend_rule_file, re_filter_verbalized_extend_rule_file, re_filter_verbalized_extend_rule_file, is_single=False, is_compositional=True)