In [1]:
import numpy as np
import pandas as pd
from collections import Counter

In [8]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature = feature      # Which feature to split on
        self.threshold = threshold  # Value of split
        self.left = left          # Left child
        self.right = right        # Right child
        self.value = value        # For leaf nodes, store the predicted class

class DecisionTree:
    def __init__(self, max_depth=None, min_samples_split=2, min_samples_leaf=1, criterion='gini'):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.criterion = criterion  # 'gini' or 'entropy'
        self.root = None
    
    def _gini(self, y):
        """Calculate Gini impurity for a node
        
        Args:
            y: Array of class labels at the node
        Returns:
            Gini impurity value
        """
        # Get count of each class
        counts = np.bincount(y)
        # Convert to probabilities
        probabilities = counts / len(y)
        # Calculate Gini: 1 - Σ(pi²)
        return 1 - np.sum(probabilities ** 2)
    
    def _entropy(self, y):
        """Calculate entropy for a node
        
        Args:
            y: Array of class labels at the node
        Returns:
            Entropy value
        """
        counts = np.bincount(y)
        probabilities = counts / len(y)
        # Remove zero probabilities to avoid log(0)
        probabilities = probabilities[probabilities > 0]
        # Calculate entropy: -Σ(pi * log2(pi))
        return -np.sum(probabilities * np.log2(probabilities))
    
    def _criterion_function(self, y):
        """Wrapper to use either gini or entropy based on initialization"""
        if self.criterion == 'gini':
            return self._gini(y)
        else:
            return self._entropy(y)
    
    def _information_gain(self, parent, left_child, right_child):
        """Calculate information gain for a split
        
        Args:
            parent: Array of class labels before split
            left_child: Array of class labels in left node after split
            right_child: Array of class labels in right node after split
        Returns:
            Information gain value
        """
        # Calculate weights for weighted average
        w_left = len(left_child) / len(parent)
        w_right = len(right_child) / len(parent)
        
        # Calculate gain = parent impurity - weighted avg of children impurity
        gain = self._criterion_function(parent) - (
            w_left * self._criterion_function(left_child) +
            w_right * self._criterion_function(right_child)
        )
        
        return gain

    def _should_stop(self, depth, X, y):
        """Check if tree building should stop at this node
        
        Args:
            depth: Current depth of the tree
            X: Feature matrix for samples at this node
            y: Labels for samples at this node
            
        Returns:
            bool: True if we should stop, False otherwise
        """
        # 1. Max depth reached
        if self.max_depth is not None and depth >= self.max_depth:
            return True
        
        # 2. Not enough samples to split
        if len(y) < self.min_samples_split:
            return True
        
        # 3. Node is pure (all samples belong to same class)
        if len(np.unique(y)) == 1:
            return True
        
        # 4. No valid splits possible (all features have same value)
        if np.all(np.all(X == X[0, :], axis=0)):
            return True
        
        return False

    def _best_split(self, X, y):
        """Find the best split for a node.
        
        Args:
            X: Feature matrix for samples at this node
            y: Labels for samples at this node
        
        Returns:
            Dictionary containing:
            - 'feature': Best feature to split on
            - 'threshold': Best threshold value
            - 'gain': Information gain from this split
            - 'left_idxs': Indices of samples going to left child
            - 'right_idxs': Indices of samples going to right child
        """
        best_split = {
            'feature': None,
            'threshold': None,
            'gain': -float('inf'),
            'left_idxs': None,
            'right_idxs': None
        }
        
        # Need at least min_samples_split samples to consider splitting
        if len(y) < self.min_samples_split:
            return None
        
        # Try each feature
        n_features = X.shape[1]
        
        for feature in range(n_features):
            # Get unique values in this feature
            thresholds = np.unique(X[:, feature])
            
            # Try each threshold
            for threshold in thresholds:
                # Split the data
                left_idxs = X[:, feature] <= threshold
                right_idxs = ~left_idxs
                
                # Skip if split doesn't meet minimum samples requirement
                if (np.sum(left_idxs) < self.min_samples_leaf or 
                    np.sum(right_idxs) < self.min_samples_leaf):
                    continue
                
                # Calculate information gain
                gain = self._information_gain(
                    y,
                    y[left_idxs],
                    y[right_idxs]
                )
                
                # Update best split if this split is better
                if gain > best_split['gain']:
                    best_split['feature'] = feature
                    best_split['threshold'] = threshold
                    best_split['gain'] = gain
                    best_split['left_idxs'] = left_idxs
                    best_split['right_idxs'] = right_idxs
        
        return best_split if best_split['feature'] is not None else None

    def _build_tree(self, X, y, depth=0, debug=False):
        """Recursively build the decision tree
        
        Args:
            X: Feature matrix for samples at this node
            y: Labels for samples at this node
            depth: Current depth in tree (default=0 for root)
        
        Returns:
            Node: Root node of the tree/subtree
        """
        if debug:
            print(f"\nAt depth {depth}:")
            print(f"Sample count: {len(y)}")
            print(f"Class distribution: {np.bincount(y)}")

        # Check stopping conditions
        if self._should_stop(depth, X, y):
            # Create leaf node with majority class
            if debug:
                print("Stopping condition met! Creating leaf node.")
            majority_class = np.bincount(y).argmax()
            return Node(value=majority_class)
        
        # Find the best split
        best_split = self._best_split(X, y)
        
        # If no valid split found, return leaf node
        if best_split is None:
            if debug:
                print("No valid split found! Creating leaf node.")
            majority_class = np.bincount(y).argmax()
            return Node(value=majority_class)
        
        if debug:
            print(f"Best split found:")
            print(f"- Feature: {best_split['feature']}")
            print(f"- Threshold: {best_split['threshold']}")
            print(f"- Information gain: {best_split['gain']:.4f}")

        # Get indices for left and right children
        left_idxs = best_split['left_idxs']
        right_idxs = best_split['right_idxs']
        
        # Recursively build left and right subtrees
        left_subtree = self._build_tree(
            X[left_idxs],
            y[left_idxs],
            depth + 1,
            debug
        )
        
        right_subtree = self._build_tree(
            X[right_idxs],
            y[right_idxs],
            depth + 1,
            debug
        )
        
        # Create and return decision node
        return Node(
            feature=best_split['feature'],
            threshold=best_split['threshold'],
            left=left_subtree,
            right=right_subtree
        )

    def fit(self, X, y, debug=False):
        """Fit the decision tree to the training data
        
        Args:
            X: Training feature matrix
            y: Training labels
        """
        # Convert X to numpy array if it's a DataFrame
        if hasattr(X, 'values'):
            X = X.values
        # Convert y to numpy array if it's a pandas Series
        if hasattr(y, 'values'):
            y = y.values
        self.n_classes = len(np.unique(y))
        self.root = self._build_tree(X, y, depth=0, debug=debug)

    def _traverse_tree(self, x, node, depth=0, debug=False):
        """Traverse the tree for a single sample to find its leaf node prediction
        
        Args:
            x: Single sample features (array of length n_features)
            node: Current node in traversal (starts from root)
        
        Returns:
            Predicted class value
        """
        if debug:
            indent = "  " * depth  # Indent based on depth
            if node.value is not None:
                print(f"{indent}Leaf Node! Predicting class: {node.value}")
            else:
                feature_val = x[node.feature]
                print(f"{indent}At depth {depth}:")
                print(f"{indent}Checking feature {node.feature} (value={feature_val}) against threshold {node.threshold}")
                print(f"{indent}Going {'left' if feature_val <= node.threshold else 'right'}")
    
        # If we've reached a leaf node, return its value
        if node.value is not None:
            return node.value
        
        # Decide whether to go left or right based on the split
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left, depth + 1, debug)
        else:
            return self._traverse_tree(x, node.right, depth + 1, debug)

    def predict(self, X, debug=False):
        """Predict class labels for multiple samples
        
        Args:
            X: Feature matrix of samples to predict
        
        Returns:
            Array of predicted class labels
        """
        if hasattr(X, 'values'):
            X = X.values
        # Make predictions for each sample
        predictions = []
        for i, x in enumerate(X):
            if debug:
                print(f"\nPredicting sample {i}:")
            pred = self._traverse_tree(x, self.root, debug=debug)
            predictions.append(pred)
        return np.array(predictions)

