In [None]:
from ucimlrepo import fetch_ucirepo 
import pandas as pd
import numpy as np

In [None]:

from types import SimpleNamespace

ucirepo_ids = {
    "iris": 53,
    "heart_disease": 45,
    "molecular_biology": 69,
    "breast_cancer": 17,
    "adult": 2,
    "bank_marketing": 222,
    "student_performance": 320,
    "wine": 109,
    "air_quality": 360,
    "mushroom": 73
}

DATASET_NAME = "adult"  # Example dataset name
TEST_SIZE = 0.2

def custom_data():
    
    data = {
            'age': ['youth', 'youth', 'middle aged', 'senior', 'senior', 'senior', 'middle aged',
                    'youth', 'youth', 'senior', 'youth', 'middle aged', 'middle aged', 'senior'],
            'income': ['high', 'high', 'high', 'medium', 'low', 'low', 'low',
                    'medium', 'low', 'medium', 'medium', 'medium', 'high', 'medium'],
            'student': ['no', 'no', 'no', 'no', 'yes', 'yes', 'yes',
                        'no', 'yes', 'yes', 'yes', 'no', 'yes', 'no'],
            'credit_rating': ['fair', 'excellent', 'fair', 'fair', 'fair', 'excellent', 'excellent',
                            'fair', 'fair', 'fair', 'excellent', 'excellent', 'fair', 'excellent'],
            'buys_computer': ['no', 'no', 'yes', 'yes', 'yes', 'no', 'yes',
                            'no', 'yes', 'yes', 'yes', 'yes', 'yes', 'no']
        }
        
    df = pd.DataFrame(data)
    
    # Split into features and target
    features = df.drop(columns='buys_computer')
    targets = df['buys_computer']
    targets= pd.DataFrame(targets.values.reshape(-1, 1), columns=['buys_computer'])

    
    # Variable info
    variable_info = {
        col: {
            'type': 'categorical',
            'unique_values': df[col].unique().tolist()
        } for col in df.columns
    }

    # Metadata
    metadata = {
        'source': 'Simulated AllElectronics dataset',
        'description': 'Customer attributes and their decision to buy a computer',
        'num_samples': len(df),
        'num_features': features.shape[1],
        'target_column': 'buys_computer',
        'class_labels': sorted(df['buys_computer'].unique().tolist())
    }

    # Build nested structure
    return SimpleNamespace(
        data=SimpleNamespace(
            features=features,
            targets=targets,
            feature_names=features.columns.tolist(),
            target_names=sorted(targets.iloc[:,0].unique()),
            # frame=df
        ),
        metadata=metadata,
        variables=variable_info
    )
    
    

def fetch_dataframe(dataframe_name):
    
    if dataframe_name == "custom_data":
        df = custom_data()
        
        # metadata 
        print(df.metadata) 
        
        # variable information 
        print(df.variables) 
    
        return df
    
    if dataframe_name in ucirepo_ids:
        # fetch dataset 
        df = fetch_ucirepo(id=ucirepo_ids[dataframe_name],) 

        # # data (as pandas dataframes) 
        X = df.data.features 
        y = df.data.targets 
        
        # metadata 
        print(df.metadata) 
        
        # variable information 
        print(df.variables) 
        
        return df
    else:
        raise ValueError(f"Dataset '{dataframe_name}' not found in UCI repository.")
    


