In [2]:
import numpy as np
import math
import functools

# Linear Regression
### Time: O(2mn+2mn²+n³)
### Space: O(n * m)

In [20]:
class MyLinearRegression:
    def __init__(self, fit_intercept=True):
        self.fit_intercept = fit_intercept
        self.coefficients = None
        self.intercept = None

    def fit(self, X, y):
        if self.fit_intercept:
            X = np.concatenate([np.ones((X.shape[0], 1)), X], axis=1)

        self.coefficients = np.linalg.inv(X.T @ X) @ X.T @ y
        if self.fit_intercept:
            self.intercept = self.coefficients[0]
            self.coefficients = self.coefficients[1:]
        return self

    def predict(self, X):
        if self.fit_intercept:
            X = np.concatenate([np.ones((X.shape[0], 1)), X], axis=1)
        return X @ np.concatenate([[self.intercept], self.coefficients])

# Logistic Regression
### Time: O(n_iterations * n_samples * n_features)
### Space: O(n_samples * n_features) 

In [41]:
class Logistic_regression:
    def __init__(self, learning_rate=0.01, n_iters=1000):
        self.learning_rate = learning_rate
        self.n_iters = n_iters
        self.weights = None
        self.bias = None
        self.losses = []

    def _sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    def compute_loss(self, y_true, y_pred):
        # binary cross entropy
        epsilon = 1e-9
        y1 = y_true * np.log(y_pred + epsilon)
        y2 = (1 - y_true) * np.log(1 - y_pred + epsilon)
        return -np.mean(y1+y2)

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0

        #Gradient Descent
        for _ in range(self.n_iters):
            linear_model = np.dot(X, self.weights) + self.bias
            y_predicted = self._sigmoid(linear_model)

            dw = (1 / n_samples) * np.dot(X.T, (y_predicted - y))
            db = (1 / n_samples) * np.sum(y_predicted - y)

            self.weights -= self.learning_rate * dw 
            self.bias -= self.learning_rate * db     
        return self
    
    def predict(self, X, threshold = 0.5):
        linear_model = np.dot(X, self.weights) + self.bias
        y_predicted = self._sigmoid(linear_model)
        return [1 if i > threshold else 0 for i in y_predicted]

In [62]:
def eval_metrics(predictions, y_test):
    if len(predictions) != len(y_test):
        return ValueError
    # Calculate Accuracy, Precision, Recall, Specificity
    TP = 0
    TN = 0 
    FP = 0 
    FN = 0 
    for i in range(len(predictions)):
        if predictions[i] == y_test[i]:
            if predictions[i] == 1:
                TP += 1
            else:
                TN += 1
        else:
            if predictions[i] == 1:
                FP += 1
            else:
                FN += 1
                
    # Accuracy = (TP+TN) / (TP+TN+FP+FN)
    Accuracy = (TP+TN) / (TP+TN+FP+FN)

    # Precision
    Precision = TP / (TP+FP)

    # Recall
    Recall = TP / (TP+FN)

    # Specificity
    Specificity = TN/(TN+FP)

    return Accuracy, Precision, Recall, Specificity

# K Nearest Neighbors
### n= number of training examples, m=number of test samples, d= number of dimensions of the data, k= number of neighbors
### Time: training = O(1), predict w/ heap/argsort  = O(m * n * log k)
### Space: train: O(n * d), predict: O(m*d)

In [94]:
import heapq
from collections import Counter
from time import time


