In [1]:
import math 
from collections import Counter
from graphviz import Digraph
import pydotplus
import numpy as np
import pandas as pd
import random
import os
import json
from collections import Counter

In [2]:
def unique_values(data):
    result = {}
    for dictionary in data:
        for key, value in dictionary.items():
            if key not in result:
                result[key] = set()
            result[key].add(value)
    return result

In [3]:
from read_data import DataProcessor
file_name='./data/onlinefraud.csv'
data_class= DataProcessor(file_name)
data_class.process_data()

train=data_class.train
label=data_class.label_train
# Create test and train tuple
attributes = list(train[0].keys())
training_data = [(train[i], label[i]) for i in range(len(train))]
unique_values_data=unique_values(train)

test = data_class.test
test_label = data_class.label_test
test_data = [(test[i], test_label[i]) for i in range(len(test_label))]

unique_values_data=unique_values(train)

In [4]:
def unique_values(data):
    result = {}
    for dictionary in data:
        for key, value in dictionary.items():
            if key not in result:
                result[key] = set()
            result[key].add(value)
    return result

class Node:
    def __init__(self, attribute=None, value=None, children=None, label=None):
        self.attribute = attribute
        self.value = value
        self.children = children if children else {}
        self.label = label
        
        
class DecisionTree:
    def __init__(self):
        self.Node = Node
        
        
    def calculate_entropy(self, data):
        num_data = len(data)
        if num_data == 0:
            return 0

        num_positives = sum(label for _, label in data)
        num_negatives = num_data - num_positives

        if num_positives == 0 or num_negatives == 0:
            return 0

        prob_positive = num_positives / num_data
        prob_negative = num_negatives / num_data

        return -prob_positive * math.log2(prob_positive) - prob_negative * math.log2(prob_negative)

    def calculate_gini_index(self, data):
        num_data = len(data)
        if num_data == 0:
            return 0

        num_positives = sum(label for _, label in data)
        num_negatives = num_data - num_positives

        prob_positive = num_positives / num_data
        prob_negative = num_negatives / num_data

        return 1 - (prob_positive ** 2 + prob_negative ** 2)

    def calculate_impurity(self, sample_data, criterion):
        if criterion == 'gini':
            return self.calculate_gini_index(sample_data)
        elif criterion == 'entropy':
            return self.calculate_entropy(sample_data)
        else:
            raise ValueError("Invalid criterion for impurity")

    def calculate_information_gain(self, sample_data, attribute, criterion, unique_values_dict):
        total_samples = len(sample_data)
        weighted_impurity = 0.0

        attribute_values = unique_values_dict[attribute]
        for value in attribute_values:
            subset = [(features, label) for features, label in sample_data if features[attribute] == value]
            weight = len(subset) / total_samples
            weighted_impurity += weight * self.calculate_impurity(subset, criterion)

        overall_impurity = self.calculate_impurity(sample_data, criterion)
        information_gain = overall_impurity - weighted_impurity
        return information_gain

    def select_best_attribute(self, data, attributes, criterion, unique_values_data):
        best_info_gain = -1
        best_attribute = None

        for attribute in attributes:
            info_gain = self.calculate_information_gain(data, attribute, criterion, unique_values_data)
            if info_gain > best_info_gain:
                best_info_gain = info_gain
                best_attribute = attribute

        return best_attribute

    def most_common_element(self, labels):
        label_counts = Counter(labels)
        most_common_label = label_counts.most_common(1)
        return most_common_label[0][0]

    def Build_tree(self, training_data, attributes, criterion):
        labels = [label for _, label in training_data]

        if labels.count(labels[0]) == len(labels): 
            return self.Node(label=labels[0])
        if not attributes: 
            return self.Node(label=self.most_common_element(labels))
        
        # Find the best attribute to split on using the specified criterion 
        best_attr = self.select_best_attribute(training_data, attributes, criterion, unique_values_data)
        # Create a new node for the tree with the best attribute
        tree = self.Node(attribute=best_attr)
        # Remove the best attribute from the list for subsequent splits
        attributes.remove(best_attr)

        # Iterate over each unique value of the best attribute
        values = list(unique_values_data[best_attr])
        for value in values:
            # Filter the training data for this value
            subset = []
            for feature_row, label in training_data:
                if feature_row[best_attr] == value:
                    subset.append((feature_row, label))
            # If the subset is empty, create a leaf node with the most common label
            if not subset:
                tree.children[value] = self.Node(label=self.most_common_element(labels))
            else:
                tree.children[value] = self.Build_tree(subset, attributes.copy(), criterion)
                
        return tree

    def predict(self, node, x_data):

        # If the current node is a leaf with a label, return the label
        if node.label is not None:
            return node.label
    
        # If the current node has no attribute, return the most common label from the training data
        elif node.attribute is None:
            return self.most_common_element(data_class.label_train)
    
        # Otherwise, find the child node based on the value of the node's attribute for the instance
        else:
            attribute_value = x_data[node.attribute]
            child_node = node.children.get(attribute_value)
    
            # If a child node exists for the attribute value, recurse with the child node
            if child_node:
                return self.predict(child_node, x_data)
            # If no child node exists, return the most common label from the training data
            else:
                return self.most_common_element(data_class.label_train)

    def calculate_accuracy(self, tree, test_data):

        correct_predictions = 0
    
        # Iterate over each instance and its corresponding true label in the test data
        for instance, true_label in test_data:
            # Use the decision tree to predict the label for the instance
            predicted_label = self.predict(tree, instance)
    
            # If the predicted label matches the true label, increment the count of correct predictions
            if predicted_label == true_label:
                correct_predictions += 1
    
        # Calculate the accuracy as the ratio of correct predictions to the total number of instances
        accuracy = correct_predictions / len(test_data)
        return accuracy
