In [1]:
import pandas as pd
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import train_test_split
np.random.seed(42)



# Load

In [3]:
data = pd.read_csv("../datasets/student-por.csv")
data.head()

Unnamed: 0,school,sex,age,address,famsize,Pstatus,Medu,Fedu,Mjob,Fjob,...,famrel,freetime,goout,Dalc,Walc,health,absences,G1,G2,G3
0,GP,F,18,U,GT3,A,4,4,at_home,teacher,...,4,3,4,1,1,3,4,0,11,11
1,GP,F,17,U,GT3,T,1,1,at_home,other,...,5,3,3,1,1,3,2,9,11,11
2,GP,F,15,U,LE3,T,1,1,at_home,other,...,4,3,2,2,3,3,6,12,13,12
3,GP,F,15,U,GT3,T,4,2,health,services,...,3,2,2,1,1,5,0,14,14,14
4,GP,F,16,U,GT3,T,3,3,other,other,...,4,3,2,1,2,5,0,11,13,13


In [3]:
# Check for missing values
missing_values = data.isnull().sum()
missing_values[missing_values > 0]

Series([], dtype: int64)

In [4]:
# Split the data into features (X) and target (y)
X = data.drop("G3", axis=1)
y = data["G3"]

# Split the data into training and testing sets (80% training, 20% testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# One-hot encode the categorical variables for both training and testing sets
X_train_encoded = pd.get_dummies(X_train)
X_test_encoded = pd.get_dummies(X_test)

X_train_encoded.head()



Unnamed: 0,age,Medu,Fedu,traveltime,studytime,failures,famrel,freetime,goout,Dalc,...,activities_no,activities_yes,nursery_no,nursery_yes,higher_no,higher_yes,internet_no,internet_yes,romantic_no,romantic_yes
332,18,2,2,1,3,0,4,3,3,1,...,1,0,0,1,0,1,0,1,1,0
29,16,4,4,1,2,0,4,4,5,5,...,0,1,0,1,0,1,0,1,0,1
302,18,3,2,1,3,0,5,3,2,1,...,0,1,1,0,0,1,1,0,1,0
286,17,2,1,1,1,0,4,4,2,2,...,1,0,1,0,0,1,0,1,1,0
554,17,1,1,2,1,0,3,5,5,2,...,0,1,0,1,1,0,0,1,0,1


In [10]:
X_train_np = X_train_encoded.to_numpy()
y_train_np = y_train.to_numpy()

# Build

In [11]:
def bootstrap_sample(X, y):
    """generate a bootstrap sample of the dataset"""
    n_samples = X.shape[0]
    idxs = np.random.choice(n_samples, size=n_samples, replace=True)
    return X[idxs], y[idxs]

def gini_impurity(y):
    """compute the Gini impurity of a set of labels"""
    m = len(y)
    return 1.0 - sum([(np.sum(y == c) / m) ** 2 for c in np.unique(y)])

def weighted_gini_impurity(y_left, y_right):
    """compute the weighted Gini impurity of a split"""
    m_left, m_right = len(y_left), len(y_right)
    m_total = m_left + m_right
    gini_left, gini_right = gini_impurity(y_left), gini_impurity(y_right)
    return (m_left / m_total) * gini_left + (m_right / m_total) * gini_right

# Testing the functions
X_sample, y_sample = bootstrap_sample(X_train_np, y_train_np)
gini_original = gini_impurity(y_sample)
gini_split = weighted_gini_impurity(y_sample[:int(len(y_sample)/2)], y_sample[int(len(y_sample)/2):])

gini_original, gini_split
# (0.8987269872030472, 0.8977860029883151) Gini impurity of sample and its half

(0.8987269872030472, 0.8977860029883151)

In [12]:
class DecisionNode:
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

def build_tree(X, y, depth=1, max_depth=5):
    n_samples, n_features = X.shape
    n_labels = len(np.unique(y))
    
    # Stopping criteria
    if n_labels == 1 or depth == max_depth:
        return DecisionNode(value=np.bincount(y).argmax())
    
    # Randomly select features
    sub_features = np.random.choice(n_features, size=int(np.sqrt(n_features)), replace=False)
    
    # Track the best split
    best_gini = float('inf')
    best_split = {}
    
    # Find the best split
    for feature in sub_features:
        thresholds = np.unique(X[:, feature])
        for threshold in thresholds:
            y_left = y[X[:, feature] <= threshold]
            y_right = y[X[:, feature] > threshold]
            
            # Avoid splits where one side is empty
            if len(y_left) == 0 or len(y_right) == 0:
                continue
                
            gini = weighted_gini_impurity(y_left, y_right)
            if gini < best_gini:
                best_gini = gini
                best_split = {
                    "feature": feature,
                    "threshold": threshold,
                    "left_indices": X[:, feature] <= threshold,
                    "right_indices": X[:, feature] > threshold
                }
    
    # If no effective split is found, return a leaf node
    if "feature" not in best_split:
        return DecisionNode(value=np.bincount(y).argmax())
    
    # Build the left and right subtrees
    left_subtree = build_tree(X[best_split["left_indices"]], y[best_split["left_indices"]], depth + 1, max_depth)
    right_subtree = build_tree(X[best_split["right_indices"]], y[best_split["right_indices"]], depth + 1, max_depth)
    
    return DecisionNode(feature=best_split["feature"], threshold=best_split["threshold"], left=left_subtree, right=right_subtree)

# Testing the tree building
sample_tree = build_tree(X_sample, y_sample, max_depth=3)

In [13]:
def predict_sample(node, x):
    """predict a single sample using the decision tree"""
    if node.value is not None:
        return node.value
    if x[node.feature] <= node.threshold:
        return predict_sample(node.left, x)
    return predict_sample(node.right, x)

