In [None]:
import numpy as np

# KMeans Class
class KMeans:
    def train(self, hyperparameters, X, y=None):
        # Validate and extract hyperparameters
        K = hyperparameters.get('K')
        tau = hyperparameters.get('tau')
        max_iterations = hyperparameters.get('max_iterations')
        if not isinstance(K, int) or K <= 0:
            raise ValueError("Number of clusters K must be a positive integer.")
        if not isinstance(tau, (float, int)) or tau < 0:
            raise ValueError("Convergence threshold tau must be a non-negative number.")
        if not isinstance(max_iterations, int) or max_iterations <= 0:
            raise ValueError("Maximum iterations must be a positive integer.")
        # Get dataset dimensions
        n_samples, n_features = X.shape
        # Add a column for cluster assignments to X
        X = np.hstack((X, np.zeros((n_samples, 1))))
        # Initialize centroids (K x n_features)
        centroids = np.random.rand(K, n_features)
        # Initialize vector to count number of samples per cluster
        cluster_sizes = np.zeros(K)

        # Randomly assign initial clusters
        for i in range(n_samples):
            random_cluster = np.random.randint(0, K)
            X[i, -1] = random_cluster
            cluster_sizes[int(random_cluster)] += 1
        # Boolean variable to check convergence
        convergence = False
        # Iterative loop for K-Means
        for iteration in range(max_iterations):
            # Backup the current centroids
            prev_centroids = centroids.copy()
            # Reset centroids and cluster sizes
            centroids = np.zeros((K, n_features))
            cluster_sizes = np.zeros(K)
            # Accumulate feature sums for each cluster
            for i in range(n_samples):
                cluster = int(X[i, -1])
                centroids[cluster] += X[i, :-1]
                cluster_sizes[cluster] += 1
            # Compute new centroids
            for k in range(K):
                if cluster_sizes[k] > 0:
                    centroids[k] *= (1.0 / cluster_sizes[k])  # Average features

            # Check for convergence
            centroid_shift = np.linalg.norm(centroids - prev_centroids)
            if centroid_shift < tau:
                convergence = True
                print(f"Converged after {iteration + 1} iterations.")
                break

        if not convergence:
            print(f"Reached maximum iterations: {max_iterations}.")

        return centroids, X[:, -1]

# SemiSupervisedKMeans Class (inherits from KMeans)
class SemiSupervisedKMeans(KMeans):
    def train(self, hyperparameters, X, y=None):
        # Validate and extract hyperparameters
        K = hyperparameters.get('K')
        tau = hyperparameters.get('tau')
        max_iterations = hyperparameters.get('max_iterations')

        if not isinstance(K, int) or K <= 0:
            raise ValueError("Number of clusters K must be a positive integer.")
        if not isinstance(tau, (float, int)) or tau < 0:
            raise ValueError("Convergence threshold tau must be a non-negative number.")
        if not isinstance(max_iterations, int) or max_iterations <= 0:
            raise ValueError("Maximum iterations must be a positive integer.")

        # Get dataset dimensions
        n_samples, n_features = X.shape

        # Add a column for cluster assignments to X
        X = np.hstack((X, np.zeros((n_samples, 1))))

        # Initialize centroids (K x n_features)
        centroids = np.random.rand(K, n_features)

        # Initialize vector to count number of samples per cluster
        cluster_sizes = np.zeros(K)

        # Initialize the label vector and boolean mask for labeled samples
        labeled_mask = np.zeros(n_samples, dtype=bool)

        if y is not None:
            for i in range(n_samples):
                if y[i] != -1:  # Label assigned
                    X[i, -1] = y[i]  # Assign label from y to X
                    labeled_mask[i] = True  # Mark this sample as labeled
                    cluster_sizes[int(y[i])] += 1  # Increment the cluster count

        # Boolean variable to check convergence
        convergence = False

        # Iterative loop for K-Means
        for iteration in range(max_iterations):
            # Backup the current centroids
            prev_centroids = centroids.copy()

            # Reset centroids and cluster sizes
            centroids = np.zeros((K, n_features))
            cluster_sizes = np.zeros(K)

            # Accumulate feature sums for each cluster, skipping labeled samples
            for i in range(n_samples):
                if labeled_mask[i]:  # Skip samples with labels already assigned
                    continue
                cluster = int(X[i, -1])  # Assigned cluster
                centroids[cluster] += X[i, :-1]  # Add feature vector
                cluster_sizes[cluster] += 1  # Increment cluster count

            # Compute new centroids
            for k in range(K):
                if cluster_sizes[k] > 0:
                    centroids[k] *= (1.0 / cluster_sizes[k])  # Average features

            # Check for convergence
            centroid_shift = np.linalg.norm(centroids - prev_centroids)
            if centroid_shift < tau:
                convergence = True
                print(f"Converged after {iteration + 1} iterations.")
                break

        if not convergence:
            print(f"Reached maximum iterations: {max_iterations}.")

        return centroids, X[:, -1]


