In [None]:
from collections import Counter
import numpy as np
import matplotlib.pyplot as plt

### Linear Regression

In [None]:
# Linear Regression Normal Equation

theta_best = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(y)


# Linear Regression with MSE

class LinearRegression:
    def __init__(self, learning_rate=.001, n_iters=1000):
        self.lr = learning_rate
        self.n_iters = n_iters
        self.weights = None
        self.bias = None

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0
        for i in range(self.n_iters):
            y_pred = np.dot(X, self.weights) + self.bias
            dw = (1/n_samples) * 2 * np.dot(X.T, (y_pred - y))
            db = (1/n_samples) * 2 * np.sum(y_pred - y)
            self.weights -= self.lr * dw
            self.bias -= self.lr * db
    
    def predict(self, X):
        y_approx = np.dot(X, self.weights) + self.bias
        return y_approx

### Logistic Regression

In [None]:
# Logistic Regression with Log Loss

class LogisticRegression:
    def __init__(self, learning_rate=0.001, n_iters=1000):
        self.lr = learning_rate
        self.n_iters = n_iters
        self.weights = None
        self.bias = None

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0
        for i in range(self.n_iters):
            linear_model = np.dot(X, self.weights) + self.bias
            y_pred = self._sigmoid(linear_model)
            dw = (1/n_samples) * 2 * np.dot(X.T, (y_pred - y))
            db = (1/n_samples) * 2 * np.sum(y_pred - y)
            self.weights -= self.lr * dw
            self.bias -= self.bias * db

    def predict(self, X):
        linear_model = np.dot(X, self.weights) + self.bias
        y_pred = self._sigmoid(linear_model)
        y_pred_cls = [1 if i > 0.5 else 0 for i in y_pred]
        return y_pred_cls

    def _sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

### Support Vector Machine

In [None]:
# SVM

class SVM:
    def __init__(self, lr=0.001, lambda_param=0.01, n_iters=201):
        self.lr = lr
        self.lambda_param = lambda_param
        self.n_iters = n_iters
        self.w = None; self.b = None

    def fit(self, X, y):
        y_ = np.where(y <= 0, -1, 1)  # y is -1 or 1
        n_samples, n_features = X.shape
        self.w = np.zeros(n_features)
        self.b = 0
        for i in range(self.n_iters):
            for idx, x_i in enumerate(X):
                condition = (y_[idx] * (np.dot(x_i, self.w) + self.b) >= 1)
                if condition:
                    dw = 2 * self.lambda_param * self.w 
                    db = 0
                else:
                    dw = 2 * self.lambda_param * self.w - y_[idx] * x_i
                    db = y_[idx]
                self.w -= self.lr * dw
                self.b -= self.lr * db
    
    def predict(self, X):
        linear_output = np.dot(X, self.w) + self.b
        return np.sign(linear_output)

### Decision Tree

In [None]:
# Decision Tree Classifier 
# Implemented with information gain + early stopping + min_samples_split + max_depth + max_features

def entropy(y):
    """Take in y-labels for all samples associated with a node"""
    hist = np.bincount(y)                                                              # count num of occurrences of each non-neg int value in array
    p = hist / len(y)
    return -np.sum([pi*np.log2(pi) for pi in p if pi > 0])                             # sum only real values greater than 0


class Node:
    """
    Each node stores 2 split parameters: the feature to split on and the threshold to split by
    If the node is a parent, also point to a left and right node
    If the node is a leaf, store the most common class label among the training samples as value
    """
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def is_leaf_node(self) -> bool:
        return self.value is not None


