베이즈 네트워크 기본. 여기 제공하는 코드는 GitHub aima-python의 코드를 기반으로 일부 수정한 것임.

In [30]:
# 확률적 추론을 위한 기본 코드들

import numpy as np
from collections import defaultdict

class ProbDist:
    """이산 확률 분포를 정의하는 클래스.
    생성자에 확률변수(random variable)를 정의함.
    이후에 확률변수 각 값에 대한 확률값을 지정함(딕셔너리 이용)."""

    def __init__(self, var_name='?', freq=None):
        """var_name: 확률변수. freq가 None이 아니라면, (확률변수의 값, 빈도) 쌍으로 구성된 딕셔너리여야 함.
        이후, 빈도를 바탕으로 합이 1이 되는 확률이 되도록 정규화함."""
        self.prob = {} # 실제 확률값은 여기에 저장됨
        self.var_name = var_name
        self.values = []
        if freq:
            for (v, p) in freq.items():
                self[v] = p
            self.normalize()

    def __getitem__(self, val):
        """확률변수의 값(val)이 주어지면, 확률 P(val) 리턴."""
        try:
            return self.prob[val]
        except KeyError:
            return 0

    def __setitem__(self, val, p):
        """확률변수의 값 val에 대한 확률 P(val) = p로 설정"""
        if val not in self.values:
            self.values.append(val)
        self.prob[val] = p

    def normalize(self):
        """모든 확률값의 합이 1이 되도록 정규화하여 정규화된 분포를 리턴함.
        확률값 합이 0이면, ZeroDivisionError가 발생됨."""
        total = sum(self.prob.values())
        if not np.isclose(total, 1.0):
            for val in self.prob:
                if total == 0:
                    self.prob[val] = 1
                else:
                    self.prob[val] /= total
        return self

    def show_approx(self, numfmt='{:.3g}'):
        """확률변수의 값에 따라 정렬하고 확률값은 반올림하여 리턴함."""
        return ', '.join([('{}: ' + numfmt).format(v, p) for (v, p) in sorted(self.prob.items())])

    def __repr__(self):
        return "P({})".format(self.var_name)


class JointProbDist(ProbDist):
    """변수 집합에 대한 이산 확률 분포(결합 확률 분포) 클래스. ProbDist의 서브 클래스."""

    def __init__(self, variables):
        """variables: 대상 확률변수들."""
        self.prob = {}
        self.variables = variables
        self.vals = defaultdict(list) # k: [v1, v2, ...]

    def __getitem__(self, values):
        """주어진 변수 값들에 대한 결합 확률 리턴."""
        values = event_values(values, self.variables)
        return ProbDist.__getitem__(self, values)

    def __setitem__(self, values, p):
        """확률값 P(values) = p로 설정. values는 각 변수에 대해 하나의 값을 가진 튜플/딕셔너리.
        또한 각 변수에 대해 지금까지 관측된 각 변수의 값을 기록함."""
        values = event_values(values, self.variables)
        self.prob[values] = p
        for var, val in zip(self.variables, values):
            if val not in self.vals[var]:
                self.vals[var].append(val)

    def values(self, var):
        """변수에 대해 가능한 값의 집합을 리턴."""
        return self.vals[var]

    def __repr__(self):
        return "P({})".format(self.variables)


def event_values(event, variables):
    """이벤트(event)에 존재하는 변수들(variables)에 대한 값들의 튜플을 리턴함."""
    if isinstance(event, tuple) and len(event) == len(variables):
        return event
    else:
        return tuple([event[var] for var in variables])


def extend(s, var, val):
    """딕셔너리 s를 복사하고 var에 값 val을 세팅하여 확장하여 그 복사본을 리턴함."""
    return {**s, var: val}


In [3]:
# 확률적 추론에 대한 기본 코드를 probability.py에 저장해뒀음.
# from probability import *
from bayesnet import *
import random
from functools import reduce

## 베이즈 네트워크 정의

