In [None]:
!pip install -q gradio

In [None]:
import numpy as np
import pandas as pd
import gradio as gr
from typing import Union

# Classes for creating Discrete Distribution and Conditional Probability table

In [None]:
class DiscreteDistribution:
    def __init__(self,eventChoices,name):
        self.eventChoices=eventChoices#evenChoices is a dictionary containing possible choices
        self.name=name
    
    def getProbability(self,queryChoice):
        return self.eventChoices.get(queryChoice)#return the marginal probability for given query


class ConditionalProbabilityTable:
    def __init__(self,conditions,dependencyList,name):
        self.dependencyList=dependencyList#list of dependent Node
        self.name=name
        columns=[distribution.name for distribution in self.dependencyList]+[self.name,"probability"]#adding columns name in dataframe
        self.conditions=pd.DataFrame(conditions,columns=columns)#creating a dataframe for CPT
        self.jointProbability=self.conditions.copy()#creating a dataframe for joint porbability distribution
    #Calculating joint probability distribution
    def joint(self):
        #P(Monty,Prize,Guest)=P(Monty|Prize,Guest)*P(Prize)*P(Guest)
        for node in self.dependencyList:
            self.jointProbability['probability']=np.multiply(np.array(self.jointProbability['probability']),\
                #returns the marginal probability for given choice 
                np.array([node.randomVariable.eventChoices.get(choice) for choice in self.jointProbability[node.name]]))

# Class for node

In [None]:
class Node:
    def __init__(self,randomVariable:Union[DiscreteDistribution,ConditionalProbabilityTable]):
        self.randomVariable = randomVariable #can be Discrete Distribution or CPT
        self.name= self.randomVariable.name 
        self.parentNode = None
        self.childNodes=None

    def addParent(self, node):
        if self.parentNode==None:
            self.parentNode=[]
        self.parentNode.append(node)
        self._addChild(node)
    
    def _addChild(self,node):
        if node.childNodes==None:
            node.childNodes=[]
        node.childNodes.append(self)

# Class for Bayesian Network

In [None]:
class BayesianNetwork:
    def addStates(self,*args):
        self.nodes=[node for node in args]

    def addEdge(self,source,destination):
        destination.addParent(source)
    
    def getStates(self):
        return self.nodes
    
    def _getConditionalNode(self):
        for node in self.nodes:
            if node.randomVariable.__class__.__name__=='ConditionalProbabilityTable':
                return node

    def getNumerator(self,randomV,predictionDictionary):
        dfFilterCondition=(randomV.jointProbability[predictionDictionary.keys()] == predictionDictionary.values()).all(axis=1)
        return randomV.jointProbability.loc[dfFilterCondition]['probability']
    
    def getDenominator(self,randomV,predictionDictionary,query):
        #get the node in node list which is none in the query
        node=[node for node in self.nodes if node.name==query][0]
        filterCondition=predictionDictionary
        filterCondition.pop(query)
        if node.randomVariable.__class__.__name__=='DiscreteDistribution':
            #P(Monty|guest)*P(guest)
            #P(Monty|guest)  or P(Monty|prize)
            #calculations--first finding all the tuples from CPT with Monty and (guest or prize) given
            cpt=randomV.conditions.loc[(randomV.conditions[filterCondition.keys()]==filterCondition.values()).all(axis=1)]['probability']
            denominator=np.array(cpt).mean()
            #P(guest) or P(prize)
            discreteNode=[i for i in randomV.dependencyList if i!=node][0]
            choice=filterCondition.get(discreteNode.name)
            #P(Monty|guest)*P(guest)
            denominator*=discreteNode.randomVariable.eventChoices.get(choice)
            return denominator
        else:
            #P(Prize)*P(Guest)
            denominator=1
            dNodeList=[i for i in self.nodes if i.name in filterCondition.keys()]
            for dNode in dNodeList:
                denominator*=dNode.randomVariable.eventChoices.get(filterCondition.get(dNode.name))
            return denominator

    def predictProbability(self,predictionDictionary):
        node=self._getConditionalNode()
        randomV=node.randomVariable
        randomV.joint()#calculate joint probability using Conditional Probability table and Discrete Disctribution
        query=list(predictionDictionary.keys())[list(predictionDictionary.values()).index(None)]#get dictionary element with None
        choices={k:0 for k in randomV.conditions[query]}#create a dictionary for unique elements in query
        for choice in choices.keys():
            #P(guest,monty,prize):-from joint probability
            predictionDictionary[query]=choice
            #choose the joint distribution according to the condition given 
            numerator=self.getNumerator(randomV,predictionDictionary)
            denominator=self.getDenominator(randomV,predictionDictionary,query)
            #numerator/P(Monty|guest)*P(guest)
            choices[choice]=float(numerator)/denominator
        return choices

# Main file

