In [1]:
from collections import namedtuple
Factor = namedtuple("Factor", ["vars", "values"])

def print_factor(phi, indent="\t"):
    line = " | ".join(phi.vars + ["ϕ(" + ",".join(phi.vars) + ")"])
    sep = "".join(["+" if c == "|" else "-" for c in list(line)])
    print(indent + sep)
    print(indent + line)
    print(indent +sep)
    for values, p in phi.values.items():
        print(indent + " | ".join([str(v) for v in values] + [str(p)]))
    print(indent + sep)

# Examples

phi_ABC = Factor(vars=["A", "B", "C"],
                 values={(0, 0, 0): .1, (0, 0, 1): .9, (0, 1, 0): .8, (0, 1, 1): .2,
                         (1, 0, 0): .7, (1, 0, 1): .4, (1, 1, 0): .5, (1, 1, 1): .5})
phi_AB = Factor(vars=["A", "B"], values={(0, 0): .1, (0, 1): .9, (1, 0): .8, (1, 1): .2})
phi_BC = Factor(vars=["B", "C"], values={(0, 0): .2, (0, 1): .8, (1, 0): .5, (1, 1): .5})
phi_A = Factor(vars=["A"], values={(0,): .4, (1,): .6})
phi_C = Factor(vars=["C"], values={(0,): .6, (1,): .8})

print_factor(phi_ABC)
print("ϕ(A=1, B=0, C=0) = " + str(phi_ABC.values[(1, 0, 0)]))

	--+---+---+---------
	A | B | C | ϕ(A,B,C)
	--+---+---+---------
	0 | 0 | 0 | 0.1
	0 | 0 | 1 | 0.9
	0 | 1 | 0 | 0.8
	0 | 1 | 1 | 0.2
	1 | 0 | 0 | 0.7
	1 | 0 | 1 | 0.4
	1 | 1 | 0 | 0.5
	1 | 1 | 1 | 0.5
	--+---+---+---------
ϕ(A=1, B=0, C=0) = 0.7


In [2]:
# Multiplicarea a doi factori:
def _rows_match(p1, p2, common_pos1, common_pos2):
    return [p1[i] for i in common_pos1] == [p2[j] for j in common_pos2]


def _get_merged_indices(vars1, p1, vars2, p2, unified):
    return tuple([p1[vars1.index(var)] if var in vars1 else p2[vars2.index(var)] for var in unified])


def multiply(phi1, phi2):
    assert isinstance(phi1, Factor) and isinstance(phi2, Factor)
    # Cerinta 1 :

    vars_set1 = set(phi1.vars)
    vars_set2 = set(phi2.vars)
    phi_vars = vars_set1 | vars_set2

    phi = Factor(vars=list(phi_vars), values={})
    common_vars = vars_set1 & vars_set2

    common_pos1 = [phi1.vars.index(var) for var in common_vars]
    common_pos2 = [phi2.vars.index(var) for var in common_vars]

    for p1, val1 in phi1.values.items():
        for p2, val2 in phi2.values.items():
            if _rows_match(p1, p2, common_pos1, common_pos2):
                key = _get_merged_indices(phi1.vars, p1, phi2.vars, p2, phi_vars)
                phi.values[key] = val1 * val2

    return phi

In [3]:
print_factor(phi_AB)
print("*")
print_factor(phi_BC)
print("=")
print_factor(multiply(phi_AB, phi_BC))

	--+---+-------
	A | B | ϕ(A,B)
	--+---+-------
	0 | 0 | 0.1
	0 | 1 | 0.9
	1 | 0 | 0.8
	1 | 1 | 0.2
	--+---+-------
*
	--+---+-------
	B | C | ϕ(B,C)
	--+---+-------
	0 | 0 | 0.2
	0 | 1 | 0.8
	1 | 0 | 0.5
	1 | 1 | 0.5
	--+---+-------
