In [1]:
import json
import os
import csv
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns

from utils import *
from logic import *

In [2]:
traces = [Trace(obj) for obj in json.load(open("trace_motivation.json", "r"))]
for t in traces:
    for f in t.state:
        if f.name == "b":
            f.name = "black"
        elif f.name == "w":
            f.name = "white"
    for r in t.rules:
        for f in r.preconditions:
            if f.name == "b":
                f.name = "black"
            elif f.name == "w":
                f.name = "white"
        for f in r.removed_preconditions:
            if f.name == "b":
                f.name = "black"
            elif f.name == "w":
                f.name = "white"
        for f in r.effects:
            if f.name == "b":
                f.name = "black"
            elif f.name == "w":
                f.name = "white"
            elif f.name == "-b":
                f.name = "-black"
            elif f.name == "-w":
                f.name = "-white"

## Danger and revision potential

The danger $d_k$ of a component $k$ is its contribution to **diminishing** $\mathcal{P}(F_{R,E})$:

$$d_k = \mathcal{P}(n_k)\cdot\Bigg[\mathcal{P}(\neg F_{R,E}|n_k)-\mathcal{P}(\neg F_{R,E}|\neg n_k)\Bigg]$$

The revision potential $r_k$ of a component $k$ is the chances of seing it revised (removed in case of a generalization, added back in case of a specialization).

In case of a generalization (if rule is not prematched), it only evaluates for preconditions and constants. It is the probability that a fulfilment is obtained and that $k$ is unverified under the fulfilling substitution:

$$r_k = \frac{1}{|\Sigma|}\cdot\sum_{\sigma\in\Sigma}{\mathcal{P}(F_{R,E,\sigma})\cdot \mathbb{1}[k\in\sigma_{unverif}]}$$

In case of a specialization (if rule is prematched), it only evaluates for removed preconditions. It is the probability that a fulfilment is **not** obtained and that $k$ is verified under a lot of almost fulfilling substitutions:

$$r_k = $$

In [3]:
def tracing_tree(ni_probs, danger, cds, choice_index=0, truth=set(), false=set(), branch_power=1):
    if choice_index == len(ni_probs) or len(cds) == 0:
        for ni in range(len(ni_probs)):
            if ni in truth:
                danger[ni] += branch_power / ni_probs[ni]
            elif ni in false:
                danger[ni] -= branch_power / (1 - ni_probs[ni])
        return branch_power

    choice_power = ni_probs[choice_index]

    cds_true = []
    cds_false = []
    false_branch_zero = False

    found_choice_in_disj = False
    for disj in cds:
        if choice_index in disj:
            found_choice_in_disj = True

            remaining_disj = disj.copy()
            remaining_disj.remove(choice_index)
            if len(remaining_disj) == 0:
                false_branch_zero = True
            else:
                cds_false.append(remaining_disj)
        else:
            cds_true.append(disj)
            cds_false.append(disj)

    if ni_probs[choice_index] == 1:
        false_branch_zero = True
            
    if not found_choice_in_disj:
        return tracing_tree(ni_probs, danger, cds_true,
                            choice_index + 1, truth, false, branch_power)

    truth_true = truth.copy()
    truth_true.add(choice_index)
    true_branch_value = 0
    if ni_probs[choice_index] != 0:
        true_branch_value = tracing_tree(ni_probs, danger, cds_true,
                                         choice_index + 1, truth_true, false, branch_power * choice_power)

    false_branch_value = 0
    if not false_branch_zero:
        false_false = false.copy()
        false_false.add(choice_index)
        false_branch_value = tracing_tree(ni_probs, danger, cds_false,
                                          choice_index + 1, truth, false_false, branch_power * (1 - choice_power))

    return true_branch_value + false_branch_value