In [32]:
class BayesNet:
    """베이즈 네트워크 클래스. 불리언 변수로만 구성된 버전"""

    def __init__(self, node_specs=None):
        """node_specs: 노드(BayesNode) 리스트. 노드의 순서는 부모가 자식보다 앞쪽에 위치하도록 정렬되어야 함.
        이 리스트의 각 원소는 BayesNode 생성에 필요한 (X, parents, cpt) 튜플로 구성됨."""
        self.nodes = []
        self.variables = []
        node_specs = node_specs or []
        for node_spec in node_specs:
            self.add(node_spec)

    def add(self, node_spec):
        """네트워크에 노드 추가.
        추가하는 노드의 부모가 이미 네트워크에 존재해야 하며,
        추가하는 노드의 변수는 네트워크에 존재하지 않아야 함."""
        node = BayesNode(*node_spec)
        assert node.variable not in self.variables
        assert all((parent in self.variables) for parent in node.parents)
        self.nodes.append(node)
        self.variables.append(node.variable)
        for parent in node.parents:
            self.variable_node(parent).children.append(node)

    def variable_node(self, var):
        """var에 해당하는 변수의 노드를 리턴함."""
        for n in self.nodes:
            if n.variable == var:
                return n
        raise Exception("No such variable: {}".format(var))

    def variable_values(self, var):
        """var의 도메인을 리턴함."""
        return [True, False]

    def __repr__(self):
        return 'BayesNet({0!r})'.format(self.nodes)

In [33]:
class BayesNode:
    """베이즈 네트워크의 노드 클래스.
    불리언 변수에 대한 조건부 확률 분포 P(X | parents)."""

    def __init__(self, X, parents, cpt):
        """X: 변수명,
        parents: 변수명 시퀀스 또는 공백으로 구분된 문자열
        cpt: 조건부 확률표. 다음 중 하나의 형태:

        * 숫자. 무조건부 확률 P(X=true). 부모가 없는 변수인 경우.

        * {v: p, ...} 형식의 딕셔너리. 조건부 확률 분포 P(X=true | parent=v) = p.
          부모가 하나뿐인 경우.

        * {(v1, v2, ...): p, ...} 형식의 딕셔너리. 조건부 확률 분포 P(X=true | parent1=v1, parent2=v2, ...) = p.
          딕셔너리의 키는 부모 개수 만큼의 값을 가져야 함. 가장 일반적인 형태.
          내부적으로는 이런 형식으로 값이 저장됨.
          
        P(X=true)를 이용하여 P(X=false)는 계산할 수 있으므로, cpt에는 거짓일 확률은 명시하지 않음.
        """
        if isinstance(parents, str):
            parents = parents.split()

        # cpt는 항상 {(v1, v2, ...): p, ...} 형식의 딕셔너리로 저장
        if isinstance(cpt, (float, int)):  # 부모 없음. 0-튜플
            cpt = {(): cpt}
        elif isinstance(cpt, dict):
            # 부모 하나. 1-튜플
            if cpt and isinstance(list(cpt.keys())[0], bool):
                cpt = {(v,): p for v, p in cpt.items()}

        assert isinstance(cpt, dict)
        for vs, p in cpt.items():
            assert isinstance(vs, tuple) and len(vs) == len(parents)
            assert all(isinstance(v, bool) for v in vs)
            assert 0 <= p <= 1

        self.variable = X
        self.parents = parents
        self.cpt = cpt
        self.children = []

    def p(self, value, event):
        """조건부 확률 P(X=value | parents=parent_values) 리턴
        parent_values: event에 있는 부모의 값들. event는 각 부모에 하나의 값을 배정해야 함.
        >>> bn = BayesNode('X', 'Burglary', {T: 0.2, F: 0.625})
        >>> bn.p(False, {'Burglary': False, 'Earthquake': True})
        0.375"""
        assert isinstance(value, bool)
        ptrue = self.cpt[event_values(event, self.parents)]
        return ptrue if value else 1 - ptrue

    def sample(self, event):
        """event의 부모 변수 값이 주어졌을 때 노드 변수의 조건부 확률 분포로부터 샘플링.
        조건부 확률에 따라 True/False를 무작위로 리턴"""
        return probability(self.p(True, event))

    def __repr__(self):
        return repr((self.variable, ' '.join(self.parents)))


def probability(p):
    """확률 p에 따라 True를 리턴함"""
    return p > random.uniform(0.0, 1.0)

## 정확 추론(Exact Inference)

In [6]:
# 열거
def enumeration_ask(X, e, bn):
    """베이즈 네트워크 bn에 대해, 증거 e(딕셔너리)가 주어졌을 때 변수 X의 조건부 확률을 리턴함."""
    assert X not in e, "Query variable must be distinct from evidence"
    Q = ProbDist(X)
    for xi in bn.variable_values(X):
        Q[xi] = enumerate_all(bn.variables, extend(e, X, xi), bn)
    return Q.normalize()


