
<div style='font-family:"cambria";'>Implement approximate inference in Bayesian network using following sampling methods:<br>
a. Rejection sampling<br>
b. Likelihood sampling<br>
c. Gibbs sampling<br>

Refer the below attachment for Bayesian network.

Infer the following quires: <br>
1. P(High Income | Payment = payback)<br>
2. P(Housing = Tenant | low income & no security)<br>
3. P(Payment = default | Deposit = Large)<br>
</div>


<img src="https://onedrive.live.com/embed?resid=3E88EC0719160DD3%21353858&authkey=%21AJNjfNr6a4vuoe4&width=660" width="500" height="auto" />

In [None]:
from collections import * #type: ignore
import numpy as np

In [None]:
class Graph:
    def __init__(self,edges,cpt) -> None: # for the above example [['A','B'],['A','C'],['B','C'],['C','D'],['E','D']]
        self.graph = defaultdict(list)
        self.vars = set()
        self.indegree = defaultdict(int)
        self.parents = defaultdict(list)
        self.toporder = list()
        self.cpt = cpt
        for edge in edges:
            self.graph[edge[0]].append(edge[1])
            self.vars.add(edge[0])
            self.vars.add(edge[1])
            if edge[0] in self.indegree:
                self.indegree[edge[0]]
            self.indegree[edge[1]]+=1
        # toposort
        q = deque()
        for i in self.vars:
            if self.indegree[i]==0:
                q.append(i)
        while q:
            node = q.popleft()
            self.toporder.append(node)
            for child in self.graph[node]:
                self.indegree[child]-=1
                self.parents[child].append(node)
                if self.indegree[child]==0:
                    q.append(child)


class Inference:
    def __init__(self,n,evidence:defaultdict[str,bool],query:defaultdict[str,bool]):
        self.n = n
        self.evidence = evidence
        self.query = query

    # rejection sampling
    def priorSampling(self,bsnet:Graph)-> defaultdict[str,bool]:
        sample = defaultdict(bool)
        for node in bsnet.toporder:
            prob = np.random.rand()
            if not bsnet.parents[node]:
                sample[node]=prob<=bsnet.cpt[node]
            else:
                sample[node] = prob <= bsnet.cpt[node][tuple([sample[parent] for parent in bsnet.parents[node]])]
        return sample
    def isConsistent(self,sample):
        flag = True
        for i in self.evidence:
            if self.evidence[i]!=sample[i]:
                flag = False
        return flag

    def queryChecker(self,sample)->bool:
        for i in self.query:
            if self.query[i]!=sample[i]:
                return False
        return True

    def rejectionSampling(self,bsnet:Graph):
        samplecnt = 0
        favsamplecnt=0
        for _ in range(self.n):
            sample = self.priorSampling(bsnet)
            if self.isConsistent(sample):
                samplecnt+=1
                favsamplecnt+=int(self.queryChecker(sample))
        return float(favsamplecnt/samplecnt)


    # likelihood weighting
    def weightedSampling(self,bsnet:Graph)->tuple[defaultdict[str,bool],int]:
        sample = defaultdict(bool)
        w = 1
        for node in bsnet.toporder:
            prob = np.random.rand()
            if node in self.evidence:
                if not bsnet.parents[node]:
                    if self.evidence[node] == True:
                        w*=bsnet.cpt[node]
                    else:
                        w*=(1-bsnet.cpt[node])

                else:
                    conditions = tuple([sample[parent] for parent in bsnet.parents[node]])
                    if self.evidence[node] == True:
                        w*=bsnet.cpt[node][conditions]
                    else:
                        w*=(1-bsnet.cpt[node][conditions])
                sample[node] = self.evidence[node]
            else:
                if not bsnet.parents[node]:
                    sample[node] = (bsnet.cpt[node]<=prob)
                else:
                    sample[node] = (bsnet.cpt[node][tuple([sample[parent] for parent in bsnet.parents[node]])])
            return (sample,w)
    def likelihoodWeighting(self,bsnet:Graph):
        favourableweight = 0
        totalweight = 0
        for _ in range(self.n):
            sample,weight = self.weightedSampling(bsnet)
            totalweight+=weight
            if self.queryChecker(sample):
                favourableweight+=weight
        return float(favourableweight/totalweight)

    # gibbs sampling
    def gibbsSampling(self,bsnet:Graph):
        favourable_samples = 0
        total_samples = 1
        current_sample = defaultdict(bool)
        non_evidence = set(bsnet.graph.keys()) - set(self.evidence.keys())

        for evid in self.evidence:
            current_sample[evid] = self.evidence[evid]
        for nevid in non_evidence:
            current_sample[nevid] = np.random.choice([True,False])
        favourable_samples+=self.queryChecker(current_sample)

        for _ in range(self.n):
            samples = current_sample.copy()
            nevid  = np.random.choice(list(non_evidence))
            prob = np.random.rand()
            if not bsnet.parents[nevid]:
                samples[nevid] = prob <= bsnet.cpt[nevid]
            else:
                samples[nevid] = prob <= bsnet.cpt[nevid][tuple([samples[parent] for parent in bsnet.parents[nevid]])]
            favourable_samples+=self.queryChecker(samples)
            total_samples+=1
        return float(favourable_samples/total_samples)

In [None]:
edges = [['A','B'],['A','C'],['B','C'],['C','D'],['E','D']]
cpt = {
    "A":0.3,
    "B":{(True,):0.1 , (False,):0.6},
    "C":{(True,True):0.05, (True,False):0.5, (False,True):0.45, (False,False):0.6},
    "E":0.35,
    "D":{(True,True):0.01, (True,False):0.75, (False,True):0.5,(False,False):0.31},
}
bsnet = Graph(edges,cpt)




In [None]:
# Q1 P(A=True | C=False)
infer = Inference(10000,{'C':False},{"A":True})
print(f"using rejection sampling : {infer.rejectionSampling(bsnet)}")
print(f"likelihood weighting: {infer.likelihoodWeighting(bsnet)}")
print(f"gibbs sampling: {infer.gibbsSampling(bsnet)}")

using rejection sampling : 0.3287589498806683
likelihood weighting: 0.0
gibbs sampling: 0.0756924307569243


In [None]:
# Q2 P(E=False | A=False,D = False)
infer = Inference(10000,{"A":False, "D":False},{"E":False})
print(f"using rejection sampling : {infer.rejectionSampling(bsnet)}")
print(f"likelihood weighting: {infer.likelihoodWeighting(bsnet)}")
print(f"gibbs sampling: {infer.gibbsSampling(bsnet)}")

using rejection sampling : 0.6445006902899217
likelihood weighting: 0.3422
gibbs sampling: 0.8824117588241176


In [None]:
# Q3 P(C=True | B = True)
infer = Inference(10000, {"B":True}, {"C":True})
print(f"using rejection sampling : {infer.rejectionSampling(bsnet)}")
print(f"likelihood weighting: {infer.likelihoodWeighting(bsnet)}")
print(f"gibbs sampling: {infer.gibbsSampling(bsnet)}")

using rejection sampling : 0.4301388266905508
likelihood weighting: 0.0
gibbs sampling: 0.8574142585741426
