## KD Tree

In [1280]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import mode

In [1281]:
data = pd.read_csv('/Users/hanifemamgholizadeh/Desktop/patter_recognition/data/tree_classification_dataset.csv')

In [1282]:
data.head()

Unnamed: 0,feature_1,feature_2,feature_3,feature_4,target
0,-3.402839,0.179845,1.432424,0.774566,1
1,-1.902995,-1.241501,1.956614,0.512447,1
2,-1.023094,1.270126,0.782203,-0.785725,0
3,-0.331077,-0.06959,-0.258279,-0.339054,0
4,-3.377452,0.816699,-3.009166,-1.553258,2


In [1283]:
y = data['target']
X = data.drop(columns=['target'])
len(y.unique())

3

In [1284]:
class TreeNode:
    def __init__(self, left_child=None, right_child=None, parent=None, value=None, feature=None, leaf_list=None):
        self.left_child = left_child
        self.right_child = right_child
        self.parent = parent
        self.value = value
        self.feature = feature
        self.leaf_list = leaf_list if leaf_list is not None else []
        self.is_right_child = False
        self.is_left_child = False

    def is_leaf(self):
        return self.left_child is None and self.right_child is None

In [1285]:
def calculate_distance(x1, x2):
    return np.linalg.norm(np.array(x1) - np.array(x2))

In [1286]:
def calculate_distance_to_wall(x, node):
    feature = node.feature
    wall_value = node.value
    return abs(x[feature] - wall_value)

In [1287]:
def create_KD_tree(X, y, parent_node=None, direction=None):
    if len(X) <= 4 or len(np.unique(y)) == 1:
        # Create a leaf node
        return TreeNode(parent=parent_node, leaf_list=list(zip(X.values, y.values)))

    # Choose a random feature and split on its median
    feature = np.random.choice(X.columns)
    median_value = X[feature].median()

    # Split the data
    left_indices = X[X[feature] <= median_value].index
    right_indices = X[X[feature] > median_value].index

    # Create current node
    node = TreeNode(parent=parent_node, value=median_value, feature=feature)

    if direction == 'left':
        node.is_left_child = True
    elif direction == 'right':
        node.is_right_child = True

    # Recursive construction
    node.left_child = create_KD_tree(X.loc[left_indices], y.loc[left_indices], parent_node=node, direction='left')
    node.right_child = create_KD_tree(X.loc[right_indices], y.loc[right_indices], parent_node=node, direction='right')

    return node


In [1288]:
def depth_first_search(tree_node: TreeNode, x, knn_distance, knn=[]):
    if tree_node is None:
        return None
    
    if tree_node.is_leaf():
        leaf_list = tree_node.get_leaf_list()
        for item in leaf_list:
            calculate_distances = [calculate_distance(x, item) for item in leaf_list]
        sorted_indices = np.argsort(calculate_distances)
        knn.append(sorted_indices)
        return tree_node.get_leaf_list()
    
    feature = tree_node.get_feature()
    value = tree_node.get_value()
    
    if calculate_distance_to_wall(x[feature], value) < knn_distance:
        return depth_first_search(tree_node, x, knn_distance, knn)
    else:
        return depth_first_search(tree_node.parent, x)

In [1289]:
def depth_first_search(node, x, knn_distance, knn_list):
    if node is None:
        return

    if node.is_leaf():
        for point, label in node.leaf_list:
            dist = calculate_distance(x, point)
            knn_list.append((dist, label))
        return

    feature = node.feature
    value = node.value

    # Choose which subtree to go first
    if x[feature] <= value:
        depth_first_search(node.left_child, x, knn_distance, knn_list)
        if calculate_distance_to_wall(x, node) < knn_distance:
            depth_first_search(node.right_child, x, knn_distance, knn_list)
    else:
        depth_first_search(node.right_child, x, knn_distance, knn_list)
        if calculate_distance_to_wall(x, node) < knn_distance:
            depth_first_search(node.left_child, x, knn_distance, knn_list)

# Query the KD Tre

