# Trees and Boosting

## Imports

In [1]:
import numpy as np
from collections import Counter
import matplotlib.pyplot as plt

## Decision Tree Class (Base Model)
- This class implements Decision tree with a fit and predict method.
- It accepts following input parameters when intializing the object: criterion, max_depth, min_samples_split, min_samples_leaf.

In [2]:
class DecisionTree:
    def __init__(self, criterion='gini', max_depth=None, min_samples_split=2, min_samples_leaf=1):
        self.criterion = criterion  # 'gini', 'entropy', or 'misclassification'
        self.max_depth = max_depth if max_depth is not None else 100  # Infinite depth if not specified
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.tree = None  # Root of the tree

    def fit(self, X, y, sample_weight=None):
        # """ Train the decision tree """
        # self.tree = self._build_tree(X, y, depth=0)

        """ Train the decision tree with optional sample weighting """
        if sample_weight is not None:
            # Normalize sample weights so they sum to 1
            sample_weight = sample_weight / np.sum(sample_weight)

            # Resample data based on sample weights
            indices = np.random.choice(len(y), size=len(y), replace=True, p=sample_weight)
            X = X[indices]
            y = y[indices]

        self.tree = self._build_tree(X, y, depth=0)

    def predict(self, X):
        """ Predict class labels for input samples """
        return np.array([self._traverse_tree(sample, self.tree) for sample in X])
    
    def _build_tree(self, X, y, depth):
        """ Recursively build the decision tree """
        num_samples, num_features = X.shape
        
        # Stopping conditions
        if depth >= self.max_depth or num_samples < self.min_samples_split or len(set(y)) == 1:
            return self._create_leaf(y)
        
        # Find the best split
        best_feature, best_threshold = self._find_best_split(X, y)
        if best_feature is None:
            return self._create_leaf(y)
        
        # Split the dataset
        left_mask = X[:, best_feature] <= best_threshold
        right_mask = ~left_mask
        
        left_subtree = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        right_subtree = self._build_tree(X[right_mask], y[right_mask], depth + 1)
        
        return {'feature': best_feature, 'threshold': best_threshold, 'left': left_subtree, 'right': right_subtree}
    
    def _find_best_split(self, X, y):
        """ Find the best feature and threshold to split on """
        best_gain = -1
        best_feature = None
        best_threshold = None
        
        for feature in range(X.shape[1]):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                left_mask = X[:, feature] <= threshold
                right_mask = ~left_mask

                # Skip if either side is empty
                if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
                    continue

                gain = self._calculate_information_gain(X[:, feature], y, threshold)
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold

        return best_feature, best_threshold
    
    def _calculate_information_gain(self, feature_column, y, threshold):
        """ Compute the information gain for a given feature split """
        left_mask = feature_column <= threshold
        right_mask = ~left_mask
        
        if sum(left_mask) < self.min_samples_leaf or sum(right_mask) < self.min_samples_leaf:
            return 0
        
        parent_impurity = self._calculate_impurity(y)
        left_impurity = self._calculate_impurity(y[left_mask])
        right_impurity = self._calculate_impurity(y[right_mask])
        
        weight_left = sum(left_mask) / len(y)
        weight_right = sum(right_mask) / len(y)
        
        return parent_impurity - (weight_left * left_impurity + weight_right * right_impurity)
    
    def _calculate_impurity(self, y):
        """ Compute impurity based on selected criterion """
        classes, counts = np.unique(y, return_counts=True)
        probs = counts / counts.sum()
        
        if self.criterion == 'gini':
            return 1 - np.sum(probs ** 2)
        elif self.criterion == 'entropy':
            return -np.sum(probs * np.log2(probs))
        elif self.criterion == 'misclassification':
            return 1 - np.max(probs)
        else:
            raise ValueError("Invalid criterion: Choose 'gini', 'entropy', or 'misclassification'")
    
    def _create_leaf(self, y):
        """ Create a leaf node with majority class label """
        if len(y) == 0:
            return {'label': 1}  # Default to class '1' to prevent errors
    
        values, counts = np.unique(y, return_counts=True)
        return {'label': values[np.argmax(counts)]}
    
    def _traverse_tree(self, sample, node):
        """ Recursively traverse the tree to predict class """
        if 'label' in node:
            return node['label']
        if sample[node['feature']] <= node['threshold']:
            return self._traverse_tree(sample, node['left'])
        else:
            return self._traverse_tree(sample, node['right'])

## Random Forest Class (Ensemble Model 1)