def enumerate_all(variables, e, bn):
    """증거 e와 일관된 P(variables | e{others})의 합을 리턴함.
    P: 베이즈 네트워크 bn으로 표현되는 결합 확률 분포,
    e{others}: bn에서 variables를 제외한 다른 변수들로 제한된 증거.
    variables에서 부모는 자식들보다 앞에 위치해야 함."""
    if not variables:
        return 1.0
    Y, rest = variables[0], variables[1:]
    Ynode = bn.variable_node(Y)
    if Y in e:
        return Ynode.p(e[Y], e) * enumerate_all(rest, e, bn)
    else:
        return sum(Ynode.p(y, e) * enumerate_all(rest, extend(e, Y, y), bn)
                   for y in bn.variable_values(Y))

In [7]:
# 변수 소거 알고리즘
def elimination_ask(X, e, bn):
    """변수 소거를 통해 베이즈 네트워크 bn의 P(X|e)를 계산함. e: 딕셔너리"""
    assert X not in e, "Query variable must be distinct from evidence"
    factors = []
    for var in reversed(bn.variables):
        factors.append(make_factor(var, e, bn))
        if is_hidden(var, X, e):
            factors = sum_out(var, factors, bn)
    return pointwise_product(factors, bn).normalize()


def is_hidden(var, X, e):
    """P(X|e)를 질의할 때 var이 미관측 변수인가?"""
    return var != X and var not in e


def make_factor(var, e, bn):
    """e가 주어졌을 때 bn의 결합 확률 분포에서 var에 대한 인수를 리턴함.
    bn의 완전 결합 확률 분포는 bn 변수들에 대한 이 인수들의 성분별 곱(pointwise product)임."""
    node = bn.variable_node(var)
    variables = [X for X in [var] + node.parents if X not in e]
    cpt = {event_values(e1, variables): node.p(e1[var], e1)
           for e1 in all_events(variables, bn, e)}
    return Factor(variables, cpt)


def pointwise_product(factors, bn):
    return reduce(lambda f, g: f.pointwise_product(g, bn), factors)


def sum_out(var, factors, bn):
    """값들에 대해 합산함으로써 모든 인수들에서 var를 삭제함."""
    result, var_factors = [], []
    for f in factors:
        (var_factors if var in f.variables else result).append(f)
    result.append(pointwise_product(var_factors, bn).sum_out(var, bn))
    return result


class Factor:
    """결합 확률 분포의 한 인수"""

    def __init__(self, variables, cpt):
        self.variables = variables
        self.cpt = cpt

    def pointwise_product(self, other, bn):
        """두 인수의 성분별 곱 계산. 변수들은 합집합이 됨."""
        variables = list(set(self.variables) | set(other.variables))
        cpt = {event_values(e, variables): self.p(e) * other.p(e) for e in all_events(variables, bn, {})}
        return Factor(variables, cpt)

    def sum_out(self, var, bn):
        """값들에 대해 합산함으로써 var를 삭제하여 인수를 생성함."""
        variables = [X for X in self.variables if X != var]
        cpt = {event_values(e, variables): sum(self.p(extend(e, var, val)) for val in bn.variable_values(var))
               for e in all_events(variables, bn, {})}
        return Factor(variables, cpt)

    def normalize(self):
        """확률값 리턴. 하나의 변수여야 함."""
        assert len(self.variables) == 1
        return ProbDist(self.variables[0], {k: v for ((k,), v) in self.cpt.items()})

    def p(self, e):
        """e에 대한 조건부 확률표 조회"""
        return self.cpt[event_values(e, self.variables)]


def all_events(variables, bn, e):
    """모든 변수들에 대한 값으로 e를 확장하여 yield."""
    if not variables:
        yield e
    else:
        X, rest = variables[0], variables[1:]
        for e1 in all_events(rest, bn, e):
            for x in bn.variable_values(X):
                yield extend(e1, X, x)


def extend(s, var, val):
    """딕셔너리 s를 복사하고 var에 값 val을 세팅하여 확장하여 그 복사본을 리턴함."""
    return {**s, var: val}

## 이용 예: 침입 경보 베이즈 네트워크