=
	--+---+---+---------
	A | B | C | ϕ(A,B,C)
	--+---+---+---------
	0 | 0 | 0 | 0.020000000000000004
	0 | 0 | 1 | 0.08000000000000002
	0 | 1 | 0 | 0.45
	0 | 1 | 1 | 0.45
	1 | 0 | 0 | 0.16000000000000003
	1 | 0 | 1 | 0.6400000000000001
	1 | 1 | 0 | 0.1
	1 | 1 | 1 | 0.1
	--+---+---+---------


In [4]:
## Tests for multiply

from itertools import permutations
from operator import mul
from functools import reduce
import sys
from copy import deepcopy

def _check_factor(_phi, all_vars, control):
    assert sorted(_phi.vars) == sorted(all_vars), \
        "Wrong variables: " + ','.join(_phi.vars) + " instead of " + ','.join(all_vars)
    assert len(_phi.values) == 2 ** len(all_vars), \
        "Wrong number of entries in phi.values: " + str(len(_phi.values))
    n = len(all_vars)
    if n > 0:
        for j in range(n + 1):
            vals = [0] * (n - j) + [1] * j
            keys = set([p for p in permutations(vals)])
            p = reduce(mul, [_phi.values[k] for k in keys])
            assert abs(p - control[j]) < 1e-9, \
                "Values for " + str(keys) + " are wrong!"
    else:
        assert abs(_phi.values[()] - control[0]) < 1e-9


def _test_multiply(name1, name2, all_vars, control, verbose=False):
    _phi = eval("multiply(deepcopy(phi_"+name1+"), deepcopy(phi_"+name2+"))")
    if verbose:
        print("Result of ϕ_"+name+" * ϕ_"+name2+":")
        print_factor(_phi)
    sys.stdout.write("Testing  ϕ_"+name1+" * ϕ_"+name2+" ... ")
    _check_factor(_phi, all_vars, control)
    print("OK!!")

_test_multiply("AB", "BC", ["A", "B", "C"], [.02, .00576, .0288, .1], verbose=False)
_test_multiply("A", "BC", ["A", "B", "C"], [.08, .00768, .0288, .3])
_test_multiply("A", "AB", ["A", "B"], [.04, .1728, .12])
_test_multiply("BC", "A", ["C", "A", "B"], [.08, .00768, .0288, .3])
_test_multiply("ABC", "BC", ["C", "A", "B"], [.02, .04032, .008, .25])
_test_multiply("C", "A", ["C", "A"], [.24, .1152, .48])
_test_multiply("A", "C", ["C", "A"], [.24, .1152, .48])
_test_multiply("C", "C", ["C"], [.36, .64])

print("\nMultiply seems ok!\n")

Testing  ϕ_AB * ϕ_BC ... OK!!
Testing  ϕ_A * ϕ_BC ... OK!!
Testing  ϕ_A * ϕ_AB ... OK!!
Testing  ϕ_BC * ϕ_A ... OK!!
Testing  ϕ_ABC * ϕ_BC ... OK!!
Testing  ϕ_C * ϕ_A ... OK!!
Testing  ϕ_A * ϕ_C ... OK!!
Testing  ϕ_C * ϕ_C ... OK!!

Multiply seems ok!



In [5]:
def _get_reduced_indices(pos, idx1, idx2):
    reduced_idx = idx1[:pos] + idx1[(pos + 1):]

    if reduced_idx == idx2[:pos] + idx2[(pos + 1):]:
        return reduced_idx


def sum_out(var, phi):
    assert isinstance(phi, Factor) and var in phi.vars
    # Cerinta 2:

    pos = phi.vars.index(var)
    new_phi = Factor(vars=deepcopy(phi.vars), values={})
    new_phi.vars.remove(var)

    for p1, val1 in phi.values.items():
        for p2, val2 in phi.values.items():
            if p1 != p2:
                idx = _get_reduced_indices(pos, p1, p2)
                if idx is not None:
                    new_phi.values[idx] = val1 + val2

    return new_phi

In [6]:
# Un exemplu

print("Însumând B afară din")
print_factor(phi_ABC)
print("=")
print_factor(sum_out("B", phi_ABC))