In [1290]:
def query_KD_tree(tree, x, k=1):
    knn_list = []

    # Initial DFS to fill neighbors
    depth_first_search(tree, x, knn_distance=np.inf, knn_list=knn_list)

    # Sort and return top-k neighbors
    knn_list.sort(key=lambda tup: tup[0])
    return knn_list[:k]

# Example usage
kd_tree = create_KD_tree(X, y)

# Test on one point
test_point = X.iloc[10]
neighbors = query_KD_tree(kd_tree, test_point, k=3)
print("Nearest Neighbors:", neighbors)
print("Test Point prediction:", mode(np.array([preds[1] for preds in neighbors])).mode[0])
print("True Label:", y.iloc[0])

Nearest Neighbors: [(0.0, 1), (0.7395099856826924, 1), (0.7622639206298912, 1)]
Test Point prediction: 1
True Label: 1


  print("Test Point prediction:", mode(np.array([preds[1] for preds in neighbors])).mode[0])


In [1291]:
from sklearn.metrics import accuracy_score
def evaluate_model(tree, X, y, k=3):
    predictions = []
    for i in range(len(X)):
        neighbors = query_KD_tree(tree, X.iloc[i], k)
        predicted_label = mode(np.array([preds[1] for preds in neighbors])).mode[0]
        predictions.append(predicted_label)
    
    accuracy = accuracy_score(y, predictions)
    print("Accuracy:", accuracy)
    return accuracy

In [1292]:
evaluate_model(kd_tree, X, y, k=3)

  predicted_label = mode(np.array([preds[1] for preds in neighbors])).mode[0]


Accuracy: 0.895


0.895

## Decision Tree

In [1293]:
df_dtree = pd.read_csv('/Users/hanifemamgholizadeh/Desktop/patter_recognition/data/decision_tree_dataset.csv')

In [1294]:
df_dtree.head()

Unnamed: 0,age,income,student,credit_rating,label
0,56,23392,1,1,0
1,46,45535,1,1,0
2,32,93603,0,1,0
3,25,67256,0,1,0
4,38,104135,1,0,1


In [1295]:
from sklearn.model_selection import train_test_split
X_dtree = df_dtree.drop(columns=['label'])
y_dtree = df_dtree['label']
X_train, X_test, y_train, y_test = train_test_split(X_dtree, y_dtree, test_size=0.2, random_state=42)

In [1296]:
class DTreeNode:
    def __init__(self, left_child=None, right_child=None, value=None, feature=None,
                 leaf_list=False, included_sample_size=0, impurity=None, sample_list=[], predicted_class=None):
        self.left_child = left_child
        self.right_child = right_child
        self.value = value
        self.feature = feature
        self.leaf_list = leaf_list
        self.included_sample_size = included_sample_size
        self.impurity = impurity
        self.samples_list = sample_list
        self.predicted_class = predicted_class  


In [1297]:
def columns_type(X=X_train):
    columns_types = dict()

    for column in X.columns:
        if X[column].dtype == 'object':
            columns_types[column] = 'categorical'
        else:
            if len(X[column].unique()) >= 10:
                columns_types[column] = 'numerical'
            else:
                columns_types[column] = 'categorical'
    return columns_types

In [1298]:
columns_types = columns_type()

In [1299]:
def entropy(labels):
    probs = labels.value_counts(normalize=True)
    return -np.sum([p * np.log2(p) for p in probs if p > 0])


In [None]:
def entropy(labels):
    probs = labels.value_counts(normalize=True)
    return -np.sum([p * np.log2(p) for p in probs if p > 0])

