In [1]:
from dataclasses import dataclass
from typing import Literal
from typing import Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sklearn
from graphviz import Digraph  # Add this import for Digraph

np.random.seed(42)

@dataclass
class DecisionTree:
    criterion: Literal["information_gain", "gini_index"]
    max_depth: int

    def _init_(self, criterion, max_depth=5):
        self.criterion = criterion
        self.max_depth = max_depth
        self.tree = None  # Add a property to store the constructed tree

    def find_best_split(self, X: pd.DataFrame, y: pd.Series) -> Tuple[float, int, float]:
        """Finds the best split for the current node.

        Returns:
            A tuple of (split_value, split_feature, split_gain).
        """
        best_split_value = None
        best_split_feature = None
        best_split_gain = float('-inf')  # or float('inf') depending on the criterion

        # Your implementation here, calculate split_value, split_feature, and split_gain

        # Example:
        # best_split_value = calculated_value
        # best_split_feature = calculated_feature
        # best_split_gain = calculated_gain

        return best_split_value, best_split_feature, best_split_gain

    # ... rest of your class methods ...



    def fit(self, X: pd.DataFrame, y: pd.Series) -> None:
        """
        Function to train and construct the decision tree
        """
        self.tree = self._fit(X, y, depth=0)

    def _fit(self, X: pd.DataFrame, y: pd.Series, depth: int) -> dict:
        """
        Recursive function to construct the decision tree
        """
        if depth == self.max_depth or len(set(y)) == 1:
            # Stop recursion if max depth reached or all labels are the same
            return {'value': y.mode().iloc[0]}

        # Find the best split
        split_value, split_feature, split_gain = self.find_best_split(X, y)

        if split_feature is None:
            # If no split is found, create a leaf node
            return {'value': y.mode().iloc[0]}

        # Split the dataset based on the best split
        left_mask = X[split_feature] <= split_value
        right_mask = ~left_mask

        # Recursively construct the left and right subtrees
        left_subtree = self._fit(X[left_mask], y[left_mask], depth + 1)
        right_subtree = self._fit(X[right_mask], y[right_mask], depth + 1)

        # Create a decision node
        return {'feature': split_feature,
                'value': split_value,
                'left': left_subtree,
                'right': right_subtree}

    def predict(self, X: pd.DataFrame) -> pd.Series:
        """
        Function to run the decision tree on test inputs
        """
        predictions = []

        for _, row in X.iterrows():
            predictions.append(self._predict(self.tree, row))

        return pd.Series(predictions)

    def _predict(self, node: dict, row: pd.Series) -> int:
        """
        Recursive function to make predictions using the decision tree
        """
        if 'value' in node:
            # Leaf node, return the predicted value
            return node['value']
        else:
            # Decision node, traverse to the appropriate subtree
            if row[node['feature']] <= node['value']:
                return self._predict(node['left'], row)
            else:
                return self._predict(node['right'], row)

    def plot(self, node=None, depth=0, parent_name=None, graph=None) -> None:
        """
        Function to plot the tree
        """
        if graph is None:
            graph = Digraph(format='png', node_attr={'style': 'filled', 'fillcolor': 'lightblue'})
            graph.attr(size='10,10')

        if node is None:
            node = self.tree

        if 'value' in node:
            graph.node(str(parent_name), label=str(node['value']), shape='ellipse', color='black', fillcolor='yellow')
            return

        feature_name = node['feature']
        value = node['value']

        graph.node(str(parent_name), label=f"{feature_name} <= {value}", shape='box', color='black', fillcolor='lightblue')

        left_name = f"{parent_name}_L"
        self.plot(node['left'], depth + 1, left_name, graph)
        graph.edge(str(parent_name), left_name, label='Yes')

        right_name = f"{parent_name}_R"
        self.plot(node['right'], depth + 1, right_name, graph)
        graph.edge(str(parent_name), right_name, label='No')

        return graph

In [2]:
import pandas as pd
import numpy as np

def check_ifreal(series: pd.Series) -> bool:
    """
    Function to check if the given series has real or discrete values
    """
    return pd.api.types.is_numeric_dtype(series)

def entropy(series: pd.Series) -> float:
    """
    Function to calculate the entropy of a given series
    """
    value_counts = series.value_counts()
    probabilities = value_counts / len(series)
    entropy_value = -np.sum(probabilities * np.log2(probabilities))
    return entropy_value