def decision_tree_predict(tree, X):
    """predict a dataset using the decision tree"""
    return np.array([predict_sample(tree, x) for x in X])

# Test the prediction function
predictions = decision_tree_predict(sample_tree, X_train_np[:10])
predictions

array([11, 11, 11, 11, 10, 10,  9,  9,  9, 11])

In [24]:
class SimplifiedRandomForest:
    def __init__(self, n_estimators=100, max_depth=5):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.trees = []
        self.feature_importances_ = np.zeros(X_train_np.shape[1])

    def fit(self, X, y):
        self.trees = []
        for _ in range(self.n_estimators):
            X_sample, y_sample = bootstrap_sample(X, y)
            tree = build_tree(X_sample, y_sample, max_depth=self.max_depth)
            self.trees.append(tree)

            # Calculating feature importance (simplified as total reduction in Gini impurity)
            self._calculate_feature_importance(tree)

        self.feature_importances_ /= self.n_estimators

    def _calculate_feature_importance(self, node):
        """recursively calculate feature importance"""
        if node.value is None:
            feature = node.feature
            self.feature_importances_[feature] += 1  # Increase the count for this feature
            self._calculate_feature_importance(node.left)
            self._calculate_feature_importance(node.right)

    def predict(self, X):
        predictions = np.zeros((self.n_estimators, X.shape[0]))
        for i, tree in enumerate(self.trees):
            predictions[i] = decision_tree_predict(tree, X)
        return np.apply_along_axis(lambda x: np.bincount(x.astype(int)).argmax(), axis=0, arr=predictions)

# Train & test

In [42]:
# Refit the simplified random forest with the seed set
srf_seed = SimplifiedRandomForest(n_estimators=10, max_depth=5)
srf_seed.fit(X_train_np, y_train_np)

In [43]:
# Splitting the data into training and testing subsets
X_train_split, X_test_split, y_train_split, y_test_split = train_test_split(X_train_np, y_train_np, test_size=0.2, random_state=42)

# Training the RandomForest on the training split
srf_split = SimplifiedRandomForest(n_estimators=10, max_depth=5)
srf_split.fit(X_train_split, y_train_split)

# Making predictions on the test split
y_pred = srf_split.predict(X_test_split)

# Calculating accuracy
accuracy = np.mean(y_pred == y_test_split)

accuracy, y_pred[:10]  # Displaying the accuracy and the first 10 predictions

(0.33653846153846156, array([11, 11, 11, 13, 13, 12, 11, 11, 11, 13]))

# Get important features

In [44]:
# Extract feature importances again
srf_seed_features_df = pd.DataFrame({
    'Feature': X_train_encoded.columns,
    'Importance': srf_seed.feature_importances_
})

sorted_srf_seed_features = srf_seed_features_df.sort_values(by='Importance', ascending=False)

sorted_srf_seed_features.head(10)

Unnamed: 0,Feature,Importance
13,G1,1.6
5,failures,1.1
14,G2,0.7
12,absences,0.7
16,school_MS,0.5
52,higher_no,0.5
27,Mjob_other,0.5
2,Fedu,0.5
7,freetime,0.4
4,studytime,0.4


# Tunning `n_estimators` & `max_depth`

In [47]:
# Define the grid of hyperparameters to search
param_grid = {
    'n_estimators': list(range(2, 31, 2)),
    'max_depth': list(range(3, 16, 2))
}

# Split the training data further into training and validation subsets
X_train_sub, X_val_sub, y_train_sub, y_val_sub = train_test_split(X_train_split, y_train_split, test_size=0.2, random_state=42)

best_params = None
best_accuracy = 0

# Grid search
for n_estimators in tqdm(param_grid['n_estimators']):
    for max_depth in param_grid['max_depth']:
        
        # Train the model
        srf_tune = SimplifiedRandomForest(n_estimators=n_estimators, max_depth=max_depth)
        srf_tune.fit(X_train_sub, y_train_sub)
        
        # Predict on the validation set
        y_val_pred = srf_tune.predict(X_val_sub)
        
        # Calculate accuracy
        accuracy = np.mean(y_val_pred == y_val_sub)
        
        # Update the best parameters if this model is better
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_params = {'n_estimators': n_estimators, 'max_depth': max_depth}

best_params, best_accuracy
# ({'n_estimators': 2, 'max_depth': 9}, 0.39759036144578314)

100%|███████████████████████████████████████████| 15/15 [01:20<00:00,  5.34s/it]


({'n_estimators': 2, 'max_depth': 9}, 0.39759036144578314)

# Compare to sklearn version

In [78]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV

# Define the grid of hyperparameters to search
param_grid_sklearn = {
    'n_estimators': list(range(2, 31, 2)),
    'max_depth': list(range(3, 16, 2)),
    'random_state': [42]  # Setting a fixed random state for reproducibility
}

# Initialize the RandomForestClassifier
rfc_sklearn = RandomForestClassifier()

# Use GridSearchCV for tuning
grid_search = GridSearchCV(rfc_sklearn, param_grid_sklearn, cv=5, scoring="accuracy")
grid_search.fit(X_train_split, y_train_split)

# Train the model with the best hyperparameters on the entire training split
best_rfc_sklearn = grid_search.best_estimator_

# Predict on the test split
y_test_pred_sklearn = best_rfc_sklearn.predict(X_test_split)

# Calculate accuracy on the test split
test_accuracy_sklearn = accuracy_score(y_test_split, y_test_pred_sklearn)

grid_search.best_params_, test_accuracy_sklearn
# ({'max_depth': 5, 'n_estimators': 26, 'random_state': 42}, 0.3076923076923077) 



({'max_depth': 5, 'n_estimators': 26, 'random_state': 42}, 0.3076923076923077)