In [36]:
alarm_node = BayesNode('Alarm', ['Burglary', 'Earthquake'], 
                       {(True, True): 0.95,(True, False): 0.94, (False, True): 0.29, (False, False): 0.001})

In [37]:
# 부모와 조건부 확률표를 설정하는 방법을 다양하게 허용함
john_node = BayesNode('JohnCalls', ['Alarm'], {True: 0.90, False: 0.05})
mary_node = BayesNode('MaryCalls', 'Alarm', {(True, ): 0.70, (False, ): 0.01})

In [38]:
# 부모가 없는 노드
burglary_node = BayesNode('Burglary', '', 0.001)
earthquake_node = BayesNode('Earthquake', '', 0.002)

In [39]:
# 노드에 연관된 조건부 확률값 조회
john_node.p(False, {'Alarm': True}) # P(JohnCalls=False | Alarm=True)

0.09999999999999998

In [4]:
Aplus = BayesNet([
    ("교수님의수제자", '', 0.05),
    ('선배와친분', "교수님의수제자", {T: 0.8, F: 0.3}),
    ('족보보유', '선배와친분', {T: 0.7, F: 0.1}),
    ('외부대외활동', '교수님의수제자', {T: 0.85, F: 0.1}),
    ('높은아이큐', '', 0.2),
    ('사전지식보유', '외부대외활동 높은아이큐', {(T, T): 0.9, (T, F): 0.75, (F, T): 0.35, (F, F): 0.05}),
    ('체력좋음', '', 0.25),
    ('매주예복습', '체력좋음', {T: 0.6, F: 0.2}),
    ('자료3회독', '매주예복습', {T: 0.9, F: 0.15}),
    ('정시기상', '체력좋음', {T: 0.99, F: 0.90}),
    ('정시출석','정시기상',{T:1, F:0}),
    ('에이플', '족보보유 사전지식보유 자료3회독 정시출석', {(T, T, T, T): 0.97, (T, T, F, T): 0.7, (T, F, T, T): 0.85, (F, T, T, T): 0.7, (T, F, F, T): 0.5, (F, T, F, T): 0.3, (F, F, T, T): 0.4, (F, F, F, T): 0.05, (T, T, T, F): 0.5, (T, T, F, F): 0.4, (T, F, T, F): 0.35, (F, T, T, F): 0.35, (T, F, F, F): 0.2, (F, T, F, F): 0.15, (F, F, T, F): 0.1, (F, F, F, F): 0.03})
    #('에이플', '족보보유 사전지식보유 자료3회독', {(T, T, T): 0.97, (T, T, F): 0.7, (T, F, T): 0.85, (F, T, T): 0.7, (T, F, F): 0.5, (F, T, F): 0.3, (F, F, T): 0.4, (F, F, F): 0.05})
])

In [41]:
# variable_node를 통해 노드에 접근할 수 있음
alarm_node = Aplus.variable_node('에이플')
alarm_node

NameError: name 'Aplus' is not defined

In [55]:
print(alarm_node.variable, alarm_node.parents, alarm_node.cpt, alarm_node.children, sep='\n')

에이플
['족보보유', '사전지식보유', '자료3회독', '출결미달없음']
{(True, True, True, True): 0.97, (True, True, False, True): 0.7, (True, False, True, True): 0.85, (False, True, True, True): 0.7, (True, False, False, True): 0.5, (False, True, False, True): 0.3, (False, False, True, True): 0.4, (False, False, False, True): 0.05, (True, True, True, False): 0.0, (True, True, False, False): 0.0, (True, False, True, False): 0.0, (False, True, True, False): 0.0, (True, False, False, False): 0.0, (False, True, False, False): 0.0, (False, False, True, False): 0.0, (False, False, False, False): 0.0}
[]


In [56]:
# 열거를 통한 추론
# P(Burglary=True | JohnCalls=True, MaryCalls=True)
ans_dist = enumeration_ask('족보보유', {"에이플":True, "높은아이큐":True}, Aplus)
ans_dist.show_approx()

'False: 0.499, True: 0.501'

In [50]:
ans_dist[T]

0.5009501090183831

In [11]:
# 변수 소거를 통한 추론
ans_dist = elimination_ask('에이플', dict(), Aplus)
ans_dist.show_approx()

'False: 0.653, True: 0.347'

In [52]:
ans_dist[T]

0.33547115981250003

In [None]:
# 결정론 : .335, 비결정론 : .362