In [64]:
import sys
'''
WRITE YOUR CODE BELOW.
'''
from numpy import zeros, float32, random
import numpy
#  pgmpy
import pgmpy
from pgmpy.models import BayesianModel
from pgmpy.factors.discrete import TabularCPD
from pgmpy.inference import VariableElimination
#You are not allowed to use following set of modules from 'pgmpy' Library.
#
# pgmpy.sampling.*
# pgmpy.factors.*
# pgmpy.estimators.*

def make_security_system_net():
    """Create a Bayes Net representation of the above security system problem. 
    Use the following as the name attribute: "H","C", "M","B", "Q", 'K",
    "D"'. (for the tests to work.)
    """
    BayesNet = BayesianModel()
    [BayesNet.add_node(i) for i in ["H","C", "M","B", "Q", "K", "D"]]
    BayesNet.add_edge("H","Q")
    BayesNet.add_edge("C","Q")
    BayesNet.add_edge("B","K")
    BayesNet.add_edge("M","K")
    BayesNet.add_edge("Q","D")
    BayesNet.add_edge("K","D")

    return BayesNet


def set_probability(bayes_net: BayesianModel):
    """Set probability distribution for each node in the security system.
    Use the following as the name attribute: "H","C", "M","B", "Q", 'K",
    "D"'. (for the tests to work.)
    """
    cpd_h = TabularCPD("H", 2, values= [[0.5], [0.5]])
    cpd_c = TabularCPD("C", 2, values= [[0.7], [0.3]])
    cpd_b = TabularCPD("B", 2, values= [[0.5], [0.5]])
    cpd_m = TabularCPD("M", 2, values= [[0.2], [0.8]])

    cpd_q = TabularCPD("Q", 2, values= [[0.95,	0.75,	0.45,	0.1], [0.05	,0.25	,0.55	,0.9]], evidence=["H","C"], evidence_card=[2, 2])
    cpd_k = TabularCPD("K", 2, values= [[0.25	,0.05	,0.99	,0.85], [0.75	,0.95	,0.01	,0.15]], evidence=["B","M"], evidence_card=[2, 2])
    
    cpd_d = TabularCPD("D", 2, values= [[0.98	,0.4	,0.65	,0.01], [0.02	,0.6	,0.35	,0.99]], evidence=["K","Q"], evidence_card=[2, 2])

    bayes_net.add_cpds(cpd_h, cpd_c ,cpd_b ,cpd_m ,cpd_q ,cpd_k, cpd_d)
    return bayes_net


def get_marginal_double0(bayes_net: BayesianModel):
    """Calculate the marginal probability that Double-0 gets compromised.
    """
    agent = VariableElimination(bayes_net)
    double0_prob = agent.query(variables= ['D'], joint=False)['D'].values[1]
    return double0_prob


def get_conditional_double0_given_no_contra(bayes_net):
    """Calculate the conditional probability that Double-0 gets compromised
    given Contra is shut down.
    """
    agent = VariableElimination(bayes_net)
    double0_prob = agent.query(variables= ['D'], evidence={"C":0},joint=False)['D'].values[1]
    return double0_prob


def get_conditional_double0_given_no_contra_and_bond_guarding(bayes_net):
    """Calculate the conditional probability that Double-0 gets compromised
    given Contra is shut down and Bond is reassigned to protect M.
    """
    agent = VariableElimination(bayes_net)
    double0_prob = agent.query(variables= ['D'], evidence={"C":0, 'B': 1},joint=False)['D'].values[1]
    return double0_prob


