In [None]:
import arff
import math
import graphviz
import copy
import pprint
import numpy as np

from scipy.sparse import csr_matrix
from scipy.sparse.csgraph import minimum_spanning_tree

pp = pprint.PrettyPrinter(depth = 6)

### Algorytm TAN - Tree Augumented Naive Bayes  
<img src="img/TAN-algo.png" style="height: 200px">
<img src="img/TAN-prob.png" style="height: 100px">

In [None]:
class TANAlgorithm(object):
    def __init__(self, attributes, test_data):
        self.test_data = test_data
        self.attributes = attributes
    
    def find_net(self):
        computed_net = self.construct_basic_naive_net()
        weights, connections = self.compute_all_weights()
        
        spanning_tree = minimum_spanning_tree(csr_matrix(weights)).toarray().astype(int)
        
        return self.update_computed_net_with_spanning_tree(spanning_tree, computed_net)
    
    def update_computed_net_with_spanning_tree(self, spanning_tree, computed_net):
        for i, row in enumerate(spanning_tree):
            for j, cell in enumerate(row):
                if cell != 0:
                    start = self.attributes[i + 1]['name']
                    end = self.attributes[j + 1]['name']   
                    computed_net = self.update_net_with_new_connection(computed_net, start, end)
        return computed_net
        
    
    def update_net_with_new_connection(self, net, new_parent_name, node_name):
        for node in net:
            if node['name'] == node_name:
                node['parents'].append({'name': new_parent_name, 'q': self.find_node_states(new_parent_name)})
        return net
    
    def construct_basic_naive_net(self):
        computed_net = []
        class_attribute = self.attributes[0]
        
        # Konstruowanie bazowej naiwnej sieci
        for i, attribute in enumerate(self.attributes):
            # Algorytm zakłada, ze pierwszy element w 'attributes' to klasa
            if (i == 0): 
                computed_net.append({'name': attribute['name'], 'r': attribute['states'], 'parents': []})
                continue
                
            computed_net.append({'name': attribute['name'], 'r': attribute['states'], 'parents': [{'name': class_attribute['name'], 'q': class_attribute['states']}]})
        
        return computed_net
    
    def compute_all_weights(self):
        class_attribute = self.attributes[0]
        connections = []
        
        size = len(self.attributes) - 1
        weights = np.zeros((size, size))
            
        for attribute_index in range(1, len(self.attributes)):
            for second_attribute_index in range(attribute_index + 1, len(self.attributes)):
                first_attribute = self.attributes[attribute_index]
                second_attribute = self.attributes[second_attribute_index]

                weight = self.compute_weight(first_attribute, second_attribute, class_attribute) * 100
                weights[attribute_index - 1, second_attribute_index - 1] = weight
                
                connections.append({'start': first_attribute['name'], 'end': second_attribute['name'], 'weight': weight})
        
        return (weights, connections)
    
    def find_node_states(self, node_name):
        for attribute in self.attributes:
            if attribute['name'] == node_name:
                return attribute['states']
        raise ValueError("No such node: " + node_name)
    
    def compute_weight(self, first_argument, second_argument, class_argument):
        result = 0
        
        class_name = class_argument['name']
        first_name = first_argument['name']
        second_name = second_argument['name']
        first_argument_states = first_argument['states']
        second_argument_states = second_argument['states']
        class_argument_states = class_argument['states']
        
        for first_state in first_argument_states:
            for second_state in second_argument_states:
                for class_state in class_argument_states:
                    p_c = self.compute_probability(class_name, class_state)
                    p_x1_given_c = self.compute_conditional_probability(first_name, first_state, class_name, class_state)
                    p_x2_given_c = self.compute_conditional_probability(second_name, second_state, class_name, class_state)
                    p_x1_x2_given_c = self.compute_double_conditional_probability((first_name, first_state), (second_name, second_state), (class_name, class_state))
                    p_c_x1_x2 = p_c * p_x1_given_c * p_x2_given_c
                    
                    result += p_c_x1_x2 * math.log10(float(p_x1_x2_given_c) / (p_x1_given_c * p_x2_given_c))
     
        return result
    
    def compute_probability(self, argument, argument_state):
        #print("P(" + argument + " = " + argument_state + ")")
        #i.e. P(play = yes)
        number_of_occurences = self.count_number_of_occurences_in_test_data(argument, argument_state)
        return number_of_occurences / len(self.test_data)
    
    def compute_conditional_probability(self, argument, argument_state, condition, condition_state):
        #print("P(" + argument + " = " + argument_state + " | " + condition + " = " + condition_state + ")")
        #i.e. P(outlook = sunny | play = yes)
        number_of_join_occurences = self.count_double_number_of_occurences_in_test_data((argument, argument_state), (condition, condition_state))
        number_of_condition_occurences = self.count_number_of_occurences_in_test_data(condition, condition_state)
        
        if (number_of_join_occurences == 0 or number_of_condition_occurences == 0):
            return 1 / len(self.attributes[0]['states'])
            
        return number_of_join_occurences / number_of_condition_occurences
    
    def compute_double_conditional_probability(self, first_argument, second_argument, condition):
        #print("P(" + first_argument[0] + " = " + first_argument[1] + ", " + second_argument[0] + " = " + second_argument[1] + " | " + condition[0] + " = " + condition[1] + ")")
        #i.e. P(outlook = sunny, windy = TRUE | play = yes)
        number_of_join_occurences = self.count_triple_number_of_occurences_in_test_data(first_argument, second_argument, condition)
        number_of_condition_occurences = self.count_number_of_occurences_in_test_data(condition[0], condition[1])
        
        if (number_of_join_occurences == 0 or number_of_condition_occurences == 0):
            return 1 / len(self.attributes[0]['states'])
        
        return number_of_join_occurences / number_of_condition_occurences
    
    def count_number_of_occurences_in_test_data(self, name, state):
        count = 0
        for sample in self.test_data:
            if (sample[name] == state):
                count += 1
        return count
    
    def count_double_number_of_occurences_in_test_data(self, first, second):
        count = 0
        for sample in self.test_data:
            if (sample[first[0]] == first[1] and sample[second[0]] == second[1]):
                count += 1
        return count
    
    def count_triple_number_of_occurences_in_test_data(self, first, second, third):
        count = 0
        for sample in self.test_data:
            if (sample[first[0]] == first[1] and sample[second[0]] == second[1] and sample[third[0]] == third[1]):
                count += 1  
        return count

