In [None]:
import pandas as pd
import numpy as np

# Load the training and validation datasets
df_train = pd.read_csv('/content/MNIST_train.csv')
df_val = pd.read_csv('/content/MNIST_validation.csv')

print("Training data loaded. Shape:", df_train.shape)
print("Validation data loaded. Shape:", df_val.shape)

# Display the first few rows of the training dataframe
print("\nFirst 5 rows of training data:")
print(df_train.head())

Training data loaded. Shape: (10002, 786)
Validation data loaded. Shape: (2499, 786)

First 5 rows of training data:
   label  1x1  1x2  1x3  1x4  1x5  1x6  1x7  1x8  1x9  ...  28x20  28x21  \
0      0    0    0    0    0    0    0    0    0    0  ...      0      0   
1      3    0    0    0    0    0    0    0    0    0  ...      0      0   
2      4    0    0    0    0    0    0    0    0    0  ...      0      0   
3      3    0    0    0    0    0    0    0    0    0  ...      0      0   
4      4    0    0    0    0    0    0    0    0    0  ...      0      0   

   28x22  28x23  28x24  28x25  28x26  28x27  28x28  even  
0      0      0      0      0      0      0      0     1  
1      0      0      0      0      0      0      0     0  
2      0      0      0      0      0      0      0     1  
3      0      0      0      0      0      0      0     0  
4      0      0      0      0      0      0      0     1  

[5 rows x 786 columns]


**Reasoning**:
Now that the dataframes are loaded, the next step is to separate features from labels, normalize the pixel values, and ensure the correct data types as per the subtask instructions.



In [None]:
X_train = df_train.drop(columns=['label', 'even']).values
y_train = df_train['label'].values

X_val = df_val.drop(columns=['label', 'even']).values
y_val = df_val['label'].values

# Normalize pixel values to [0, 1]
X_train = X_train / 255.0
X_val = X_val / 255.0

# Convert feature arrays to float and label arrays to int
X_train = X_train.astype('float32')
X_val = X_val.astype('float32')
y_train = y_train.astype('int32')
y_val = y_val.astype('int32')

print("Features and labels separated and normalized.")
print("X_train shape:", X_train.shape, "Dtype:", X_train.dtype)
print("y_train shape:", y_train.shape, "Dtype:", y_train.dtype)
print("X_val shape:", X_val.shape, "Dtype:", X_val.dtype)
print("y_val shape:", y_val.shape, "Dtype:", y_val.dtype)

Features and labels separated and normalized.
X_train shape: (10002, 784) Dtype: float32
y_train shape: (10002,) Dtype: int32
X_val shape: (2499, 784) Dtype: float32
y_val shape: (2499,) Dtype: int32


## Softmax Regression (Multiclass Logistic Regression)

Implement Softmax Regression for multiclass classification, including softmax activation, multiclass cross-entropy loss, gradient descent optimization, prediction, accuracy evaluation, and training time recording.


**Reasoning**:
I will start by defining the `SoftmaxRegression` class and implement its constructor to initialize weights and biases, which are essential for the model. Then I will define the softmax activation function.



In [None]:
class SoftmaxRegression:
    def __init__(self, num_features, num_classes):
        self.num_features = num_features
        self.num_classes = num_classes
        # Initialize weights with small random values
        self.weights = np.random.randn(num_features, num_classes) * 0.01
        # Initialize biases to zeros
        self.biases = np.zeros((1, num_classes))

    def softmax(self, z):
        # Subtract the max for numerical stability
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def cross_entropy_loss(self, y_pred_prob, y_true_one_hot):
        # Avoid log(0) by clipping probabilities
        epsilon = 1e-9
        y_pred_prob = np.clip(y_pred_prob, epsilon, 1. - epsilon)
        loss = -np.sum(y_true_one_hot * np.log(y_pred_prob)) / y_pred_prob.shape[0]
        return loss

print("SoftmaxRegression class structure initialized with softmax and cross-entropy loss functions.")

SoftmaxRegression class structure initialized with softmax and cross-entropy loss functions.


In [None]:
import time

class SoftmaxRegression:
    def __init__(self, num_features, num_classes):
        self.num_features = num_features
        self.num_classes = num_classes
        self.weights = np.random.randn(num_features, num_classes) * 0.01
        self.biases = np.zeros((1, num_classes))

    def softmax(self, z):
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def cross_entropy_loss(self, y_pred_prob, y_true_one_hot):
        epsilon = 1e-9
        y_pred_prob = np.clip(y_pred_prob, epsilon, 1. - epsilon)
        loss = -np.sum(y_true_one_hot * np.log(y_pred_prob)) / y_pred_prob.shape[0]
        return loss

    def compute_gradients(self, X, y_true_one_hot, y_pred_prob):
        # Gradient of loss with respect to scores (logits)
        grad_logits = y_pred_prob - y_true_one_hot
        # Gradient of loss with respect to weights
        grad_weights = np.dot(X.T, grad_logits) / X.shape[0]
        # Gradient of loss with respect to biases
        grad_biases = np.sum(grad_logits, axis=0, keepdims=True) / X.shape[0]
        return grad_weights, grad_biases

    def fit(self, X, y, learning_rate=0.1, epochs=100):
        start_time = time.time()

        # Convert y to one-hot encoding
        y_one_hot = np.zeros((y.size, self.num_classes))
        y_one_hot[np.arange(y.size), y] = 1

        for epoch in range(epochs):
            # Compute scores (logits)
            z = np.dot(X, self.weights) + self.biases
            # Apply softmax to get probabilities
            y_pred_prob = self.softmax(z)
            # Compute loss
            loss = self.cross_entropy_loss(y_pred_prob, y_one_hot)

            # Compute gradients
            grad_weights, grad_biases = self.compute_gradients(X, y_one_hot, y_pred_prob)

            # Update weights and biases
            self.weights -= learning_rate * grad_weights
            self.biases -= learning_rate * grad_biases

            # Optional: print loss every few epochs
            # if (epoch + 1) % 10 == 0:
            #     print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss:.4f}")

        self.training_time = time.time() - start_time
        print(f"Training finished in {self.training_time:.2f} seconds.")

    def predict(self, X):
        # Compute scores (logits)
        z = np.dot(X, self.weights) + self.biases
        # Apply softmax to get probabilities
        y_pred_prob = self.softmax(z)
        # Return the class with the highest probability
        return np.argmax(y_pred_prob, axis=1)

def evaluate(y_true, y_pred):
    accuracy = np.mean(y_true == y_pred)
    return accuracy

# Create an instance of the SoftmaxRegression model
num_features = X_train.shape[1]
num_classes = len(np.unique(y_train))
softmax_model = SoftmaxRegression(num_features, num_classes)

# Train the model
print("Starting Softmax Regression training...")
softmax_model.fit(X_train, y_train, learning_rate=0.5, epochs=200)

# Make predictions on the validation set
y_pred_val = softmax_model.predict(X_val)

# Evaluate the model
softmax_accuracy = evaluate(y_val, y_pred_val)
print(f"Softmax Regression Validation Accuracy: {softmax_accuracy:.4f}")
print(f"Softmax Regression Training Time: {softmax_model.training_time:.2f} seconds.")


Starting Softmax Regression training...
Training finished in 20.73 seconds.
Softmax Regression Validation Accuracy: 0.9012
Softmax Regression Training Time: 20.73 seconds.


## Perceptron (Multiclass OVR)

Implement the Perceptron algorithm using the Multiclass One-vs-Rest strategy, train 10 separate perceptron models, implement the perceptron update rule without activation, predict by choosing the class with the highest output, and evaluate accuracy and record training time.


In [None]:
import time