Însumând B afară din
	--+---+---+---------
	A | B | C | ϕ(A,B,C)
	--+---+---+---------
	0 | 0 | 0 | 0.1
	0 | 0 | 1 | 0.9
	0 | 1 | 0 | 0.8
	0 | 1 | 1 | 0.2
	1 | 0 | 0 | 0.7
	1 | 0 | 1 | 0.4
	1 | 1 | 0 | 0.5
	1 | 1 | 1 | 0.5
	--+---+---+---------
=
	--+---+-------
	A | C | ϕ(A,C)
	--+---+-------
	0 | 0 | 0.9
	0 | 1 | 1.1
	1 | 0 | 1.2
	1 | 1 | 0.9
	--+---+-------


In [7]:
## Tests for sum_out

def _test_sum_out(var, name, left_vars, control, verbose=False):
    import sys
    from itertools import permutations
    from operator import mul
    from functools import reduce
    _phi = eval("sum_out('"+var+"', phi_"+name+")")
    if verbose:
        print_factor(_phi)
    sys.stdout.write("Testing  sum_"+var+" ϕ_"+name+" ... ")
    _check_factor(_phi, left_vars, control)
    print("OK!!")

_test_sum_out("A", "ABC", ["C", "B"], [.8, 1.69, .7], verbose=False)
_test_sum_out("B", "ABC", ["A", "C"], [.9, 1.32, .9], verbose=False)
_test_sum_out("C", "C", [], [1.4], verbose=False)
_test_sum_out("A", "A", [], [1.], verbose=False)
_test_sum_out("B", "BC", ["C"], [.7, 1.3], verbose=False)

print("\nSummations seem ok!\n")

Testing  sum_A ϕ_ABC ... OK!!
Testing  sum_B ϕ_ABC ... OK!!
Testing  sum_C ϕ_C ... OK!!
Testing  sum_A ϕ_A ... OK!!
Testing  sum_B ϕ_BC ... OK!!

Summations seem ok!



In [8]:
def prod_sum(var, Phi, verbose=False):
    assert isinstance(var, str) and all([isinstance(phi, Factor) for phi in Phi])
    # Cerinta 3:
    new_Phi = list(filter(lambda phi: var in phi.vars, Phi))
    omega = reduce(multiply, new_Phi)
    tau = sum_out(var, omega)
    ret_Phi = list(filter(lambda phi: phi not in new_Phi, Phi)) + [tau]

    if verbose:
        map(print_factor, ret_Phi)

    return ret_Phi

In [9]:
# Un exemplu
print("Elininând B din :")
print_factor(phi_AB)
print("și")
print_factor(phi_BC)
print("=>")
print_factor(prod_sum("B", [phi_AB, phi_BC])[0])

Elininând B din :
	--+---+-------
	A | B | ϕ(A,B)
	--+---+-------
	0 | 0 | 0.1
	0 | 1 | 0.9
	1 | 0 | 0.8
	1 | 1 | 0.2
	--+---+-------
și
	--+---+-------
	B | C | ϕ(B,C)
	--+---+-------
	0 | 0 | 0.2
	0 | 1 | 0.8
	1 | 0 | 0.5
	1 | 1 | 0.5
	--+---+-------
=>
	--+---+-------
	A | C | ϕ(A,C)
	--+---+-------
	0 | 0 | 0.47000000000000003
	0 | 1 | 0.53
	1 | 0 | 0.26
	1 | 1 | 0.7400000000000001
	--+---+-------


In [10]:
## Test prod_sum

sys.stdout.write("Testing prod-sum (I) ... ")
result = prod_sum("B", [deepcopy(_phi) for _phi in [phi_A, phi_C, phi_ABC, phi_BC]])
assert len(result) == 3
for _phi in result:
    if sorted(_phi.vars) == ["A", "C"]:
        assert abs(_phi.values[(0, 0)] - 0.42) < 1e-9
        assert abs(_phi.values[(0, 1)] * _phi.values[(1, 0)] - 0.3198) < 1e-9
        assert abs(_phi.values[(1, 1)] - 0.57) < 1e-9
    elif sorted(_phi.vars) == ["A"]:
        assert abs(_phi.values[(0,)] - 0.4) < 1e-9
        assert abs(_phi.values[(1,)] - 0.6) < 1e-9
    elif sorted(_phi.vars) == ["C"]:
        assert abs(_phi.values[(0,)] - 0.6) < 1e-9
        assert abs(_phi.values[(1,)] - 0.8) < 1e-9