class knn_classifier:
    def __init__(self, k=5, algorithm = 'heapq'):
        self.k = k
        self.algorithm = algorithm

    def fit(self, X_train, y_train):
        self.X_train = X_train
        self.y_train = y_train

    def euclidean_distance(self, x1, x2):
        return np.sqrt(np.sum((x1 - x2)**2))

    def predict(self, X_test):
        y_pred = []
        if self.algorithm == 'heapq':
            for x_test in X_test:
                heap = []
                for i, x_train in enumerate(self.X_train):
                    distances = self.euclidean_distance(x_test, x_train)
                    if len(heap) < self.k:
                        heapq.heappush(heap, (-distances, self.y_train[i]))
                    else:
                        heapq.heappushpop(heap, (-distances, self.y_train[i]))
                nearest_labels = [label  for j,label in heap]
                most_common_label = Counter(nearest_labels).most_common(1)[0][0]
                y_pred.append(most_common_label)
        else: 
            for x_test in X_test:
                distances = [self.euclidean_distance(x_test, x_train) for x_train in self.X_train]
                nearest_indices = np.argsort(distances)[:self.k]
                nearest_labels = [self.y_train[i] for i in nearest_indices]
                most_common_label = np.argmax(np.bincount(nearest_labels))
                y_pred.append(most_common_label)
        return np.array(y_pred)

# Decision Tree
### n= number of points in the Training set d=dimentionality of the data
### Training Time: O(n*log(n)*d) 
### Space: O(n * d)

In [91]:
import numpy as np
from collections import Counter

class TreeNode():
    def __init__(self, data, feature_idx, feature_val, prediction_probs, information_gain) -> None:
        self.data = data
        self.feature_idx = feature_idx
        self.feature_val = feature_val
        self.prediction_probs = prediction_probs
        self.information_gain = information_gain
        self.feature_importance = self.data.shape[0] * self.information_gain
        self.left = None
        self.right = None

    def node_def(self) -> str:

        if (self.left or self.right):
            return f"NODE | Information Gain = {self.information_gain} | Split IF X[{self.feature_idx}] < {self.feature_val} THEN left O/W right"
        else:
            unique_values, value_counts = np.unique(self.data[:,-1], return_counts=True)
            output = ", ".join([f"{value}->{count}" for value, count in zip(unique_values, value_counts)])            
            return f"LEAF | Label Counts = {output} | Pred Probs = {self.prediction_probs}"
        