def gini_index(series: pd.Series) -> float:
    """
    Function to calculate the Gini index of a given series
    """
    value_counts = series.value_counts()
    probabilities = value_counts / len(series)
    gini_index_value = 1 - np.sum(np.square(probabilities))
    return gini_index_value

def information_gain(y: pd.Series, attr: pd.Series, criterion) -> float:
    """
    Function to calculate the information gain
    """
    if criterion == 'information_gain':
        entropy_before = entropy(y)
        entropy_after = y.groupby(attr).apply(lambda group: len(group) / len(y) * entropy(group)).sum()
        return entropy_before - entropy_after
    elif criterion == 'gini':
        return y.groupby(attr).apply(lambda group: len(group) / len(y) * gini_index(group)).sum()
    elif criterion == 'mse':
        mse_before = y.var()
        mse_after = y.groupby(attr).apply(lambda group: len(group) / len(y) * group.var()).sum()
        return mse_before - mse_after
    else:
        raise ValueError("Invalid criterion")

def opt_split_attribute(X: pd.DataFrame, y: pd.Series, criterion, features: pd.Series):
    """
    Function to find the optimal attribute to split upon.
    """
    scores = {}

    for feature in features:
        scores[feature] = information_gain(y, X[feature], criterion)
    
    if criterion in ['information_gain', 'mse']:
        key = max(scores, key=scores.get)
        return key, scores[key]
    elif criterion == 'gini':
        key = min(scores, key=scores.get)
        return key, scores[key]

def find_optimal_split_value(X: pd.DataFrame, y: pd.Series, attribute):
    """
    Function to find the optimal split value for a given attribute.
    """
    X_sorted = X.sort_values(by=attribute)
    unique_values = (X_sorted[attribute] + X_sorted[attribute].shift()) / 2
    unique_values = unique_values.iloc[1:].reset_index(drop=True)

    y = y if check_ifreal(y) else y.cat.codes
    best_mse = float('inf')
    optimal_value = None

    for value in unique_values:
        mse = np.sum((y[X[attribute] <= value] - y[X[attribute] <= value].mean())**2) + \
              np.sum((y[X[attribute] > value] - y[X[attribute] > value].mean())**2)
            
        if mse < best_mse:
            best_mse = mse
            optimal_value = value

    return optimal_value

def split_data(X: pd.DataFrame, y: pd.Series, attribute, value=None):
    """
    Function to split the data according to an attribute.
    """
    if not check_ifreal(X[attribute]):
        unique_values = X[attribute].unique()
        return [(X[X[attribute] == val], y[X[attribute] == val]) for val in unique_values], unique_values
    else:
        mask = (X[attribute] <= value)
        return [(X[mask], y[mask]), (X[~mask], y[~mask])], value

In [4]:
from typing import Union
import pandas as pd
import numpy as np

def accuracy(y_hat: pd.Series, y: pd.Series) -> float:
    """
    Function to calculate the accuracy
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    correct_predictions = (y_hat == y).sum()
    total_predictions = y.size
    return correct_predictions / total_predictions

def precision(y_hat: pd.Series, y: pd.Series, cls: Union[int, str]) -> float:
    """
    Function to calculate the precision
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    
    true_positive = ((y_hat == cls) & (y == cls)).sum()
    false_positive = ((y_hat == cls) & (y != cls)).sum()
    
    if true_positive + false_positive == 0:
        return 0.0
    else:
        return true_positive / (true_positive + false_positive)

def recall(y_hat: pd.Series, y: pd.Series, cls: Union[int, str]) -> float:
    """
    Function to calculate the recall
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    
    true_positive = ((y_hat == cls) & (y == cls)).sum()
    false_negative = ((y_hat != cls) & (y == cls)).sum()
    
    if true_positive + false_negative == 0:
        return 0.0
    else:
        return true_positive / (true_positive + false_negative)

def rmse(y_hat: pd.Series, y: pd.Series) -> float:
    """
    Function to calculate the root-mean-squared-error (rmse)
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    return np.sqrt(((y_hat - y) ** 2).mean())

def mae(y_hat: pd.Series, y: pd.Series) -> float:
    """
    Function to calculate the mean-absolute-error (mae)
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    return np.abs(y_hat - y).mean()


In [5]:
from typing import Union
import pandas as pd
import numpy as np

def accuracy(y_hat: pd.Series, y: pd.Series) -> float:
    """
    Function to calculate the accuracy
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    correct_predictions = (y_hat == y).sum()
    total_predictions = y.size
    return correct_predictions / total_predictions