In [None]:
def __train_test_split(X, y, test_size = 0.2, shuffle_and_stratify = True):
    
    if test_size < 0 or test_size > 1:
        raise ValueError("test_size must be between 0 and 1")
   
    if len(X) != len(y):
        raise ValueError("Features and targets must have the same length.")

    
    if shuffle_and_stratify == False:
    
        train_size = 1 - test_size
        train_index = int(len(X) * train_size)
        
        X_train = X[0: train_index]
        X_test = X[train_index:]
        
        y_train = y[0: train_index]
        y_test = y[train_index:]
        
        return X_train, X_test, y_train, y_test
    else:
        labels = y.iloc[:,0].unique()
        X_train = pd.DataFrame(columns=X.columns)
        y_train = pd.DataFrame(columns=y.columns)
        X_test = pd.DataFrame(columns=X.columns)
        y_test = pd.DataFrame(columns=y.columns)
        
        train_size = 1 - test_size
        

        for label in labels :
            y_rows = y[y.iloc[:,0] == label]            
            X_rows = X.loc[y_rows.index]
            
            train_index = int(len(X_rows) * train_size)
            
            X_train = pd.concat([X_train, X_rows.iloc[:train_index]], ignore_index=False)
            y_train = pd.concat([y_train, y_rows.iloc[:train_index]] , ignore_index=False)
            
            X_test = pd.concat([X_test, X_rows[train_index:]], ignore_index=False)
            y_test = pd.concat([y_test, y_rows[train_index:]], ignore_index=False)

        return X_train, X_test, y_train, y_test
    

In [380]:
df = fetch_dataframe(DATASET_NAME)

# print(df)