class Perceptron:
    def __init__(self, num_features):
        # Initialize weights to zeros (or small random values)
        self.weights = np.zeros(num_features)
        # Initialize bias to zero
        self.bias = 0

    def fit(self, X, y, learning_rate=0.01, epochs=100):
        # Perceptron update rule
        for _ in range(epochs):
            for i in range(X.shape[0]):
                # Calculate the raw score (without activation function)
                linear_output = np.dot(X[i], self.weights) + self.bias
                # If misclassified, update weights and bias
                # The target labels for OVR will be +1 or -1
                if y[i] * linear_output <= 0: # Misclassification condition (y_true * y_pred <= 0)
                    self.weights += learning_rate * y[i] * X[i]
                    self.bias += learning_rate * y[i]

    def predict(self, X):
        # Return the raw scores
        return np.dot(X, self.weights) + self.bias

print("Perceptron class defined with __init__, fit, and predict methods.")

Perceptron class defined with __init__, fit, and predict methods.


**Reasoning**:
Now that the `Perceptron` class is defined, I will implement the Multiclass One-vs-Rest strategy by training 10 separate perceptron models for each digit (0-9) and storing them. I will also measure the training time and then proceed with prediction and evaluation.



In [None]:
num_features = X_train.shape[1]
num_classes = len(np.unique(y_train))

# 2. Implement the Multiclass One-vs-Rest (OVR) strategy
perceptron_models = []

print("Starting Perceptron OVR training...")
start_time = time.time()

for c in range(num_classes):
    print(f"Training Perceptron for digit {c}...")
    # Create binary training labels: +1 for current class, -1 for others
    y_train_binary = np.where(y_train == c, 1, -1)

    # Create an instance of the Perceptron class
    perceptron_model = Perceptron(num_features)

    # Train this perceptron model
    perceptron_model.fit(X_train, y_train_binary, learning_rate=0.1, epochs=20)

    # Store the trained perceptron model
    perceptron_models.append(perceptron_model)

perceptron_training_time = time.time() - start_time
print(f"Perceptron OVR training finished in {perceptron_training_time:.2f} seconds.")

# 3. Make predictions on the validation set (X_val)
y_pred_val_perceptron = np.zeros(X_val.shape[0])

print("Making predictions on the validation set...")
for i in range(X_val.shape[0]):
    scores = []
    for model in perceptron_models:
        scores.append(model.predict(X_val[i].reshape(1, -1))[0]) # Get raw score for current sample

    # The predicted class is the digit corresponding to the model with the highest score
    y_pred_val_perceptron[i] = np.argmax(scores)

# 4. Evaluate the model's performance
perceptron_accuracy = np.mean(y_val == y_pred_val_perceptron)

print(f"\nMulticlass Perceptron OVR Validation Accuracy: {perceptron_accuracy:.4f}")
print(f"Multiclass Perceptron OVR Training Time: {perceptron_training_time:.2f} seconds.")


Starting Perceptron OVR training...
Training Perceptron for digit 0...
Training Perceptron for digit 1...
Training Perceptron for digit 2...
Training Perceptron for digit 3...
Training Perceptron for digit 4...
Training Perceptron for digit 5...
Training Perceptron for digit 6...
Training Perceptron for digit 7...
Training Perceptron for digit 8...
Training Perceptron for digit 9...
Perceptron OVR training finished in 10.49 seconds.
Making predictions on the validation set...

Multiclass Perceptron OVR Validation Accuracy: 0.8531
Multiclass Perceptron OVR Training Time: 10.49 seconds.


## Gaussian Naive Bayes

Implement Gaussian Naive Bayes classifier. For each class (digit), calculate the mean and variance for each pixel from the training data. Use these statistics to compute the Gaussian probability density function for each pixel. Predict the class that maximizes the posterior probability (using log-probabilities to avoid underflow). Evaluate accuracy and record training time.


In [None]:
import time

# Initialize lists to store parameters for each class
class_means = []
class_variances = []
class_priors = []

num_classes = len(np.unique(y_train))
num_features = X_train.shape[1]

# Small value to avoid division by zero and log(0)
epsilon = 1e-6

print("Starting Gaussian Naive Bayes training...")
start_time = time.time()

for c in range(num_classes):
    # Filter data for the current class
    X_c = X_train[y_train == c]

    # Calculate mean and variance for each feature for the current class
    mean_c = np.mean(X_c, axis=0)
    variance_c = np.var(X_c, axis=0) + epsilon # Add epsilon for numerical stability

    class_means.append(mean_c)
    class_variances.append(variance_c)

    # Calculate prior probability for the current class
    prior_c = len(X_c) / len(X_train)
    class_priors.append(prior_c)

# Convert to numpy arrays for easier manipulation
class_means = np.array(class_means)
class_variances = np.array(class_variances)
class_priors = np.array(class_priors)

gnb_training_time = time.time() - start_time
print(f"Gaussian Naive Bayes training finished in {gnb_training_time:.2f} seconds.")

print("Calculated means, variances, and priors for each class.")
print("Shape of class_means:", class_means.shape)
print("Shape of class_variances:", class_variances.shape)
print("Shape of class_priors:", class_priors.shape)

Starting Gaussian Naive Bayes training...
Gaussian Naive Bayes training finished in 0.10 seconds.
Calculated means, variances, and priors for each class.
Shape of class_means: (10, 784)
Shape of class_variances: (10, 784)
Shape of class_priors: (10,)


**Reasoning**:
Now that the means, variances, and priors have been calculated for each class, I will define a helper function to compute the Gaussian log-Probability Density Function (log-PDF). This function will be crucial for calculating the log-likelihoods in the prediction step, ensuring numerical stability by working with log-probabilities.



In [None]:
def gaussian_log_pdf(X, mean, variance):
    # Using log-probabilities for numerical stability
    # log(1 / sqrt(2 * pi * variance)) - (x - mean)^2 / (2 * variance)
    log_denominator = 0.5 * np.log(2 * np.pi * variance)
    numerator = (X - mean)**2
    log_likelihood = -0.5 * (numerator / variance) - log_denominator
    return log_likelihood

print("Gaussian log-PDF function defined.")

Gaussian log-PDF function defined.


In [None]:
def predict_gnb(X):
    predictions = np.zeros(X.shape[0])
    for i, sample in enumerate(X):
        log_posteriors = []
        for c in range(num_classes):
            # Calculate log-prior
            log_prior = np.log(class_priors[c])

            # Calculate log-likelihood for the sample given the class
            # Sum of log-PDFs for each feature
            log_likelihood_per_feature = gaussian_log_pdf(sample, class_means[c], class_variances[c])
            log_likelihood = np.sum(log_likelihood_per_feature)

            # Calculate log-posterior
            log_posterior = log_prior + log_likelihood
            log_posteriors.append(log_posterior)

        # Predict the class with the maximum log-posterior probability
        predictions[i] = np.argmax(log_posteriors)
    return predictions

print("Making predictions on the validation set using Gaussian Naive Bayes...")
prediction_start_time = time.time()
y_pred_val_gnb = predict_gnb(X_val)
gnb_prediction_time = time.time() - prediction_start_time

# Evaluate the model
gnb_accuracy = np.mean(y_val == y_pred_val_gnb)

print(f"Gaussian Naive Bayes Validation Accuracy: {gnb_accuracy:.4f}")
print(f"Gaussian Naive Bayes Training Time: {gnb_training_time:.2f} seconds.")
print(f"Gaussian Naive Bayes Prediction Time: {gnb_prediction_time:.2f} seconds.")