def precision(y_hat: pd.Series, y: pd.Series, cls: Union[int, str]) -> float:
    """
    Function to calculate the precision
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    
    true_positive = ((y_hat == cls) & (y == cls)).sum()
    false_positive = ((y_hat == cls) & (y != cls)).sum()
    
    if true_positive + false_positive == 0:
        return 0.0
    else:
        return true_positive / (true_positive + false_positive)

def recall(y_hat: pd.Series, y: pd.Series, cls: Union[int, str]) -> float:
    """
    Function to calculate the recall
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    
    true_positive = ((y_hat == cls) & (y == cls)).sum()
    false_negative = ((y_hat != cls) & (y == cls)).sum()
    
    if true_positive + false_negative == 0:
        return 0.0
    else:
        return true_positive / (true_positive + false_negative)

def rmse(y_hat: pd.Series, y: pd.Series) -> float:
    """
    Function to calculate the root-mean-squared-error (rmse)
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    return np.sqrt(((y_hat - y) ** 2).mean())

def mae(y_hat: pd.Series, y: pd.Series) -> float:
    """
    Function to calculate the mean-absolute-error (mae)
    """
    assert y_hat.size == y.size, "Input sizes mismatch"
    return np.abs(y_hat - y).mean()

In [6]:
"""
The current code given is for the Assignment 1.
You will be expected to use this to make trees for:
> discrete input, discrete output
> real input, real output
> real input, discrete output
> discrete input, real output
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sklearn

np.random.seed(42)
# Test case 1
# Real Input and Real Output

N = 30
P = 5
X = pd.DataFrame(np.random.randn(N, P))
y = pd.Series(np.random.randn(N))


for criteria in ["information_gain", "gini_index"]:
    tree = DecisionTree(criterion=criteria,max_depth=9)  # Split based on Inf. Gain
    tree.fit(X, y)
    y_hat = tree.predict(X)
    print(find_optimal_split_value(X, y, tree.find_best_split(X, y)))
    tree.plot()
    print("Criteria :", criteria)
    print("RMSE: ", rmse(y_hat, y))
    print("MAE: ", mae(y_hat, y))

# Test case 2
# Real Input and Discrete Output

N = 30
P = 5
X = pd.DataFrame(np.random.randn(N, P))
y = pd.Series(np.random.randint(P, size=N), dtype="category")

for criteria in ["information_gain", "gini_index"]:
    tree = DecisionTree(criterion=criteria,max_depth=9)  # Split based on Inf. Gain
    tree.fit(X, y)
    y_hat = tree.predict(X)
    tree.plot()
    print("Criteria :", criteria)
    print("Accuracy: ", accuracy(y_hat, y))
    for cls in y.unique():
        print("Precision: ", precision(y_hat, y, cls))
        print("Recall: ", recall(y_hat, y, cls))


# Test case 3
# Discrete Input and Discrete Output

N = 30
P = 5
X = pd.DataFrame({i: pd.Series(np.random.randint(P, size=N), dtype="category") for i in range(5)})
y = pd.Series(np.random.randint(P, size=N), dtype="category")

for criteria in ["information_gain", "gini_index"]:
    tree = DecisionTree(criterion=criteria,max_depth=9)  # Split based on Inf. Gain
    tree.fit(X, y)
    y_hat = tree.predict(X)
    tree.plot()
    print("Criteria :", criteria)
    print("Accuracy: ", accuracy(y_hat, y))
    for cls in y.unique():
        print("Precision: ", precision(y_hat, y, cls))
        print("Recall: ", recall(y_hat, y, cls))

# Test case 4
# Discrete Input and Real Output

N = 30
P = 5
X = pd.DataFrame({i: pd.Series(np.random.randint(P, size=N), dtype="category") for i in range(5)})
y = pd.Series(np.random.randn(N))

for criteria in ["information_gain", "gini_index"]:
    tree = DecisionTree(criterion=criteria,max_depth=9)  # Split based on Inf. Gain
    tree.fit(X, y)
    y_hat = tree.predict(X)
    tree.plot()
    
    print("Criteria :", criteria)
    print("RMSE: ", rmse(y_hat, y))
    print("MAE: ", mae(y_hat, y))

KeyError: (None, None, -inf)