class DecisionTree():
    def __init__(self, max_depth=4, min_samples_leaf=1, min_information_gain=0.0, numb_of_features_splitting=None, amount_of_say=None):
        self.max_depth = max_depth
        self.min_samples_leaf = min_samples_leaf
        self.min_information_gain = min_information_gain
        self.numb_of_features_splitting = numb_of_features_splitting
        self.amount_of_say = amount_of_say

    def _entropy(self, class_probabilities: list) -> float:
        return sum([-p * np.log2(p) for p in class_probabilities if p>0])
    
    def _class_probabilities(self, labels: list) -> list:
        total_count = len(labels)
        return [label_count / total_count for label_count in Counter(labels).values()]

    def _data_entropy(self, labels: list) -> float:
        return self._entropy(self._class_probabilities(labels))
    
    def _partition_entropy(self, subsets: list) -> float:
        """subsets = list of label lists (EX: [[1,0,0], [1,1,1])"""
        total_count = sum([len(subset) for subset in subsets])
        return sum([self._data_entropy(subset) * (len(subset) / total_count) for subset in subsets])
    
    def _split(self, data: np.array, feature_idx: int, feature_val: float) -> tuple:
        
        mask_below_threshold = data[:, feature_idx] < feature_val
        group1 = data[mask_below_threshold]
        group2 = data[~mask_below_threshold]

        return group1, group2
    
    def _select_features_to_use(self, data: np.array) -> list:
        feature_idx = list(range(data.shape[1]-1))

        if self.numb_of_features_splitting == "sqrt":
            feature_idx_to_use = np.random.choice(feature_idx, size=int(np.sqrt(len(feature_idx))))
        elif self.numb_of_features_splitting == "log":
            feature_idx_to_use = np.random.choice(feature_idx, size=int(np.log2(len(feature_idx))))
        else:
            feature_idx_to_use = feature_idx

        return feature_idx_to_use
        
    def _find_best_split(self, data: np.array) -> tuple:
        """
        Finds the best split (with the lowest entropy) given data
        Returns 2 splitted groups and split information
        """
        min_part_entropy = 1e9
        feature_idx_to_use =  self._select_features_to_use(data)

        for idx in feature_idx_to_use:
            feature_vals = np.percentile(data[:, idx], q=np.arange(25, 100, 25))
            for feature_val in feature_vals:
                g1, g2, = self._split(data, idx, feature_val)
                part_entropy = self._partition_entropy([g1[:, -1], g2[:, -1]])
                if part_entropy < min_part_entropy:
                    min_part_entropy = part_entropy
                    min_entropy_feature_idx = idx
                    min_entropy_feature_val = feature_val
                    g1_min, g2_min = g1, g2

        return g1_min, g2_min, min_entropy_feature_idx, min_entropy_feature_val, min_part_entropy

    def _find_label_probs(self, data: np.array) -> np.array:
        labels_as_integers = data[:,-1].astype(int)
        # Calculate the total number of labels
        total_labels = len(labels_as_integers)
        # Calculate the ratios (probabilities) for each label
        label_probabilities = np.zeros(len(self.labels_in_train), dtype=float)

        # Populate the label_probabilities array based on the specific labels
        for i, label in enumerate(self.labels_in_train):
            label_index = np.where(labels_as_integers == i)[0]
            if len(label_index) > 0:
                label_probabilities[i] = len(label_index) / total_labels

        return label_probabilities

    def _create_tree(self, data: np.array, current_depth: int) -> TreeNode:
        # Check if the max depth has been reached (stopping criteria)
        if current_depth > self.max_depth:
            return None
        
        # Find best split
        split_1_data, split_2_data, split_feature_idx, split_feature_val, split_entropy = self._find_best_split(data)
        
        # Find label probs for the node
        label_probabilities = self._find_label_probs(data)

        # Calculate information gain
        node_entropy = self._entropy(label_probabilities)
        information_gain = node_entropy - split_entropy
        
        # Create node
        node = TreeNode(data, split_feature_idx, split_feature_val, label_probabilities, information_gain)

        # Check if the min_samples_leaf has been satisfied (stopping criteria)
        if self.min_samples_leaf > split_1_data.shape[0] or self.min_samples_leaf > split_2_data.shape[0]:
            return node
        # Check if the min_information_gain has been satisfied (stopping criteria)
        elif information_gain < self.min_information_gain:
            return node

        current_depth += 1
        node.left = self._create_tree(split_1_data, current_depth)
        node.right = self._create_tree(split_2_data, current_depth)
        
        return node
    
    def _predict_one_sample(self, X: np.array) -> np.array:
        """Returns prediction for 1 dim array"""
        node = self.tree

        # Finds the leaf which X belongs
        while node:
            pred_probs = node.prediction_probs
            if X[node.feature_idx] < node.feature_val:
                node = node.left
            else:
                node = node.right

        return pred_probs

    def train(self, X_train: np.array, Y_train: np.array) -> None:
        """
        Trains the model with given X and Y datasets
        """

        # Concat features and labels
        self.labels_in_train = np.unique(Y_train)
        train_data = np.concatenate((X_train, np.reshape(Y_train, (-1, 1))), axis=1)

        # Start creating the tree
        self.tree = self._create_tree(data=train_data, current_depth=0)

        # Calculate feature importance
        self.feature_importances = dict.fromkeys(range(X_train.shape[1]), 0)
        self._calculate_feature_importance(self.tree)
        # Normalize the feature importance values
        self.feature_importances = {k: v / total for total in (sum(self.feature_importances.values()),) for k, v in self.feature_importances.items()}

    def predict_proba(self, X_set: np.array) -> np.array:
        """Returns the predicted probs for a given data set"""
        pred_probs = np.apply_along_axis(self._predict_one_sample, 1, X_set)
        return pred_probs

    def predict(self, X_set: np.array) -> np.array:
        """Returns the predicted labels for a given data set"""
        pred_probs = self.predict_proba(X_set)
        preds = np.argmax(pred_probs, axis=1)
        return preds    
        
    def _print_recursive(self, node: TreeNode, level=0) -> None:
        if node != None:
            self._print_recursive(node.left, level + 1)
            print('    ' * 4 * level + '-> ' + node.node_def())
            self._print_recursive(node.right, level + 1)

    def print_tree(self) -> None:
        self._print_recursive(node=self.tree)

    def _calculate_feature_importance(self, node):
        """Calculates the feature importance by visiting each node in the tree recursively"""
        if node != None:
            self.feature_importances[node.feature_idx] += node.feature_importance
            self._calculate_feature_importance(node.left)
            self._calculate_feature_importance(node.right) 