def compute_danger_revision(trace, rule):
    subs = rule.substitutions
    precs = rule.preconditions
    remprecs = rule.removed_preconditions
    constants = rule.constants
    
    state = trace.state
    
    ni_probs = [elem.nec for elem in precs + remprecs + constants]
    danger = [0]*len(ni_probs)
    potential = [0]*len(ni_probs)
    cds = [set() for s in subs]
    
    for s, sub in enumerate(subs):
        pFREsigma = 1
        for i, ni in enumerate(precs + remprecs):
            subbed = sub.apply(ni)
            if subbed not in state:
                cds[s].add(i)
                pFREsigma *= 1 - ni.nec
        for i, ni in enumerate(constants):
            subbed = sub.get(ni)
            inv = sub.getinv(ni)
            if (subbed is not None and subbed != ni.name) or (inv is not None and inv != ni.name):
                cds[s].add(i + len(precs) + len(remprecs))
                pFREsigma *= 1 - ni.nec
        
        for i in cds[s]:
            potential[i] += pFREsigma / len(subs)
    
    revprob = tracing_tree(ni_probs, danger, cds)
    
    for ni in range(len(ni_probs)):
        danger[ni] = max(0, danger[ni] * ni_probs[ni])
    
    danger_precs = [Literal([l.name, l.params, danger[i]]) for i, l in enumerate(precs)]
    danger_remprecs = [Literal([l.name, l.params, danger[i + len(precs)]]) for i, l in enumerate(remprecs)]
    danger_consts = [Constant([c.name, danger[i + len(precs) + len(remprecs)]]) for i, c in enumerate(constants)]
    
    pot_precs = [Literal([l.name, l.params, potential[i]]) for i, l in enumerate(precs)]
    pot_remprecs = [Literal([l.name, l.params, potential[i + len(precs)]]) for i, l in enumerate(remprecs)]
    pot_consts = [Constant([c.name, potential[i + len(precs) + len(remprecs)]]) for i, c in enumerate(constants)]
    
    return 1 - revprob, danger_precs, danger_remprecs, danger_consts, pot_precs, pot_remprecs, pot_consts

In [7]:
def motivate_experiment(trace_index, trace: Trace):
    rule = None
    best_revprob = 0
    for r in trace.rules:
        revprob_r = (1 - r.fulfilment) if r.prematching else r.fulfilment
        if rule is None or revprob_r > best_revprob:
            rule = r
            best_revprob = revprob_r
    if rule is None:
        return
    
    prec_verification = np.zeros(len(rule.preconditions))
    remprec_verification = np.zeros(len(rule.removed_preconditions))
    csts_verification = np.zeros(len(rule.constants))
    
    subs = rule.substitutions
    for sub in subs:
        subbed_preconds = [sub.apply(p) for p in rule.preconditions]
        subbed_rempreconds = [sub.apply(p) for p in rule.removed_preconditions]
        subbed_constants = [sub.get(t.name) for t in rule.constants]
        
        for i, subbed in enumerate(subbed_preconds):
            if subbed in trace.state:
                prec_verification[i] += 1
        for i, subbed in enumerate(subbed_rempreconds):
            if subbed in trace.state:
                remprec_verification[i] += 1
        for i, cst in enumerate(rule.constants):
            if subbed_constants[i] is None or subbed_constants[i] == cst.name:
                csts_verification[i] += 1

    revprob,\
    danger_prec, danger_remprecs, danger_consts,\
    pot_precs, pot_remprecs, pot_consts = compute_danger_revision(trace, rule)
    
    print(colortxt("--------------------------------------------------------------------------------------------", GREY))
    print("Trace n°{}, in state:".format(trace_index))
    print(align_lits(trace.state))
    print("Experiment", trace.action, "is conducted. It has", qualif_probability(trace.rev_prob), "of triggering a " + colortxt("SPECIALIZATION" if rule.prematching else "GENERALIZATION", RED, lighten(RED, 4)) + " revision.")
    print()
