In [1]:
import csv
import random

In [186]:
class DecisionNode:
    """A Decision Node asks a question. This holds a reference to the question, and to the child nodes."""
    def __init__(self, question, true_branch, false_branch):
        self.question = question
        self.true_branch = true_branch
        self.false_branch = false_branch

class Leaf:
    """A Leaf node classifies data. This holds a dictionary of class (e.g., "Apple") -> number of times it appears in the rows from the training data that reach this leaf."""
    def __init__(self, rows):
        self.predictions = self.class_counts(rows)

    def class_counts(self, rows):
        """Counts the number of each type of example in a dataset."""
        counts = {}  # a dictionary of label -> count.
        for row in rows:
            # in our dataset format, the label is always the last column
            label = row[-1]
            if label not in counts:
                counts[label] = 0
            counts[label] += 1
        return counts

class DecisionTreeClassifier:
    def __init__(self):
        self.tree = None

    def fit(self, X, y, criterion, max_depth=float('inf')):
        data = [X[i] + [y[i]] for i in range(len(X))]
        self.tree = self.build_tree(data, criterion, 0, max_depth)

    def predict(self, rows):
        predictions = [self.classify(row, self.tree) for row in rows]
        return predictions

    def accuracy(self, X, y):
        predictions = self.predict(X)
        correct = sum(1 for pred, actual in zip(predictions, y) if pred == actual)
        return correct / len(y)

    def build_tree(self, rows, criterion, current_depth, max_depth):
        if current_depth >= max_depth:
            return Leaf(rows)

        gain, question = self.find_best_split(rows, criterion)
        
        if gain == 0:
            return Leaf(rows)

        true_rows, false_rows = self.partition(rows, question)
        true_branch = self.build_tree(true_rows, criterion, current_depth + 1, max_depth)
        false_branch = self.build_tree(false_rows, criterion, current_depth + 1, max_depth)

        return DecisionNode(question, true_branch, false_branch)

    def find_best_split(self, rows, criterion):
        best_gain = 0
        best_question = None
        current_uncertainty = self.gini(rows) if criterion == 'gini' else self.entropy(rows)
        n_features = len(rows[0]) - 1  # number of columns

        for col in range(n_features):  # for each feature
            values = set([row[col] for row in rows])  # unique values in the column
            for val in values:  # for each value
                question = (col, val)
                true_rows, false_rows = self.partition(rows, question)

                if len(true_rows) == 0 or len(false_rows) == 0:
                    continue

                gain = self.info_gain(true_rows, false_rows, current_uncertainty, criterion)

                if gain >= best_gain:
                    best_gain, best_question = gain, question

        return best_gain, best_question

    def partition(self, rows, question):
        """Partitions a dataset."""
        true_rows, false_rows = [], []
        col, val = question
        for row in rows:
            if row[col] >= val:
                true_rows.append(row)
            else:
                false_rows.append(row)
        return true_rows, false_rows

    def gini(self, rows):
        """Calculate the Gini Impurity for a list of rows."""
        counts = Leaf(rows).predictions
        impurity = 1
        for lbl in counts:
            prob_of_lbl = counts[lbl] / float(len(rows))
            impurity -= prob_of_lbl**2
        return impurity

    def info_gain(self, left, right, current_uncertainty, criterion):
        """Information Gain."""
        if criterion == "gini":
            p = float(len(left)) / (len(left) + len(right))
            return current_uncertainty - p * self.gini(left) - (1 - p) * self.gini(right)
        else:
            p = float(len(left)) / (len(left) + len(right))
            return current_uncertainty - p * self.entropy(left) - (1 - p) * self.entropy(right)

    def classify(self, row, node):
        """Classify a new data point based on the tree."""
        if isinstance(node, Leaf):
            return max(node.predictions.keys(), key=lambda k: node.predictions[k])

        if row[node.question[0]] >= node.question[1]:
            return self.classify(row, node.true_branch)
        else:
            return self.classify(row, node.false_branch)

    def entropy(self, rows):
        """Calculate the entropy for a list of rows."""
        from math import log
        log2 = lambda x: log(x) / log(2)
        counts = self.class_counts(rows)
        impurity = 0
        for lbl in counts:
            prob_of_lbl = counts[lbl] / float(len(rows))
            impurity -= prob_of_lbl * log2(prob_of_lbl)
        return impurity

    def class_counts(self, rows):
        """Counts the number of each type of example in a dataset."""
        counts = {}  # a dictionary of label -> count.
        for row in rows:
            # in our dataset format, the label is always the last column
            label = row[-1]
            if label not in counts:
                counts[label] = 0
            counts[label] += 1
        return counts

    @staticmethod
    def load_dataset(filename):
        dataset = []
        with open(filename, 'r') as file:
            csv_reader = csv.reader(file)
            for row in csv_reader:
                dataset.append(row)
        dataset.pop(0)
        return dataset

    @staticmethod
    def split_dataset_train_test(dataset, split_ratio):
        random.shuffle(dataset)
        split_index = int(len(dataset) * split_ratio)
        train_data = dataset[:split_index]
        test_data = dataset[split_index:]
        return train_data, test_data
    
    @staticmethod
    def print_tree(node, spacing=""):
        if isinstance(node, Leaf):
            print(spacing + "Predict", node.predictions)
            return

        print(spacing + str(node.question))
        
        print(spacing + '--> True:')
        DecisionTreeClassifier.print_tree(node.true_branch, spacing + "  ")

        print(spacing + '--> False:')
        DecisionTreeClassifier.print_tree(node.false_branch, spacing + "  ")

    def accuracy_on_combined_data(self, rows):
        correct = 0
        for row in rows:
            if self.classify(row[:-1], self.tree) == row[-1]:
                correct += 1
        return correct / len(rows) if rows else 0
    
    def prune(self, node, validation_rows):
        if isinstance(node, Leaf):
            return node

        # Prune true and false branches first
        node.true_branch = self.prune(node.true_branch, validation_rows)
        node.false_branch = self.prune(node.false_branch, validation_rows)

        # If both branches are now leaves, consider pruning (merging) them
        if isinstance(node.true_branch, Leaf) and isinstance(node.false_branch, Leaf):
            # Evaluate accuracy with this node
            accuracy_before_pruning = self.accuracy_on_combined_data(validation_rows)

            # Create a leaf node that replaces this DecisionNode
            merged_leaf = Leaf(validation_rows)

            # Temporarily replace this DecisionNode with merged_leaf
            original_node = node
            self.tree = merged_leaf

            # Evaluate accuracy without this node (i.e., with it replaced by merged_leaf)
            accuracy_after_pruning = self.accuracy_on_combined_data(validation_rows)

            # Restore the original tree structure if pruning did not improve accuracy
            if accuracy_after_pruning < accuracy_before_pruning:
                self.tree = original_node

            return merged_leaf if accuracy_after_pruning >= accuracy_before_pruning else original_node

        return node