Making predictions on the validation set using Gaussian Naive Bayes...
Gaussian Naive Bayes Validation Accuracy: 0.6355
Gaussian Naive Bayes Training Time: 0.10 seconds.
Gaussian Naive Bayes Prediction Time: 0.52 seconds.


## k-Nearest Neighbors (k-NN)

Implement the k-Nearest Neighbors algorithm from scratch, including distance calculation, finding nearest neighbors, and majority voting for prediction.


In [None]:
import numpy as np
import time

class KNearestNeighbors:
    def __init__(self, n_neighbors=5):
        self.n_neighbors = n_neighbors

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y
        print("KNN model fitted with training data.")

print("KNearestNeighbors class defined with __init__ and fit methods.")

KNearestNeighbors class defined with __init__ and fit methods.


**Reasoning**:
Now that the `KNearestNeighbors` class has its `__init__` and `fit` methods, I will implement the `euclidean_distance`, `find_nearest_neighbors`, and `predict` methods within the class to handle distance calculation, neighbor identification, and majority voting for predictions, completing the implementation of the KNN algorithm as per the subtask instructions.



In [None]:
class KNearestNeighbors:
    def __init__(self, n_neighbors=5):
        self.n_neighbors = n_neighbors

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y
        print("KNN model fitted with training data.")

    def euclidean_distance(self, x1, x2):
        # Calculate Euclidean distance between two points
        return np.sqrt(np.sum((x1 - x2)**2))

    def find_nearest_neighbors(self, X_test_sample):
        distances = [self.euclidean_distance(X_test_sample, x_train) for x_train in self.X_train]
        # Get the indices of the k smallest distances
        k_nearest_indices = np.argsort(distances)[:self.n_neighbors]
        # Return the labels of the k nearest neighbors
        return [self.y_train[i] for i in k_nearest_indices]

    def predict_single_sample(self, X_test_sample):
        neighbors_labels = self.find_nearest_neighbors(X_test_sample)
        # Perform majority voting
        unique_labels, counts = np.unique(neighbors_labels, return_counts=True)
        return unique_labels[np.argmax(counts)]

    def predict(self, X_test):
        predictions = [self.predict_single_sample(x) for x in X_test]
        return np.array(predictions)


# Initialize and fit the model
knn_model = KNearestNeighbors(n_neighbors=5) # You can change k here
print("Starting k-NN training (fitting data)...")
knn_training_start_time = time.time()
knn_model.fit(X_train, y_train)
knn_training_time = time.time() - knn_training_start_time
print(f"k-NN training (fitting) finished in {knn_training_time:.2f} seconds.")

# Make predictions on the validation set
print("Making predictions on the validation set using k-NN...")
knn_prediction_start_time = time.time()
y_pred_val_knn = knn_model.predict(X_val)
knn_prediction_time = time.time() - knn_prediction_start_time

# Evaluate the model
knn_accuracy = np.mean(y_val == y_pred_val_knn)

print(f"\nk-Nearest Neighbors Validation Accuracy: {knn_accuracy:.4f}")
print(f"k-Nearest Neighbors Training Time: {knn_training_time:.2f} seconds.")
print(f"k-Nearest Neighbors Prediction Time: {knn_prediction_time:.2f} seconds.")

Starting k-NN training (fitting data)...
KNN model fitted with training data.
k-NN training (fitting) finished in 0.00 seconds.
Making predictions on the validation set using k-NN...

k-Nearest Neighbors Validation Accuracy: 0.9504
k-Nearest Neighbors Training Time: 0.00 seconds.
k-Nearest Neighbors Prediction Time: 273.04 seconds.


## Linear SVM (Gradient Descent)

Implement the Linear SVM model, including its initialization and the hinge loss function.


In [None]:
class LinearSVM:
    def __init__(self, num_features):
        # Initialize weights with small random values to break symmetry
        self.weights = np.random.randn(num_features) * 0.01
        # Initialize bias to zero
        self.bias = 0

    def hinge_loss(self, y_true, y_pred_score):
        # y_true should be +1 or -1
        # y_pred_score is the raw output from the SVM (w.x + b)
        return np.maximum(0, 1 - y_true * y_pred_score)

print("LinearSVM class initialized with __init__ and hinge_loss methods.")

LinearSVM class initialized with __init__ and hinge_loss methods.


## Linear SVM (Gradient Descent)

Implement the `fit` method to optimize the Linear SVM using gradient descent and the `predict` method for classification. Evaluate the model's accuracy on the validation set and record its training time.


In [None]:
import time

class LinearSVM:
    def __init__(self, num_features):
        # Initialize weights with small random values
        self.weights = np.random.randn(num_features) * 0.01
        # Initialize bias to zero
        self.bias = 0
        self.training_time = 0

    def hinge_loss(self, y_true, y_pred_score):
        # y_true should be +1 or -1
        # y_pred_score is the raw output from the SVM (w.x + b)
        return np.maximum(0, 1 - y_true * y_pred_score)

    def fit(self, X, y_binary, learning_rate=0.001, epochs=100, C=0.01):
        # y_binary labels should be +1 or -1
        start_time = time.time()

        for epoch in range(epochs):
            # Iterate through each training sample
            for i in range(X.shape[0]):
                x_i = X[i]
                y_i = y_binary[i]

                # Calculate the score (decision function output)
                score = np.dot(x_i, self.weights) + self.bias

                # Update weights and bias based on the hinge loss subgradient
                if y_i * score < 1:  # Misclassified or within the margin
                    self.weights = self.weights + learning_rate * (y_i * x_i - 2 * C * self.weights)
                    self.bias = self.bias + learning_rate * y_i
                else:  # Correctly classified and outside the margin
                    self.weights = self.weights - learning_rate * 2 * C * self.weights
                    # Bias remains unchanged in this case

        self.training_time = time.time() - start_time

    def predict(self, X):
        # Compute raw scores (decision function output)
        scores = np.dot(X, self.weights) + self.bias
        # Return class predictions: +1 if score > 0, -1 otherwise
        return np.where(scores > 0, 1, -1)

# Helper function to evaluate accuracy (re-using the one defined in softmax section)
# def evaluate(y_true, y_pred):
#     accuracy = np.mean(y_true == y_pred)
#     return accuracy

# Multiclass One-vs-Rest (OVR) strategy for Linear SVM
num_features = X_train.shape[1]
num_classes = len(np.unique(y_train))

svm_models_ovr = []
overall_svm_training_start_time = time.time()

print("Starting Linear SVM OVR training...")
for c in range(num_classes):
    print(f"Training Linear SVM for digit {c}...")
    # Convert original labels to binary: +1 for current class, -1 for all others
    y_train_binary_ovr = np.where(y_train == c, 1, -1)

    # Create an instance of LinearSVM for the current class
    svm_model = LinearSVM(num_features)

    # Train the SVM model with specified hyperparameters
    # Using a small C for L2 regularization and a small learning rate for stability
    svm_model.fit(X_train, y_train_binary_ovr, learning_rate=0.0001, epochs=50, C=0.001)
    svm_models_ovr.append(svm_model)

linear_svm_ovr_training_time = time.time() - overall_svm_training_start_time
print(f"Linear SVM OVR training finished in {linear_svm_ovr_training_time:.2f} seconds.")

# Make predictions on the validation set using the OVR strategy
print("Making predictions on the validation set using Linear SVM OVR...")

# Store raw scores from each binary SVM for each validation sample
prediction_scores = np.zeros((X_val.shape[0], num_classes))

for c, model in enumerate(svm_models_ovr):
    # The raw decision function output (w.x + b) is used for OVR prediction
    scores_for_class_c = np.dot(X_val, model.weights) + model.bias
    prediction_scores[:, c] = scores_for_class_c