def decision_tree(X, y, parent_node=None, list_of_indices=None, threshold=0.01):

    columns_types = columns_type(X)

    # Use consistent positional indices
    if list_of_indices is None:
        list_of_indices = list(range(len(X)))

    # Subset the labels
    labels = y.iloc[list_of_indices]

    # Stopping conditions: pure or low impurity
    p_vals = labels.value_counts(normalize=True)
    current_impurity = -np.sum([p * np.log2(p) for p in p_vals if p > 0])

    if current_impurity <= threshold or len(set(labels)) == 1 or len(list_of_indices) <= 1:
        return DTreeNode(
            included_sample_size=len(list_of_indices),
            sample_list=list_of_indices,
            impurity=current_impurity,
            predicted_class=labels.mode()[0]
        )

    # Create current node
    current_node = DTreeNode(
        included_sample_size=len(list_of_indices),
        sample_list=list_of_indices,
        impurity=current_impurity
    )

    min_impurity = float('inf')
    feature_to_split = None
    value_to_split = None
    best_left_indices = None
    best_right_indices = None

    for column in X.columns:
        column_type = columns_types[column]
        data = X[column].iloc[list_of_indices]

        if column_type == 'numerical':
            sorted_indices = data.sort_values().index.tolist()

            for i in range(1, len(sorted_indices)):
                left_indices = sorted_indices[:i]
                right_indices = sorted_indices[i:]

                if not left_indices or not right_indices:
                    continue

                y_left = y.iloc[left_indices]
                y_right = y.iloc[right_indices]

                entropy_left = entropy(y_left)
                entropy_right = entropy(y_right)

                weighted_entropy = (
                    len(left_indices) / len(list_of_indices) * entropy_left +
                    len(right_indices) / len(list_of_indices) * entropy_right
                )

                if weighted_entropy < min_impurity:
                    min_impurity = weighted_entropy
                    feature_to_split = column
                    value_to_split = (data.iloc[i - 1] + data.iloc[i]) / 2
                    best_left_indices = left_indices
                    best_right_indices = right_indices

        else:  # categorical
            unique_values = data.unique()
            for val in unique_values:
                left_indices = data[data == val].index.tolist()
                right_indices = data[data != val].index.tolist()

                if not left_indices or not right_indices:
                    continue

                y_left = y.iloc[left_indices]
                y_right = y.iloc[right_indices]

                entropy_left = entropy(y_left)
                entropy_right = entropy(y_right)

                weighted_entropy = (
                    len(left_indices) / len(list_of_indices) * entropy_left +
                    len(right_indices) / len(list_of_indices) * entropy_right
                )

                if weighted_entropy < min_impurity:
                    min_impurity = weighted_entropy
                    feature_to_split = column
                    value_to_split = val
                    best_left_indices = left_indices
                    best_right_indices = right_indices

    # Final safeguard: avoid invalid or redundant splits
    if (
        best_left_indices is None or best_right_indices is None or
        set(best_left_indices) == set(list_of_indices) or
        set(best_right_indices) == set(list_of_indices)
    ):
        current_node.predicted_class = labels.mode()[0]
        return current_node

    # Store best split
    current_node.feature = feature_to_split
    current_node.value = value_to_split

    # Recursively split
    current_node.left_child = decision_tree(X, y, current_node, best_left_indices, threshold)
    current_node.right_child = decision_tree(X, y, current_node, best_right_indices, threshold)

    return current_node


In [1301]:
X_train = X_train.reset_index(drop=True)
y_train = y_train.reset_index(drop=True)

tree = decision_tree(X_train, y_train)

Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']


In [1302]:
def predict_one(sample, node):
    while node.left_child and node.right_child:
        val = sample[node.feature]

        if columns_types[node.feature] == 'numerical':
            if val <= node.value:
                node = node.left_child
            else:
                node = node.right_child
        else:  # categorical
            if val == node.value:
                node = node.left_child
            else:
                node = node.right_child

    return node.predicted_class


In [1303]:
def predict_all(X_test, root_node):
    return X_test.apply(lambda row: predict_one(row, root_node), axis=1)

In [1304]:
# Assume your full tree has been built like this
root = decision_tree(X_train, y_train)

# Predict on test set
y_pred = predict_all(X_test, root)

# Accuracy
from sklearn.metrics import accuracy_score
print("Accuracy:", accuracy_score(y_test, y_pred))

Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Columns: ['age', 'income', 'student', 'credit_rating']
Accuracy: 0.95


In [1305]:
from graphviz import Digraph

