# Inference logic

The codebase is composed by three classes:
1. Facts
2. Rules
3. Inference Engine

Each fact_name(a, b) has two properties:
- name (i.e. predicate): fact_name
- attributes: a, b

Facts contains only constants. A constant must start with a lowercase.

Each rule contains two facts:
- An antecedent
- A consequent: true if the antecedent is true

Facts in a rule can contains variables. A variable must start with an uppercase.

In [1]:
class Fact: # TODO: class diagrams
    def __init__(self, name, attributes):
        self.name = name
        self.attributes = attributes
        
    def __eq__(self, other):
        if isinstance(other, Fact):
            return self.name == other.name and self.attributes == other.attributes

In [2]:
class Rule:
    def __init__(self, antecedent, consequent):
        self.antecedent = antecedent
        self.consequent = consequent

## How this inference work

In this case, the inference model will try to search with a for-loop if there exists at least one rule in which the name is the same of the one on the query, and if there exists a combination of variables where the rule is satisfied

In [3]:
class InferenceEngine:
    def __init__(self):
        self.facts = []
        self.rules = []

    def add_fact(self, fact):
        self.facts.append(fact)

    def add_rule(self, antecedent, consequent):
        self.rules.append(Rule(antecedent, consequent))
        
    def is_variable(self, term):
        return term[0].isupper()
    
    def unify(self, fact1_attrs, fact2_attrs):
        substitution = {}
        for attr1, attr2 in zip(fact1_attrs, fact2_attrs):
            if self.is_variable(attr1):
                substitution[attr1] = attr2
            elif self.is_variable(attr2):
                substitution[attr2] = attr1
            elif attr1 != attr2:
                return None
        return substitution
    
    def inference(self, query_facts):
        for rule in self.rules:
            for fact in self.facts:
                if rule.antecedent.name == fact.name:
                    substitution = self.unify(rule.antecedent.attributes, fact.attributes)
                    if substitution is not None:
                        infered_fact = Fact(rule.consequent.name, [substitution.get(attr, attr) for attr in rule.consequent.attributes])
                        if infered_fact in query_facts:
                            return True
        return False

## Example

In [4]:
engine = InferenceEngine()

engine.add_fact(Fact("dad", ["dom", "ale"]))

engine.add_rule(Fact("dad", ["X", "Y"]), Fact("son", ["Y", "X"]))

result = engine.inference([Fact("son", ["ale", "dom"])])

if result:
    print("Rule verified")
else:
    print("Rule not verified")

Rule verified


# Multiple antecedents

In [5]:
from itertools import product
import time

In [6]:
class Rule:
    def __init__(self, antecedents, consequent):
        self.antecedents = antecedents  # now this is a list
        self.consequent = consequent

In [7]:
def add_rule_v2(self, antecedents, consequent):
    self.rules.append(Rule(antecedents, consequent))

def inference_v2(self, query_facts):
    for query_fact in query_facts:
        for rule in self.rules:
            matched_facts = []
            for antecedent in rule.antecedents:  # iteration on multiple antecedents
                for fact in self.facts:
                    if antecedent.name == fact.name:
                        substitution = self.unify(antecedent.attributes, fact.attributes)
                        if substitution is not None:
                            matched_facts.append(substitution)

            # Verifica che tutti gli antecedenti abbiano fatti corrispondenti
            if len(matched_facts) == len(rule.antecedents):
                # Unificazione delle sostituzioni
                unified_substitution = {}
                for substitution in matched_facts:
                    unified_substitution.update(substitution)

                inferred_fact = Fact(rule.consequent.name, [unified_substitution.get(attr, attr) for attr in rule.consequent.attributes])
                if inferred_fact == query_fact:
                    return True
    return False

InferenceEngine.add_rule = add_rule_v2
InferenceEngine.inference = inference_v2

In [8]:
engine = InferenceEngine()
engine.add_fact(Fact("dad", ["dom", "ale"]))
engine.add_fact(Fact("mom", ["ria", "ale"]))

engine.add_rule([Fact("dad", ["X", "Y"]), Fact("mom", ["Z", "Y"])], Fact("child", ["Y", "X", "Z"]))

result = engine.inference([Fact("child", ["ale", "dom", "ria"])])

if result:
    print("Rule verified")
else:
    print("Rule not verified")

Rule verified


# Multiple consequentes

In [9]:
class Rule:
    def __init__(self, antecedents, consequents):
        self.antecedents = antecedents
        self.consequents = consequents # now this is a list

In [10]:
def add_rule_v3(self, antecedents, consequents):
    self.rules.append(Rule(antecedents, consequents))
    
def inference_v3(self, query_facts):
    results = [False] * len(query_facts)

    for rule in self.rules:
        all_matched_facts = []

        for antecedent in rule.antecedents:
            matched_facts_for_antecedent = []

            for fact in self.facts:
                if antecedent.name == fact.name:
                    substitution = self.unify(antecedent.attributes, fact.attributes)
                    if substitution is not None:
                        matched_facts_for_antecedent.append(substitution)

            all_matched_facts.append(matched_facts_for_antecedent)

        for combined_matched_facts in product(*all_matched_facts):
            unified_substitution = {}

            for substitution in combined_matched_facts:
                unified_substitution.update(substitution)

            for i, query_fact in enumerate(query_facts):
                for consequent in rule.consequents:
                    inferred_fact = Fact(consequent.name, [unified_substitution.get(attr, attr) for attr in consequent.attributes])
                    if inferred_fact == query_fact:
                        results[i] = True

    return results


InferenceEngine.add_rule = add_rule_v3
InferenceEngine.inference = inference_v3