In [95]:
class DecisionTreeClassifier:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def _build_tree(self, X, y, depth=0):
        num_samples, num_features = X.shape
        num_classes = len(set(y))

        # Check termination conditions
        if (self.max_depth is not None and depth >= self.max_depth) or num_samples < self.min_samples_split or num_classes == 1:
            return TreeNode(is_leaf=True, prediction=self._majority_class(y))

        # Find the best split
        best_feature, best_threshold = self._find_best_split(X, y)

        # Create the left and right child nodes
        left_mask = X[:, best_feature] < best_threshold
        right_mask = ~left_mask
        left_child = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        right_child = self._build_tree(X[right_mask], y[right_mask], depth + 1)

        # Create the current node
        return TreeNode(feature=best_feature, threshold=best_threshold, left=left_child, right=right_child)

    def _find_best_split(self, X, y):
        best_gain = 0
        best_feature = None
        best_threshold = None

        for feature in range(X.shape[1]):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                gain = self._information_gain(X, y, feature, threshold)
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold

        return best_feature, best_threshold

    def _information_gain(self, X, y, feature, threshold):
        # Compute the entropy of the parent node
        parent_entropy = self._entropy(y)

        # Compute the entropy of the left and right child nodes
        left_mask = X[:, feature] < threshold
        right_mask = ~left_mask
        left_entropy = self._entropy(y[left_mask])
        right_entropy = self._entropy(y[right_mask])

        # Compute the information gain
        left_weight = np.sum(left_mask) / len(y)
        right_weight = np.sum(right_mask) / len(y)
        information_gain = parent_entropy - (left_weight * left_entropy + right_weight * right_entropy)

        return information_gain

    def _entropy(self, y):
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / len(y)
        return -np.sum(probabilities * np.log(probabilities))

    def _majority_class(self, y):
        return np.argmax(np.bincount(y))

    def predict(self, X):
        return [self._predict_single(x, self.root) for x in X]

    def _predict_single(self, x, node):
        if node.is_leaf:
            return node.prediction
        elif x[node.feature] < node.threshold:
            return self._predict_single(x, node.left)
        else:
            return self._predict_single(x, node.right)

class TreeNode:
    def __init__(self, feature=None, threshold=None, left=None, right=None, is_leaf=False, prediction=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.is_leaf = is_leaf
        self.prediction = prediction

In [114]:
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression

# Prepare the data
X = np.array([10, 20, 30, 40, 50])
y = np.array([100, 300, 500, 700, 900])

X_log = np.log(X).reshape(-1,1)
y_log = np.log(y)
print(X_log, y_log)
# Train the multiplicative regression model
model = LinearRegression()
model.fit(X_log, y_log)

# Interpret the coefficients
coefficients = np.exp(model.coef_)
print("Multiplicative coefficients:", coefficients)

# Make predictions
X_new = np.array([60, 70, 80])
X_new_log = np.log(X_new).reshape(-1,1)
y_new_log = model.predict(X_new_log)
y_new = np.exp(y_new_log)
print("Predicted values:", y_new)

[[2.30258509]
 [2.99573227]
 [3.40119738]
 [3.68887945]
 [3.91202301]] [4.60517019 5.70378247 6.2146081  6.55108034 6.80239476]
Multiplicative coefficients: [3.91186704]
Predicted values: [1228.25735218 1515.67393416 1818.47633529]
