In [4]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import time
import psutil

# Function to get CPU utilization and memory consumption
def get_system_status():
    cpu_percent = psutil.cpu_percent()
    memory_percent = psutil.virtual_memory().percent
    return cpu_percent, memory_percent

# Node class for the Decision Tree
class Node:
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, value=None):
        self.feature_index = feature_index  # Index of feature to split on
        self.threshold = threshold          # Threshold value to split on
        self.left = left                    # Left subtree
        self.right = right                  # Right subtree
        self.value = value                  # Class label for leaf nodes

# Decision Tree class
class DecisionTree:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth
        self.root = None

    def fit(self, X, y):
        self.root = self._build_tree(X, y, depth=0)

    def _build_tree(self, X, y, depth):
        num_samples, num_features = X.shape
        unique_classes, counts = np.unique(y, return_counts=True)
        most_common_class = unique_classes[np.argmax(counts)]

        # Stopping criteria
        if depth == self.max_depth or len(np.unique(y)) == 1 or num_samples <= 1:
            return Node(value=most_common_class)

        # Find the best split
        best_split = self._find_best_split(X, y)

        if best_split is not None:
            feature_index, threshold = best_split
            left_mask = X[:, feature_index] <= threshold
            right_mask = ~left_mask

            left_subtree = self._build_tree(X[left_mask], y[left_mask], depth + 1)
            right_subtree = self._build_tree(X[right_mask], y[right_mask], depth + 1)

            return Node(feature_index=feature_index, threshold=threshold, left=left_subtree, right=right_subtree)

        return Node(value=most_common_class)

    def _find_best_split(self, X, y):
        num_samples, num_features = X.shape

        if num_samples <= 1:
            return None

        # Calculate the Gini impurity for the current node
        current_gini = self._gini_impurity(y)

        best_gini = 1
        best_split = None

        for feature_index in range(num_features):
            thresholds = np.unique(X[:, feature_index])
            for threshold in thresholds:
                left_mask = X[:, feature_index] <= threshold
                right_mask = ~left_mask

                if np.sum(left_mask) > 0 and np.sum(right_mask) > 0:
                    left_gini = self._gini_impurity(y[left_mask])
                    right_gini = self._gini_impurity(y[right_mask])

                    weighted_gini = (np.sum(left_mask) / num_samples) * left_gini + \
                                    (np.sum(right_mask) / num_samples) * right_gini

                    if weighted_gini < best_gini:
                        best_gini = weighted_gini
                        best_split = (feature_index, threshold)

        return best_split

    def _gini_impurity(self, y):
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / len(y)
        gini = 1 - np.sum(probabilities**2)
        return gini

    def predict(self, X):
        return np.array([self._predict_tree(x, self.root) for x in X])

    def _predict_tree(self, x, node):
        if node is None:
            return None  # Handle case where the node is None

        if node.value is not None:
            return node.value

        if x[node.feature_index] <= node.threshold:
            return self._predict_tree(x, node.left)
        else:
            return self._predict_tree(x, node.right)