def get_game_network():
    """Create a Bayes Net representation of the game problem.
    Name the nodes as "A","B","C","AvB","BvC" and "CvA".  """
    BayesNet = BayesianModel()
    [BayesNet.add_node(i) for i in ["A","B","C","AvB","BvC","CvA"]]

    BayesNet.add_edge("A", "CvA")
    BayesNet.add_edge("A", "AvB")

    BayesNet.add_edge("B", "AvB")
    BayesNet.add_edge("B", "BvC")

    BayesNet.add_edge("C", "CvA")
    BayesNet.add_edge("C", "BvC")

    cpd_a = TabularCPD("A", 4, [[0.15] , [0.45], [0.30], [0.10]])
    cpd_b = TabularCPD("B", 4, [[0.15] , [0.45], [0.30], [0.10]])
    cpd_c = TabularCPD("C", 4, [[0.15] , [0.45], [0.30], [0.10]])

    

    a = build_tabels()
    values= [list(a.T[0])+ list(a.T[1])+ list(a.T[2])]
    cpd_ac = TabularCPD("CvA", 3, values , evidence=["A","C"], evidence_card=[4, 4])
    cpd_bc = TabularCPD("BvC", 3, values , evidence=["B","C"], evidence_card=[4, 4])
    cpd_ab = TabularCPD("AvB", 3, values , evidence=["A","B"], evidence_card=[4, 4])
    BayesNet.add_cpds(cpd_ab, cpd_ac, cpd_bc, cpd_a, cpd_b, cpd_c)


    return BayesNet

def build_tabels():
    skill_diff = {
    0:[0.10, 0.10, 0.80],
    1:[0.20, 0.60, 0.20],
    2:[0.15, 0.75, 0.10],
    3:[0.05, 0.90, 0.05],
    -1:[0.60, 0.20, 0.20],
    -2:[0.75, 0.15, 0.10],
    -3:[0.90, 0.05, 0.05]}
    a = zeros((16,3))
    b = zeros((4,4))
    col = 0
    for t_1 in [0, 1, 2, 3]: 
        for t_2 in [0, 1, 2, 3]:
            key = t_2 - t_1
            b[t_1][t_2] = key
            ab_0 = skill_diff[key][0]
            ab_1 = skill_diff[key][1]
            ab_2 = skill_diff[key][2]
            a[col] = [ab_0, ab_1, ab_2] 
            col += 1
    return a
 
def calculate_posterior(bayes_net):
    """Calculate the posterior distribution of the BvC match given that A won against B and tied C. 
    Return a list of probabilities corresponding to win, loss and tie likelihood."""
    agent = VariableElimination(bayes_net)
    posterior = agent.query(variables=["BvC"], joint= False, evidence= {'AvB': 0, 'CvA': 2})['BvC'].values
    return posterior 


def Gibbs_sampler(bayes_net: BayesianModel, initial_state):
    """Complete a single iteration of the Gibbs sampling algorithm 
    given a Bayesian network and an initial state value. 
    
    initial_state is a list of length 6 where: 
    index 0-2: represent skills of teams A,B,C (values lie in [0,3] inclusive)
    index 3-5: represent results of matches AvB, BvC, CvA (values lie in [0,2] inclusive)
    
    Returns the new state sampled from the probability distribution as a tuple of length 6.
    Return the sample as a tuple.    
    """
    
    Z = {"A": initial_state[0] , "B": initial_state[1], "C": initial_state[2], "BvC": initial_state[4]}
    possible_non_ev = ['A', 'B', 'C', 'BvC']
    targeted_var = random.choice(possible_non_ev)
    possible_non_ev.remove(targeted_var)

    result = 0
    # mb = bayes_net.get_markov_blanket(targeted_var)
    if targeted_var != 'BvC':
        result = random.choice(range(0,4))
        # result = bayes_net.get_cpds(targeted_var).values[result] 
        Z[targeted_var] = result
    
    B = Z['B']
    C = Z['C']
    #P(BvC|B=b, C=c)
    result = getBvC(bayes_net, B, C)

    result = result.index(max(result))
    Z['BvC'] = result
        
    initial_state = [Z['A'], Z['B'], Z['C'], initial_state[3],  Z['BvC'], initial_state[5]]
    sample = tuple(initial_state)
    return sample