# Create an instance of the DecisionTree class

In [5]:

dt_1 = DecisionTree()
dt_2 = DecisionTree()
#Build the tree using the training data and 'gini' as the criterion
EntropyTree = dt_1.Build_tree(training_data, attributes, criterion='entropy')
GiniTree = dt_2.Build_tree(training_data, attributes, criterion='gini')
# Create test data from the test set
test = data_class.test
test_label = data_class.label_test
test_data = [(test[i], test_label[i]) for i in range(len(test_label))]

# Calculate the accuracy using the test data and the tree
EntropyAccuracy = dt_1.calculate_accuracy(EntropyTree, test_data)
print("Entropy Accuracy:", EntropyAccuracy)

GiniAccuracy = dt_2.calculate_accuracy(GiniTree, test_data)
print("Gini Accuracy:", GiniAccuracy)


Entropy Accuracy: 0.9875472083290804
Gini Accuracy: 0.9885679289578443


In [6]:
from graphviz import Digraph

def visualize_tree(node, dot=None, parent_name=None, edge_label='', node_style=None, edge_style=None):
    if dot is None:
        dot = Digraph(comment='Decision Tree')

    # Define default styles if None provided
    if node_style is None:
        node_style = {'shape': 'box', 'style': 'filled', 'fillcolor': 'lightblue'}
    if edge_style is None:
        edge_style = {'fontsize': '11', 'fontcolor': 'red'}

    # Generate a unique name for each node
    node_name = f'node_{id(node)}'

    # If this is a leaf node with a label
    if hasattr(node, 'label') and node.label is not None:
        dot.node(node_name, str(node.label), **node_style)
        if parent_name:
            dot.edge(parent_name, node_name, label=edge_label, **edge_style)
    else:
        # If this is an internal node with an attribute
        internal_node_style = {'shape': 'ellipse', 'style': 'filled', 'fillcolor': 'lightyellow'}
        dot.node(node_name, str(node.attribute), **internal_node_style)
        if parent_name:
            dot.edge(parent_name, node_name, label=edge_label, **edge_style)

        # Add edges for children nodes
        for value, child_node in node.children.items():
            dot = visualize_tree(child_node, dot, node_name, str(value), node_style, edge_style)

    return dot