# Random Forest class
class RandomForest:
    def __init__(self, n_estimators=100, max_depth=None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.estimators = [DecisionTree(max_depth=max_depth) for _ in range(n_estimators)]

# Random Forest class
class RandomForest:
    def __init__(self, n_estimators=100, max_depth=None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.estimators = [DecisionTree(max_depth=max_depth) for _ in range(n_estimators)]
        self.training_times = []  # List to store training times

    def fit(self, X, y):
        start_time = time.time()
        for estimator in self.estimators:
            bootstrap_indices = np.random.choice(len(X), len(X), replace=True)
            X_bootstrap, y_bootstrap = X[bootstrap_indices], y[bootstrap_indices]
            estimator.fit(X_bootstrap, y_bootstrap)
        end_time = time.time()
        training_time = end_time - start_time
        self.training_times.append(training_time)
        print(f"Time taken for training: {training_time:.2f} seconds")

    def predict(self, X):
        predictions = np.array([estimator.predict(X) for estimator in self.estimators])

        result = []
        for i in range(predictions.shape[1]):
            column = predictions[:, i]
            valid_values = [val for val in column if val is not None]
            if valid_values:
                result.append(np.bincount(np.array(valid_values).astype(int)).argmax())
            else:
                result.append(0)

        return np.array(result)


# Load MNIST data
mnist = fetch_openml('mnist_784', as_frame=True)
X, y = mnist.data.to_numpy().astype('float32'), mnist.target.astype('int')

# Normalize the pixel values to be in the range [0, 1]
X /= 255.0

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Add a bias term to the input features
X_train_bias = np.c_[np.ones((X_train.shape[0], 1)), X_train]
X_test_bias = np.c_[np.ones((X_test.shape[0], 1)), X_test]

# Initialize the Random Forest model
n_estimators = 100
max_depth = 10
rf_model = RandomForest(n_estimators=n_estimators, max_depth=max_depth)

# Monitoring overall system status
overall_cpu_utilization = []
overall_memory_utilization = []

# Training the Random Forest model
start_time = time.time()
log_list = []

for epoch in range(1):
    # Monitor system status and log
    cpu_percent, memory_percent = get_system_status()
    overall_cpu_utilization.append(cpu_percent)
    overall_memory_utilization.append(memory_percent)
    log_list.append({'Epoch': epoch + 1, 'CPU Utilization': cpu_percent, 'Memory Usage (%)': memory_percent})

# Calculate the time taken for training
training_time = time.time() - start_time
print(f"Time taken for training: {training_time:.2f} seconds")

# Print the overall CPU and memory utilization
print(f"Overall CPU Utilization: {np.mean(overall_cpu_utilization)}%")
print(f"Overall Memory Usage: {np.mean(overall_memory_utilization)}%")

# Make predictions on the test set
rf_predictions = rf_model.predict(X_test)

# Convert predictions to integers
rf_predictions = rf_predictions.astype(int)

# Evaluate accuracy
rf_accuracy = accuracy_score(y_test, rf_predictions)
print(f"Random Forest Accuracy: {rf_accuracy}")


Time taken for training: 0.01 seconds
Overall CPU Utilization: 19.3%
Overall Memory Usage: 66.6%
Random Forest Accuracy: 0.09592857142857143


In [2]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import time
import psutil

# Function to get CPU utilization and memory consumption
def get_system_status():
    cpu_percent = psutil.cpu_percent()
    memory_percent = psutil.virtual_memory().percent
    return cpu_percent, memory_percent

class KNN:
    def __init__(self, k=3):
        self.k = k
        self.X_train = None
        self.y_train = None

    def fit(self, X_train, y_train):
        self.X_train = X_train
        self.y_train = y_train

    def predict(self, X_test):
        predictions = [self._predict(x) for x in X_test]
        return np.array(predictions)

    def _predict(self, x):
        distances = [np.linalg.norm(x - x_train) for x_train in self.X_train]
        k_indices = np.argsort(distances)[:self.k]
        k_nearest_labels = [self.y_train[i] for i in k_indices]
        most_common = np.bincount(k_nearest_labels).argmax()
        return most_common

# Load MNIST data
mnist = fetch_openml('mnist_784', as_frame=True)
X, y = mnist.data.to_numpy().astype('float32'), mnist.target.astype('int')

# Normalize the pixel values to be in the range [0, 1]
X /= 255.0

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize the k-NN model
knn_model = KNN(k=3)

# Monitoring overall system status
overall_cpu_utilization = []
overall_memory_utilization = []

# Training the k-NN model
start_time = time.time()

knn_model.fit(X_train, y_train)

# Monitor system status and log at the start
cpu_percent, memory_percent = get_system_status()
overall_cpu_utilization.append(cpu_percent)
overall_memory_utilization.append(memory_percent)
log_list = [{'Algorithm': 'k-NN', 'CPU Utilization': cpu_percent, 'Memory Usage (%)': memory_percent, 'Training Time': 0.0}]

# Calculate the time taken for training at the end
end_time = time.time()
training_time = end_time - start_time
log_list[0]['Training Time'] = training_time

# Print the overall CPU and memory utilization
print(f"Overall CPU Utilization: {np.mean(overall_cpu_utilization)}%")
print(f"Overall Memory Usage: {np.mean(overall_memory_utilization)}%")

# Print the time taken for training
print(f"Time taken for training: {training_time:.2f} seconds")

# Make predictions on the test set
knn_predictions = knn_model.predict(X_test)

# Evaluate accuracy
knn_accuracy = accuracy_score(y_test, knn_predictions)
print(f"k-NN Accuracy: {knn_accuracy}")


Overall CPU Utilization: 18.4%
Overall Memory Usage: 66.4%
Time taken for training: 0.01 seconds


KeyError: 47232