{'uci_id': 2, 'name': 'Adult', 'repository_url': 'https://archive.ics.uci.edu/dataset/2/adult', 'data_url': 'https://archive.ics.uci.edu/static/public/2/data.csv', 'abstract': 'Predict whether annual income of an individual exceeds $50K/yr based on census data. Also known as "Census Income" dataset. ', 'area': 'Social Science', 'tasks': ['Classification'], 'characteristics': ['Multivariate'], 'num_instances': 48842, 'num_features': 14, 'feature_types': ['Categorical', 'Integer'], 'demographics': ['Age', 'Income', 'Education Level', 'Other', 'Race', 'Sex'], 'target_col': ['income'], 'index_col': None, 'has_missing_values': 'yes', 'missing_values_symbol': 'NaN', 'year_of_dataset_creation': 1996, 'last_updated': 'Tue Sep 24 2024', 'dataset_doi': '10.24432/C5XW20', 'creators': ['Barry Becker', 'Ronny Kohavi'], 'intro_paper': None, 'additional_info': {'summary': "Extraction was done by Barry Becker from the 1994 Census database.  A set of reasonably clean records was extracted using the fol

In [381]:
from sklearn.model_selection import train_test_split

X = df.data.features
y = df.data.targets

X_train, X_test, y_train, y_test = __train_test_split(X, y , test_size=TEST_SIZE, shuffle_and_stratify=True)

# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE)

print("X_train shape:", X_train.shape)
print("X_test shape:", X_test.shape)
print("y_train shape:", y_train.shape)
print("y_test shape:", y_test.shape)

X_train shape: (39072, 14)
X_test shape: (9770, 14)
y_train shape: (39072, 1)
y_test shape: (9770, 1)


In [382]:
def __classification_report(y_true, y_pred):
    
    if isinstance(y_true, pd.DataFrame):
        y_true = y_true.iloc[:, 0]

    # Ensure y_true is a Series
    if isinstance(y_pred, pd.Series):
        y_pred = y_pred.reset_index(drop=True)
    elif isinstance(y_pred, list):
        y_pred = pd.Series(y_pred)


    if len(y_true) != len(y_pred):
        raise ValueError("prediction does not have same number of tuples as the true value set")
    
    labels = pd.Series(y_true).unique()
    
    for label in labels:
        P = N = TP = FP = TN = FN = 0
        
        for i in range(len(y_pred)):
            true_label = y_true.iloc[i] if hasattr(y_true, 'iloc') else y_true[i]
            pred_label = y_pred.iloc[i] if hasattr(y_pred, 'iloc') else y_pred[i]
            
            if true_label == label:
                P += 1
            else:
                N += 1
                
            if true_label == label and pred_label == label:
                TP += 1
            elif true_label == label and pred_label != label:
                FN += 1
            elif true_label != label and pred_label == label:
                FP += 1
            elif true_label != label and pred_label != label:
                TN += 1
        
        accuracy = (TP + TN) / (P + N) if (P + N) > 0 else 0.0
        precision = TP / (TP + FP) if (TP + FP) > 0 else 0.0
        recall = TP / (TP + FN) if (TP + FN) > 0 else 0.0
        f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
        sensitivity = TP / P if P > 0 else 0.0
        specificity = TN / N if N > 0 else 0.0
        support = P

        print(f"Label: {label}")
        print(f"  Accuracy   : {accuracy:.2f}")
        print(f"  Precision  : {precision:.2f}")
        print(f"  Recall     : {recall:.2f}")
        print(f"  F1 Score   : {f1_score:.2f}")
        print(f"  Sensitivity: {sensitivity:.2f}")
        print(f"  Specificity: {specificity:.2f}")
        print(f"  Support    : {support}")
        print("-" * 30)
        
    #overall accuracy
    
    mathced = 0
    for i in range(len(y_pred)):
        true_label = y_true.iloc[i] if hasattr(y_true, 'iloc') else y_true[i]
        pred_label = y_pred.iloc[i] if hasattr(y_pred, 'iloc') else y_pred[i]
        
        if true_label == pred_label:
            mathced += 1
    
    overall_accuracy = mathced / len(y_pred) if len(y_pred) > 0 else 0.0
    print(f"Overall Accuracy: {overall_accuracy:.2f}")


In [383]:

def _is_continous(X: pd.Series) -> bool:
    return np.issubdtype(X.dtype, np.number) and (len(X.unique()) / len(X) > 0.001)

        

class DiscreteAttributeSelectionCriteria:
    
    def __init__(self, value):
        self.value = value 
        
    def condition_satisfied(self, val):
        if self.value == None:
            raise ValueError(f"value not set")
        
        if self.value == val:
            return True
        else:
            return False
        
        
        
class ContinuousAtrributeSelectionCriteria:
    
    def __init__(self, start_point, end_point):
        self.start_point = start_point
        self.end_point = end_point
    
    def condition_satisfied(self, val):
        if self.start_point == None or self.end_point==None:
            raise ValueError(f"value not set")
        
        if self.start_point <= val <= self.end_point:
            return True
        else:
            return False



class DiscreteAndBinaryAtrributeSelectionCriteria:
    
    def __init__(self, set_of_values):
        self.value_set = set_of_values

    def condition_satisfied(self, val):
        if len(self.value_set) == 0:
            raise ValueError(f"value not set")
        
        if val in self.value_set:
            return True
        else:
            return False
        


def get_criteria(D,
                best_splitting_criterion,
                splitting_attr,
                multiple_splits_allowed:bool=True
                ):
    
    
    if not _is_continous(D[splitting_attr]):
        
        if multiple_splits_allowed:
            return DiscreteAttributeSelectionCriteria(value=best_splitting_criterion.value)
        else:
            return DiscreteAndBinaryAtrributeSelectionCriteria(set_of_values=best_splitting_criterion.set)
    else:
        return ContinuousAtrributeSelectionCriteria(start_point=best_splitting_criterion.start_point, 
                                                    end_point=best_splitting_criterion.end_point)
        

In [384]:
from math import log2


def info(D):
    label_counts = D.iloc[:, -1].value_counts().to_dict()
    info_val = 0
    for label in label_counts:
        pi = label_counts[label] / len(D)
        info_val += - pi * log2(pi)
    return info_val

def info_A(D, attr):
    info_A = 0
    attr_values = D[attr].unique()
    for attr_val in attr_values:
        Dj = D[D[attr] == attr_val]
        info_A += (len(Dj) / len(D)) * info(Dj)
    return info_A

def Gain(D, A):
    return info(D) - info_A(D, A)





In [385]:

from platform import node


class Node:
    def __init__(self):
        self.children = {}
        self.isLeaf = False
        self.split_attribute = None  # <-- add this
        self.returning_class = None
        self.attribute_selection_criteria = None


class DecisionTreeClassifier:
    def __init__(self, multiple_splits_allowed=True):
        self.root = None
        self.multiple_splits_allowed = multiple_splits_allowed

    def attribute_selection(self, D, attribute_list):
        best_gain = -1
        best_attr = None
        best_criterion = None

        for attr in attribute_list:
            
            if len(D[attr].unique()) <= 1:
                continue

            if _is_continous(D[attr]):
                sorted_vals = np.sort(D[attr].dropna().unique())
                split_points = [(sorted_vals[i] + sorted_vals[i+1]) / 2 for i in range(len(sorted_vals)-1)]

                for split in split_points:
                    D_left = D[D[attr] <= split]
                    D_right = D[D[attr] > split]
                    if len(D_left) == 0 or len(D_right) == 0:
                        continue
                    weighted_info = (len(D_left)/len(D)) * info(D_left) + (len(D_right)/len(D)) * info(D_right)
                    gain = info(D) - weighted_info
                    if gain > best_gain:
                        best_gain = gain
                        best_attr = attr
                        best_criterion = [split]  # Store best split point
            else:
                gain = Gain(D, attr)
                if gain > best_gain:
                    best_gain = gain
                    best_attr = attr
                    best_criterion = D[attr].unique()

        return best_criterion, best_attr

    def build_tree(self, X_train, y_train):
        # Combine features and labels into one DataFrame
        D = pd.concat([X_train, y_train], axis=1)
        attribute_list = set(X_train.columns)

        def generate_decision_tree(D, attribute_list):
            node = Node()

            # Stopping condition 1: All samples have the same label
            if len(D.iloc[:, -1].unique()) == 1:
                node.isLeaf = True
                node.returning_class = D.iloc[:, -1].iloc[0]
                return node

            # Stopping condition 2: No attributes left to split
            if len(attribute_list) == 0:
                node.isLeaf = True
                majority_class = D.iloc[:, -1].value_counts().idxmax()
                node.returning_class = majority_class
                return node

            best_criterion, best_attr = self.attribute_selection(D, attribute_list)
            
            node.split_attribute = best_attr  # Store the attribute name
            
            if not self.multiple_splits_allowed:
                attribute_list = attribute_list - {best_attr}

            # If no attribute gives positive gain, make leaf node with majority class
            if best_attr is None or len(best_criterion) == 0:
                node.isLeaf = True
                node.returning_class = D.iloc[:, -1].value_counts().idxmax()
                return node

            
            if _is_continous(D[best_attr]):
                split = best_criterion[0]
                
                node.attribute_selection_criteria = ContinuousAtrributeSelectionCriteria(start_point=-float('inf'), end_point=split)
                
                D_left = D[D[best_attr] <= split]
                D_right = D[D[best_attr] > split]
                
                if len(D_left) == 0 or len(D_right) == 0:
                    node.isLeaf = True
                    node.returning_class = D.iloc[:, -1].value_counts().idxmax()
                    return node


                node.children['left'] = generate_decision_tree(D_left, attribute_list.copy())
                node.children['right'] = generate_decision_tree(D_right, attribute_list.copy())
            else:
                node.attribute_selection_criteria = DiscreteAttributeSelectionCriteria(value=best_criterion[0])
            # If multiple splits are NOT allowed, remove the chosen attribute
                

                # Split dataset by each attribute value and recurse
                for attr_val in best_criterion:
                    D_j = D[D[best_attr] == attr_val]

                    # If no samples in this subset, create leaf with majority class of parent
                    if len(D_j) == 0:
                        leaf_node = Node()
                        leaf_node.isLeaf = True
                        leaf_node.returning_class = D.iloc[:, -1].value_counts().idxmax()
                        node.children[attr_val] = leaf_node
                    else:
                        node.children[attr_val] = generate_decision_tree(D_j, attribute_list.copy())

            return node

        self.root = generate_decision_tree(D, attribute_list)

    def _majority_class(self, node):
        from collections import Counter

        def collect_leaf_classes(n):
            if n.isLeaf:
                return [n.returning_class]
            labels = []
            for child in n.children.values():
                labels.extend(collect_leaf_classes(child))
            return labels

        leaf_classes = collect_leaf_classes(node)
        if not leaf_classes:
            return None
        return Counter(leaf_classes).most_common(1)[0][0]

    def predict_single(self, x):
        node = self.root
        while not node.isLeaf:
            attr = node.split_attribute  # <--- Get column name
            val = x[attr]                # <--- Safe: x[attr] is now x["age"] or similar

            if isinstance(node.attribute_selection_criteria, ContinuousAtrributeSelectionCriteria):
                if node.attribute_selection_criteria.condition_satisfied(val):
                    node = node.children['left']
                else:
                    node = node.children['right']
            elif isinstance(node.attribute_selection_criteria, DiscreteAttributeSelectionCriteria):
                if node.attribute_selection_criteria.condition_satisfied(val):
                    node = node.children[val]
                else:
                    return self._majority_class(node)

        return node.returning_class


    def predict(self, X_test):
        predictions = [self.predict_single(row) for _, row in X_test.iterrows()]
        return pd.Series(predictions, index=X_test.index)

In [386]:
model = DecisionTreeClassifier()

model.build_tree(X_train, y_train)

In [387]:


# Example prediction
example = X_test.iloc[0]
print(f"Example for prediction: {example.to_dict()}")
predicted_class = model.predict_single(example)
print(f"Predicted class: {predicted_class} -- expected: {y_test.iloc[0, 0]})")

Example for prediction: {'age': 28, 'workclass': 'Private', 'fnlwgt': 198258, 'education': 'Assoc-voc', 'education-num': 11, 'marital-status': 'Married-civ-spouse', 'occupation': 'Sales', 'relationship': 'Husband', 'race': 'White', 'sex': 'Male', 'capital-gain': 0, 'capital-loss': 0, 'hours-per-week': 40, 'native-country': 'United-States'}
Predicted class: <=50K -- expected: <=50K)


In [388]:
y_preds = model.predict(X_test)

print("y_test type:", type(y_test))
print("y_test shape:", y_test.shape)
print("y_preds type:", type(y_preds))
print("y_preds shape:", y_preds.shape)

if len(y_test) != len(y_preds):
    raise ValueError("y_test and y_preds must have the same length.")
# Evaluate the model
# from sklearn.metrics import accuracy_score, classification_report

# print(f"Accuracy: {accuracy_score(y_test, y_preds)}")
# print("Classification Report:")
# print(classification_report(y_test, y_preds))


__classification_report(y_test, y_preds)


y_test type: <class 'pandas.core.frame.DataFrame'>
y_test shape: (9770, 1)
y_preds type: <class 'pandas.core.series.Series'>
y_preds shape: (9770,)
Label: <=50K
  Accuracy   : 0.51
  Precision  : 0.51
  Recall     : 1.00
  F1 Score   : 0.67
  Sensitivity: 1.00
  Specificity: 0.00
  Support    : 4944
------------------------------
Label: >50K
  Accuracy   : 0.84
  Precision  : 0.00
  Recall     : 0.00
  F1 Score   : 0.00
  Sensitivity: 0.00
  Specificity: 1.00
  Support    : 1569
------------------------------
Label: <=50K.
  Accuracy   : 0.75
  Precision  : 0.00
  Recall     : 0.00
  F1 Score   : 0.00
  Sensitivity: 0.00
  Specificity: 1.00
  Support    : 2487
------------------------------
Label: >50K.
  Accuracy   : 0.92
  Precision  : 0.00
  Recall     : 0.00
  F1 Score   : 0.00
  Sensitivity: 0.00
  Specificity: 1.00
  Support    : 770
------------------------------
Overall Accuracy: 0.51