# Example usage assuming you have an EntropyTree and GiniTree
node_style = {'shape': 'box', 'style': 'filled', 'fillcolor': 'lightblue'}
edge_style = {'fontsize': '11', 'fontcolor': 'red'}

# Assuming your EntropyTree is built and stored in EntropyTree
dot = visualize_tree(EntropyTree, node_style=node_style, edge_style=edge_style)
dot.render('entropy_tree', format='pdf', view=True)  # This will render and open the tree in a .pdf viewer

# For GiniTree
dot = visualize_tree(GiniTree, node_style=node_style, edge_style=edge_style)
dot.render('gini_tree', format='pdf', view=True)  # This will render and open the tree in a .pdf viewer

'gini_tree.pdf'

In [7]:

def tree_to_json(node):
    if node.label is not None:
        return {'label': node.label}
    else:
        children = {str(key): tree_to_json(sub_node) for key, sub_node in node.children.items()}
        return {
            'attribute': node.attribute,
            'children': children
        }

json_tree = tree_to_json(EntropyTree)
# Convert Python dictionary to JSON string
json_tree_str = json.dumps(json_tree, indent=4)
# Output the JSON string
print(json_tree_str)
# To save the JSON to a file
with open('entropy_tree.json', 'w') as json_file:
    json.dump(json_tree, json_file, indent=4)


{
    "attribute": "step",
    "children": {
        "1": {
            "attribute": "oldbalanceDest",
            "children": {
                "0.0": {
                    "attribute": "type",
                    "children": {
                        "0": {
                            "attribute": "oldbalanceOrg",
                            "children": {
                                "0.0": {
                                    "attribute": "amount",
                                    "children": {
                                        "0.0": {
                                            "attribute": "newbalanceOrig",
                                            "children": {
                                                "0.0": {
                                                    "attribute": "newbalanceDest",
                                                    "children": {
                                                        "0.0": {
                                     

In [8]:
# Similarly, for the GiniTree
json_tree_gini = tree_to_json(GiniTree)
print(json_tree_gini)
# Convert Python dictionary to JSON string and save to a file
with open('gini_tree.json', 'w') as json_file:
    json.dump(json_tree_gini, json_file, indent=4)

{'attribute': 'amount', 'children': {'0.0': {'attribute': 'type', 'children': {'0': {'attribute': 'oldbalanceOrg', 'children': {'0.0': {'attribute': 'newbalanceOrig', 'children': {'0.0': {'attribute': 'oldbalanceDest', 'children': {'0.0': {'attribute': 'newbalanceDest', 'children': {'0.0': {'label': 0}, '1.0': {'label': 0}, '2.0': {'label': 0}, '3.0': {'label': 0}, '4.0': {'label': 0}, '5.0': {'label': 0}, '6.0': {'label': 0}, '8.0': {'label': 0}}}, '1.0': {'label': 0}, '2.0': {'label': 0}, '3.0': {'label': 0}, '4.0': {'label': 0}, '5.0': {'label': 0}, '6.0': {'label': 0}, '7.0': {'label': 0}}}, '1.0': {'label': 0}, '2.0': {'label': 0}, '3.0': {'label': 0}, '4.0': {'label': 0}, '5.0': {'label': 0}, '6.0': {'label': 0}, '7.0': {'label': 0}}}, '1.0': {'attribute': 'newbalanceOrig', 'children': {'0.0': {'label': 0}, '1.0': {'attribute': 'oldbalanceDest', 'children': {'0.0': {'attribute': 'newbalanceDest', 'children': {'0.0': {'label': 0}, '1.0': {'label': 0}, '2.0': {'label': 0}, '3.0': {