def getA(bayes_net, evidence):
    """
    ***DO NOT POST OR SHARE THESE FUNCTIONS WITH ANYONE***
    Returns a distribution of probability of skill levels for team "A" given an evidence vector.
    Parameter: 
    : bayes net: Baysian Model Object
    : evidence: array of length 6 containing the skill levels for teams A, B, C in indices 0, 1, 2
    : and match outcome for AvB, BvC and CvA in indices 3, 4 and 5
    """
    A_cpd = bayes_net.get_cpds("A")      
    AvB_cpd = bayes_net.get_cpds("AvB")
    match_table = AvB_cpd.values
    team_table = A_cpd.values
    a = []
    normalizer = 0
    for i in range(4):
        normalizer += (team_table[i] * match_table[evidence[3]][i][evidence[1]] * 
                       match_table[evidence[5]][i][evidence[2]])
    for i in range(4):
        unnorm_prob = (team_table[i] * match_table[evidence[3]][i][evidence[1]] * 
                       match_table[evidence[5]][i][evidence[2]])
        a.append(unnorm_prob)
    return numpy.array(a)/normalizer

def getBvC(bayes_net, B, C):
    """
    ***DO NOT POST OR SHARE THESE FUNCTIONS WITH ANYONE***
    Returns a distribution of probability of match outcomes for "BvC" given the skill levels of B and C as evidence
    Parameter: 
    : bayes net: Baysian Model Object
    : B: int representing team B's skill level
    : C: int representing team C's skill level
    """
    BvC_cpd = bayes_net.get_cpds('BvC')
    match_table = BvC_cpd.values
    bvc = []
    for i in range(0, 3):
        bvc.append(match_table[i][B][C])
    return bvc   

def getB(bayes_net, evidence):
    """
    ***DO NOT POST OR SHARE THESE FUNCTIONS WITH ANYONE***
    Returns a distribution of probability of skill levels for team "B" given an evidence vector.
    Parameter: 
    : bayes net: Baysian Model Object
    : evidence: array of length 6 containing the skill levels for teams A, B, C in indices 0, 1, 2
    : and match outcome for AvB, BvC and CvA in indices 3, 4 and 5
    """
    B_cpd = bayes_net.get_cpds("B")      
    AvB_cpd = bayes_net.get_cpds("AvB")
    match_table = AvB_cpd.values
    team_table = B_cpd.values
    b = []
    normalizer = 0
    for i in range(4):
        normalizer += (team_table[i] * match_table[evidence[3]][evidence[0]][i] * 
                       match_table[evidence[4]][i][evidence[2]])
    for i in range(4):
        unnorm_prob = (team_table[i] * match_table[evidence[3]][evidence[0]][i] * 
                       match_table[evidence[4]][i][evidence[2]])
        b.append(unnorm_prob)
    return numpy.array(b)/normalizer

def getC(bayes_net, evidence):
    """
    ***DO NOT POST OR SHARE THESE FUNCTIONS WITH ANYONE***
    Returns a distribution of probability of skill levels for team "C" given an evidence vector.
    Parameter: 
    : bayes net: Baysian Model Object
    : evidence: array of length 6 containing the skill levels for teams A, B, C in indices 0, 1, 2
    : and match outcome for AvB, BvC and CvA in indices 3, 4 and 5
    """
    C_cpd = bayes_net.get_cpds("C")      
    AvB_cpd = bayes_net.get_cpds("AvB")
    match_table = AvB_cpd.values
    team_table = C_cpd.values
    c = []
    normalizer = 0
    for i in range(4):
        normalizer += (team_table[i] * match_table[evidence[5]][i][evidence[0]] * 
                       match_table[evidence[4]][evidence[1]][i])
    for i in range(4):
        unnorm_prob = (team_table[i] * match_table[evidence[5]][i][evidence[0]] * 
                       match_table[evidence[4]][evidence[1]][i])
        c.append(unnorm_prob)
    return numpy.array(c)/normalizer