# The predicted class for each sample is the one whose corresponding SVM yielded the highest score
y_pred_val_svm_ovr = np.argmax(prediction_scores, axis=1)

# Evaluate the model's accuracy on the validation set
linear_svm_ovr_accuracy = np.mean(y_val == y_pred_val_svm_ovr) # Using numpy mean for accuracy

print(f"\nLinear SVM (Gradient Descent) OVR Validation Accuracy: {linear_svm_ovr_accuracy:.4f}")
print(f"Linear SVM (Gradient Descent) OVR Training Time: {linear_svm_ovr_training_time:.2f} seconds.")

Starting Linear SVM OVR training...
Training Linear SVM for digit 0...
Training Linear SVM for digit 1...
Training Linear SVM for digit 2...
Training Linear SVM for digit 3...
Training Linear SVM for digit 4...
Training Linear SVM for digit 5...
Training Linear SVM for digit 6...
Training Linear SVM for digit 7...
Training Linear SVM for digit 8...
Training Linear SVM for digit 9...
Linear SVM OVR training finished in 48.07 seconds.
Making predictions on the validation set using Linear SVM OVR...

Linear SVM (Gradient Descent) OVR Validation Accuracy: 0.8980
Linear SVM (Gradient Descent) OVR Training Time: 48.07 seconds.


## k-Means Clustering + Label Mapping

Implement the k-Means clustering algorithm, including initialization of centroids, the iterative assignment of points to clusters, and updating of centroids until convergence.


In [None]:
class KMeans:
    def __init__(self, n_clusters=10, max_iter=300, random_state=None):
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        self.centroids = None
        self.labels = None
        self.random_state = random_state

    def fit(self, X):
        if self.random_state:
            np.random.seed(self.random_state)

        # 1. Initialize centroids randomly from the data points
        # Randomly select n_clusters indices from X
        initial_indices = np.random.choice(X.shape[0], self.n_clusters, replace=False)
        self.centroids = X[initial_indices]

        for i in range(self.max_iter):
            # 2. Assign each data point to the closest centroid
            distances = np.sqrt(((X - self.centroids[:, np.newaxis])**2).sum(axis=2))
            self.labels = np.argmin(distances, axis=0)

            # 3. Update centroids
            new_centroids = np.array([X[self.labels == k].mean(axis=0) if np.any(self.labels == k) else self.centroids[k]
                                      for k in range(self.n_clusters)])

            # Check for convergence
            if np.allclose(self.centroids, new_centroids, atol=1e-4):
                print(f"K-Means converged after {i+1} iterations.")
                break

            self.centroids = new_centroids
        else:
            print(f"K-Means did not converge after {self.max_iter} iterations.")

        return self

    def predict(self, X):
        # Assign each data point to the closest centroid
        distances = np.sqrt(((X - self.centroids[:, np.newaxis])**2).sum(axis=2))
        return np.argmin(distances, axis=0)

print("KMeans class defined with __init__ and fit methods.")

KMeans class defined with __init__ and fit methods.


In [None]:
import time

# Initialize and fit the KMeans model
kmeans_model = KMeans(n_clusters=num_classes, random_state=42) # num_classes is 10 for MNIST

print("Starting k-Means training...")
kmeans_training_start_time = time.time()
kmeans_model.fit(X_train)
kmeans_training_time = time.time() - kmeans_training_start_time
print(f"k-Means training finished in {kmeans_training_time:.2f} seconds.")

# Map clusters to true labels
# For each cluster, find the most frequent true label among its members
cluster_to_label_map = np.zeros(num_classes, dtype=int)
for i in range(num_classes):
    # Get true labels for all data points assigned to cluster i
    labels_in_cluster = y_train[kmeans_model.labels == i]
    if len(labels_in_cluster) > 0:
        # Find the most frequent label in this cluster
        unique_labels, counts = np.unique(labels_in_cluster, return_counts=True)
        cluster_to_label_map[i] = unique_labels[np.argmax(counts)]
    else:
        # Handle empty clusters, e.g., assign an arbitrary label or skip
        cluster_to_label_map[i] = -1 # Indicate unassigned/empty cluster

print("Cluster to label mapping created:", cluster_to_label_map)

# Predict on the validation set
print("Making predictions on the validation set using k-Means...")
kmeans_prediction_start_time = time.time()
y_pred_val_kmeans_clusters = kmeans_model.predict(X_val)
y_pred_val_kmeans = np.array([cluster_to_label_map[cluster_id] for cluster_id in y_pred_val_kmeans_clusters])
kmeans_prediction_time = time.time() - kmeans_prediction_start_time

# Evaluate accuracy (ignoring unassigned clusters for evaluation if any)
# For simplicity, we assume all clusters are mapped to a valid label here

kmeans_accuracy = np.mean(y_val == y_pred_val_kmeans)

print(f"\nk-Means Clustering + Label Mapping Validation Accuracy: {kmeans_accuracy:.4f}")
print(f"k-Means Clustering + Label Mapping Training Time: {kmeans_training_time:.2f} seconds.")
print(f"k-Means Clustering + Label Mapping Prediction Time: {kmeans_prediction_time:.2f} seconds.")

Starting k-Means training...
K-Means converged after 44 iterations.
k-Means training finished in 10.48 seconds.
Cluster to label mapping created: [4 9 1 8 3 1 7 2 6 0]
Making predictions on the validation set using k-Means...

k-Means Clustering + Label Mapping Validation Accuracy: 0.5818
k-Means Clustering + Label Mapping Training Time: 10.48 seconds.
k-Means Clustering + Label Mapping Prediction Time: 0.05 seconds.


## XGBoost (from Scratch)

Implement the core `Node` and `DecisionTree` classes for XGBoost, handling splitting criteria, gain calculation, and leaf value computation based on gradients and Hessians.


In [1]:
import numpy as np

class Node:
    def __init__(self, feature_idx=None, threshold=None, value=None, left=None, right=None, gain=None):
        # For decision node
        self.feature_idx = feature_idx
        self.threshold = threshold
        self.left = left
        self.right = right
        self.gain = gain
        # For leaf node
        self.value = value

print("Node class defined for XGBoost Decision Tree.")

Node class defined for XGBoost Decision Tree.


In [2]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.root = None

    # (Placeholder for future methods: _find_best_split, _calculate_leaf_value, _grow_tree, fit, predict_one_sample, predict)

print("DecisionTree class defined with constructor.")

DecisionTree class defined with constructor.