In [3]:
class RandomForest:
    def __init__(self, base_classifier, num_trees=10, min_features=2):
        """
        Initialize the Random Forest Classifier.
        :param base_classifier: The base classifier (DecisionTree) used in each tree.
        :param num_trees: Number of decision trees in the forest.
        :param min_features: Minimum number of features to consider for splitting.
        """
        self.num_trees = num_trees
        self.min_features = min_features
        self.trees = []
        self.feature_indices = []
        self.base_classifier = base_classifier

    def fit(self, X, y):
        """
        Train the Random Forest on dataset X and labels y.
        :param X: Training data (n_samples, n_features)
        :param y: Labels (n_samples,)
        """
        n_samples, n_features = X.shape
        self.trees = []
        self.feature_indices = []
        
        for _ in range(self.num_trees):
            # Bootstrap Sampling: Randomly sample n instances with replacement
            sample_indices = np.random.choice(n_samples, n_samples, replace=True)
            X_sample, y_sample = X[sample_indices], y[sample_indices]
            
            # Feature Selection: Choose random subset of features
            num_selected_features = np.random.randint(self.min_features, n_features + 1)
            feature_idx = np.random.choice(n_features, num_selected_features, replace=False)
            self.feature_indices.append(feature_idx)
            
            # Train decision tree on selected subset of data and features
            tree = self.base_classifier()
            tree.fit(X_sample[:, feature_idx], y_sample)
            self.trees.append(tree)

    def predict(self, X):
        """
        Predict the class labels for given samples.
        :param X: Test data (n_samples, n_features)
        :return: Predicted labels (n_samples,)
        """
        predictions = []
        
        for tree, feature_idx in zip(self.trees, self.feature_indices):
            pred = tree.predict(X[:, feature_idx])
            predictions.append(pred)
        
        # Majority voting
        final_predictions = np.array([Counter(col).most_common(1)[0][0] for col in np.array(predictions).T])
        return final_predictions

## Boosting using AdaBoost Class (Ensemble Model 2)

In [4]:
class AdaBoost:
    def __init__(self, weak_learner, num_learners=50, learning_rate=1.0):
        """
        Initialize AdaBoost classifier.

        :param weak_learner: Weak learner model (e.g., DecisionTree)
        :param num_learners: Number of weak learners to train
        :param learning_rate: Weight applied to weak learners
        """
        self.weak_learner = weak_learner
        self.num_learners = num_learners
        self.learning_rate = learning_rate
        self.learners = [] # List of trained weak learners
        self.alphas = [] # List of learner weights

    def fit(self, X, y):
        """
        Train AdaBoost classifier.

        :param X: Training data (n_samples, n_features)
        :param y: Training labels (-1 or +1) (n_samples,)
        """
        n_samples, n_features = X.shape
        # Step 1: Initialize weights equally
        weights = np.full(n_samples, (1 / n_samples))

        for m in range(self.num_learners):
            # Step 2: Train a weak learner on weighted data
            learner = self.weak_learner()  # Create a new weak learner
            learner.fit(X, y, sample_weight=weights)  # Fit with sample weights

            # Step 3: Get predictions
            predictions = learner.predict(X)
            incorrect = predictions != y  # Boolean array for misclassifications

            # Step 4: Compute weighted error
            err_m = np.dot(weights, incorrect) / np.sum(weights)

            # Step 5: Compute alpha (learner's contribution)
            alpha_m = self.learning_rate * np.log((1 - err_m) / (err_m + 1e-10)) / 2

            # Step 6: Update sample weights
            weights *= np.exp(alpha_m * incorrect * 2)  # Increase weights for misclassified samples
            weights /= np.sum(weights)  # Normalize

            # Store learner and its alpha
            self.learners.append(learner)
            self.alphas.append(alpha_m)

            # Early stopping if perfect classification is achieved
            if err_m == 0:
                break

    def predict(self, X):
        """
        Predict class labels for input data.

        :param X: Test data (n_samples, n_features)
        :return: Predicted class labels (n_samples,)
        """
        # Aggregate predictions from weak learners
        final_prediction = np.zeros(X.shape[0])

        for alpha, learner in zip(self.alphas, self.learners):
            final_prediction += alpha * learner.predict(X)

        return np.sign(final_prediction)  # Convert to -1 or +1

## Testing with the Titanic Dataset

### Data Preprocessing
- The Titanic dataset is processed in the same way from the reference of the notebook on Kaggle "Introduction to Decision Trees (Titanic dataset)