def MH_sampler(bayes_net, initial_state):
    """Complete a single iteration of the MH sampling algorithm given a Bayesian network and an initial state value. 
    initial_state is a list of length 6 where: 
    index 0-2: represent skills of teams A,B,C (values lie in [0,3] inclusive)
    index 3-5: represent results of matches AvB, BvC, CvA (values lie in [0,2] inclusive)    
    Returns the new state sampled from the probability distribution as a tuple of length 6. 
    """
    A_cpd = bayes_net.get_cpds("A")      
    AvB_cpd = bayes_net.get_cpds("AvB")
    match_table = AvB_cpd.values
    team_table = A_cpd.values


    sample = tuple(initial_state)
    # TODO: finish this function
    raise NotImplementedError    
    return sample


def compare_sampling(bayes_net, initial_state):
    """Compare Gibbs and Metropolis-Hastings sampling by calculating how long it takes for each method to converge."""    

    C = zeros(3)
    C[0] += 1

    if initial_state == None:
        a_val = random.choice(range(0,4))
        b_val = random.choice(range(0,4))
        c_val = random.choice(range(0,4))
        bc_val = random.choice(range(0,3))
        initial_state = [a_val, b_val, c_val, bc_val]


    Gibbs_count = 0
    MH_count = 0
    MH_rejection_count = 0
    Gibbs_convergence = [0,0,0] # posterior distribution of the BvC match as produced by Gibbs 
    MH_convergence = [0,0,0] # posterior distribution of the BvC match as produced by MH
    # TODO: finish this function
    raise NotImplementedError        
    return Gibbs_convergence, MH_convergence, Gibbs_count, MH_count, MH_rejection_count


def sampling_question():
    """Question about sampling performance."""
    # TODO: assign value to choice and factor
    raise NotImplementedError
    choice = 2
    options = ['Gibbs','Metropolis-Hastings']
    factor = 0
    return options[choice], factor


def return_your_name():
    """Return your name from this function"""
    return 'Kiavosh Peybarad'


In [159]:
def Gibbs_sampler(bayes_net: BayesianModel, initial_state):
    """Complete a single iteration of the Gibbs sampling algorithm 
    given a Bayesian network and an initial state value. 
    
    initial_state is a list of length 6 where: 
    index 0-2: represent skills of teams A,B,C (values lie in [0,3] inclusive)
    index 3-5: represent results of matches AvB, BvC, CvA (values lie in [0,2] inclusive)
    
    Returns the new state sampled from the probability distribution as a tuple of length 6.
    Return the sample as a tuple.    
    """
    
    possible_non_ev = ['A', 'B', 'C', 'BvC']
    targeted_var = random.choice(possible_non_ev)
    initial_state = list(initial_state)
    result = 0
    
    if  targeted_var == 'C':
        C = list(getC(bayes_net, initial_state))
        result = C.index(max(C))
        initial_state[2] = result

    elif targeted_var == 'A':
        A = list(getA(bayes_net, initial_state))
        result = A.index(max(A))
        initial_state[0] = result

    elif targeted_var == 'B':
        B = list(getB(bayes_net, initial_state))
        result = B.index(max(B))
        initial_state[1] = result

    elif targeted_var == 'BvC':
        BvC = getBvC(bayes_net, initial_state[1], initial_state[2])
        result = BvC.index(max(BvC))
        initial_state[4] = result 
   
    # if targeted_var != 'BvC':
    #     BvC = getBvC(bayes_net, initial_state[1], initial_state[2])
    #     result = BvC.index(max(BvC))
    #     initial_state[4] = result 
    

    
    # print(str(result) +'-----' +  targeted_var)
    # print(targeted_var+ "________" +str(initial_state))
    sample = tuple(initial_state)
    return sample