print("OK!")

sys.stdout.write("Testing prod-sum (II) ... ")
result = prod_sum("A", [deepcopy(_phi) for _phi in [phi_A, phi_C, phi_ABC, phi_BC]])
assert len(result) == 3
for _phi in result:
    if sorted(_phi.vars) == ["B", "C"]:
        assert abs(_phi.values[(0, 0)] - 0.2) < 1e-9 or abs(_phi.values[(0, 0)] - 0.46) < 1e-9
        assert abs(_phi.values[(0, 1)] * _phi.values[(1, 0)] - 0.4) < 1e-9 or \
               abs(_phi.values[(0, 1)] * _phi.values[(1, 0)] - 0.372) < 1e-9
        assert abs(_phi.values[(1, 1)] - 0.5) < 1e-9 or abs(_phi.values[(1, 1)] - 0.38) < 1e-9
    elif sorted(_phi.vars) == ["C"]:
        assert abs(_phi.values[(0,)] - 0.6) < 1e-9
        assert abs(_phi.values[(1,)] - 0.8) < 1e-9
print("OK!")
print("Prod-Sum seems ok!")

Testing prod-sum (I) ... OK!
Testing prod-sum (II) ... OK!
Prod-Sum seems ok!


In [14]:
def variable_elimination(Phi, Z, verbose=False):
    # Cerinta 4:
    for var in Z:
        Phi = prod_sum(var, Phi)
    
    ret_Phi = reduce(multiply, Phi)

    if verbose:
        map(print_factor, ret_Phi)

    return ret_Phi

In [15]:
## Testing Variable elimination

def _test_variable_elimination(Phi, Vars, left_vars, control, verbose=False):

    
    var_list = '["' + '", "'.join(Vars) + '"]'
    factor_list = '[' + ','.join([("deepcopy(phi_"+name + ")") for name in Phi]) +']'
    name_list = '[' + ','.join([("ϕ_"+name) for name in Phi]) +']'
    _phi = eval("variable_elimination("+factor_list+", "+var_list+")")
    if verbose:
        print_factor(_phi)
    sys.stdout.write("Testing  eliminate_var "+var_list+" from "+name_list+" ... ")
    _check_factor(_phi, left_vars, control)
    print("OK!!")

_test_variable_elimination(["A", "C"], ["C"], ["A"], [0.56, 0.84])
_test_variable_elimination(["ABC", "BC", "AB", "A"], ["C", "B"], ["A"], [0.2096, 0.2808])
_test_variable_elimination(["ABC", "BC", "AB", "A"], ["C", "B", "A"], [], [0.4904])
_test_variable_elimination(["ABC", "AB", "BC", "A"], ["A", "B", "C"], [], [0.4904])
_test_variable_elimination(["ABC"], ["A", "B", "C"], [], [4.1])

Testing  eliminate_var ["C"] from [ϕ_A,ϕ_C] ... OK!!
Testing  eliminate_var ["C", "B"] from [ϕ_ABC,ϕ_BC,ϕ_AB,ϕ_A] ... OK!!
Testing  eliminate_var ["C", "B", "A"] from [ϕ_ABC,ϕ_BC,ϕ_AB,ϕ_A] ... OK!!
Testing  eliminate_var ["A", "B", "C"] from [ϕ_ABC,ϕ_AB,ϕ_BC,ϕ_A] ... OK!!
Testing  eliminate_var ["A", "B", "C"] from [ϕ_ABC] ... OK!!


In [16]:
def _check_obs(var, val, vars, vals):
    if var in vars:
        return vals[vars.index(var)] == val
    else:
        return True


def _check_conds(vars, vals, Z):
    return all(map(lambda obs: _check_obs(obs[0], obs[1], vars, vals), Z.items()))