In [3]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.root = None

    def _calculate_gain(self, G_L, H_L, G_R, H_R):
        # Gain calculation for XGBoost
        # G_L, H_L: sum of gradients and hessians in left child
        # G_R, H_R: sum of gradients and hessians in right child
        # reg_lambda: L2 regularization term
        # reg_alpha: L1 regularization term

        # Loss for a node (before/after split) using L1 and L2 regularization
        def loss_term(G, H):
            return -0.5 * ((G**2) / (H + self.reg_lambda)) if (H + self.reg_lambda) > 0 else 0

        gain = loss_term(G_L, H_L) + loss_term(G_R, H_R) - loss_term(G_L + G_R, H_L + H_R)
        # Optionally, add gamma for tree complexity regularization (not explicitly in subtask but common)
        # gain -= self.gamma # Tree complexity cost
        return gain

    def _find_best_split(self, X, g, h):
        best_gain = -np.inf
        best_feature_idx = None
        best_threshold = None

        # Sum of gradients and hessians for the current node
        G_node = np.sum(g)
        H_node = np.sum(h)

        # Check if splitting is even possible (e.g., enough samples)
        if X.shape[0] < self.min_samples_split:
            return None, None, None

        for feature_idx in range(X.shape[1]):
            # Sort unique thresholds for the current feature
            thresholds = np.unique(X[:, feature_idx])
            # If there's only one unique value, no split possible for this feature
            if len(thresholds) <= 1:
                continue

            for threshold in thresholds:
                # Split data based on the current feature and threshold
                left_indices = X[:, feature_idx] < threshold
                right_indices = X[:, feature_idx] >= threshold

                # Ensure minimum samples in each split
                if np.sum(left_indices) < self.min_samples_split or np.sum(right_indices) < self.min_samples_split:
                    continue

                # Calculate sums of gradients and hessians for left and right children
                G_L = np.sum(g[left_indices])
                H_L = np.sum(h[left_indices])
                G_R = np.sum(g[right_indices])
                H_R = np.sum(h[right_indices])

                # Calculate gain
                current_gain = self._calculate_gain(G_L, H_L, G_R, H_R)

                # Update best split if current_gain is better
                if current_gain > best_gain:
                    best_gain = current_gain
                    best_feature_idx = feature_idx
                    best_threshold = threshold

        # A split is only considered if it results in a positive gain after complexity cost
        # Here, we assume gain includes complexity cost implicitly if self.gamma is used in _calculate_gain
        # For now, let's just return if best_gain is significantly positive
        if best_gain <= 0: # A common heuristic is to require positive gain
            return None, None, None

        return best_feature_idx, best_threshold, best_gain

    # (Placeholder for future methods: _calculate_leaf_value, _grow_tree, fit, predict_one_sample, predict)

print("DecisionTree class updated with _calculate_gain and _find_best_split methods.")

DecisionTree class updated with _calculate_gain and _find_best_split methods.


In [4]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.root = None

    def _calculate_gain(self, G_L, H_L, G_R, H_R):
        # Gain calculation for XGBoost
        # G_L, H_L: sum of gradients and hessians in left child
        # G_R, H_R: sum of gradients and hessians in right child
        # reg_lambda: L2 regularization term
        # reg_alpha: L1 regularization term

        # Loss for a node (before/after split) using L1 and L2 regularization
        def loss_term(G, H):
            return -0.5 * ((G**2) / (H + self.reg_lambda)) if (H + self.reg_lambda) > 0 else 0

        gain = loss_term(G_L, H_L) + loss_term(G_R, H_R) - loss_term(G_L + G_R, H_L + H_R)
        # Optionally, add gamma for tree complexity regularization (not explicitly in subtask but common)
        # gain -= self.gamma # Tree complexity cost
        return gain

    def _find_best_split(self, X, g, h):
        best_gain = -np.inf
        best_feature_idx = None
        best_threshold = None

        # Sum of gradients and hessians for the current node
        G_node = np.sum(g)
        H_node = np.sum(h)

        # Check if splitting is even possible (e.g., enough samples)
        if X.shape[0] < self.min_samples_split:
            return None, None, None

        for feature_idx in range(X.shape[1]):
            # Sort unique thresholds for the current feature
            thresholds = np.unique(X[:, feature_idx])
            # If there's only one unique value, no split possible for this feature
            if len(thresholds) <= 1:
                continue

            for threshold in thresholds:
                # Split data based on the current feature and threshold
                left_indices = X[:, feature_idx] < threshold
                right_indices = X[:, feature_idx] >= threshold

                # Ensure minimum samples in each split
                if np.sum(left_indices) < self.min_samples_split or np.sum(right_indices) < self.min_samples_split:
                    continue

                # Calculate sums of gradients and hessians for left and right children
                G_L = np.sum(g[left_indices])
                H_L = np.sum(h[left_indices])
                G_R = np.sum(g[right_indices])
                H_R = np.sum(h[right_indices])

                # Calculate gain
                current_gain = self._calculate_gain(G_L, H_L, G_R, H_R)

                # Update best split if current_gain is better
                if current_gain > best_gain:
                    best_gain = current_gain
                    best_feature_idx = feature_idx
                    best_threshold = threshold

        # A split is only considered if it results in a positive gain after complexity cost
        # Here, we assume gain includes complexity cost implicitly if self.gamma is used in _calculate_gain
        # For now, let's just return if best_gain is significantly positive
        if best_gain <= 0: # A common heuristic is to require positive gain
            return None, None, None

        return best_feature_idx, best_threshold, best_gain

    def _calculate_leaf_value(self, g, h):
        # Sum of gradients and hessians for the current leaf node
        G = np.sum(g)
        H = np.sum(h)

        # Apply L1 regularization (reg_alpha)
        if self.reg_alpha > 0:
            if G > self.reg_alpha:
                G -= self.reg_alpha
            elif G < -self.reg_alpha:
                G += self.reg_alpha
            else:
                G = 0 # If |G| <= reg_alpha, G effectively becomes 0 due to L1 regularization

        # Calculate leaf value (gamma) with L2 regularization (reg_lambda)
        if (H + self.reg_lambda) > 0:
            leaf_value = -G / (H + self.reg_lambda)
        else:
            leaf_value = 0 # Avoid division by zero, though H should generally be positive

        return leaf_value

    # (Placeholder for future methods: _grow_tree, fit, predict_one_sample, predict)

print("DecisionTree class updated with _calculate_leaf_value method.")

DecisionTree class updated with _calculate_leaf_value method.


In [5]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.root = None

    def _calculate_gain(self, G_L, H_L, G_R, H_R):
        # Gain calculation for XGBoost
        def loss_term(G, H):
            return -0.5 * ((G**2) / (H + self.reg_lambda)) if (H + self.reg_lambda) > 0 else 0

        gain = loss_term(G_L, H_L) + loss_term(G_R, H_R) - loss_term(G_L + G_R, H_L + H_R)
        return gain

    def _find_best_split(self, X, g, h):
        best_gain = -np.inf
        best_feature_idx = None
        best_threshold = None

        if X.shape[0] < self.min_samples_split:
            return None, None, None, None

        for feature_idx in range(X.shape[1]):
            thresholds = np.unique(X[:, feature_idx])
            if len(thresholds) <= 1:
                continue

            for threshold in thresholds:
                left_indices = X[:, feature_idx] < threshold
                right_indices = X[:, feature_idx] >= threshold

                if np.sum(left_indices) < self.min_samples_split or np.sum(right_indices) < self.min_samples_split:
                    continue

                G_L = np.sum(g[left_indices])
                H_L = np.sum(h[left_indices])
                G_R = np.sum(g[right_indices])
                H_R = np.sum(h[right_indices])

                current_gain = self._calculate_gain(G_L, H_L, G_R, H_R)

                if current_gain > best_gain:
                    best_gain = current_gain
                    best_feature_idx = feature_idx
                    best_threshold = threshold

        # Return best_gain as well to check for stopping conditions later
        if best_gain <= 0 or np.isinf(best_gain):
            return None, None, None, None

        return best_feature_idx, best_threshold, best_gain

    def _calculate_leaf_value(self, g, h):
        G = np.sum(g)
        H = np.sum(h)

        if self.reg_alpha > 0:
            if G > self.reg_alpha:
                G -= self.reg_alpha
            elif G < -self.reg_alpha:
                G += self.reg_alpha
            else:
                G = 0

        if (H + self.reg_lambda) > 0:
            leaf_value = -G / (H + self.reg_lambda)
        else:
            leaf_value = 0

        return leaf_value

    def _grow_tree(self, X, g, h, depth):
        # Stopping conditions
        if depth >= self.max_depth or X.shape[0] < self.min_samples_split:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        feature_idx, threshold, gain = self._find_best_split(X, g, h)

        # If no beneficial split is found, create a leaf node
        if feature_idx is None or gain <= 0:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        # Split data
        left_indices = X[:, feature_idx] < threshold
        right_indices = X[:, feature_idx] >= threshold

        # Recursively grow left and right subtrees
        left_child = self._grow_tree(X[left_indices], g[left_indices], h[left_indices], depth + 1)
        right_child = self._grow_tree(X[right_indices], g[right_indices], h[right_indices], depth + 1)

        return Node(feature_idx=feature_idx, threshold=threshold, left=left_child, right=right_child, gain=gain)

    # (Placeholder for future methods: fit, predict_one_sample, predict)