In [None]:
import json
def sentence_builder(Guest,Prize,Monty):
    choice={}
    choice['Guest']=Guest
    choice['Prize']=Prize
    choice['Monty']=Monty
    if Prize=='?':
        choice['Prize']=None
    elif Monty=='?' :
        choice['Monty']=None
    model=BayesianNetwork()
    guest=Node(DiscreteDistribution({'A': 1./3, 'B': 1./3, 'C': 1./3},name="Guest"))
    prize=Node(DiscreteDistribution({'A': 1./3, 'B': 1./3, 'C': 1./3},name="Prize"))
    monty=Node(ConditionalProbabilityTable(
        [['A', 'A', 'A', 0.0],
          ['A', 'A', 'B', 0.5],
          ['A', 'A', 'C', 0.5],
          ['A', 'B', 'A', 0.0],
          ['A', 'B', 'B', 0.0],
          ['A', 'B', 'C', 1.0],
          ['A', 'C', 'A', 0.0],
          ['A', 'C', 'B', 1.0],
          ['A', 'C', 'C', 0.0],
          ['B', 'A', 'A', 0.0],
          ['B', 'A', 'B', 0.0],
          ['B', 'A', 'C', 1.0],
          ['B', 'B', 'A', 0.5],
          ['B', 'B', 'B', 0.0],
          ['B', 'B', 'C', 0.5],
          ['B', 'C', 'A', 1.0],
          ['B', 'C', 'B', 0.0],
          ['B', 'C', 'C', 0.0],
          ['C', 'A', 'A', 0.0],
          ['C', 'A', 'B', 1.0],
          ['C', 'A', 'C', 0.0],
          ['C', 'B', 'A', 1.0],
          ['C', 'B', 'B', 0.0],
          ['C', 'B', 'C', 0.0],
          ['C', 'C', 'A', 0.5],
          ['C', 'C', 'B', 0.5],
          ['C', 'C', 'C', 0.0]], [guest,prize],name="Monty"))

    model.addStates(guest,prize,monty)
    model.addEdge(guest,monty)
    model.addEdge(prize,monty)
    # print(model.predictProbability(choice))
    output=model.predictProbability(choice)
    # return json.dumps(output)
    return {k:float(output.get(k)) for k in output.keys()}

iface = gr.Interface(
    sentence_builder,
    [
        gr.inputs.Radio(["A","B","C"]),
        gr.inputs.Radio(["A","B","C","?"]),
        gr.inputs.Radio(["A","B","C","?"]),
    ],
    gr.outputs.Label(num_top_classes=3),
)
iface.launch()

Colab notebook detected. To show errors in colab notebook, set `debug=True` in `launch()`
Running on public URL: https://40654.gradio.app

This share link expires in 72 hours. For free permanent hosting, check out Spaces (https://huggingface.co/spaces)


(<fastapi.applications.FastAPI at 0x7fa446014710>,
 'http://127.0.0.1:7861/',
 'https://40654.gradio.app')

In [None]:
model=BayesianNetwork()
guest=Node(DiscreteDistribution({'A': 1./3, 'B': 1./3, 'C': 1./3},name="Guest"))
prize=Node(DiscreteDistribution({'A': 1./3, 'B': 1./3, 'C': 1./3},name="Prize"))
monty=Node(ConditionalProbabilityTable(
    [['A', 'A', 'A', 0.0],
      ['A', 'A', 'B', 0.5],
      ['A', 'A', 'C', 0.5],
      ['A', 'B', 'A', 0.0],
      ['A', 'B', 'B', 0.0],
      ['A', 'B', 'C', 1.0],
      ['A', 'C', 'A', 0.0],
      ['A', 'C', 'B', 1.0],
      ['A', 'C', 'C', 0.0],
      ['B', 'A', 'A', 0.0],
      ['B', 'A', 'B', 0.0],
      ['B', 'A', 'C', 1.0],
      ['B', 'B', 'A', 0.5],
      ['B', 'B', 'B', 0.0],
      ['B', 'B', 'C', 0.5],
      ['B', 'C', 'A', 1.0],
      ['B', 'C', 'B', 0.0],
      ['B', 'C', 'C', 0.0],
      ['C', 'A', 'A', 0.0],
      ['C', 'A', 'B', 1.0],
      ['C', 'A', 'C', 0.0],
      ['C', 'B', 'A', 1.0],
      ['C', 'B', 'B', 0.0],
      ['C', 'B', 'C', 0.0],
      ['C', 'C', 'A', 0.5],
      ['C', 'C', 'B', 0.5],
      ['C', 'C', 'C', 0.0]], [guest,prize],name="Monty"))

model.addStates(guest,prize,monty)
model.addEdge(guest,monty)
model.addEdge(prize,monty)
choice={
        'Guest':'A',
        'Prize':None,
        'Monty':'B'
        
    }
print(model.predictProbability(choice))

{'A': 0.3333333333333333, 'B': 0.0, 'C': 0.6666666666666666}