def _apply_conds(Z, phi):
    new_vals = {val: pr for (val, pr) in phi.values.items() if _check_conds(phi.vars, val, Z)}
    return Factor(vars = phi.vars, values = new_vals)


def condition_factors(Phi, Z, verbose=False):
    # Cerinta 5
    return [_apply_conds(Z, phi) for phi in Phi]

In [17]:
# Un exemplu
print("Aplicand B=0 in factorul")
print_factor(phi_ABC)
print("=>")
print_factor(condition_factors([phi_ABC], {"B": 0})[0])

Aplicand B=0 in factorul
	--+---+---+---------
	A | B | C | ϕ(A,B,C)
	--+---+---+---------
	0 | 0 | 0 | 0.1
	0 | 0 | 1 | 0.9
	0 | 1 | 0 | 0.8
	0 | 1 | 1 | 0.2
	1 | 0 | 0 | 0.7
	1 | 0 | 1 | 0.4
	1 | 1 | 0 | 0.5
	1 | 1 | 1 | 0.5
	--+---+---+---------
=>
	--+---+---+---------
	A | B | C | ϕ(A,B,C)
	--+---+---+---------
	0 | 0 | 0 | 0.1
	0 | 0 | 1 | 0.9
	1 | 0 | 0 | 0.7
	1 | 0 | 1 | 0.4
	--+---+---+---------


In [18]:
# Teste pentru condition_factors

phi_ABC = Factor(vars=["A", "B", "C"],
                 values={(0, 0, 0): .1, (0, 0, 1): .9, (0, 1, 0): .8, (0, 1, 1): .2,
                         (1, 0, 0): .7, (1, 0, 1): .4, (1, 1, 0): .5, (1, 1, 1): .5})

_phi = condition_factors([phi_ABC], {"B": 0})[0]
assert sorted(_phi.vars) == ["A", "B", "C"]
assert len(_phi.values) == 4 and abs(_phi.values[(0, 0, 0)] - .1) < 1e-7
_phi = condition_factors([phi_ABC], {"B": 0, "A": 1})[0]
assert sorted(_phi.vars) == ["A", "B", "C"] and len(_phi.values) == 2
print("Condition factors seems ok!")

Condition factors seems ok!


In [19]:
from random import shuffle

def query(X, Y, Z, Phi, Other=None, verbose=False):
    """
    X - full list of variables
    Y - query variables
    Z - dictionary with observations
    Phi - the list with all factor
    Ohter - an order over variables in X \ (Y U Z); None to pick a random one
    verbose - display factors as they are created
    """

    if verbose:
        print("\n-------------\nInitial factors:")
        for phi in Phi:
            print_factor(phi)

    Phi = condition_factors(Phi, Z, verbose=verbose)  # Condition factors on Z=z

    if Other is None:
        Other = [x for x in X if (x not in Y and x not in Z)]  # Variables that need to be eliminated
        shuffle(Other)
    else:
        assert sorted(Other) == sorted([x for x in X if (x not in Y and x not in Z)])
    if verbose:
        print("\n-------------\nEliminating variables in the following order: " + ",".join(Other))

    phi = variable_elimination(Phi, Other, verbose=verbose)  # Eliminate other variables then Y and Z
    
    # Normalize factor to represent the conditional probability p(Y|Z=z)
    s = sum(phi.values.values())
    prob = Factor(vars=phi.vars, values={k: v / s for (k, v) in phi.values.items()})
    print("\n-----------------\nProbabilitatea ceruta:")
    print_factor(prob)

In [20]:
phi_a = Factor(vars=["A"], values={(0,): .7, (1,): .3})
phi_b = Factor(vars=["B"], values={(0,): .5, (1,): .5})
phi_c = Factor(vars=["C"], values={(0,): .4, (1,): .6})

phi_d = Factor(vars=["A", "B", "D"],
               values={(0, 0, 0): .75, (0, 0, 1): .25, (0, 1, 0): .7, (0, 1, 1): .3,
                       (1, 0, 0): .6, (1, 0, 1): .4, (1, 1, 0): .2, (1, 1, 1): .8
                      })