In [5]:
# Step 1: Imports needed for the script
import numpy as np
import pandas as pd
import re
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score

# Step 2: Load the dataset
train = pd.read_csv('./data/train.csv')
test = pd.read_csv('./data/test.csv')

# Store our test passenger IDs for easy access
PassengerId = test['PassengerId']

# Showing overview of the train dataset
train.head(3)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S


### Preprocessing

In [6]:
# Step 3: Feature Engineering

# Create a backup copy
original_train = train.copy()

# Combining train and test dataset
full_data = [train, test]

# Feature that tells whether a passenger had a cabin on the Titanic
# train['Has_Cabin'] = train["Cabin"].apply(lambda x: 0 if type(x) == float else 1)
# test['Has_Cabin'] = test["Cabin"].apply(lambda x: 0 if type(x) == float else 1)

train['Has_Cabin'] = train["Cabin"].apply(lambda x: 1 if pd.notna(x) else 0)
test['Has_Cabin'] = test["Cabin"].apply(lambda x: 1 if pd.notna(x) else 0)

# Feature engineering steps taken from Sina and Anisotropic
# Create new feature FamilySize as a combination of SibSp and Parch
for dataset in full_data:
    dataset['FamilySize'] = dataset['SibSp'] + dataset['Parch'] + 1

# Create new feature IsAlone from FamilySize
for dataset in full_data:
    dataset['IsAlone'] = 0
    dataset.loc[dataset['FamilySize'] == 1, 'IsAlone'] = 1

# Remove all NULLS in the Embarked column
for dataset in full_data:
    dataset['Embarked'] = dataset['Embarked'].fillna('S')

# Remove all NULLS in the Fare column
for dataset in full_data:
    dataset['Fare'] = dataset['Fare'].fillna(train['Fare'].median())

# Remove all NULLS in the Age column
for dataset in full_data:
    age_avg = dataset['Age'].mean()
    age_std = dataset['Age'].std()
    age_null_count = dataset['Age'].isnull().sum()
    age_null_random_list = np.random.randint(age_avg - age_std, age_avg + age_std, size=age_null_count)
    # Next line has been improved to avoid warning
    dataset.loc[np.isnan(dataset['Age']), 'Age'] = age_null_random_list
    dataset['Age'] = dataset['Age'].astype(int)

# Step 4: Extract Titles from Name

# Define function to extract titles from passenger names
def get_title(name):
    #title_search = re.search(' ([A-Za-z]+)\.', name)
    title_search = re.search(r' ([A-Za-z]+)\.', name)
    # If the title exists, extract and return it.
    if title_search:
        return title_search.group(1)
    return ""

for dataset in full_data:
    dataset['Title'] = dataset['Name'].apply(get_title)

# Group all non-common titles into one single grouping "Rare"
for dataset in full_data:
    dataset['Title'] = dataset['Title'].replace(['Lady', 'Countess','Capt', 'Col','Don', 'Dr', 'Major', 'Rev', 'Sir', 'Jonkheer', 'Dona'], 'Rare')

    dataset['Title'] = dataset['Title'].replace('Mlle', 'Miss')
    dataset['Title'] = dataset['Title'].replace('Ms', 'Miss')
    dataset['Title'] = dataset['Title'].replace('Mme', 'Mrs')

# Step 5: Encode Categorical Variables
for dataset in full_data:
    # Mapping Sex
    dataset['Sex'] = dataset['Sex'].map( {'female': 0, 'male': 1} ).astype(int)
    
    # Mapping titles
    title_mapping = {"Mr": 1, "Master": 2, "Mrs": 3, "Miss": 4, "Rare": 5}
    dataset['Title'] = dataset['Title'].map(title_mapping)
    dataset['Title'] = dataset['Title'].fillna(0)

    # Mapping Embarked
    dataset['Embarked'] = dataset['Embarked'].map( {'S': 0, 'C': 1, 'Q': 2} ).astype(int)
    
    # Mapping Fare
    dataset.loc[ dataset['Fare'] <= 7.91, 'Fare'] = 0
    dataset.loc[(dataset['Fare'] > 7.91) & (dataset['Fare'] <= 14.454), 'Fare'] = 1
    dataset.loc[(dataset['Fare'] > 14.454) & (dataset['Fare'] <= 31), 'Fare'] = 2
    dataset.loc[ dataset['Fare'] > 31, 'Fare'] = 3
    dataset['Fare'] = dataset['Fare'].astype(int)
    
    # Mapping Age
    dataset.loc[ dataset['Age'] <= 16, 'Age'] = 0
    dataset.loc[(dataset['Age'] > 16) & (dataset['Age'] <= 32), 'Age'] = 1
    dataset.loc[(dataset['Age'] > 32) & (dataset['Age'] <= 48), 'Age'] = 2
    dataset.loc[(dataset['Age'] > 48) & (dataset['Age'] <= 64), 'Age'] = 3
    dataset.loc[ dataset['Age'] > 64, 'Age'] = 4