class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=100, max_features=None):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.max_features = max_features
        self.root = None

    def fit(self, X, y):
        """Grow the tree"""
        self.max_features = X.shape[1] if not self.max_features else min(X.shape[1], self.max_features)
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        """Recursively grow the tree"""
        n_samples, n_features = X.shape
        n_labels = len(np.unique(y))

        # Stopping criteria, if stopped create and return a leaf node
        if depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split:
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        # Else, continue building out the tree using a greedy search
        feature_idxs = np.random.choice(n_features, self.max_features, replace=False)  # randomly choose feature subset
        best_feature, best_threshold = self._best_criteria(X, y, feature_idxs)
        left_idxs, right_idxs = self._split(X[:, best_feature], best_threshold)
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth+1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth+1)
        return Node(best_feature, best_threshold, left, right)

    def _most_common_label(self, y):
        counter = Counter(y)
        most_common = counter.most_common(1)[0][0]                                  # extract the most common label
        return most_common

    def _best_criteria(self, X, y, feature_idxs):
        """For each feature selected by feature_idxs, loop through all available labels, save the pair (feature, label)
        with the highest information gain as our split criteria"""
        split_idx, split_threshold, best_inf_gain = None, None, -1
        for feat_idx in feature_idxs:
            X_col = X[:, feat_idx]
            thresholds = np.unique(X_col)
            for thresh in thresholds:
                inf_gain = self._information_gain(y, X_col, thresh)                 # technically split on info gain not entropy, but same result
                if inf_gain > best_inf_gain:
                    best_inf_gain = inf_gain
                    split_idx = feat_idx
                    split_threshold = thresh

        return split_idx, split_threshold

    def _information_gain(self, y, X_column, split_threshold):
        # Calculate parent node entropy
        parent_entropy = entropy(y)

        # Generate node split
        left_idxs, right_idxs = self._split(X_column, split_threshold)

        # Calculate child node entropies
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0
        n, n_left, n_right = len(y), len(left_idxs), len(right_idxs)
        entropy_left, entropy_right = entropy(y[left_idxs]), entropy(y[right_idxs])
        child_entropy = (n_left/n) * entropy_left + (n_right/n) * entropy_right      # CART cost func using entropy

        # Return information gain
        info_gain = parent_entropy - child_entropy
        return info_gain

    def _split(self, X_column, split_threshold):
        left_idxs = np.argwhere(X_column <= split_threshold).flatten()
        right_idxs = np.argwhere(X_column > split_threshold).flatten()
        return left_idxs, right_idxs

    def predict(self, X):
        """Traverse the tree"""
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value

        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

### Random Forest

In [None]:
# Random Forest
# Use DecisionTree instances as weak learners

def bootstrap_sample(X, y):
    n_samples = X.shape[0]
    idxs = np.random.choice(n_samples, size=n_samples, replace=True)
    return X[idxs], y[idxs]

def most_common_label(y):
    """Plurality voting scheme"""
    counter = Counter(y)
    most_common = counter.most_common(1)[0][0]  # most common label
    return most_common

class RandomForest:
    """Takes as input num of trees to ensemble, plus all DecisionTree hyperparameters"""
    def __init__(self, n_trees=100, min_samples_split=2, max_depth=100, max_features=None):
        self.n_trees = n_trees
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.max_features = max_features
        self.trees = []
    def fit(self, X, y):
        self.trees = []
        for _ in range(self.n_trees):
            tree = DecisionTree(self.min_samples_split, self.max_depth, self.max_features)
            X_sample, y_sample = bootstrap_sample(X, y)
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)
    def predict(self, X):
        tree_preds = np.array([tree.predict(X) for tree in self.trees])  # each row is a tree's predictions of X
        tree_preds = np.swapaxes(tree_preds, 0, 1)
        y_pred = np.array([most_common_label(tree_pred) for tree_pred in tree_preds])
        return y_pred

### K-Nearest Neighbors

In [None]:
# KNN Classifier

def euclidean_distance(x1, x2):
    return np.sqrt(np.sum((x1 - x2)**2))

class KNN:
    def __init__(self, k):
        self.k = k

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y

    def predict(self, X):
        predicted_labels = [self._predict(x) for x in X]
        return np.array(predicted_labels)

    def _predict(self, x):
        distances = [euclidean_distance(x, x_train) for x_train in self.X_train]
        k_idx = np.argsort(distances)[:self.k]  # indices of k nearest neigbors
        k_neighbor_labels = [self.y_train[idx] for idx in k_idx]
        most_common = Counter(k_neighbor_labels).most_common(1)
        return most_common[0][0]

### Naive Bayes

In [None]:
# Naive Bayes
# Implemented with log-trick and Gaussian PDF from scratch