phi_e = Factor(vars=["C", "E"],
               values={(0, 0): .25, (0, 1): .75, (1, 0): .75, (1, 1): .25})

phi_f = Factor(vars=["A", "D", "F"],
               values={(0, 0, 0): .6, (0, 0, 1): .4, (0, 1, 0): .4, (0, 1, 1): .6,
                       (1, 0, 0): .7, (1, 0, 1): .3, (1, 1, 0): .8, (1, 1, 1): .2
                      })
phi_g = Factor(vars=["D", "E", "G"],
               values={(0, 0, 0): .1, (0, 0, 1): .9, (0, 1, 0): .2, (0, 1, 1): .8,
                       (1, 0, 0): .5, (1, 0, 1): .5, (1, 1, 0): .4, (1, 1, 1): .6
                      })

all_vars = ["A", "B", "C", "D", "E", "F", "G"]
Phi = [phi_a, phi_b, phi_c, phi_d, phi_e, phi_f, phi_g]

In [21]:
# Algoritmul ar trebui să ajungă la probabilitățile din tabele

# Verificati ca algoritmul "ajunge" corect la valorile din tabele
query(all_vars, ["F"], {"A": 0, "D": 1}, Phi)
query(all_vars, ["G"], {"D": 0, "E": 1}, Phi)


-----------------
Probabilitatea ceruta:
	--+---+---+---------
	D | A | F | ϕ(D,A,F)
	--+---+---+---------
	1 | 0 | 0 | 0.39999999999999997
	1 | 0 | 1 | 0.6
	--+---+---+---------

-----------------
Probabilitatea ceruta:
	--+---+---+---------
	D | E | G | ϕ(D,E,G)
	--+---+---+---------
	0 | 1 | 0 | 0.2
	0 | 1 | 1 | 0.8
	--+---+---+---------


In [22]:
# Exemplul din PDF-ul atașat

query(all_vars, ["C", "F"], {"G": 0}, Phi, Other=["E", "B", "A", "D"], verbose=True)


-------------
Initial factors:
	--+-----
	A | ϕ(A)
	--+-----
	0 | 0.7
	1 | 0.3
	--+-----
	--+-----
	B | ϕ(B)
	--+-----
	0 | 0.5
	1 | 0.5
	--+-----
	--+-----
	C | ϕ(C)
	--+-----
	0 | 0.4
	1 | 0.6
	--+-----
	--+---+---+---------
	A | B | D | ϕ(A,B,D)
	--+---+---+---------
	0 | 0 | 0 | 0.75
	0 | 0 | 1 | 0.25
	0 | 1 | 0 | 0.7
	0 | 1 | 1 | 0.3
	1 | 0 | 0 | 0.6
	1 | 0 | 1 | 0.4
	1 | 1 | 0 | 0.2
	1 | 1 | 1 | 0.8
	--+---+---+---------
	--+---+-------
	C | E | ϕ(C,E)
	--+---+-------
	0 | 0 | 0.25
	0 | 1 | 0.75
	1 | 0 | 0.75
	1 | 1 | 0.25
	--+---+-------
	--+---+---+---------
	A | D | F | ϕ(A,D,F)
	--+---+---+---------
	0 | 0 | 0 | 0.6
	0 | 0 | 1 | 0.4
	0 | 1 | 0 | 0.4
	0 | 1 | 1 | 0.6
	1 | 0 | 0 | 0.7
	1 | 0 | 1 | 0.3
	1 | 1 | 0 | 0.8
	1 | 1 | 1 | 0.2
	--+---+---+---------
	--+---+---+---------
	D | E | G | ϕ(D,E,G)
	--+---+---+---------
	0 | 0 | 0 | 0.1
	0 | 0 | 1 | 0.9
	0 | 1 | 0 | 0.2
	0 | 1 | 1 | 0.8
	1 | 0 | 0 | 0.5
	1 | 0 | 1 | 0.5
	1 | 1 | 0 | 0.4
	1 | 1 | 1 | 0.6
	--+---+---+---------