In [235]:
# Use the code on data
dataset = DecisionTreeClassifier.load_dataset('Dry_Bean_Dataset.csv')
train_data, test_data = DecisionTreeClassifier.split_dataset_train_test(dataset, 0.2)

X_train = [row[:-1] for row in train_data]
y_train = [row[-1] for row in train_data]

classifier = DecisionTreeClassifier()

## Fit without pre-pruning
# classifier.fit(X_train, y_train, criterion="gini")

## Fit with pre-pruning -> To integrate pre-pruning, comment the above code and run the following line
classifier.fit(X_train, y_train, criterion="entropy", max_depth=3)

In [227]:
# Run this cell to do post-pruning on the tree
classifier.prune(classifier.tree, test_data)

<__main__.DecisionNode at 0x1e78fbe23d0>

In [236]:
X_test = [row[:-1] for row in test_data]
y_test = [row[-1] for row in test_data]

predictions = classifier.predict(X_test)
accuracy = classifier.accuracy(X_test, y_test)

print("Predictions:", predictions)
print("Actual:     ", y_test)
print("Accuracy:", accuracy)

print("\n\nFinal Tree:")
DecisionTreeClassifier.print_tree(classifier.tree)

Predictions: ['BOMBAY', 'CALI', 'SIRA', 'CALI', 'CALI', 'CALI', 'SIRA', 'SIRA', 'SEKER', 'SIRA', 'DERMASON', 'CALI', 'SIRA', 'DERMASON', 'BOMBAY', 'CALI', 'SEKER', 'SIRA', 'DERMASON', 'SIRA', 'DERMASON', 'DERMASON', 'CALI', 'CALI', 'SIRA', 'DERMASON', 'SIRA', 'SEKER', 'DERMASON', 'SEKER', 'CALI', 'DERMASON', 'SEKER', 'DERMASON', 'CALI', 'DERMASON', 'CALI', 'SEKER', 'SIRA', 'SEKER', 'CALI', 'BOMBAY', 'SIRA', 'SIRA', 'SIRA', 'SIRA', 'CALI', 'CALI', 'SIRA', 'SIRA', 'DERMASON', 'SEKER', 'SIRA', 'HOROZ', 'BOMBAY', 'DERMASON', 'DERMASON', 'BOMBAY', 'SIRA', 'SEKER', 'SIRA', 'HOROZ', 'DERMASON', 'HOROZ', 'DERMASON', 'CALI', 'CALI', 'SIRA', 'HOROZ', 'SIRA', 'HOROZ', 'HOROZ', 'HOROZ', 'HOROZ', 'HOROZ', 'CALI', 'DERMASON', 'SIRA', 'DERMASON', 'DERMASON', 'HOROZ', 'SIRA', 'DERMASON', 'SIRA', 'CALI', 'SIRA', 'CALI', 'SIRA', 'DERMASON', 'DERMASON', 'SEKER', 'HOROZ', 'SIRA', 'DERMASON', 'SEKER', 'SEKER', 'SIRA', 'SIRA', 'CALI', 'SIRA', 'SIRA', 'BOMBAY', 'CALI', 'SIRA', 'HOROZ', 'SEKER', 'DERMASON', '

In [237]:
# Calculate metrics
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, auc
import matplotlib.pyplot as plt
import numpy as np

def calculate_metrics(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted')
    recall = recall_score(y_true, y_pred, average='weighted')
    f1 = f1_score(y_true, y_pred, average='weighted')
    print(f"Accuracy: {accuracy}\nPrecision: {precision}\nRecall: {recall}\nF1 Score: {f1}")

def generate_confusion_matrix(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    print("Confusion Matrix:\n", cm)

def plot_multiclass_roc(y_true, y_scores, n_classes):
    # Binarize the output
    y_true = label_binarize(y_true, classes=[*range(n_classes)])
    if y_scores.shape[1] == 2:  # Binary case
        y_scores = np.hstack((1 - y_scores, y_scores))
    
    # Compute ROC curve and ROC area for each class
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_scores[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
    
    # Plot all ROC curves
    plt.figure()
    colors = iter(plt.cm.rainbow(np.linspace(0, 1, n_classes)))
    for i in range(n_classes):
        plt.plot(fpr[i], tpr[i], color=next(colors), lw=2,
                 label='Class {0} (area = {1:0.2f})'.format(i, roc_auc[i]))
    
    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Multi-class ROC')
    plt.legend(loc="lower right")
    plt.show()

calculate_metrics(y_test, predictions)
generate_confusion_matrix(y_test, predictions)

y = [row[-1] for row in dataset]
n_classes = len(set(y))

Accuracy: 0.7923592616401873
Precision: 0.7607235299334032
Recall: 0.7923592616401873
F1 Score: 0.7610815686950496
Confusion Matrix:
 [[   0    3  889    0    2   35  119]
 [   0  413    0    0    0    0    0]
 [   0    1 1264    0   13    2   38]
 [   0    0    0 2159    0   44  636]
 [   0    0   29    2 1290    0  220]
 [   0    0    2   33    0 1511  101]
 [   0    0    4   59    8   21 1991]]


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [108]:
# Implement DT using exteranal libraries to measure and compare model accuracy
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
from sklearn.datasets import load_iris

# Load the Iris dataset
iris = load_iris()
X = iris.data
y = iris.target

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize the Decision Tree Classifier
clf = DecisionTreeClassifier()

# Fit the classifier on the training data
clf.fit(X_train, y_train)

# Predict the labels for the test set
y_pred = clf.predict(X_test)

# Calculate the accuracy of the model
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy of the Decision Tree Classifier on the Iris dataset: {:.2f}".format(accuracy))


Accuracy of the Decision Tree Classifier on the Iris dataset: 1.00
