<h2>ASSIGNMENT 4: bayesian network from scratch<h2>

<h4><b>Author</b>: Aliprandi Francesco<h4>


<h4><b>1.1 Class Node</b></h4>

This class implement the logic of a node inside a bayesian network.

<b>Constructur</b>

the constructur of the class take as input:
- the name of the node as a string
- the list of the values that the node could assume as list of string
- the list of parents of the node as a list of Node objects
- the conditional probability table of the node (cpt)
the constructure sort all the keys of the cpt locally to have a unique order for those keys as they are tuples

<b>Conditional probability table</b> 

the cpt is organized as follows:
- for nodes without parents it's a dictionary in which the key are the values that the node could assume and the values are the probability of the node to assume that value
- for a binary node with parents dictionary keys are tuples in which the entry are strings "name of the parent"+"_"+"value of the parent" and the values are the probability of the node to be true 
- for a multinomial node with parents dictionary keys are tuples constructed as before but within the tuple is added the value "current node name" + "_" + "value taking node" and the corresponding value is the probability of the node to assume the value given the parents assumes the values specified into the tuple

Last two cases (binary and multinomial node with parents) could both be represented in the same way by employing the notation used for the multinomial but the binary version could halve the dimension of the cpt.

<b> Class methods</b>

- <b> Sample value:</b> this method sample a value for the node. It takes as input a list of strings that represents parents and respective values assigned in the sampling so far. This function distinguish three cases of a node without parents, a binomial node with parents and a multinomial node with parents. It returns the sampled value for the current node.
- <b> Get sample:</b> is a support function that prepare the list of parent with respective value if needed by extracting the value on the sampling done so far

In [68]:
import numpy as np
import random

class Node:   
    def __init__(self, name, values, parents, table):
        self.name: str = name
        self.values: list = values
        self.parents: list = parents
        #to ensure parents sorted in CPTs
        if len(self.parents) != 0:
            self.table: dict = {tuple(sorted(k)): v for k, v in table.items()}
        else:
            self.table: dict = table
    
    def get_name(self):
        return self.name
    
    def get_values(self):
        return self.values
    
    def get_parents(self):
        return self.parents
    
    def get_table(self):
        return self.table
    
    def sample_value(self, parent_values):
        #if node has no parents
        if(len(parent_values) == 0):
            values = list(self.table.keys())
            prob = list(self.table.values())
            return np.random.choice(values, p=prob)
        #if node has parents and it's binomial
        elif len(self.values) == 2:
            values = ["True", "False"]
            key = tuple(sorted(parent_values))
            prob = self.table[key]
            prob = [prob, 1-prob]
            return np.random.choice(values, p=prob)
        #if node has parents and it's multinomial
        else:
            keys = []
            values = []
            for value in self.values:
                keys.append(tuple(sorted(parent_values + [self.name + "_" + value])))
            prob = []
            for key in keys:
                prob.append(self.table[key])
            return np.random.choice(self.values, p=prob)
                
    def get_sample(self, sampling):
        #if node has no parents
        if(len(self.parents) == 0):
            return self.sample_value([])
        #if node has parents
        parents_value = []
        for node in sampling.keys():
            for parent in self.parents:
                if node == parent.get_name():
                    parents_value.append(node + "_" + sampling[node])
        return self.sample_value(sorted(parents_value))
               
        

<h4><b>1.2 Class Network</b></h4>

This class implement the logic of the network. The network is implemented as a list of Nodes

<b>Constructur</b>

The constructur of the class take as input the list of nodes to be stored and reorders the nodes following a valid topological ordering 

<b> Class methods</b>

- <b> Topological sort:</b> performs topological sorting of nodes by executing a DFS visit of the graph
- <b> Ancestral sampling:</b> perform an ancestral sampling by scanning the nodes in order and sampling one value at a time. 