In [10]:
# Load the data
poker = pd.read_csv(
    'poker-hand-training-true.data', 
    header=None, 
    names=['S1', 'C1', 'S2', 'C2', 'S3', 'C3', 'S4', 'C4', 'S5', 'C5', 'CLASS']
)

poker_test = pd.read_csv(
    'poker-hand-testing.data', 
    header=None, 
    names=['S1', 'C1', 'S2', 'C2', 'S3', 'C3', 'S4', 'C4', 'S5', 'C5', 'CLASS']
)

In [11]:
def create_feats_with_card_counts(df):
    ''' Create features for the poker hand dataset and with highest and 2nd highest card counts '''
    df_copy = df.copy()
    # count cards of same rank
    for card in range (1, 14):
        df_copy[f'card {card}'] = df_copy[[f'C{i}' for i in range(1, 6)]].eq(card).sum(axis=1)
    # count cards of same suit
    for suit in range (1, 5):
        df_copy[f'suit {suit}'] = df_copy[[f'S{i}' for i in range(1, 6)]].eq(suit).sum(axis=1)
    # check sequential
    df_copy['sorted_rank'] = df_copy.apply(lambda row: sorted([row[f'C{i}'] for i in range(1, 6)]), axis=1)
    df_copy['is_sequental'] = df_copy['sorted_rank'].apply(lambda x: all(x[i+1] - x[i] == 1 for i in range(len(x)-1)))
    df_copy['is_sequental'] = df_copy.apply(
        lambda row: True if row['sorted_rank'] == [1, 10, 11, 12, 13] else row['is_sequental'], axis=1
    )
    # check flush
    df_copy['is_flush'] = df_copy.apply(lambda row: any(row[f'suit {i}'] == 5 for i in range(1, 5)), axis=1)
    # Identify highest card count
    df_copy['max_card_count'] = df_copy[[f'card {i}' for i in range(1, 14)]].max(axis=1)
    # Identify second highest card count
    df_copy['second_max_card_count'] = df_copy.apply(
        lambda x: sorted([x[f'card {i}'] for i in range(1, 14)])[-2], 
        axis=1
    )
    df_copy.drop(['S1', 'C1', 'S2', 'C2', 'S3', 'C3', 'S4', 'C4', 'S5', 'C5', 'sorted_rank'], axis=1, inplace=True)
    return df_copy