class NaiveBayes:
    def fit(self, X, y):
        n_samples, n_features = X.shape
        self._classes = np.unique(y)
        n_classes = len(self._classes)
        
        # For each feature/class combo, calculate mean and var
        self._means = np.zeros((n_classes, n_features), dtype=np.float64)
        self._vars = np.zeros((n_classes, n_features), dtype=np.float64)
        
        # For each class, calculate prior
        self._priors = np.zeros(n_classes, dtype=np.float64)
        for idx, c in enumerate(self._classes):
            X_c = X[y == c]                                # extract X samples that are class `c`
            self._means[idx, :] = np.mean(X_c, axis=0)     # find X_c feature means
            self._vars[idx, :] = np.var(X_c, axis=0)       # find X_c feature variances
            self._priors[idx] = X_c.shape[0] / n_samples
    
    def predict(self, X):
        y_pred = [self._predict(x) for x in X]
        return np.array(y_pred)
    
    def _predict(self, x):
        posteriors = []
        
        # Calculate posterior probability for each class
        for idx, _ in enumerate(self._classes):
            prior = np.log(self._priors[idx])
            class_cond = np.sum(np.log(self._gaussian_pdf(idx, x)))
            posterior = class_cond + prior
            posteriors.append(posterior)
        
        # Return class with highest posterior probability
        return self._classes[np.argmax(posteriors)]
    
    def _gaussian_pdf(self, class_idx, x):
        means = self._means[class_idx, :]
        vars = self._vars[class_idx, :]

        # Gaussian PDF formula
        numerator = np.exp(-((x - means) ** 2) / (2 * vars))
        denominator = np.sqrt(2 * np.pi * vars)
        return numerator / denominator

### K-Means

In [None]:
# KMeans
# Doesn't implement transform method to predict on test data, instead retrain on data each time predict called

def euclidean_distance(x1, x2):
    return np.sqrt(np.sum((x1 - x2) ** 2))


class KMeans:
    def __init__(self, K=5, max_iters=100, plot_steps=False):
        self.K = K
        self.max_iters = max_iters
        self.plot_steps = plot_steps

        # List of sample indices for each cluster
        self.clusters = [[] for _ in range(self.K)]

        # The centers (mean feature vector) for each cluster
        self.centroids = []

    def predict(self, X):
        self.X = X
        self.n_samples, self.n_features = X.shape

        # Initialize centroids
        random_sample_idxs = np.random.choice(self.n_samples, self.K, replace=False)
        self.centroids = [self.X[idx] for idx in random_sample_idxs]

        # Optimize clusters
        for _ in range(self.max_iters):
            # Assign samples to closest centroids (create clusters)
            self.clusters = self._create_clusters(self.centroids)

            if self.plot_steps:
                self.plot()

            # Calculate new centroids from the clusters
            centroids_old = self.centroids
            self.centroids = self._get_centroids(self.clusters)

            # Check if clusters have changed
            if self._is_converged(centroids_old, self.centroids):
                break

            # Plot each iteration of KMeans
            if self.plot_steps:
                self.plot()

        # Classify samples as the index of their clusters
        return self._get_cluster_labels(self.clusters)

    def _create_clusters(self, centroids):
        # Assign the samples to the closest centroids to create clusters
        clusters = [[] for _ in range(self.K)]
        for idx, sample in enumerate(self.X):
            centroid_idx = self._closest_centroid(sample, centroids)
            clusters[centroid_idx].append(idx)
        return clusters

    def _closest_centroid(self, sample, centroids):
        # Distance of the current sample to each centroid
        distances = [euclidean_distance(sample, point) for point in centroids]
        closest_index = np.argmin(distances)
        return closest_index

    def _get_centroids(self, clusters):
        # Assign mean value of clusters to centroids
        centroids = np.zeros((self.K, self.n_features))
        for cluster_idx, cluster in enumerate(clusters):
            cluster_mean = np.mean(self.X[cluster], axis=0)  # remember each cluster is just a list of sample idxes that belong to that cluster
            centroids[cluster_idx] = cluster_mean
        return centroids

    def _is_converged(self, centroids_old, centroids_new):
        # Distances between each old and new centroids, for all centroids
        distances = [euclidean_distance(centroids_old[i], centroids_new[i]) for i in range(self.K)]
        return sum(distances) == 0

    def _get_cluster_labels(self, clusters):
        # Each sample will get the label of the cluster it was assigned to
        labels = np.empty(self.n_samples)

        for cluster_idx, cluster in enumerate(clusters):
            for sample_index in cluster:
                labels[sample_index] = cluster_idx
        return labels

    def plot(self):
        fig, ax = plt.subplots(figsize=(12, 8))

        for i, index in enumerate(self.clusters):
            point = self.X[index].T
            ax.scatter(*point)

        for point in self.centroids:
            ax.scatter(*point, marker="x", color="black", linewidth=2)

        plt.show()