------

### Ładowanie pliku ARFF

In [None]:
with open('data/weather.arff') as fh:
    data = arff.load(fh)
    
    attributes = []
    for i, p in enumerate(data['attributes']):
        attributes.append({'name': p[0], 'states': p[1]})
    
    sample_data = []
    for i, p in enumerate(data['data']):
        temp_dict = {}
        for j, d in enumerate(p):
            temp_dict.update({attributes[j]['name']: d})
        sample_data.append(temp_dict)

------

### Wybór argumentu - klasy

In [None]:
    index_of_class_attribute = 4
    print("Class argument: ")
    pp.pprint(attributes[index_of_class_attribute])
    
    final_attributes = []
    final_attributes.append(attributes[index_of_class_attribute])
    
    for i, attribute in enumerate(attributes):
        if i != index_of_class_attribute:
            final_attributes.append(attribute)

-----
### Główne wywołanie algorytmu i rysowanie grafu

In [None]:
bayesian_network = TANAlgorithm(final_attributes, sample_data).find_net()

graph = graphviz.Digraph('generated graph')        
for node in bayesian_network:
    if not node['parents']:
        graph.node(node['name'] , label = node['name'])
    else:
        for parent in node['parents']:
            graph.edge(parent['name'], node['name'])          
graph

-----
### Klasyfikacja przy dowolnych podanych warunkach
<img src="img/classify.png">