In [69]:
class Network:
    def __init__(self, nodes):
        self.nodes: list = nodes
        #random.shuffle(self.nodes)
        self.nodes = self.topological_sort()
            
    def get_nodes(self):    
        return self.nodes
    
    def topological_sort(self):
        visited = set()
        stack = []

        def dfs(node):
            visited.add(node)
            for parent in node.parents:
                if parent not in visited:
                    dfs(parent)
            stack.append(node)
    
        for node in self.nodes:
            if node not in visited:
                dfs(node)
        return stack
        
    
    def ancestral_sampling(self):
        sampling = {}
        for node in self.nodes:
            value = node.get_sample(sampling)
            sampling[node.get_name()] = value          
        return sampling

<h4><b>1.3 Test set up</b></h4>

The bayesian networks considered for the test is created following the scheme shown in the figure below. 

Two networks were created to perform the tests: 
- one that models the power-on of an old car 
- one that models the power-on of a new car. 

Both this networks have the same structure, but the probabilities that the components are working change.

Finally, to assess sampling in the multinomial node case with parents (that in the network is not present), a small 3x3 network was created that models only the car battery.

<img src="./image/bnet.png" alt="car ignition network" width=460 height=230>



<h4><b>1.3.1 Support functions and data structure for testing</b></h4>

In [70]:
#support function for testing a car network
def test_car(net,n):
    c = 0
    for i in range(n):
        sampling = net.ancestral_sampling()
        if(sampling["Engine"] == "True"):
            c+=1
    return " starts " + str(round(c/n*100,1)) + " % of the time"

In [71]:
#creating old car network
alternator = Node("Alternator", ["New", "Worn","broken"], [], {"New": 0.3, "Worn": 0.55, "Broken": 0.15})
fanbelt = Node("Fanbelt", ["True", "False"], [], {"True": 0.69, "False": 0.31})
starter = Node("Starter", ["True", "False"], [], {"True": 0.77, "False": 0.23})
sparks = Node("Sparks", ["True", "False"], [], {"True": 0.78, "False": 0.22})
fuel_pump = Node("Fuel_pump", ["True", "False"], [], {"True": 0.77, "False": 0.23})
fuel_level = Node("Fuel_level", ["Low", "Normal", "High"], [], {"Low": 0.1, "Normal": 0.8, "High": 0.1})

battery = Node("Battery", ["True", "False"], [alternator, fanbelt], {("Alternator_New","Fanbelt_True"): 0.85, ("Alternator_New","Fanbelt_False"): 0.27, ("Alternator_Worn","Fanbelt_True"): 0.73, ("Alternator_Worn","Fanbelt_False"): 0.24, ("Alternator_Broken","Fanbelt_True"): 0.52, ("Alternator_Broken","Fanbelt_False"): 0.11})
dashboard = Node("Dashboard", ["True","False"], [battery, starter], {("Battery_True","Starter_True"): 0.92, ("Battery_True","Starter_False"): 0.15, ("Battery_False","Starter_True"): 0.2, ("Battery_False","Starter_False"): 0.08})
fuel_subsystem = Node("Fuel_subsystem", ["True", "False"], [fuel_pump, fuel_level], {("Fuel_level_High", "Fuel_pump_True"): 0.88, ("Fuel_level_High", "Fuel_pump_False"): 0.12, ("Fuel_level_Normal", "Fuel_pump_True"): 0.9, ("Fuel_level_Normal", "Fuel_pump_False"): 0.18, ("Fuel_level_Low", "Fuel_pump_True"): 0.79, ("Fuel_level_Low", "Fuel_pump_False"): 0.11})
engine = Node("Engine", ["True", "False"], [dashboard, sparks, fuel_subsystem], {("Dashboard_True", "Fuel_subsystem_True", "Sparks_True"): 0.92, ("Dashboard_True", "Fuel_subsystem_True", "Sparks_False"): 0.04, ("Dashboard_True", "Fuel_subsystem_False", "Sparks_True"): 0.38, ("Dashboard_True", "Fuel_subsystem_False", "Sparks_False"): 0.02, ("Dashboard_False", "Fuel_subsystem_True", "Sparks_True"): 0.44, ("Dashboard_False", "Fuel_subsystem_True", "Sparks_False"): 0.03, ("Dashboard_False", "Fuel_subsystem_False", "Sparks_True"): 0.07, ("Dashboard_False", "Fuel_subsystem_False", "Sparks_False"): 0.01})

