diff --git a/machine_learning/gradient_descent.py b/machine_learning/gradient_descent.py index 95463faf5635..eb0faff35910 100644 --- a/machine_learning/gradient_descent.py +++ b/machine_learning/gradient_descent.py @@ -1,139 +1,197 @@ """ -Implementation of gradient descent algorithm for minimizing cost of a linear hypothesis -function. +Gradient descent helpers for a simple linear hypothesis function. + +Time complexity: O(iterations * n_samples * n_features) +Space complexity: O(n_features) """ +from __future__ import annotations + +from collections.abc import Sequence + import numpy as np -# List of input, output pairs -train_data = ( - ((5, 2, 3), 15), - ((6, 5, 9), 25), - ((11, 12, 13), 41), - ((1, 1, 1), 8), - ((11, 12, 13), 41), +# List of input, output pairs (bias term handled separately) +train_data: tuple[tuple[tuple[float, ...], float], ...] = ( + ((5.0, 2.0, 3.0), 15.0), + ((6.0, 5.0, 9.0), 25.0), + ((11.0, 12.0, 13.0), 41.0), + ((1.0, 1.0, 1.0), 8.0), + ((11.0, 12.0, 13.0), 41.0), +) +test_data: tuple[tuple[tuple[float, ...], float], ...] = ( + ((515.0, 22.0, 13.0), 555.0), + ((61.0, 35.0, 49.0), 150.0), ) -test_data = (((515, 22, 13), 555), ((61, 35, 49), 150)) -parameter_vector = [2, 4, 1, 5] -m = len(train_data) +parameter_vector: list[float] = [2.0, 4.0, 1.0, 5.0] LEARNING_RATE = 0.009 -def _error(example_no, data_set="train"): - """ - :param data_set: train data or test data - :param example_no: example number whose error has to be checked - :return: error in example pointed by example number. - """ - return calculate_hypothesis_value(example_no, data_set) - output( - example_no, data_set - ) +def _get_dataset(data_set: str) -> tuple[tuple[tuple[float, ...], float], ...]: + """Return the requested dataset or raise for unknown keys.""" + if data_set == "train": + return train_data + if data_set == "test": + return test_data + msg = "data_set must be 'train' or 'test'" + raise ValueError(msg) -def _hypothesis_value(data_input_tuple): +def predict_from_parameters( + parameters: Sequence[float], features: Sequence[float] +) -> float: """ - Calculates hypothesis function value for a given input - :param data_input_tuple: Input tuple of a particular example - :return: Value of hypothesis function at that point. - Note that there is an 'biased input' whose value is fixed as 1. - It is not explicitly mentioned in input data.. But, ML hypothesis functions use it. - So, we have to take care of it separately. Line 36 takes care of it. + Evaluate the linear hypothesis, treating the first coefficient as the bias term. + + >>> predict_from_parameters([1.0, 2.0, -1.0], (3.0, 0.5)) + 6.5 """ - hyp_val = 0 - for i in range(len(parameter_vector) - 1): - hyp_val += data_input_tuple[i] * parameter_vector[i + 1] - hyp_val += parameter_vector[0] - return hyp_val + if len(parameters) != len(features) + 1: + raise ValueError("parameters must include a bias term and match feature count") + return float(parameters[0] + np.dot(parameters[1:], features)) -def output(example_no, data_set): +def output(example_no: int, data_set: str = "train") -> float: """ - :param data_set: test data or train data - :param example_no: example whose output is to be fetched - :return: output for that example + Retrieve the label for an example from the requested dataset. + + >>> output(0, data_set=\"train\") + 15.0 """ - if data_set == "train": - return train_data[example_no][1] - elif data_set == "test": - return test_data[example_no][1] - return None + dataset = _get_dataset(data_set) + return dataset[example_no][1] -def calculate_hypothesis_value(example_no, data_set): +def calculate_hypothesis_value( + example_no: int, + data_set: str = "train", + parameters: Sequence[float] | None = None, +) -> float: """ - Calculates hypothesis value for a given example - :param data_set: test data or train_data - :param example_no: example whose hypothesis value is to be calculated - :return: hypothesis value for that example + Calculate the hypothesis value for a specific example. + + >>> calculate_hypothesis_value(0, parameters=[2.0, 1.0, 0.0, 0.0]) + 7.0 """ - if data_set == "train": - return _hypothesis_value(train_data[example_no][0]) - elif data_set == "test": - return _hypothesis_value(test_data[example_no][0]) - return None + dataset = _get_dataset(data_set) + params = parameter_vector if parameters is None else parameters + return predict_from_parameters(params, dataset[example_no][0]) -def summation_of_cost_derivative(index, end=m): +def _error( + example_no: int, data_set: str = "train", parameters: Sequence[float] | None = None +) -> float: + """Compute the prediction error for one example.""" + return calculate_hypothesis_value(example_no, data_set, parameters) - output( + example_no, data_set + ) + + +def summation_of_cost_derivative( + index: int, + end: int | None = None, + parameters: Sequence[float] | None = None, + data_set: str = "train", + dataset: Sequence[tuple[Sequence[float], float]] | None = None, +) -> float: """ - Calculates the sum of cost function derivative - :param index: index wrt derivative is being calculated - :param end: value where summation ends, default is m, number of examples - :return: Returns the summation of cost derivative - Note: If index is -1, this means we are calculating summation wrt to biased - parameter. + Calculate the summed derivative of the cost function for a parameter index. + + ``index=-1`` represents the bias term. """ - summation_value = 0 - for i in range(end): + working_dataset = _get_dataset(data_set) if dataset is None else dataset + params = parameter_vector if parameters is None else parameters + limit = len(working_dataset) if end is None else end + + summation_value = 0.0 + for i in range(limit): + features, label = working_dataset[i] + error = predict_from_parameters(params, features) - label if index == -1: - summation_value += _error(i) + summation_value += error else: - summation_value += _error(i) * train_data[i][0][index] + summation_value += error * features[index] return summation_value -def get_cost_derivative(index): +def get_cost_derivative( + index: int, + data_set: str = "train", + parameters: Sequence[float] | None = None, + dataset: Sequence[tuple[Sequence[float], float]] | None = None, +) -> float: """ - :param index: index of the parameter vector wrt to derivative is to be calculated - :return: derivative wrt to that index - Note: If index is -1, this means we are calculating summation wrt to biased - parameter. + Return the average cost derivative for one parameter. + + ``index=-1`` represents the bias term. """ - cost_derivative_value = summation_of_cost_derivative(index, m) / m - return cost_derivative_value + working_dataset = _get_dataset(data_set) if dataset is None else dataset + return summation_of_cost_derivative( + index, len(working_dataset), parameters, data_set, working_dataset + ) / len(working_dataset) -def run_gradient_descent(): +def batch_gradient_descent_step( + parameters: Sequence[float], + learning_rate: float, + data: Sequence[tuple[Sequence[float], float]] | None = None, +) -> list[float]: + """ + Perform one batch gradient descent step. + + >>> dataset = (((1.0, 0.0, 0.0), 1.0), ((0.0, 1.0, 0.0), 1.0)) + >>> batch_gradient_descent_step([0.0, 0.0, 0.0, 0.0], 0.1, dataset) + [0.1, 0.05, 0.05, 0.0] + """ + dataset = train_data if data is None else data + updated_parameters: list[float] = [] + for i, parameter in enumerate(parameters): + cost_derivative = get_cost_derivative( + i - 1, data_set="train", parameters=parameters, dataset=dataset + ) + updated_parameters.append(parameter - learning_rate * cost_derivative) + return updated_parameters + + +def run_gradient_descent( + learning_rate: float = LEARNING_RATE, + max_iterations: int = 10_000, + atol: float = 2e-6, + rtol: float = 0.0, +) -> tuple[list[float], int]: + """ + Repeatedly apply gradient descent until the parameter vector stabilizes. + + >>> params, iterations = run_gradient_descent(max_iterations=5) + >>> len(params) + 4 + >>> iterations >= 1 + True + """ global parameter_vector - # Tune these values to set a tolerance value for predicted output - absolute_error_limit = 0.000002 - relative_error_limit = 0 - j = 0 - while True: - j += 1 - temp_parameter_vector = [0, 0, 0, 0] - for i in range(len(parameter_vector)): - cost_derivative = get_cost_derivative(i - 1) - temp_parameter_vector[i] = ( - parameter_vector[i] - LEARNING_RATE * cost_derivative - ) - if np.allclose( - parameter_vector, - temp_parameter_vector, - atol=absolute_error_limit, - rtol=relative_error_limit, - ): + iterations = 0 + current_parameters = parameter_vector[:] + for iteration in range(1, max_iterations + 1): + iterations = iteration + next_parameters = batch_gradient_descent_step(current_parameters, learning_rate) + if np.allclose(current_parameters, next_parameters, atol=atol, rtol=rtol): + current_parameters = next_parameters break - parameter_vector = temp_parameter_vector - print(("Number of iterations:", j)) + current_parameters = next_parameters + + parameter_vector = current_parameters + return current_parameters, iterations -def test_gradient_descent(): +def test_gradient_descent() -> None: + """Run a quick prediction check against the test dataset.""" + params, iterations = run_gradient_descent() + print(f"Converged in {iterations} iterations -> {params}") for i in range(len(test_data)): print(("Actual output value:", output(i, "test"))) print(("Hypothesis output:", calculate_hypothesis_value(i, "test"))) if __name__ == "__main__": - run_gradient_descent() print("\nTesting gradient descent for a linear hypothesis function.\n") test_gradient_descent() diff --git a/machine_learning/k_means_clust.py b/machine_learning/k_means_clust.py index a55153628f9c..5b9fc7b83be2 100644 --- a/machine_learning/k_means_clust.py +++ b/machine_learning/k_means_clust.py @@ -1,57 +1,22 @@ -"""README, Author - Anurag Kumar(mailto:anuragkumarak95@gmail.com) -Requirements: - - sklearn - - numpy - - matplotlib -Python: - - 3.5 -Inputs: - - X , a 2D numpy array of features. - - k , number of clusters to create. - - initial_centroids , initial centroid values generated by utility function(mentioned - in usage). - - maxiter , maximum number of iterations to process. - - heterogeneity , empty list that will be filled with heterogeneity values if passed - to kmeans func. -Usage: - 1. define 'k' value, 'X' features array and 'heterogeneity' empty list - 2. create initial_centroids, - initial_centroids = get_initial_centroids( - X, - k, - seed=0 # seed value for initial centroid generation, - # None for randomness(default=None) - ) - 3. find centroids and clusters using kmeans function. - centroids, cluster_assignment = kmeans( - X, - k, - initial_centroids, - maxiter=400, - record_heterogeneity=heterogeneity, - verbose=True # whether to print logs in console or not.(default=False) - ) - 4. Plot the loss function and heterogeneity values for every iteration saved in - heterogeneity list. - plot_heterogeneity( - heterogeneity, - k - ) - 5. Plot the labeled 3D data points with centroids. - plot_kmeans( - X, - centroids, - cluster_assignment - ) - 6. Transfers Dataframe into excel format it must have feature called - 'Clust' with k means clustering numbers in it. """ +Utilities for a minimal K-Means clustering workflow. + +The implementation uses Euclidean distance, stops once assignments stabilize, and +offers helpers for plotting as well as a small reporting utility for clustered +DataFrames. +Time complexity: O(maxiter * n_samples * k * n_features) +Space complexity: O(n_samples * n_features + k * n_features) +""" + +import os +import tempfile import warnings +from collections.abc import Sequence import numpy as np import pandas as pd -from matplotlib import pyplot as plt +from numpy.typing import NDArray from sklearn.metrics import pairwise_distances warnings.filterwarnings("ignore") @@ -59,8 +24,25 @@ TAG = "K-MEANS-CLUST/ " -def get_initial_centroids(data, k, seed=None): - """Randomly choose k data points as initial centroids""" +def get_initial_centroids( + data: NDArray[np.floating], k: int, seed: int | None = None +) -> NDArray[np.floating]: + """Randomly choose ``k`` rows as initial centroids. + + >>> data = np.arange(12).reshape(6, 2) + >>> centroids = get_initial_centroids(data, k=2, seed=0) + >>> centroids.shape + (2, 2) + >>> set(map(tuple, centroids.tolist())).issubset( + ... set(map(tuple, data.tolist())) + ... ) + True + """ + if k <= 0: + raise ValueError("k must be positive") + if k > len(data): + raise ValueError("k cannot exceed the number of data points") + # useful for obtaining consistent results rng = np.random.default_rng(seed) n = data.shape[0] # number of data points @@ -73,31 +55,47 @@ def get_initial_centroids(data, k, seed=None): # it will carry a nonzero weight in the TF-IDF vector of the centroid. centroids = data[rand_indices, :] - return centroids + return np.asarray(centroids) -def centroid_pairwise_dist(x, centroids): +def centroid_pairwise_dist( + x: NDArray[np.floating], centroids: NDArray[np.floating] +) -> NDArray[np.floating]: + """Compute Euclidean distances between every row in ``x`` and each centroid.""" return pairwise_distances(x, centroids, metric="euclidean") -def assign_clusters(data, centroids): - # Compute distances between each data point and the set of centroids: - # Fill in the blank (RHS only) +def assign_clusters( + data: NDArray[np.floating], centroids: NDArray[np.floating] +) -> np.ndarray: + """Assign each row to its closest centroid. + + >>> data = np.array([[0.0, 0.0], [0.0, 1.0], [5.0, 5.0]]) + >>> assign_clusters(data, np.array([[0.0, 0.0], [4.0, 4.0]])).tolist() + [0, 0, 1] + """ distances_from_centroids = centroid_pairwise_dist(data, centroids) - # Compute cluster assignments for each data point: - # Fill in the blank (RHS only) cluster_assignment = np.argmin(distances_from_centroids, axis=1) return cluster_assignment -def revise_centroids(data, k, cluster_assignment): - new_centroids = [] +def revise_centroids( + data: NDArray[np.floating], k: int, cluster_assignment: np.ndarray +) -> NDArray[np.floating]: + """Recompute centroids as the mean of the assigned samples. + + >>> data = np.array([[0.0, 0.0], [0.0, 1.0], [5.0, 5.0]]) + >>> np.allclose( + ... revise_centroids(data, 2, np.array([0, 0, 1])), + ... np.array([[0.0, 0.5], [5.0, 5.0]]), + ... ) + True + """ + new_centroids: list[NDArray[np.floating]] = [] for i in range(k): - # Select all data points that belong to cluster i. Fill in the blank (RHS only) member_data_points = data[cluster_assignment == i] - # Compute the mean of the data points. Fill in the blank (RHS only) centroid = member_data_points.mean(axis=0) new_centroids.append(centroid) new_centroids = np.array(new_centroids) @@ -105,14 +103,24 @@ def revise_centroids(data, k, cluster_assignment): return new_centroids -def compute_heterogeneity(data, k, centroids, cluster_assignment): +def compute_heterogeneity( + data: NDArray[np.floating], + k: int, + centroids: NDArray[np.floating], + cluster_assignment: np.ndarray, +) -> float: + """Return the within-cluster sum of squared distances. + + >>> data = np.array([[0.0, 0.0], [0.0, 1.0], [5.0, 5.0]]) + >>> centroids = np.array([[0.0, 0.5], [5.0, 5.0]]) + >>> float(compute_heterogeneity(data, 2, centroids, np.array([0, 0, 1]))) + 0.5 + """ heterogeneity = 0.0 for i in range(k): - # Select all data points that belong to cluster i. Fill in the blank (RHS only) member_data_points = data[cluster_assignment == i, :] if member_data_points.shape[0] > 0: # check if i-th cluster is non-empty - # Compute distances from centroid to data points (RHS only) distances = pairwise_distances( member_data_points, [centroids[i]], metric="euclidean" ) @@ -123,6 +131,12 @@ def compute_heterogeneity(data, k, centroids, cluster_assignment): def plot_heterogeneity(heterogeneity, k): + from matplotlib import pyplot as plt + + # Matplotlib tries to create a config directory on import; fall back to a + # temporary location when the default is not writable (e.g. CI sandboxes). + os.environ.setdefault("MPLCONFIGDIR", tempfile.gettempdir()) + plt.figure(figsize=(7, 4)) plt.plot(heterogeneity, linewidth=4) plt.xlabel("# Iterations") @@ -133,6 +147,10 @@ def plot_heterogeneity(heterogeneity, k): def plot_kmeans(data, centroids, cluster_assignment): + from matplotlib import pyplot as plt + + os.environ.setdefault("MPLCONFIGDIR", tempfile.gettempdir()) + ax = plt.axes(projection="3d") ax.scatter(data[:, 0], data[:, 1], data[:, 2], c=cluster_assignment, cmap="viridis") ax.scatter( @@ -146,15 +164,34 @@ def plot_kmeans(data, centroids, cluster_assignment): def kmeans( - data, k, initial_centroids, maxiter=500, record_heterogeneity=None, verbose=False -): - """Runs k-means on given data and initial set of centroids. - maxiter: maximum number of iterations to run.(default=500) - record_heterogeneity: (optional) a list, to store the history of heterogeneity - as function of iterations - if None, do not store the history. - verbose: if True, print how many data points changed their cluster labels in - each iteration""" + data: NDArray[np.floating], + k: int, + initial_centroids: NDArray[np.floating], + maxiter: int = 500, + record_heterogeneity: list[float] | None = None, + verbose: bool = False, +) -> tuple[NDArray[np.floating], np.ndarray]: + """Run k-means on ``data`` starting from ``initial_centroids``. + + The algorithm stops early once all assignments stabilize. Heterogeneity values + are appended to ``record_heterogeneity`` when provided. + + >>> dataset = np.array([[0.0, 0.0], [0.0, 1.0], [5.0, 5.0]]) + >>> heterogeneity: list[float] = [] + >>> centroids, labels = kmeans( + ... dataset, + ... k=2, + ... initial_centroids=np.array([[0.0, 0.0], [5.0, 5.0]]), + ... maxiter=10, + ... record_heterogeneity=heterogeneity, + ... ) + >>> labels.tolist() + [0, 0, 1] + >>> [round(float(value), 3) for value in heterogeneity] + [0.5] + >>> np.allclose(centroids, np.array([[0.0, 0.5], [5.0, 5.0]])) + True + """ centroids = initial_centroids[:] prev_cluster_assignment = None @@ -216,37 +253,29 @@ def kmeans( def report_generator( - predicted: pd.DataFrame, clustering_variables: np.ndarray, fill_missing_report=None + predicted: pd.DataFrame, + clustering_variables: Sequence[str], + fill_missing_report: dict[str, float] | None = None, ) -> pd.DataFrame: """ - Generate a clustering report given these two arguments: - predicted - dataframe with predicted cluster column - fill_missing_report - dictionary of rules on how we are going to fill in missing - values for final generated report (not included in modelling); - >>> predicted = pd.DataFrame() - >>> predicted['numbers'] = [1, 2, 3] - >>> predicted['col1'] = [0.5, 2.5, 4.5] - >>> predicted['col2'] = [100, 200, 300] - >>> predicted['col3'] = [10, 20, 30] - >>> predicted['Cluster'] = [1, 1, 2] - >>> report_generator(predicted, ['col1', 'col2'], 0) - Features Type Mark 1 2 - 0 # of Customers ClusterSize False 2.000000 1.000000 - 1 % of Customers ClusterProportion False 0.666667 0.333333 - 2 col1 mean_with_zeros True 1.500000 4.500000 - 3 col2 mean_with_zeros True 150.000000 300.000000 - 4 numbers mean_with_zeros False 1.500000 3.000000 - .. ... ... ... ... ... - 99 dummy 5% False 1.000000 1.000000 - 100 dummy 95% False 1.000000 1.000000 - 101 dummy stdev False 0.000000 NaN - 102 dummy mode False 1.000000 1.000000 - 103 dummy median False 1.000000 1.000000 - - [104 rows x 5 columns] + Generate a clustering summary report for a labelled ``predicted`` DataFrame. + + This helper groups numeric columns by the ``Cluster`` label, computes summary + statistics, and marks the columns listed in ``clustering_variables``. + + >>> predicted = pd.DataFrame( + ... {'spend': [0.0, 50.0, 100.0], 'Cluster': [0, 0, 1]} + ... ) + >>> report = report_generator(predicted, clustering_variables=['spend']) + >>> float(report.loc[report['Features'] == '# of Customers', 0].iloc[0]) + 2.0 + >>> float(report.loc[report['Features'] == '% of Customers', 1]) + 0.3333333333333333 + >>> bool(report.loc[report['Features'] == 'spend', 'Mark'].iloc[0]) + True """ # Fill missing values with given rules - if fill_missing_report: + if fill_missing_report is not None: predicted = predicted.fillna(value=fill_missing_report) predicted["dummy"] = 1 numeric_cols = predicted.select_dtypes(np.number).columns diff --git a/machine_learning/linear_regression.py b/machine_learning/linear_regression.py index 5b1e663116cc..630d6c90018e 100644 --- a/machine_learning/linear_regression.py +++ b/machine_learning/linear_regression.py @@ -1,11 +1,10 @@ """ Linear regression is the most basic type of regression commonly used for -predictive analysis. The idea is pretty simple: we have a dataset and we have -features associated with it. Features should be chosen very cautiously -as they determine how much our model will be able to make future predictions. -We try to set the weight of these features, over many iterations, so that they best -fit our dataset. In this particular code, I had used a CSGO dataset (ADR vs -Rating). We try to best fit a line through dataset and estimate the parameters. +predictive analysis. The algorithm iteratively updates a weight vector to fit a +line through the data using gradient descent. + +Time complexity: O(iterations * n_samples * n_features) +Space complexity: O(n_features) """ # /// script @@ -15,16 +14,14 @@ # "numpy", # ] # /// +from collections.abc import Sequence import httpx import numpy as np -def collect_dataset(): - """Collect dataset of CSGO - The dataset contains ADR vs Rating of a Player - :return : dataset obtained from the link, as matrix - """ +def collect_dataset() -> np.ndarray: + """Collect a small CSGO dataset (ADR vs Rating) as a NumPy array.""" response = httpx.get( "https://raw.githubusercontent.com/yashLadha/The_Math_of_Intelligence/" "master/Week1/ADRvsRating.csv", @@ -36,19 +33,19 @@ def collect_dataset(): item = item.split(",") data.append(item) data.pop(0) # This is for removing the labels from the list - dataset = np.matrix(data) + dataset = np.array(data, dtype=float) return dataset -def run_steep_gradient_descent(data_x, data_y, len_data, alpha, theta): - """Run steep gradient descent and updates the Feature vector accordingly_ - :param data_x : contains the dataset - :param data_y : contains the output associated with each data-entry - :param len_data : length of the data_ - :param alpha : Learning rate of the model - :param theta : Feature vector (weight's for our model) - ;param return : Updated Feature's, using - curr_features - alpha_ * gradient(w.r.t. feature) +def run_steep_gradient_descent( + data_x: np.ndarray, + data_y: np.ndarray, + len_data: int, + alpha: float, + theta: np.ndarray, +) -> np.ndarray: + """Run steep gradient descent and update the weight vector accordingly. + >>> import numpy as np >>> data_x = np.array([[1, 2], [3, 4]]) >>> data_y = np.array([5, 6]) @@ -67,54 +64,58 @@ def run_steep_gradient_descent(data_x, data_y, len_data, alpha, theta): return theta -def sum_of_square_error(data_x, data_y, len_data, theta): - """Return sum of square error for error calculation - :param data_x : contains our dataset - :param data_y : contains the output (result vector) - :param len_data : len of the dataset - :param theta : contains the feature vector - :return : sum of square error computed from given feature's +def sum_of_square_error( + data_x: np.ndarray, data_y: np.ndarray, len_data: int, theta: np.ndarray +) -> float: + """Return sum of square error for error calculation. Example: >>> vc_x = np.array([[1.1], [2.1], [3.1]]) >>> vc_y = np.array([1.2, 2.2, 3.2]) - >>> round(sum_of_square_error(vc_x, vc_y, 3, np.array([1])),3) - np.float64(0.005) + >>> float(round(sum_of_square_error(vc_x, vc_y, 3, np.array([1])), 3)) + 0.005 """ prod = np.dot(theta, data_x.transpose()) prod -= data_y.transpose() sum_elem = np.sum(np.square(prod)) - error = sum_elem / (2 * len_data) + error: float = sum_elem / (2 * len_data) return error -def run_linear_regression(data_x, data_y): - """Implement Linear regression over the dataset - :param data_x : contains our dataset - :param data_y : contains the output (result vector) - :return : feature for line of best fit (Feature vector) +def run_linear_regression( + data_x: np.ndarray, + data_y: np.ndarray, + iterations: int = 100000, + alpha: float = 0.0001550, + verbose: bool = False, +) -> np.ndarray: + """Implement Linear Regression over the dataset using gradient descent. + + >>> data_x = np.array([[1.0, 0.0], [1.0, 1.0], [1.0, 2.0]]) + >>> data_y = np.array([1.0, 4.0, 7.0]) + >>> run_linear_regression( + ... data_x, data_y, iterations=2000, alpha=0.05, verbose=False + ... ) + array([[1., 3.]]) """ - iterations = 100000 - alpha = 0.0001550 - no_features = data_x.shape[1] - len_data = data_x.shape[0] - 1 + len_data = data_x.shape[0] theta = np.zeros((1, no_features)) for i in range(iterations): theta = run_steep_gradient_descent(data_x, data_y, len_data, alpha, theta) - error = sum_of_square_error(data_x, data_y, len_data, theta) - print(f"At Iteration {i + 1} - Error is {error:.5f}") + if verbose: + error = sum_of_square_error(data_x, data_y, len_data, theta) + print(f"At Iteration {i + 1} - Error is {error:.5f}") return theta -def mean_absolute_error(predicted_y, original_y): - """Return sum of square error for error calculation - :param predicted_y : contains the output of prediction (result vector) - :param original_y : contains values of expected outcome - :return : mean absolute error computed from given feature's +def mean_absolute_error( + predicted_y: Sequence[float], original_y: Sequence[float] +) -> float: + """Return the mean absolute error between two sequences. >>> predicted_y = [3, -0.5, 2, 7] >>> original_y = [2.5, 0.0, 2, 8] @@ -125,15 +126,15 @@ def mean_absolute_error(predicted_y, original_y): return total / len(original_y) -def main(): - """Driver function""" +def main() -> None: + """Driver function for manual runs.""" data = collect_dataset() len_data = data.shape[0] data_x = np.c_[np.ones(len_data), data[:, :-1]].astype(float) data_y = data[:, -1].astype(float) - theta = run_linear_regression(data_x, data_y) + theta = run_linear_regression(data_x, data_y, verbose=False) len_result = theta.shape[1] print("Resultant Feature vector : ") for i in range(len_result): diff --git a/machine_learning/test_machine_learning_algorithms.py b/machine_learning/test_machine_learning_algorithms.py new file mode 100644 index 000000000000..6261f5ccb477 --- /dev/null +++ b/machine_learning/test_machine_learning_algorithms.py @@ -0,0 +1,49 @@ +import numpy as np +import pandas as pd +import pytest + +from machine_learning.gradient_descent import batch_gradient_descent_step +from machine_learning.k_means_clust import kmeans, report_generator +from machine_learning.linear_regression import run_steep_gradient_descent + + +def test_kmeans_converges_on_toy_dataset() -> None: + data = np.array([[0.0, 0.0], [0.0, 1.0], [5.0, 5.0]]) + heterogeneity: list[float] = [] + centroids, labels = kmeans( + data, + k=2, + initial_centroids=np.array([[0.0, 0.0], [5.0, 5.0]]), + maxiter=10, + record_heterogeneity=heterogeneity, + ) + + assert labels.tolist() == [0, 0, 1] + assert heterogeneity == [pytest.approx(0.5)] + assert np.allclose(centroids, np.array([[0.0, 0.5], [5.0, 5.0]])) + + +def test_report_generator_marks_requested_features() -> None: + predicted = pd.DataFrame({"spend": [0.0, 50.0, 100.0], "Cluster": [0, 0, 1]}) + report = report_generator(predicted, clustering_variables=["spend"]) + + cluster_sizes = report.loc[report["Features"] == "# of Customers", [0, 1]].iloc[0] + assert cluster_sizes[0] == pytest.approx(2) + assert cluster_sizes[1] == pytest.approx(1) + assert bool(report.loc[report["Features"] == "spend", "Mark"].iloc[0]) + + +def test_batch_gradient_descent_step_updates_parameters() -> None: + dataset = (((1.0, 0.0, 0.0), 1.0), ((0.0, 1.0, 0.0), 1.0)) + updated = batch_gradient_descent_step([0.0, 0.0, 0.0, 0.0], 0.1, dataset) + + assert updated == pytest.approx([0.1, 0.05, 0.05, 0.0]) + + +def test_run_steep_gradient_descent_matches_expected_step() -> None: + data_x = np.array([[1.0, 0.0], [1.0, 1.0], [1.0, 2.0]]) + data_y = np.array([1.0, 4.0, 7.0]) + theta = np.zeros(2) + + new_theta = run_steep_gradient_descent(data_x, data_y, len(data_x), 0.05, theta) + assert np.allclose(new_theta, np.array([0.2, 0.3]))