print("DecisionTree class updated with _grow_tree method.")


DecisionTree class updated with _grow_tree method.


In [6]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.root = None

    def _calculate_gain(self, G_L, H_L, G_R, H_R):
        def loss_term(G, H):
            return -0.5 * ((G**2) / (H + self.reg_lambda)) if (H + self.reg_lambda) > 0 else 0

        gain = loss_term(G_L, H_L) + loss_term(G_R, H_R) - loss_term(G_L + G_R, H_L + H_R)
        return gain

    def _find_best_split(self, X, g, h):
        best_gain = -np.inf
        best_feature_idx = None
        best_threshold = None

        if X.shape[0] < self.min_samples_split:
            return None, None, None, None

        for feature_idx in range(X.shape[1]):
            thresholds = np.unique(X[:, feature_idx])
            if len(thresholds) <= 1:
                continue

            for threshold in thresholds:
                left_indices = X[:, feature_idx] < threshold
                right_indices = X[:, feature_idx] >= threshold

                if np.sum(left_indices) < self.min_samples_split or np.sum(right_indices) < self.min_samples_split:
                    continue

                G_L = np.sum(g[left_indices])
                H_L = np.sum(h[left_indices])
                G_R = np.sum(g[right_indices])
                H_R = np.sum(h[right_indices])

                current_gain = self._calculate_gain(G_L, H_L, G_R, H_R)

                if current_gain > best_gain:
                    best_gain = current_gain
                    best_feature_idx = feature_idx
                    best_threshold = threshold

        if best_gain <= 0 or np.isinf(best_gain):
            return None, None, None, None

        return best_feature_idx, best_threshold, best_gain

    def _calculate_leaf_value(self, g, h):
        G = np.sum(g)
        H = np.sum(h)

        if self.reg_alpha > 0:
            if G > self.reg_alpha:
                G -= self.reg_alpha
            elif G < -self.reg_alpha:
                G += self.reg_alpha
            else:
                G = 0

        if (H + self.reg_lambda) > 0:
            leaf_value = -G / (H + self.reg_lambda)
        else:
            leaf_value = 0

        return leaf_value

    def _grow_tree(self, X, g, h, depth):
        if depth >= self.max_depth or X.shape[0] < self.min_samples_split:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        feature_idx, threshold, gain = self._find_best_split(X, g, h)

        if feature_idx is None or gain <= 0:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        left_indices = X[:, feature_idx] < threshold
        right_indices = X[:, feature_idx] >= threshold

        left_child = self._grow_tree(X[left_indices], g[left_indices], h[left_indices], depth + 1)
        right_child = self._grow_tree(X[right_indices], g[right_indices], h[right_indices], depth + 1)

        return Node(feature_idx=feature_idx, threshold=threshold, left=left_child, right=right_child, gain=gain)

    def fit(self, X, g, h):
        self.root = self._grow_tree(X, g, h, depth=0)
        print("Decision tree fitted with gradients and Hessians.")

    # (Placeholder for future methods: predict_one_sample, predict)

print("DecisionTree class updated with fit method.")

DecisionTree class updated with fit method.


In [7]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.root = None

    def _calculate_gain(self, G_L, H_L, G_R, H_R):
        def loss_term(G, H):
            return -0.5 * ((G**2) / (H + self.reg_lambda)) if (H + self.reg_lambda) > 0 else 0

        gain = loss_term(G_L, H_L) + loss_term(G_R, H_R) - loss_term(G_L + G_R, H_L + H_R)
        return gain

    def _find_best_split(self, X, g, h):
        best_gain = -np.inf
        best_feature_idx = None
        best_threshold = None

        if X.shape[0] < self.min_samples_split:
            return None, None, None, None

        for feature_idx in range(X.shape[1]):
            thresholds = np.unique(X[:, feature_idx])
            if len(thresholds) <= 1:
                continue

            for threshold in thresholds:
                left_indices = X[:, feature_idx] < threshold
                right_indices = X[:, feature_idx] >= threshold

                if np.sum(left_indices) < self.min_samples_split or np.sum(right_indices) < self.min_samples_split:
                    continue

                G_L = np.sum(g[left_indices])
                H_L = np.sum(h[left_indices])
                G_R = np.sum(g[right_indices])
                H_R = np.sum(h[right_indices])

                current_gain = self._calculate_gain(G_L, H_L, G_R, H_R)

                if current_gain > best_gain:
                    best_gain = current_gain
                    best_feature_idx = feature_idx
                    best_threshold = threshold

        if best_gain <= 0 or np.isinf(best_gain):
            return None, None, None, None

        return best_feature_idx, best_threshold, best_gain

    def _calculate_leaf_value(self, g, h):
        G = np.sum(g)
        H = np.sum(h)

        if self.reg_alpha > 0:
            if G > self.reg_alpha:
                G -= self.reg_alpha
            elif G < -self.reg_alpha:
                G += self.reg_alpha
            else:
                G = 0

        if (H + self.reg_lambda) > 0:
            leaf_value = -G / (H + self.reg_lambda)
        else:
            leaf_value = 0

        return leaf_value

    def _grow_tree(self, X, g, h, depth):
        if depth >= self.max_depth or X.shape[0] < self.min_samples_split:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        feature_idx, threshold, gain = self._find_best_split(X, g, h)

        if feature_idx is None or gain <= 0:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        left_indices = X[:, feature_idx] < threshold
        right_indices = X[:, feature_idx] >= threshold

        left_child = self._grow_tree(X[left_indices], g[left_indices], h[left_indices], depth + 1)
        right_child = self._grow_tree(X[right_indices], g[right_indices], h[right_indices], depth + 1)

        return Node(feature_idx=feature_idx, threshold=threshold, left=left_child, right=right_child, gain=gain)

    def fit(self, X, g, h):
        self.root = self._grow_tree(X, g, h, depth=0)
        print("Decision tree fitted with gradients and Hessians.")

    def predict_one_sample(self, x):
        node = self.root
        while node.value is None: # While it's not a leaf node
            if x[node.feature_idx] < node.threshold:
                node = node.left
            else:
                node = node.right
        return node.value

    # (Placeholder for future method: predict)

print("DecisionTree class updated with predict_one_sample method.")

DecisionTree class updated with predict_one_sample method.