old_car = Network([alternator, fanbelt, starter, sparks, fuel_pump, fuel_level, battery, dashboard, fuel_subsystem, engine])

old_car_result = test_car(old_car, 10000)

In [72]:
#creating new car network
alternator = Node("Alternator", ["New", "Worn","broken"], [], {"New": 0.9, "Worn": 0.08, "Broken": 0.02})
fanbelt = Node("Fanbelt", ["True", "False"], [], {"True": 0.96, "False": 0.04})
starter = Node("Starter", ["True", "False"], [], {"True": 0.95, "False": 0.05})
sparks = Node("Sparks", ["True", "False"], [], {"True": 0.93, "False": 0.07})
fuel_pump = Node("Fuel_pump", ["True", "False"], [], {"True": 0.93, "False": 0.07})
fuel_level = Node("Fuel_level", ["Low", "Normal", "High"], [], {"Low": 0.15, "Normal": 0.8, "High": 0.05})

battery = Node("Battery", ["True", "False"], [alternator, fanbelt], {("Alternator_New","Fanbelt_True"): 0.95, ("Alternator_New","Fanbelt_False"): 0.44, ("Alternator_Worn","Fanbelt_True"): 0.88, ("Alternator_Worn","Fanbelt_False"): 0.37, ("Alternator_Broken","Fanbelt_True"): 0.55, ("Alternator_Broken","Fanbelt_False"): 0.18})
dashboard = Node("Dashboard", ["True","False"], [battery, starter], {("Battery_True","Starter_True"): 0.98, ("Battery_True","Starter_False"): 0.33, ("Battery_False","Starter_True"): 0.22, ("Battery_False","Starter_False"): 0.09})
fuel_subsystem = Node("Fuel_subsystem", ["True", "False"], [fuel_pump, fuel_level], {("Fuel_level_High", "Fuel_pump_True"): 0.96, ("Fuel_level_High", "Fuel_pump_False"): 0.12, ("Fuel_level_Normal", "Fuel_pump_True"): 0.95, ("Fuel_level_Normal", "Fuel_pump_False"): 0.09, ("Fuel_level_Low", "Fuel_pump_True"): 0.88, ("Fuel_level_Low", "Fuel_pump_False"): 0.11})
engine = Node("Engine", ["True", "False"], [dashboard, sparks, fuel_subsystem], {("Dashboard_True", "Fuel_subsystem_True", "Sparks_True"): 0.99, ("Dashboard_True", "Fuel_subsystem_True", "Sparks_False"): 0.04, ("Dashboard_True", "Fuel_subsystem_False", "Sparks_True"): 0.37, ("Dashboard_True", "Fuel_subsystem_False", "Sparks_False"): 0.02, ("Dashboard_False", "Fuel_subsystem_True", "Sparks_True"): 0.67, ("Dashboard_False", "Fuel_subsystem_True", "Sparks_False"): 0.03, ("Dashboard_False", "Fuel_subsystem_False", "Sparks_True"): 0.07, ("Dashboard_False", "Fuel_subsystem_False", "Sparks_False"): 0.01})

new_car = Network([alternator, fanbelt, starter, sparks, fuel_pump, fuel_level, battery, dashboard, fuel_subsystem, engine])
new_car_result = test_car(new_car, 10000)

<h4><b>1.4 Test results</b></h4>


Example of ancestor sampling applied to the network of the old car.
As is it possible to observe <b>Alternator</b> is warn and Fanbelt is not working so is more likely that <b>Battery</b> is not charged (and so it is). Given that the <b>Starter</b> is working and the <b>Battery</b> is not working it's very unlikely that the <b>Dashboard</b> will turn on, and in fact the value sampled for <b>Dashboard</b> is false. 
<b>Fuel subsystem</b> works as the <b>Fuel pump</b> works and <b>Fuel level</b> is normal. 

Since the <b>Spark</b> plugs works, the <b>Fuel subsystem</b> works and the <b>Dashboard</b> is turned off the probability of the car to start is 44% and in this case the car's <b>Engine</b> turns on