In [11]:
# Esempio di utilizzo
engine = InferenceEngine()
engine.add_fact(Fact("dad", ["dom", "ale"]))
engine.add_fact(Fact("mom", ["ria", "ale"]))

engine.add_rule(
    [Fact("dad", ["X", "Y"]), Fact("mom", ["Z", "Y"])], 
    [Fact("child", ["Y", "X", "Z"]), Fact("son", ["Y", "X"]), Fact("son", ["Y", "X"])]
)

# Query
facts_to_inference = []
facts_to_inference.append(Fact("child", ["ale", "dom", "ria"]))
facts_to_inference.append(Fact("son", ["ale", "dom"]))

query = engine.inference(facts_to_inference)

print(query)

[True, True]


# Adding variables in query

In [19]:
def inference_v4(self, query_facts):
    results = [[] for _ in range(len(query_facts))]

    for rule in self.rules:
        all_matched_facts = []

        for antecedent in rule.antecedents:
            matched_facts_for_antecedent = []

            for fact in self.facts:
                if antecedent.name == fact.name:
                    substitution = self.unify(antecedent.attributes, fact.attributes)
                    if substitution is not None:
                        matched_facts_for_antecedent.append(substitution)

            all_matched_facts.append(matched_facts_for_antecedent)

        for combined_matched_facts in product(*all_matched_facts):
            unified_substitution = {}

            for substitution in combined_matched_facts:
                unified_substitution.update(substitution)

            for i, query_fact in enumerate(query_facts):
                for consequent in rule.consequents:
                    inferred_fact = Fact(consequent.name, [unified_substitution.get(attr, attr) for attr in consequent.attributes])

                    query_substitution = self.unify(inferred_fact.attributes, query_fact.attributes)
                    if query_substitution is not None:
                        results[i].append(query_substitution)

    return results

def inference_exists(self, query_facts): # renaming this
    results = [False] * len(query_facts)

    for rule in self.rules:
        all_matched_facts = []

        for antecedent in rule.antecedents:
            matched_facts_for_antecedent = []

            for fact in self.facts:
                if antecedent.name == fact.name:
                    substitution = self.unify(antecedent.attributes, fact.attributes)
                    if substitution is not None:
                        matched_facts_for_antecedent.append(substitution)

            all_matched_facts.append(matched_facts_for_antecedent)

        for combined_matched_facts in product(*all_matched_facts):
            unified_substitution = {}

            for substitution in combined_matched_facts:
                unified_substitution.update(substitution)

            for i, query_fact in enumerate(query_facts):
                for consequent in rule.consequents:
                    inferred_fact = Fact(consequent.name, [unified_substitution.get(attr, attr) for attr in consequent.attributes])
                    if inferred_fact == query_fact:
                        results[i] = True

    return results

InferenceEngine.inference = inference_v4
InferenceEngine.inference_exists = inference_exists

In [20]:
engine = InferenceEngine()
engine.add_fact(Fact("dad", ["dom", "ale"]))
engine.add_fact(Fact("mom", ["ria", "ale"]))

engine.add_rule(
    [Fact("dad", ["X", "Y"]), Fact("mom", ["Z", "Y"])], 
    [Fact("child", ["Y", "X", "Z"]), Fact("son", ["Y", "X"]), Fact("son", ["Y", "X"])]
)

start_time = time.time()

facts_to_inference = []
facts_to_inference.append(Fact("child", ["ale", "dom", "ria"]))
facts_to_inference.append(Fact("son", ["ale", "X"]))

query = engine.inference(facts_to_inference)
end_time = time.time()
elapsed_time = end_time - start_time

print(query)
    
print(f"Time taken for inference: {elapsed_time:.6f} seconds")


[[{}, {}, {}], [{'X': 'dom'}, {'X': 'dom'}, {'X': 'dom'}]]
Time taken for inference: 0.000277 seconds


## Load facts and rules from file

```
fact_name(a,b).
rule_name(A,B) -> derived_fact(B,A).
```

In [14]:
import re

def load_from_file(engine, filename):
    with open(filename, 'r') as f:
        lines = f.readlines()

    for line in lines:
        line = line.strip().strip('.')  # Rimuove spazi bianchi, newline e il punto finale

        # Se la linea è una regola
        if '->' in line:
            antecedent_str, consequent_str = line.split('->')
            antecedents = [parse_fact(fact.strip()) for fact in antecedent_str.split('&')]
            consequents = [parse_fact(fact.strip()) for fact in consequent_str.split('&')]
            
            # Aggiunge una sola regola con più antecedenti e conseguenti
            engine.add_rule(antecedents, consequents)

        # Se la linea è un fatto
        else:
            fact = parse_fact(line)
            engine.add_fact(fact)

def parse_fact(fact_str):
    # Utilizzo di espressioni regolari per estrarre il nome e gli attributi del fatto
    m = re.match(r"(\w+)\(([^)]+)\)", fact_str)
    if m:
        name, attrs = m.groups()
        attrs = attrs.split(',')
        attrs = [attr.strip() for attr in attrs]
        return Fact(name, attrs)



In [15]:
engine = InferenceEngine()
load_from_file(engine, 'rules_and_facts.txt')

In [25]:
facts_to_inference = [
    Fact("son", ["ale", "X"])
]

engine.inference_exists(facts_to_inference)

[False]

In [21]:
facts_to_inference = [
    Fact("son", ["ale", "X"])
]

engine.inference(facts_to_inference)

[[{'X': 'dom'}, {'X': 'dom'}, {'X': 'dom'}]]

# Performance analysis

WIP