In [8]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.root = None

    def _calculate_gain(self, G_L, H_L, G_R, H_R):
        def loss_term(G, H):
            return -0.5 * ((G**2) / (H + self.reg_lambda)) if (H + self.reg_lambda) > 0 else 0

        gain = loss_term(G_L, H_L) + loss_term(G_R, H_R) - loss_term(G_L + G_R, H_L + H_R)
        return gain

    def _find_best_split(self, X, g, h):
        best_gain = -np.inf
        best_feature_idx = None
        best_threshold = None

        if X.shape[0] < self.min_samples_split:
            return None, None, None, None

        for feature_idx in range(X.shape[1]):
            thresholds = np.unique(X[:, feature_idx])
            if len(thresholds) <= 1:
                continue

            for threshold in thresholds:
                left_indices = X[:, feature_idx] < threshold
                right_indices = X[:, feature_idx] >= threshold

                if np.sum(left_indices) < self.min_samples_split or np.sum(right_indices) < self.min_samples_split:
                    continue

                G_L = np.sum(g[left_indices])
                H_L = np.sum(h[left_indices])
                G_R = np.sum(g[right_indices])
                H_R = np.sum(h[right_indices])

                current_gain = self._calculate_gain(G_L, H_L, G_R, H_R)

                if current_gain > best_gain:
                    best_gain = current_gain
                    best_feature_idx = feature_idx
                    best_threshold = threshold

        if best_gain <= 0 or np.isinf(best_gain):
            return None, None, None, None

        return best_feature_idx, best_threshold, best_gain

    def _calculate_leaf_value(self, g, h):
        G = np.sum(g)
        H = np.sum(h)

        if self.reg_alpha > 0:
            if G > self.reg_alpha:
                G -= self.reg_alpha
            elif G < -self.reg_alpha:
                G += self.reg_alpha
            else:
                G = 0

        if (H + self.reg_lambda) > 0:
            leaf_value = -G / (H + self.reg_lambda)
        else:
            leaf_value = 0

        return leaf_value

    def _grow_tree(self, X, g, h, depth):
        if depth >= self.max_depth or X.shape[0] < self.min_samples_split:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        feature_idx, threshold, gain = self._find_best_split(X, g, h)

        if feature_idx is None or gain <= 0:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        left_indices = X[:, feature_idx] < threshold
        right_indices = X[:, feature_idx] >= threshold

        left_child = self._grow_tree(X[left_indices], g[left_indices], h[left_indices], depth + 1)
        right_child = self._grow_tree(X[right_indices], g[right_indices], h[right_indices], depth + 1)

        return Node(feature_idx=feature_idx, threshold=threshold, left=left_child, right=right_child, gain=gain)

    def fit(self, X, g, h):
        self.root = self._grow_tree(X, g, h, depth=0)
        # print("Decision tree fitted with gradients and Hessians.") # Removed print to avoid clutter if many trees

    def predict_one_sample(self, x):
        node = self.root
        while node.value is None: # While it's not a leaf node
            if x[node.feature_idx] < node.threshold:
                node = node.left
            else:
                node = node.right
        return node.value

    def predict(self, X):
        return np.array([self.predict_one_sample(x) for x in X])

print("DecisionTree class updated with predict method.")

DecisionTree class updated with predict method.


## Implement XGBoost Class: Initialization and Loss Function

Define the `XGBoost` class, including its constructor to initialize parameters like the number of estimators and learning rate, and methods to compute gradients and Hessians for multiclass classification.


In [9]:
import numpy as np

class XGBoost:
    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0, num_classes=10):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.num_classes = num_classes
        self.models = [] # List to store decision trees for each class
        # Initial prediction (before any trees are added) is log-odds ratio, usually initialized to 0 for all classes
        # For multiclass, this will be an array of zeros for each class.
        self.initial_prediction = np.zeros(self.num_classes)

    def softmax(self, scores):
        # Subtract max for numerical stability
        exp_scores = np.exp(scores - np.max(scores, axis=1, keepdims=True))
        return exp_scores / np.sum(exp_scores, axis=1, keepdims=True)

    def _get_gradients_hessians(self, y_true, y_pred_proba):
        # For multiclass logistic regression (softmax loss)
        # y_true needs to be one-hot encoded
        num_samples = y_true.shape[0]

        # Gradients (first-order derivatives)
        # g_k = y_pred_proba_k - y_true_k
        gradients = y_pred_proba - y_true

        # Hessians (second-order derivatives)
        # h_k = y_pred_proba_k * (1 - y_pred_proba_k) (for binary)
        # For multiclass, diagonal hessians are p_k * (1 - p_k)
        # Off-diagonal are -p_j * p_k
        # XGBoost typically uses diagonal Hessians for simplicity and efficiency
        # h_k = p_k * (1 - p_k) for each class k
        hessians = y_pred_proba * (1 - y_pred_proba)

        return gradients, hessians

print("XGBoost class initialized with constructor, softmax, and _get_gradients_hessians methods.")

XGBoost class initialized with constructor, softmax, and _get_gradients_hessians methods.


In [10]:
import numpy as np
import time

class XGBoost:
    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0, num_classes=10):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.num_classes = num_classes
        self.models = [] # List to store lists of decision trees, one list per class
        self.initial_prediction = np.zeros(self.num_classes)

    def softmax(self, scores):
        exp_scores = np.exp(scores - np.max(scores, axis=1, keepdims=True))
        return exp_scores / np.sum(exp_scores, axis=1, keepdims=True)

    def _get_gradients_hessians(self, y_true_one_hot, y_pred_proba):
        gradients = y_pred_proba - y_true_one_hot
        # For multiclass, diagonal hessians are p_k * (1 - p_k)
        # Using an approximation or more simply, for logistic regression,
        # H_ii = p_i * (1 - p_i). XGBoost often uses this for simplicity.
        hessians = y_pred_proba * (1 - y_pred_proba)
        return gradients, hessians

    def fit(self, X, y):
        start_time = time.time()

        # Convert y to one-hot encoding for multiclass loss calculation
        y_true_one_hot = np.zeros((y.size, self.num_classes))
        y_true_one_hot[np.arange(y.size), y] = 1

        # Initialize raw predictions (logits) to zeros
        # shape: (num_samples, num_classes)
        current_raw_predictions = np.zeros((X.shape[0], self.num_classes))

        # Initialize models list, one list of trees per class
        self.models = [[] for _ in range(self.num_classes)]

        print("Starting XGBoost training...")
        for estimator_idx in range(self.n_estimators):
            # Compute current probabilities
            y_pred_proba = self.softmax(current_raw_predictions)

            # Compute gradients and Hessians for all classes
            gradients, hessians = self._get_gradients_hessians(y_true_one_hot, y_pred_proba)

            for class_idx in range(self.num_classes):
                # Create and train a DecisionTree for the current class using its specific
                # gradients and Hessians
                tree = DecisionTree(
                    max_depth=self.max_depth,
                    min_samples_split=self.min_samples_split,
                    reg_lambda=self.reg_lambda,
                    reg_alpha=self.reg_alpha
                )
                # Pass gradients and Hessians specific to this class
                tree.fit(X, gradients[:, class_idx], hessians[:, class_idx])
                self.models[class_idx].append(tree)

                # Update raw predictions for the current class using the new tree's predictions
                # and the learning rate (shrinkage)
                current_raw_predictions[:, class_idx] += self.learning_rate * tree.predict(X)

            if (estimator_idx + 1) % 10 == 0 or estimator_idx == self.n_estimators - 1:
                print(f"  Estimator {estimator_idx + 1}/{self.n_estimators} trained.")

        self.training_time = time.time() - start_time
        print(f"XGBoost training finished in {self.training_time:.2f} seconds.")


    # (Placeholder for future method: predict)

print("XGBoost class updated with fit method.")

XGBoost class updated with fit method.


## Consolidate All Model Results (Actual & Planned)

Gather all accuracy and training time results for the models that have been executed successfully. Note the execution status and any errors encountered for XGBoost. Identify other models that were planned but not yet implemented. This consolidation will form the factual basis for the LaTeX report.


In [34]:
import numpy as np
import time
import pandas as pd

# --- Data Loading and Preprocessing ---
# Load the training and validation datasets, skipping bad lines
df_train = pd.read_csv('/content/MNIST_train.csv', on_bad_lines='skip')
df_val = pd.read_csv('/content/MNIST_validation.csv', on_bad_lines='skip')

X_train = df_train.drop(columns=['label', 'even']).values
y_train = df_train['label'].values

X_val = df_val.drop(columns=['label', 'even']).values
y_val = df_val['label'].values