In [73]:
#example of sampling 
old_car.ancestral_sampling()

"""
This sampling value is saved as if I accidentally run the cell above, I dont't have to comment on it again
{'Alternator': 'Worn',
 'Fanbelt': 'False',
 'Starter': 'True',
 'Sparks': 'True',
 'Fuel_pump': 'True',
 'Fuel_level': 'Normal',
 'Battery': 'False',
 'Dashboard': 'False',
 'Fuel_subsystem': 'True',
 'Engine': 'True'}
 """

{'Alternator': 'Worn',
 'Fanbelt': 'False',
 'Starter': 'True',
 'Sparks': 'True',
 'Fuel_pump': 'True',
 'Fuel_level': 'Normal',
 'Battery': 'False',
 'Dashboard': 'False',
 'Fuel_subsystem': 'True',
 'Engine': 'True'}

These two tests asses the probability of the machine's engine turning on, in the case of both an old and a new machine, over 10'000 attempts to turn it on.
The two machines have the same network topology, what changes are the various probabilities of individual components working.
In the case of the old machine, the engine turns on in 44.3% of cases, while in the case of the new machine it turns on in 82.7% of cases. 
This significant difference in the ignition rate between the old and the new machine suggests that there are substantial differences in the performance of the engines of the two machines. This is surely due to wear and tear and the state of maintenance of the components of the older vehicle. These circumstances are modeled through the probability of functioning of the various components. As suggested by the results the more defective the parts the greater the gap in the ignition probability of the two vehicles. 

In [74]:
#esults of sampling for old and new car networks 
print("Old car" + old_car_result)
print("New car" + new_car_result)

Old car starts 44.3 % of the time
New car starts 82.7 % of the time


In [75]:
#Testing multinomial node with parents:
alternator = Node("Alternator", ["True", "False"], [], {"True": 0.93, "False": 0.07})
fanbelt = Node("Fanbelt", ["True", "False"], [], {"True": 0.96, "False": 0.04})

battery = Node("Battery", ["Full", "Empty", "Half"], [alternator, fanbelt], {("Alternator_True","Fanbelt_True","Battery_Full"): 0.35, ("Alternator_True","Fanbelt_True","Battery_Half"): 0.6, ("Alternator_True","Fanbelt_True","Battery_Empty"): 0.05,
                                                                              ("Alternator_True","Fanbelt_False","Battery_Full"): 0.2, ("Alternator_True","Fanbelt_False","Battery_Half"): 0.70, ("Alternator_True","Fanbelt_False","Battery_Empty"): 0.1,
                                                                              ("Alternator_False","Fanbelt_True","Battery_Full"): 0.15, ("Alternator_False","Fanbelt_True","Battery_Half"): 0.62, ("Alternator_False","Fanbelt_True","Battery_Empty"): 0.23,
                                                                              ("Alternator_False","Fanbelt_False","Battery_Full"): 0.05, ("Alternator_False","Fanbelt_False","Battery_Half"): 0.32, ("Alternator_False","Fanbelt_False","Battery_Empty"): 0.63})

battery_net = Network([alternator, fanbelt, battery])

Since in the Bayesian network I have created the case of nodes with multinomial distribution with parents does not occur, I have given here a simple example of a network consisting of three nodes representing car battery charge but with multinomial distribution.
In this case the probability that the alternator and fanbelt are both working is very high and as reported by running on 1000 random samples the battery is 93.4% of the cases charged and only 6.6% of the cases discharged


In [81]:
f = 0
e = 0
h = 0
for i in range(1000):
    sampling = battery_net.ancestral_sampling()
    if(sampling["Battery"] == "Full"):
        f+=1
    if(sampling["Battery"] == "Empty"):
        e+=1
    if(sampling["Battery"] == "Half"):
        h+=1
print("battery is full", round(f/1000*100,1), "% of the time")
print("battery is half full",round(h/1000*100,1), "% of the time")
print("battery is empty", round(e/1000*100,1), "% of the time")



battery is full 32.6 % of the time
battery is half full 60.8 % of the time
battery is empty 6.6 % of the time