# Step 6: Feature Selection: remove variables no longer containing relevant information
drop_elements = ['PassengerId', 'Name', 'Ticket', 'Cabin', 'SibSp']
train = train.drop(drop_elements, axis = 1)
test  = test.drop(drop_elements, axis = 1)

# Step 7: Train-Test Split for Validation
X = train.drop("Survived", axis=1)
y = train["Survived"]
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Showing processed train data
print("Final Processed Training Data:")
print(train.head(3))

print("Training Data Shape:", X_train.shape)
print("Validation Data Shape:", X_val.shape)
print("Test Data Shape:", test.shape)

Final Processed Training Data:
   Survived  Pclass  Sex  Age  Parch  Fare  Embarked  Has_Cabin  FamilySize  \
0         0       3    1    1      0     0         0          0           2   
1         1       1    0    2      0     3         1          1           2   
2         1       3    0    1      0     1         0          0           1   

   IsAlone  Title  
0        0      1  
1        0      3  
2        1      4  
Training Data Shape: (712, 10)
Validation Data Shape: (179, 10)
Test Data Shape: (418, 10)


## Evaluation
- Evaluating the performance of Decision Tree model, Random forest model, and AdaBoost model with the Titanic dataset.

In [7]:
# Initialize models
dt_model = DecisionTree(criterion="gini", max_depth=5, min_samples_split=2, min_samples_leaf=1)
rf_model = RandomForest(base_classifier=DecisionTree, num_trees=100, min_features=2)
ab_model = AdaBoost(weak_learner=DecisionTree, num_learners=80, learning_rate=0.002)

# Train the models
dt_model.fit(X_train.values, y_train.values)  # Convert to NumPy for consistency
rf_model.fit(X_train.values, y_train.values)
ab_model.fit(X_train.values, y_train.values)

# Predict on validation set
dt_preds = dt_model.predict(X_val.values)
rf_preds = rf_model.predict(X_val.values)
ab_preds = ab_model.predict(X_val.values)

# Compute accuracy
dt_acc = accuracy_score(y_val.values, dt_preds)
rf_acc = accuracy_score(y_val.values, rf_preds)
ab_acc = accuracy_score(y_val.values, ab_preds)

# Print results
print(f"Decision Tree Accuracy: {dt_acc:.4f}")
print(f"Random Forest Accuracy: {rf_acc:.4f}")
print(f"AdaBoost Accuracy: {ab_acc:.4f}")

Decision Tree Accuracy: 0.8324
Random Forest Accuracy: 0.8212
AdaBoost Accuracy: 0.6927


### Display Results in Table

In [8]:
results_df = pd.DataFrame({
    "Model": ["Decision Tree", "Random Forest", "AdaBoost"],
    "Accuracy": [dt_acc, rf_acc, ab_acc]
})

print("\nModel Performance on Titanic Dataset:\n\n")
print(results_df)


Model Performance on Titanic Dataset:


           Model  Accuracy
0  Decision Tree  0.832402
1  Random Forest  0.821229
2       AdaBoost  0.692737


### Make predictions on Test set

In [9]:
# Make predictions on the test set
dt_test_preds = dt_model.predict(test.values)
rf_test_preds = rf_model.predict(test.values)
ab_test_preds = ab_model.predict(test.values)

# Convert to binary output (since the models might return -1, 1). 0 is for not survived, 1 is for survived.
dt_test_preds = [1 if pred > 0 else 0 for pred in dt_test_preds]
rf_test_preds = [1 if pred > 0 else 0 for pred in rf_test_preds]
ab_test_preds = [1 if pred > 0 else 0 for pred in ab_test_preds]

# Create a submission DataFrame
submission_df = pd.DataFrame({
    "PassengerId": PassengerId,  # Ensure the ID column is retained
    "DecisionTree": dt_test_preds,
    "RandomForest": rf_test_preds,
    "AdaBoost": ab_test_preds
})

# Save results as CSV file
submission_df.to_csv("titanic_predictions.csv", index=False)
print("Submission file saved: titanic_submission.csv")

Submission file saved: titanic_submission.csv