In [160]:
C = {}
init_state = [random.choice(range(0,4)),random.choice(range(0,4)),random.choice(range(0,4)),0,0,2]
for i in range(0, 1000000): 
    init_state = Gibbs_sampler(net, init_state)
    if init_state in C.keys():
        C[init_state] += 1
    else:
        C[init_state] = 1

In [None]:
C = {}
init_state = [random.choice(range(0,4)),random.choice(range(0,4)),random.choice(range(0,4)),0,0,2]
for i in range(0, 1000000): 
    init_state = Gibbs_sampler(net, init_state)
    if init_state in C.keys(): 
        C[init_state] += 1
    else: 
        C[init_state] = 1


In [161]:
C

{(2, 3, 3, 0, 2, 2): 5, (2, 3, 2, 0, 0, 2): 3, (2, 1, 2, 0, 1, 2): 999992}

In [154]:
init_state = [random.choice(range(0,4)),random.choice(range(0,4)),random.choice(range(0,4)),0,0,2]

init_state = Gibbs_sampler(net, init_state)
print(init_state)
init_state = Gibbs_sampler(net, init_state)

init_state

0-----B
(1, 0, 2, 0, 0, 2)
2-----A


(2, 0, 2, 0, 0, 2)

In [151]:
net = get_game_network()
agent = VariableElimination(net)
agent.query(variables=["BvC"], joint= False, evidence= {'AvB': 0, 'CvA': 2})['BvC'].values

Finding Elimination Order: : 100%|██████████| 3/3 [00:00<00:00, 2993.08it/s]
Eliminating: C: 100%|██████████| 3/3 [00:00<00:00, 750.10it/s]


array([0.25890074, 0.42796763, 0.31313163])

In [None]:
net.get_cpds("AvB").values

array([[[0.1 , 0.2 , 0.15, 0.05],
        [0.6 , 0.1 , 0.2 , 0.15],
        [0.75, 0.6 , 0.1 , 0.2 ],
        [0.9 , 0.75, 0.6 , 0.1 ]],

       [[0.1 , 0.6 , 0.75, 0.9 ],
        [0.2 , 0.1 , 0.6 , 0.75],
        [0.15, 0.2 , 0.1 , 0.6 ],
        [0.05, 0.15, 0.2 , 0.1 ]],

       [[0.8 , 0.2 , 0.1 , 0.05],
        [0.2 , 0.8 , 0.2 , 0.1 ],
        [0.1 , 0.2 , 0.8 , 0.2 ],
        [0.05, 0.1 , 0.2 , 0.8 ]]])

In [None]:
build_tabels().T

array([[0.1 , 0.2 , 0.15, 0.05, 0.6 , 0.1 , 0.2 , 0.15, 0.75, 0.6 , 0.1 ,
        0.2 , 0.9 , 0.75, 0.6 , 0.1 ],
       [0.1 , 0.6 , 0.75, 0.9 , 0.2 , 0.1 , 0.6 , 0.75, 0.15, 0.2 , 0.1 ,
        0.6 , 0.05, 0.15, 0.2 , 0.1 ],
       [0.8 , 0.2 , 0.1 , 0.05, 0.2 , 0.8 , 0.2 , 0.1 , 0.1 , 0.2 , 0.8 ,
        0.2 , 0.05, 0.1 , 0.2 , 0.8 ]])

In [None]:
net = make_security_system_net()
net = set_probability(net)
agent = VariableElimination(net)
agent.query(variables= ['D'], joint=False)['D'].values
agent.query(variables= ['D'], evidence={"C":0, 'B': 1},joint=False)['D'].values[1]

Finding Elimination Order: : 100%|██████████| 6/6 [00:00<00:00, 2601.93it/s]
Eliminating: Q: 100%|██████████| 6/6 [00:00<00:00, 750.03it/s]
Finding Elimination Order: : 100%|██████████| 4/4 [00:00<00:00, 662.66it/s]
Eliminating: Q: 100%|██████████| 4/4 [00:00<?, ?it/s]


0.23645600000000003