In [None]:
class From(object):
    def __init__(self, bayesian_network, test_data):
        self.bayesian_network = bayesian_network
        self.conditionals = []
        self.test_data = test_data
    
    def given(self, name, value):
        self.conditionals.append({"name": name, "value": value})
        return self
    
    def classify(self, class_attribute_name):
        final_result = []
        for class_state in self.find_class_attribute_states(class_attribute_name):
            result = 1
            temp_conditionals = copy.deepcopy(self.conditionals)
            temp_conditionals.append({"name": class_attribute_name, "value": class_state})
            
            for node in self.bayesian_network:
                tuples = []
                tuples.append((node["name"], self.get_conditional_value(node['name'], temp_conditionals), True))
                    
                for parent in node['parents']:    
                    tuples.append((parent['name'], self.get_conditional_value(parent['name'], temp_conditionals), False))
                    
                result *= self.compute_probability(tuples)
            
            final_result.append({"state": class_state, "value": result})  
                
        return self.normalize_probabilities_sum(final_result)
    
    def normalize_probabilities_sum(self, result_table):
        probabilities_sum = self.compute_probabilities_sum(result_table)
        for result in result_table:
            result['value'] = (result['value'] / probabilities_sum) * 100.0
        return result_table
    
    def compute_probabilities_sum(self, result_table):
        probabilities_sum = 0
        for result in result_table:
            probabilities_sum += result['value']
        return probabilities_sum
            
    def get_conditional_value(self, name, conditionals):
        for conditional in conditionals:
            if conditional["name"] == name:
                return conditional["value"]
        raise ValueError("There's no " + name + " conditional in conditionals")
    
    def find_class_attribute_states(self, class_attribute_name):
        for node in self.bayesian_network:
            if (node['name'] == class_attribute_name):
                return node['r']
            
        raise ValueError(class_attribute_name + " -> No class attribute state error")
    
    def compute_probability(self, tuples):
        all_conditions_counter = self.count_occurences_of_fullfilled_conditions(tuples)
        
        if len(tuples) == 0: 
            return all_conditions_counter / len(self.test_data)

        self.delete_parent_from_tuples(tuples)
        
        parent_conditions_counter = self.count_occurences_of_fullfilled_conditions(tuples)
        
        if all_conditions_counter == 0 or parent_conditions_counter == 0:
            return 1 / self.count_number_of_states(tuples)

        return all_conditions_counter / parent_conditions_counter
    
    def count_occurences_of_fullfilled_conditions(self, tuples):
        counter = 0
        for test_data_line in self.test_data:
            if (self.does_test_line_fullfill_conditions(tuples, test_data_line)):
                counter += 1
        return counter
    
    def does_test_line_fullfill_conditions(self, tuples, test_data_line):
        for condition_tuple in tuples:
            if test_data_line[condition_tuple[0]] != condition_tuple[1]:
                return False
        return True
    
    def count_number_of_states(self, tuples):
        result = 0
        for single_tuple in tuples:
            for node in self.bayesian_network:
                if node['name'] == single_tuple[0]:
                    result += len(node['r'])
        return result
    
    def delete_parent_from_tuples(self, tuples):
        for i, single_tuple in enumerate(tuples):
            if single_tuple[2]:
                del tuples[i]
        return tuples

In [None]:
print("Should be no", From(bayesian_network, sample_data).given('outlook', 'sunny').given('temperature', 'hot').given('humidity', 'high').given('windy', 'FALSE').classify('play'))
print("Should be no", From(bayesian_network, sample_data).given('outlook', 'sunny').given('temperature', 'hot').given('humidity', 'high').given('windy', 'TRUE').classify('play'))
print("Should be yes", From(bayesian_network, sample_data).given('outlook', 'overcast').given('temperature', 'hot').given('humidity', 'high').given('windy', 'FALSE').classify('play'))
print("Should be yes", From(bayesian_network, sample_data).given('outlook', 'rainy').given('temperature', 'mild').given('humidity', 'high').given('windy', 'FALSE').classify('play'))
print("Should be yes", From(bayesian_network, sample_data).given('outlook', 'rainy').given('temperature', 'cool').given('humidity', 'normal').given('windy', 'FALSE').classify('play'))
print("Should be no", From(bayesian_network, sample_data).given('outlook', 'rainy').given('temperature', 'cool').given('humidity', 'normal').given('windy', 'TRUE').classify('play'))
print("Should be yes", From(bayesian_network, sample_data).given('outlook', 'overcast').given('temperature', 'cool').given('humidity', 'normal').given('windy', 'TRUE').classify('play'))
print("Should be no", From(bayesian_network, sample_data).given('outlook', 'sunny').given('temperature', 'mild').given('humidity', 'high').given('windy', 'FALSE').classify('play'))
print("Should be yes", From(bayesian_network, sample_data).given('outlook', 'sunny').given('temperature', 'cool').given('humidity', 'normal').given('windy', 'FALSE').classify('play'))
print("Should be yes", From(bayesian_network, sample_data).given('outlook', 'rainy').given('temperature', 'mild').given('humidity', 'normal').given('windy', 'FALSE').classify('play'))
print("Should be yes", From(bayesian_network, sample_data).given('outlook', 'sunny').given('temperature', 'mild').given('humidity', 'normal').given('windy', 'TRUE').classify('play'))
print("Should be yes", From(bayesian_network, sample_data).given('outlook', 'overcast').given('temperature', 'mild').given('humidity', 'high').given('windy', 'TRUE').classify('play'))
print("Should be yes", From(bayesian_network, sample_data).given('outlook', 'overcast').given('temperature', 'hot').given('humidity', 'normal').given('windy', 'FALSE').classify('play'))
print("Should be no", From(bayesian_network, sample_data).given('outlook', 'rainy').given('temperature', 'mild').given('humidity', 'high').given('windy', 'TRUE').classify('play'))