Converged after 2 iterations.
Converged after 2 iterations.
Centroids from K-Means: [[-0.04386439 -0.05971545]
 [-0.01432383 -0.06994851]
 [ 0.06182498  0.13776796]]
Centroids from Semi-Supervised K-Means: [[0. 0.]
 [0. 0.]
 [0. 0.]]


In [None]:
import numpy as np

class PCA:
    def __init__(self, n_components):
        self.n_components = n_components
        self.mean_ = None
        self.std_ = None
        self.components_ = None

    def _data_standardization(self, X, mean, std):
        print(f"Performing Data Standardization: (Inputs include mean: {mean}, std: {std})")

        # Step 1: Centering the data: subtract the mean from each feature
        X_centered = X - mean

        # Step 2: Inverse of the standard deviations: element-wise inverse of std
        sigma_inverted = np.power(std, -1)  # Element-wise inverse of std

        # Step 3: Create a diagonal matrix with the inverted standard deviations
        sigma_inverted_diagonal = np.diag(sigma_inverted)

        # Step 4: Multiply centered data by the diagonal matrix to scale the data
        X_standardized = X_centered.dot(sigma_inverted_diagonal)

        return X_standardized

    def _data_undo_standardization(self, X_standardized, mean, std):

        print(f"Performing Data Undo Standardization: (Inputs include mean: {mean}, std: {std})")

        sigma_diagonal = np.diag(std)  # Create a diagonal matrix of the standard deviations
        X_scaled = X_standardized.dot(sigma_diagonal)  # Multiply by the diagonal matrix

        mean_tiled = np.tile(mean, (X_scaled.shape[0], 1))  # Tile the mean vector to match the number of rows in X_scaled
        X_repositioned = X_scaled + mean_tiled  # Add the mean back to the scaled data

        return X_repositioned

    def train(self, X):
        # Compute the mean and standard deviation per feature
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)

        # Standardize the data
        X_standardized = self._data_standardization(X, self.mean_, self.std_)

        # Compute the SVD for PCA
        U, S, Vt = np.linalg.svd(X_standardized, full_matrices=False)
        self.components_ = Vt[:self.n_components]

    def predict(self, X):
        # Check if the model has been trained
        if self.mean_ is None or self.std_ is None or self.components_ is None:
            raise ValueError("Model has not been trained yet. Please train the model before prediction.")

        # Standardize the input X
        X_standardized = self._data_standardization(X, self.mean_, self.std_)

        # Data compression (multiply the learned parameter matrix by Xstandardized transpose)
        Xcompressed_standardized = X_standardized.dot(self.components_.T)

        # Data decompression (transpose the learned parameter matrix and multiply by compressed data)
        Xdecompressed_standardized = Xcompressed_standardized.dot(self.components_)

        # Ensure that Xdecompressed_standardized has the same shape as X_standardized
        if Xdecompressed_standardized.shape != X_standardized.shape:
            Xdecompressed_standardized = Xdecompressed_standardized.T

        # Undo the standardization to return Xdecompressed in original units
        Xdecompressed = self._data_undo_standardization(Xdecompressed_standardized, self.mean_, self.std_)

        # Return the decompressed data
        return Xdecompressed


Performing Data Standardization: (Inputs include mean: [0.48331931 0.57266763 0.46773823 0.53104806 0.51521381], std: [0.29231337 0.28721236 0.31493959 0.26748174 0.29296276])
Performing Data Standardization: (Inputs include mean: [0.48331931 0.57266763 0.46773823 0.53104806 0.51521381], std: [0.29231337 0.28721236 0.31493959 0.26748174 0.29296276])
Performing Data Undo Standardization: (Inputs include mean: [0.48331931 0.57266763 0.46773823 0.53104806 0.51521381], std: [0.29231337 0.28721236 0.31493959 0.26748174 0.29296276])
Decompressed Data: [[0.79932575 0.54793866 0.4254587  0.25296237 0.8672916 ]
 [0.72641533 0.52250547 0.42233399 0.3519555  0.74578692]
 [0.56685095 0.41612752 0.39451781 0.62533458 0.41428127]
 [0.57121708 0.48133786 0.4210473  0.54816564 0.50392409]
 [0.97845885 0.31986713 0.31295499 0.33476638 0.7900372 ]
 [0.52097974 0.3435092  0.3691339  0.75094777 0.26461435]
 [0.42568307 0.75639119 0.54957583 0.38129928 0.68277475]
 [0.28053348 0.64184301 0.51691825 0.64987