def visualize_tree(node, dot=None, parent_name=None, edge_label=None, node_id=[0]):
    if dot is None:
        dot = Digraph()
        dot.attr('node', shape='box')

    current_id = str(node_id[0])
    node_id[0] += 1

    if node.left_child is None and node.right_child is None:
        # Leaf node
        label = f'Leaf\nSamples: {node.included_sample_size}\nClass: {getattr(node, "predicted_class", "?")}'
    else:
        label = f'{node.feature} ≤ {node.value}' if columns_types[node.feature] == 'numerical' else f'{node.feature} = {node.value}'
        label += f'\nSamples: {node.included_sample_size}\nImpurity: {round(node.impurity, 3)}'

    dot.node(current_id, label)

    if parent_name is not None:
        dot.edge(parent_name, current_id, label=edge_label)

    # Recurse for children
    if node.left_child is not None:
        visualize_tree(node.left_child, dot, current_id, 'True', node_id)
    if node.right_child is not None:
        visualize_tree(node.right_child, dot, current_id, 'False', node_id)

    return dot


In [1306]:
# Build the decision tree (assuming you've already done this)
# root = decision_tree(X_train, y_train)

# # Visualize it
# dot = visualize_tree(root)
# dot.render("my_tree", format="png", cleanup=False)  # Saves as my_tree.png
# dot.view()  # Opens the image in default viewer


## Random Forest

### Bagging

In [1307]:
rf_data = pd.read_csv('/Users/hanifemamgholizadeh/Desktop/patter_recognition/data/random_forest_synthetic_dataset.csv')

In [1308]:
rf_data.head()

Unnamed: 0,age,income,score,owns_house,has_loan,is_married,category_1,category_2,category_3,target
0,56,51905,0.936648,1,1,0,B,X,Low,1
1,69,31258,0.039186,0,0,0,A,X,Low,1
2,46,79176,0.417946,1,1,1,C,Y,High,1
3,32,47699,0.967581,0,0,0,A,X,Low,0
4,60,36395,0.547972,0,1,1,C,Y,Low,1


In [1309]:
len(rf_data)

200

In [1310]:
len(rf_data.columns)

10

In [1311]:
from sklearn.model_selection import train_test_split
X_rf = rf_data.drop(columns=['target'])
y_rf = rf_data['target']
X_train_rf, X_test_rf, y_train_rf, y_test_rf = train_test_split(X_rf, y_rf, test_size=0.2, random_state=42)

In [1312]:
def bagging(X, sample_size=len(X)):
    return X.sample(n=sample_size, replace=True)

In [1313]:
import random
def train_rf(X, n_trees=10):
    trees = []
    for i in range(n_trees):
        
        sample = bagging(X).reset_index(drop=True)
        
        # Exclude 'target' when sampling features
        feature_columns = random.sample(
            list(sample.columns.difference(['target'])), 
            k=int(np.sqrt(len(sample.columns) - 1))
        )
        
        # Call decision tree with selected features
        tree = decision_tree(sample[feature_columns], sample['target'])
        trees.append(tree)
        
    return trees

In [1314]:
def predict_one(sample, node):
    columns_types = columns_type(X_test_rf)
    while node.left_child and node.right_child:
        val = sample[node.feature]

        if columns_types[node.feature] == 'numerical':
            if val <= node.value:
                node = node.left_child
            else:
                node = node.right_child
        else:  # categorical
            if val == node.value:
                node = node.left_child
            else:
                node = node.right_child

    return node.predicted_class


In [1315]:
def test_rf(trees, X_test):
    predictions = []
    for row in X_test.iterrows():
        
        votes = []
        for tree in trees:
            pred = predict_one(row[1], tree)
            votes.append(pred)
        final_prediction = mode(votes).mode[0]
        predictions.append(final_prediction)
    return predictions

In [1316]:
trees = train_rf(pd.concat([X_train_rf, y_train_rf], axis=1), n_trees=10)

Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']
Columns: ['score', 'category_3', 'age']


In [1317]:
preds = test_rf(trees, X_test_rf)

  final_prediction = mode(votes).mode[0]


In [1318]:
from sklearn.metrics import accuracy_score
print("Accuracy:", accuracy_score(y_test_rf, preds))

Accuracy: 0.55