#     print("Rule R led to the highest generalization probability. Its preconditions necessities are:")
#     print(align_lits(rule.preconditions))
#     if len(rule.removed_preconditions) > 0:
#         print("Its removed preconditions necessities are:")
#         print(align_lits(rule.removed_preconditions))
#     if len(rule.constants) > 0:
#         print("Its constants necessities are:")
#         print(align_lits(rule.constants))
#     print("R's action is", rule.action ,"and its effects are:")
#     print(align_lits(rule.effects))
#     print()
    print("Rule R has", qualif_probability(rule.fulfilment), "of being fulfilled, because the danger on its preconditions is:")
    print(align_lits(danger_prec))
    if len(rule.removed_preconditions) > 0:
        print("The danger of its removed preconditions is:")
        print(align_lits(danger_remprecs))
    if len(rule.constants) > 0:
        print("The danger of its constants necessities is:")
        print(align_lits(danger_consts))
    print()
    if not rule.prematching:
        print("The revision potential of its preconditions is:")
        print(align_lits(pot_precs))
        if len(rule.constants) > 0:
            print("And the revision potential of its constants is:")
            print(align_lits(pot_consts))
    else:
        print("But rule prematches so if it doesn't fulfill, a specialization will occur.")
    print()
    print(colortxt("(a precondition is dangerous when it is highly necessary and often unverified)", GREY))
    print(colortxt("(a precondition has great revision potential when it is not necessary and often unverified)", GREY))
    print(colortxt("--------------------------------------------------------------------------------------------", GREY))
    print()
    
    # danger = high necessity * low verification = nec * (1 - verif)
    # target = low necessity * low verification = (1 - nec) * (1 - verif)
        

In [8]:
print(traces[0])

Trace:
State:
  | black(a)           | black(d)           | black(f)           | black(g)           | clear(d)           | 
  | clear(g)           | on(a, e)           | on(b, floor)       | on(c, floor)       | on(d, c)           | 
  | on(e, f)           | on(f, b)           | on(g, a)           | white(b)           | white(c)           | 
  | white(e)           | 
Action: move(c, g)

  Rule:
  Action: move(g, a)
  Preconds:
    | [48;5;156m[38;5;16mblack(a)      18%[38;5;0m [48;5;231m | [48;5;156m[38;5;16mblack(d)      18%[38;5;0m [48;5;231m | [48;5;156m[38;5;16mblack(f)      18%[38;5;0m [48;5;231m | [48;5;192m[38;5;16mblack(g)      27%[38;5;0m [48;5;231m | [48;5;210m[38;5;16mclear(a)     100%[38;5;0m [48;5;231m | 
    | [48;5;156m[38;5;16mclear(d)      18%[38;5;0m [48;5;231m | [48;5;192m[38;5;16mclear(g)      27%[38;5;0m [48;5;231m | [48;5;192m[38;5;16mon(a, e)      27%[38;5;0m [48;5;231m | [48;5;156m[38;5;16mon(b, floor)  18%[38;5;0m [48;5;231m 

In [9]:
for ti, t in enumerate(traces):
    if t.rev_prob > 0.5:
        motivate_experiment(ti, t)

[38;5;102m--------------------------------------------------------------------------------------------[38;5;0m
Trace n°3, in state:
| black(a)           | black(c)           | black(d)           | black(f)           | black(g)           | 
| clear(a)           | clear(c)           | clear(d)           | clear(e)           | clear(g)           | 
| on(a, floor)       | on(b, floor)       | on(c, floor)       | on(d, floor)       | on(e, f)           | 
| on(f, b)           | on(g, floor)       | white(b)           | white(e)           | 
Experiment move(g, d) is conducted. It has an average chance (52%) of triggering a [48;5;224m[38;5;196mGENERALIZATION[38;5;0m[48;5;231m revision.

Rule R has an average chance (48%) of being fulfilled, because the danger on its preconditions is:
| [48;5;120m[38;5;16mblack([38;5;19mZ[38;5;0m)       0%[38;5;0m [48;5;231m | [48;5;120m[38;5;16mblack([38;5;19mS[38;5;0m)       0%[38;5;0m [48;5;231m | [48;5;120m[38;5;16mblack(g)       0%[3