In [12]:
poker_after = create_feats_with_card_counts(poker)
poker_test_after = create_feats_with_card_counts(poker_test)

x_train = poker_after.drop('CLASS', axis=1)
y_train = poker_after['CLASS']
x_test = poker_test_after.drop('CLASS', axis=1)
y_test = poker_test_after['CLASS']

In [13]:
tree_gini = DecisionTree(max_depth=5)
tree_gini.fit(x_train, y_train, debug=True)


At depth 0:
Sample count: 25010
Class distribution: [12493 10599  1206   513    93    54    36     6     5     5]
Best split found:
- Feature: 19
- Threshold: 1
- Information gain: 0.4304

At depth 1:
Sample count: 12650
Class distribution: [12493     0     0     0    93    54     0     0     5     5]
Best split found:
- Feature: 17
- Threshold: False
- Information gain: 0.0146

At depth 2:
Sample count: 12547
Class distribution: [12493     0     0     0     0    54]
Best split found:
- Feature: 18
- Threshold: False
- Information gain: 0.0086

At depth 3:
Sample count: 12493
Class distribution: [12493]
Stopping condition met! Creating leaf node.

At depth 3:
Sample count: 54
Class distribution: [ 0  0  0  0  0 54]
Stopping condition met! Creating leaf node.

At depth 2:
Sample count: 103
Class distribution: [ 0  0  0  0 93  0  0  0  5  5]
Best split found:
- Feature: 18
- Threshold: False
- Information gain: 0.1315

At depth 3:
Sample count: 93
Class distribution: [ 0  0  0  0 93]
St