# Normalize pixel values to [0, 1]
X_train = X_train / 255.0
X_val = X_val / 255.0

# Convert feature arrays to float and label arrays to int
X_train = X_train.astype('float32')
X_val = X_val.astype('float32')
y_train = y_train.astype('int32')
y_val = y_val.astype('int32')

# --- Node class definition ---
class Node:
    def __init__(self, feature_idx=None, threshold=None, value=None, left=None, right=None, gain=None):
        self.feature_idx = feature_idx
        self.threshold = threshold
        self.left = left
        self.right = right
        self.gain = gain
        self.value = value

# --- DecisionTree class definition ---
class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.root = None

    def _calculate_gain(self, G_L, H_L, G_R, H_R):
        def loss_term(G, H):
            return -0.5 * ((G**2) / (H + self.reg_lambda)) if (H + self.reg_lambda) > 0 else 0

        gain = loss_term(G_L, H_L) + loss_term(G_R, H_R) - loss_term(G_L + G_R, H_L + H_R)
        return gain

    def _find_best_split(self, X, g, h):
        best_gain = -np.inf
        best_feature_idx = None
        best_threshold = None

        if X.shape[0] < self.min_samples_split:
            # Corrected: Return 3 None values if split is not possible
            return None, None, None

        for feature_idx in range(X.shape[1]):
            thresholds = np.unique(X[:, feature_idx])
            if len(thresholds) <= 1:
                continue

            for threshold in thresholds:
                left_indices = X[:, feature_idx] < threshold
                right_indices = X[:, feature_idx] >= threshold

                if np.sum(left_indices) < self.min_samples_split or np.sum(right_indices) < self.min_samples_split:
                    continue

                G_L = np.sum(g[left_indices])
                H_L = np.sum(h[left_indices])
                G_R = np.sum(g[right_indices])
                H_R = np.sum(h[right_indices])

                current_gain = self._calculate_gain(G_L, H_L, G_R, H_R)

                if current_gain > best_gain:
                    best_gain = current_gain
                    best_feature_idx = feature_idx
                    best_threshold = threshold

        if best_gain <= 0 or np.isinf(best_gain):
            # Corrected: Return 3 None values if no beneficial split is found
            return None, None, None

        # Corrected: Always return 3 values
        return best_feature_idx, best_threshold, best_gain

    def _calculate_leaf_value(self, g, h):
        G = np.sum(g)
        H = np.sum(h)

        if self.reg_alpha > 0:
            if G > self.reg_alpha:
                G -= self.reg_alpha
            elif G < -self.reg_alpha:
                G += self.reg_alpha
            else:
                G = 0

        if (H + self.reg_lambda) > 0:
            leaf_value = -G / (H + self.reg_lambda)
        else:
            leaf_value = 0

        return leaf_value

    def _grow_tree(self, X, g, h, depth):
        if depth >= self.max_depth or X.shape[0] < self.min_samples_split:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        # Corrected: Expect 3 values from _find_best_split
        feature_idx, threshold, gain = self._find_best_split(X, g, h)

        if feature_idx is None or gain <= 0:
            leaf_value = self._calculate_leaf_value(g, h)
            return Node(value=leaf_value)

        left_indices = X[:, feature_idx] < threshold
        right_indices = X[:, feature_idx] >= threshold

        left_child = self._grow_tree(X[left_indices], g[left_indices], h[left_indices], depth + 1)
        right_child = self._grow_tree(X[right_indices], g[right_indices], h[right_indices], depth + 1)

        return Node(feature_idx=feature_idx, threshold=threshold, left=left_child, right=right_child, gain=gain)

    def fit(self, X, g, h):
        self.root = self._grow_tree(X, g, h, depth=0)

    def predict_one_sample(self, x):
        node = self.root
        while node.value is None:
            if node.feature_idx is None: # Added this check to prevent infinite loop for malformed trees
                return 0 # Or raise an error
            if x[node.feature_idx] < node.threshold:
                node = node.left
            else:
                node = node.right
        return node.value

    def predict(self, X):
        return np.array([self.predict_one_sample(x) for x in X])

# --- XGBoost class definition ---
class XGBoost:
    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=5, min_samples_split=2, reg_lambda=1.0, reg_alpha=0.0, num_classes=10):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.num_classes = num_classes
        self.models = [] # List to store lists of decision trees, one list per class
        self.initial_prediction = np.zeros(self.num_classes)
        self.training_time = 0.0

    def softmax(self, scores):
        exp_scores = np.exp(scores - np.max(scores, axis=1, keepdims=True))
        return exp_scores / np.sum(exp_scores, axis=1, keepdims=True)

    def _get_gradients_hessians(self, y_true_one_hot, y_pred_proba):
        gradients = y_pred_proba - y_true_one_hot
        hessians = y_pred_proba * (1 - y_pred_proba)
        return gradients, hessians

    def fit(self, X, y):
        start_time = time.time()

        y_true_one_hot = np.zeros((y.size, self.num_classes))
        y_true_one_hot[np.arange(y.size), y] = 1

        current_raw_predictions = np.zeros((X.shape[0], self.num_classes))

        self.models = [[] for _ in range(self.num_classes)]

        print("Starting XGBoost training...")
        for estimator_idx in range(self.n_estimators):
            y_pred_proba = self.softmax(current_raw_predictions)
            gradients, hessians = self._get_gradients_hessians(y_true_one_hot, y_pred_proba)

            for class_idx in range(self.num_classes):
                tree = DecisionTree(
                    max_depth=self.max_depth,
                    min_samples_split=self.min_samples_split,
                    reg_lambda=self.reg_lambda,
                    reg_alpha=self.reg_alpha
                )
                tree.fit(X, gradients[:, class_idx], hessians[:, class_idx])
                self.models[class_idx].append(tree)
                current_raw_predictions[:, class_idx] += self.learning_rate * tree.predict(X)

            if (estimator_idx + 1) % 10 == 0 or estimator_idx == self.n_estimators - 1:
                print(f"  Estimator {estimator_idx + 1}/{self.n_estimators} trained.")

        self.training_time = time.time() - start_time
        print(f"XGBoost training finished in {self.training_time:.2f} seconds.")

    def predict_proba(self, X):
        raw_predictions = np.zeros((X.shape[0], self.num_classes))

        for class_idx in range(self.num_classes):
            for tree in self.models[class_idx]:
                raw_predictions[:, class_idx] += self.learning_rate * tree.predict(X)

        return self.softmax(raw_predictions)

    def predict(self, X):
        probabilities = self.predict_proba(X)
        return np.argmax(probabilities, axis=1)


# Create an instance of the XGBoost model
num_classes_mnist = len(np.unique(y_train))
xgb_model = XGBoost(
    n_estimators=50,
    learning_rate=0.1,
    max_depth=3,
    min_samples_split=2,
    reg_lambda=1.0,
    reg_alpha=0.0,
    num_classes=num_classes_mnist
)

# Train the model
xgb_model.fit(X_train, y_train)

# Make predictions on the validation set
print("Making predictions on the validation set using XGBoost...")
prediction_start_time = time.time()
y_pred_val_xgb = xgb_model.predict(X_val)
xgb_prediction_time = time.time() - prediction_start_time

# Evaluate the model
xgb_accuracy = np.mean(y_val == y_pred_val_xgb)

print(f"\nXGBoost Validation Accuracy: {xgb_accuracy:.4f}")
print(f"XGBoost Training Time: {xgb_model.training_time:.2f} seconds.")
print(f"XGBoost Prediction Time: {xgb_prediction_time:.2f} seconds.")

Starting XGBoost training...


KeyboardInterrupt: 