diff --git a/machine_learning/decision_tree_pruning.py b/machine_learning/decision_tree_pruning.py new file mode 100644 index 000000000000..742a1b3f4e64 --- /dev/null +++ b/machine_learning/decision_tree_pruning.py @@ -0,0 +1,733 @@ +""" +Enhanced Decision Tree with Pruning functionality. + +This implementation extends the basic decision tree with advanced pruning techniques +to reduce overfitting and improve generalization. It includes both pre-pruning +(constraints during tree building) and post-pruning (reduced error pruning and +cost-complexity pruning). + +Key features: +- Pre-pruning: Maximum depth, minimum samples per leaf, minimum impurity decrease +- Post-pruning: Reduced error pruning and cost-complexity pruning +- Support for both regression and classification +- Comprehensive validation and testing + +Reference: https://en.wikipedia.org/wiki/Decision_tree_pruning +""" + +import doctest +from typing import Literal + +import numpy as np + + +class DecisionTreePruning: + """ + Enhanced Decision Tree with pruning capabilities. + + This implementation provides both regression and classification decision trees + with various pruning techniques to prevent overfitting. + """ + + def __init__( + self, + max_depth: int | None = None, + min_samples_split: int = 2, + min_samples_leaf: int = 1, + min_impurity_decrease: float = 0.0, + pruning_method: Literal["none", "reduced_error", "cost_complexity"] = "none", + ccp_alpha: float = 0.0, + random_state: int | None = None, + ) -> None: + """ + Initialize Decision Tree with pruning parameters. + + Args: + max_depth: Maximum depth of the tree + min_samples_split: Minimum samples required to split a node + min_samples_leaf: Minimum samples required at a leaf node + min_impurity_decrease: Minimum impurity decrease for a split + pruning_method: Pruning method to use + ccp_alpha: Cost complexity pruning parameter + random_state: Random seed for reproducibility + + >>> tree = DecisionTreePruning(max_depth=5, min_samples_leaf=2) + >>> tree.max_depth + 5 + >>> tree.min_samples_leaf + 2 + """ + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.min_samples_leaf = min_samples_leaf + self.min_impurity_decrease = min_impurity_decrease + self.pruning_method = pruning_method + self.ccp_alpha = ccp_alpha + self.random_state = random_state + + # Tree structure + self.root_: TreeNode | None = None + self.n_features_: int | None = None + self.feature_names_: list[str] | None = None + + if random_state is not None: + self.rng_ = np.random.default_rng(random_state) + else: + self.rng_ = np.random.default_rng() + + def _mse(self, y: np.ndarray) -> float: + """ + Compute mean squared error for regression. + + Args: + y: Target values + + Returns: + Mean squared error + """ + if len(y) == 0: + return 0.0 + return np.mean((y - np.mean(y)) ** 2) + + def _gini(self, y: np.ndarray) -> float: + """ + Compute Gini impurity for classification. + + Args: + y: Target labels + + Returns: + Gini impurity + """ + if len(y) == 0: + return 0.0 + + _, counts = np.unique(y, return_counts=True) + probabilities = counts / len(y) + return 1 - np.sum(probabilities**2) + + def _entropy(self, y: np.ndarray) -> float: + """ + Compute entropy for classification. + + Args: + y: Target labels + + Returns: + Entropy + """ + if len(y) == 0: + return 0.0 + + _, counts = np.unique(y, return_counts=True) + probabilities = counts / len(y) + probabilities = probabilities[probabilities > 0] # Avoid log(0) + return -np.sum(probabilities * np.log2(probabilities)) + + def _find_best_split( + self, x: np.ndarray, y: np.ndarray, task_type: str + ) -> tuple[int, float, float]: + """ + Find the best split for the given data. + + Args: + x: Feature matrix + y: Target values + task_type: 'regression' or 'classification' + + Returns: + Tuple of (best_feature, best_threshold, best_impurity) + """ + best_feature = -1 + best_threshold = 0.0 + best_impurity = float("inf") + + n_features = x.shape[1] + current_impurity = self._mse(y) if task_type == "regression" else self._gini(y) + + for feature_idx in range(n_features): + # Get unique values for this feature + feature_values = np.unique(x[:, feature_idx]) + + for threshold in feature_values[:-1]: # Exclude the last value + # Split the data + left_mask = x[:, feature_idx] <= threshold + right_mask = ~left_mask + + if ( + np.sum(left_mask) < self.min_samples_leaf + or np.sum(right_mask) < self.min_samples_leaf + ): + continue + + # Calculate weighted impurity + left_impurity = ( + self._mse(y[left_mask]) + if task_type == "regression" + else self._gini(y[left_mask]) + ) + right_impurity = ( + self._mse(y[right_mask]) + if task_type == "regression" + else self._gini(y[right_mask]) + ) + + weighted_impurity = ( + np.sum(left_mask) * left_impurity + + np.sum(right_mask) * right_impurity + ) / len(y) + + # Check if this split improves impurity + impurity_decrease = current_impurity - weighted_impurity + if ( + impurity_decrease >= self.min_impurity_decrease + and weighted_impurity < best_impurity + ): + best_feature = feature_idx + best_threshold = threshold + best_impurity = weighted_impurity + + return best_feature, best_threshold, best_impurity + + def _build_tree( + self, + x: np.ndarray, + y: np.ndarray, + depth: int = 0, + task_type: str = "regression", + ) -> "TreeNode": + """ + Recursively build the decision tree. + + Args: + x: Feature matrix + y: Target values + depth: Current depth + task_type: 'regression' or 'classification' + + Returns: + Root node of the subtree + """ + node = TreeNode() + + # Check stopping criteria + if ( + len(y) < self.min_samples_split + or (self.max_depth is not None and depth >= self.max_depth) + or len(np.unique(y)) == 1 + ): + node.is_leaf = True + node.value = ( + np.mean(y) if task_type == "regression" else self._most_common(y) + ) + node.samples = len(y) + return node + + # Find best split + best_feature, best_threshold, best_impurity = self._find_best_split( + x, y, task_type + ) + + # If no good split found, make it a leaf + if best_feature == -1: + node.is_leaf = True + node.value = ( + np.mean(y) if task_type == "regression" else self._most_common(y) + ) + node.samples = len(y) + return node + + # Split the data + left_mask = x[:, best_feature] <= best_threshold + right_mask = ~left_mask + + # Create internal node + node.is_leaf = False + node.feature = best_feature + node.threshold = best_threshold + node.samples = len(y) + node.impurity = best_impurity + + # Recursively build left and right subtrees + node.left = self._build_tree(x[left_mask], y[left_mask], depth + 1, task_type) + node.right = self._build_tree( + x[right_mask], y[right_mask], depth + 1, task_type + ) + + return node + + def _most_common(self, y: np.ndarray) -> int | float: + """ + Find the most common value in an array. + + Args: + y: Array of values + + Returns: + Most common value + """ + values, counts = np.unique(y, return_counts=True) + return values[np.argmax(counts)] + + def _reduced_error_pruning(self, x_val: np.ndarray, y_val: np.ndarray) -> None: + """ + Perform reduced error pruning on the tree. + + Args: + x_val: Validation feature matrix + y_val: Validation target values + """ + if self.root_ is None: + return + + # Get all internal nodes (post-order traversal) + internal_nodes = self._get_internal_nodes(self.root_) + + # Try pruning each internal node + improved = True + while improved: + improved = False + best_improvement = 0.0 + best_node = None + + for node in internal_nodes: + if node.is_leaf: + continue + + # Calculate validation error before pruning + predictions_before = self._predict_batch(x_val) + error_before = self._calculate_error(y_val, predictions_before) + + # Temporarily prune the node + original_left = node.left + original_right = node.right + original_is_leaf = node.is_leaf + original_value = node.value + + node.left = None + node.right = None + node.is_leaf = True + node.value = self._most_common(y_val) # Use validation set majority + + # Calculate validation error after pruning + predictions_after = self._predict_batch(x_val) + error_after = self._calculate_error(y_val, predictions_after) + + # Calculate improvement + improvement = error_before - error_after + + if improvement > best_improvement: + best_improvement = improvement + best_node = node + + # Restore the node + node.left = original_left + node.right = original_right + node.is_leaf = original_is_leaf + node.value = original_value + + # Apply the best pruning if it improves performance + if best_node is not None and best_improvement > 0: + best_node.left = None + best_node.right = None + best_node.is_leaf = True + best_node.value = self._most_common(y_val) + improved = True + # Remove from internal nodes list + internal_nodes = [node for node in internal_nodes if node != best_node] + + def _cost_complexity_pruning(self) -> None: + """ + Perform cost-complexity pruning using alpha parameter. + """ + if self.root_ is None: + return + + # Calculate cost-complexity for each node + self._calculate_cost_complexity(self.root_) + + # Prune nodes with high cost-complexity + self._prune_high_cost_nodes(self.root_) + + def _calculate_cost_complexity(self, node: "TreeNode") -> float: + """ + Calculate cost-complexity for a node and its subtree. + + Args: + node: Current node + + Returns: + Cost-complexity value + """ + if node.is_leaf: + node.cost_complexity = 0.0 + return 0.0 + + # Calculate cost-complexity for children + left_cc = self._calculate_cost_complexity(node.left) if node.left else 0.0 + right_cc = self._calculate_cost_complexity(node.right) if node.right else 0.0 + + # Calculate total cost-complexity + total_cc = left_cc + right_cc + self.ccp_alpha + + # If pruning this subtree would be better, mark for pruning + if total_cc >= self.ccp_alpha: + node.cost_complexity = total_cc + else: + node.cost_complexity = 0.0 + + return node.cost_complexity + + def _prune_high_cost_nodes(self, node: "TreeNode") -> None: + """ + Prune nodes with high cost-complexity. + + Args: + node: Current node + """ + if node.is_leaf: + return + + if node.cost_complexity > self.ccp_alpha: + # Prune this subtree + node.left = None + node.right = None + node.is_leaf = True + node.value = 0.0 # Will be updated during fit + else: + # Recursively check children + if node.left: + self._prune_high_cost_nodes(node.left) + if node.right: + self._prune_high_cost_nodes(node.right) + + def _get_internal_nodes(self, node: "TreeNode") -> list["TreeNode"]: + """ + Get all internal nodes in the tree. + + Args: + node: Root node + + Returns: + List of internal nodes + """ + if node is None or node.is_leaf: + return [] + + nodes = [node] + if node.left: + nodes.extend(self._get_internal_nodes(node.left)) + if node.right: + nodes.extend(self._get_internal_nodes(node.right)) + return nodes + + def _predict_batch(self, x: np.ndarray) -> np.ndarray: + """ + Make predictions for a batch of samples. + + Args: + x: Feature matrix + + Returns: + Predictions + """ + if self.root_ is None: + raise ValueError("Model must be fitted before predict") + + predictions = np.zeros(len(x)) + for i, sample in enumerate(x): + predictions[i] = self._predict_single(sample, self.root_) + return predictions + + def _predict_single(self, sample: np.ndarray, node: "TreeNode") -> int | float: + """ + Make a prediction for a single sample. + + Args: + sample: Feature vector + node: Current node + + Returns: + Prediction + """ + if node.is_leaf: + if node.value is None: + raise ValueError("Leaf node must have a value") + return node.value + + if sample[node.feature] <= node.threshold: + if node.left is None: + raise ValueError("Non-leaf node must have left child") + return self._predict_single(sample, node.left) + else: + if node.right is None: + raise ValueError("Non-leaf node must have right child") + return self._predict_single(sample, node.right) + + def _calculate_error(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: + """ + Calculate prediction error. + + Args: + y_true: True values + y_pred: Predicted values + + Returns: + Error value + """ + return np.mean((y_true - y_pred) ** 2) + + def fit( + self, + x: np.ndarray, + y: np.ndarray, + x_val: np.ndarray | None = None, + y_val: np.ndarray | None = None, + ) -> "DecisionTreePruning": + """ + Fit the decision tree with optional pruning. + + Args: + x: Training feature matrix + y: Training target values + x_val: Validation feature matrix (for pruning) + y_val: Validation target values (for pruning) + + Returns: + Self for method chaining + """ + if x.ndim != 2: + raise ValueError("x must be 2-dimensional") + if len(x) != len(y): + raise ValueError("x and y must have the same length") + + self.n_features_ = x.shape[1] + + # Determine task type + task_type = ( + "classification" if np.issubdtype(y.dtype, np.integer) else "regression" + ) + + # Build the tree + self.root_ = self._build_tree(x, y, task_type=task_type) + + # Apply pruning if specified + if self.pruning_method == "reduced_error": + if x_val is None or y_val is None: + raise ValueError("Validation data required for reduced error pruning") + self._reduced_error_pruning(x_val, y_val) + elif self.pruning_method == "cost_complexity": + self._cost_complexity_pruning() + + return self + + def predict(self, x: np.ndarray) -> np.ndarray: + """ + Make predictions. + + Args: + x: Feature matrix + + Returns: + Predictions + """ + if self.root_ is None: + raise ValueError("Tree must be fitted before prediction") + + return self._predict_batch(x) + + def score(self, x: np.ndarray, y: np.ndarray) -> float: + """ + Calculate accuracy (for classification) or R² (for regression). + + Args: + x: Feature matrix + y: True values + + Returns: + Score + """ + predictions = self.predict(x) + + if np.issubdtype(y.dtype, np.integer): + # Classification: accuracy + return np.mean(predictions == y) + else: + # Regression: R² + ss_res = np.sum((y - predictions) ** 2) + ss_tot = np.sum((y - np.mean(y)) ** 2) + return 1 - (ss_res / ss_tot) + + +class TreeNode: + """ + Node class for decision tree. + """ + + def __init__(self) -> None: + """Initialize tree node.""" + self.is_leaf = True + self.feature: int | None = None + self.threshold: float | None = None + self.value: int | float | None = None + self.left: TreeNode | None = None + self.right: TreeNode | None = None + self.samples: int = 0 + self.impurity: float = 0.0 + self.cost_complexity: float = 0.0 + + +def generate_regression_data( + n_samples: int = 100, noise: float = 0.1, random_state: int = 42 +) -> tuple[np.ndarray, np.ndarray]: + """ + Generate regression data. + + Args: + n_samples: Number of samples + noise: Noise level + random_state: Random seed + + Returns: + Tuple of (x, y) + """ + rng = np.random.default_rng(random_state) + x = rng.standard_normal((n_samples, 2)) + y = x[:, 0] ** 2 + x[:, 1] ** 2 + noise * rng.standard_normal(n_samples) + return x, y + + +def generate_classification_data( + n_samples: int = 100, random_state: int = 42 +) -> tuple[np.ndarray, np.ndarray]: + """ + Generate classification data. + + Args: + n_samples: Number of samples + random_state: Random seed + + Returns: + Tuple of (x, y) + """ + rng = np.random.default_rng(random_state) + x = rng.standard_normal((n_samples, 2)) + y = ((x[:, 0] + x[:, 1]) > 0).astype(int) + return x, y + + +def compare_pruning_methods() -> None: + """ + Compare different pruning methods. + """ + # Generate data + x, y = generate_regression_data(n_samples=200) + + # Split data + split_idx = int(0.7 * len(x)) + x_train, x_test = x[:split_idx], x[split_idx:] + y_train, y_test = y[:split_idx], y[split_idx:] + + # Further split training data for validation + val_split = int(0.5 * len(x_train)) + x_val, x_train = x_train[:val_split], x_train[val_split:] + y_val, y_train = y_train[:val_split], y_train[val_split:] + + print(f"Training set size: {len(x_train)}") + print(f"Validation set size: {len(x_val)}") + print(f"Test set size: {len(x_test)}") + + # Test different pruning methods + methods = [ + ("No Pruning", "none"), + ("Reduced Error Pruning", "reduced_error"), + ("Cost Complexity Pruning", "cost_complexity"), + ] + + for method_name, method in methods: + print(f"\n=== {method_name} ===") + + tree = DecisionTreePruning( + max_depth=10, + min_samples_leaf=2, + pruning_method=method, # type: ignore[arg-type] + ccp_alpha=0.01, + ) + + if method == "reduced_error": + tree.fit(x_train, y_train, x_val, y_val) + else: + tree.fit(x_train, y_train) + + train_score = tree.score(x_train, y_train) + test_score = tree.score(x_test, y_test) + + print(f"Training R²: {train_score:.4f}") + print(f"Test R²: {test_score:.4f}") + print(f"Overfitting gap: {train_score - test_score:.4f}") + + +def main() -> None: + """ + Demonstrate decision tree with pruning. + """ + print("=== Regression Example ===") + + # Generate regression data + x_reg, y_reg = generate_regression_data(n_samples=200, noise=0.1) + + # Split data + split_idx = int(0.8 * len(x_reg)) + x_train, x_test = x_reg[:split_idx], x_reg[split_idx:] + y_train, y_test = y_reg[:split_idx], y_reg[split_idx:] + + # Train tree with cost-complexity pruning + tree_reg = DecisionTreePruning( + max_depth=10, + min_samples_leaf=2, + pruning_method="cost_complexity", + ccp_alpha=0.01, + ) + tree_reg.fit(x_train, y_train) + + # Make predictions + train_score = tree_reg.score(x_train, y_train) + test_score = tree_reg.score(x_test, y_test) + + print(f"Training R²: {train_score:.4f}") + print(f"Test R²: {test_score:.4f}") + + print("\n=== Classification Example ===") + + # Generate classification data + x_cls, y_cls = generate_classification_data(n_samples=200) + + # Split data + split_idx = int(0.8 * len(x_cls)) + x_train, x_test = x_cls[:split_idx], x_cls[split_idx:] + y_train, y_test = y_cls[:split_idx], y_cls[split_idx:] + + # Train tree with reduced error pruning + val_split = int(0.5 * len(x_train)) + x_val, x_train = x_train[:val_split], x_train[val_split:] + y_val, y_train = y_train[:val_split], y_train[val_split:] + + tree_cls = DecisionTreePruning( + max_depth=10, min_samples_leaf=2, pruning_method="reduced_error" + ) + tree_cls.fit(x_train, y_train, x_val, y_val) + + # Make predictions + train_accuracy = tree_cls.score(x_train, y_train) + test_accuracy = tree_cls.score(x_test, y_test) + + print(f"Training accuracy: {train_accuracy:.4f}") + print(f"Test accuracy: {test_accuracy:.4f}") + + print("\n=== Pruning Methods Comparison ===") + compare_pruning_methods() + + +if __name__ == "__main__": + doctest.testmod() + main() diff --git a/machine_learning/logistic_regression_vectorized.py b/machine_learning/logistic_regression_vectorized.py new file mode 100644 index 000000000000..393352a5f0b8 --- /dev/null +++ b/machine_learning/logistic_regression_vectorized.py @@ -0,0 +1,547 @@ +""" +Vectorized Logistic Regression implementation from scratch using NumPy. + +Logistic Regression is a classification algorithm that uses the logistic function +to model the probability of a binary or multi-class outcome. This implementation +includes full vectorization for efficient computation. + +Key features: +- Sigmoid activation function +- Binary and multi-class classification support +- Gradient descent optimization with vectorized operations +- Cost function computation +- Regularization (L1 and L2) +- Comprehensive testing and validation + +Reference: https://en.wikipedia.org/wiki/Logistic_regression +""" + +import doctest + +import numpy as np + + +class LogisticRegressionVectorized: + """ + Vectorized Logistic Regression implementation from scratch. + + This implementation uses full vectorization with NumPy for efficient + computation of gradients and predictions across all training examples. + """ + + def __init__( + self, + learning_rate: float = 0.01, + max_iterations: int = 1000, + tolerance: float = 1e-6, + regularization: str = "none", + lambda_reg: float = 0.1, + random_state: int | None = None, + ) -> None: + """ + Initialize Logistic Regression parameters. + + Args: + learning_rate: Learning rate for gradient descent + max_iterations: Maximum number of iterations + tolerance: Convergence tolerance + regularization: Type of regularization ('none', 'l1', 'l2') + lambda_reg: Regularization parameter + random_state: Random seed for reproducibility + + >>> lr = LogisticRegressionVectorized(learning_rate=0.1, max_iterations=100) + >>> lr.learning_rate + 0.1 + >>> lr.max_iterations + 100 + """ + self.learning_rate = learning_rate + self.max_iterations = max_iterations + self.tolerance = tolerance + self.regularization = regularization + self.lambda_reg = lambda_reg + self.random_state = random_state + + # Initialize parameters + self.weights_: np.ndarray | None = None + self.bias_: np.ndarray | float | None = None + self.cost_history_: list[float] = [] + self.n_classes_: int | None = None + self.classes_: np.ndarray | None = None + + if random_state is not None: + self.rng_ = np.random.default_rng(random_state) + else: + self.rng_ = np.random.default_rng() + + def _sigmoid(self, z: np.ndarray) -> np.ndarray: + """ + Compute the sigmoid function. + + Args: + z: Input values + + Returns: + Sigmoid values between 0 and 1 + + >>> lr = LogisticRegressionVectorized() + >>> z = np.array([0, 1, -1, 2]) + >>> sigmoid_values = lr._sigmoid(z) + >>> bool(np.all(sigmoid_values >= 0) and np.all(sigmoid_values <= 1)) + True + >>> bool(np.isclose(sigmoid_values[0], 0.5, atol=1e-6)) + True + """ + # Clip z to prevent overflow + z = np.clip(z, -500, 500) + return 1 / (1 + np.exp(-z)) + + def _softmax(self, z: np.ndarray) -> np.ndarray: + """ + Compute the softmax function for multi-class classification. + + Args: + z: Input values of shape (n_samples, n_classes) + + Returns: + Softmax probabilities of shape (n_samples, n_classes) + + >>> lr = LogisticRegressionVectorized() + >>> z = np.array([[1, 2, 3], [0, 0, 0]]) + >>> softmax_values = lr._softmax(z) + >>> np.allclose(np.sum(softmax_values, axis=1), 1.0) + True + """ + # Subtract max for numerical stability + z_shifted = z - np.max(z, axis=1, keepdims=True) + exp_z = np.exp(z_shifted) + return exp_z / np.sum(exp_z, axis=1, keepdims=True) + + def _compute_cost( + self, + x: np.ndarray, + y: np.ndarray, + weights: np.ndarray, + bias: np.ndarray | float, + is_multiclass: bool = False, + ) -> float: + """ + Compute the cost function. + + Args: + x: Feature matrix of shape (n_samples, n_features) + y: Target labels + weights: Model weights + bias: Model bias + is_multiclass: Whether this is multi-class classification + + Returns: + Cost value + + >>> lr = LogisticRegressionVectorized() + >>> x = np.array([[1, 2], [3, 4]]) + >>> y = np.array([0, 1]) + >>> weights = np.array([0.1, 0.2]) + >>> bias = 0.0 + >>> cost = lr._compute_cost(x, y, weights, bias) + >>> isinstance(cost, float) + True + """ + x.shape[0] + + # Compute predictions + z = np.dot(x, weights) + bias + + if is_multiclass: + # Multi-class: use softmax and cross-entropy + predictions = self._softmax(z) + # Avoid log(0) + predictions = np.clip(predictions, 1e-15, 1 - 1e-15) + cost = -np.mean(np.sum(y * np.log(predictions), axis=1)) + else: + # Binary: use sigmoid and binary cross-entropy + predictions = self._sigmoid(z) + predictions = np.clip(predictions, 1e-15, 1 - 1e-15) + cost = -np.mean(y * np.log(predictions) + (1 - y) * np.log(1 - predictions)) + + # Add regularization + if self.regularization == "l1": + cost += self.lambda_reg * np.sum(np.abs(weights)) + elif self.regularization == "l2": + cost += self.lambda_reg * np.sum(weights**2) + + return cost + + def _compute_gradients( + self, + x: np.ndarray, + y: np.ndarray, + weights: np.ndarray, + bias: np.ndarray | float, + is_multiclass: bool = False, + ) -> tuple[np.ndarray, np.ndarray | float]: + """ + Compute gradients using vectorized operations. + + Args: + x: Feature matrix of shape (n_samples, n_features) + y: Target labels + weights: Model weights + bias: Model bias + is_multiclass: Whether this is multi-class classification + + Returns: + Tuple of (weight_gradients, bias_gradient) + + >>> lr = LogisticRegressionVectorized() + >>> x = np.array([[1, 2], [3, 4]]) + >>> y = np.array([0, 1]) + >>> weights = np.array([0.1, 0.2]) + >>> bias = 0.0 + >>> grad_w, grad_b = lr._compute_gradients(x, y, weights, bias) + >>> grad_w.shape == weights.shape + True + >>> isinstance(grad_b, (float, np.floating)) + True + """ + n_samples = x.shape[0] + + # Compute predictions + z = np.dot(x, weights) + bias + + if is_multiclass: + # Multi-class: use softmax + predictions = self._softmax(z) + error = predictions - y + else: + # Binary: use sigmoid + predictions = self._sigmoid(z) + error = predictions - y + + # Compute gradients + weight_gradients = np.dot(x.T, error) / n_samples + bias_gradient = np.mean(error) + + # Add regularization gradients + if self.regularization == "l1": + weight_gradients += self.lambda_reg * np.sign(weights) + elif self.regularization == "l2": + weight_gradients += 2 * self.lambda_reg * weights + + return weight_gradients, bias_gradient + + def _prepare_multiclass_targets(self, y: np.ndarray) -> np.ndarray: + """ + Convert target labels to one-hot encoding for multi-class classification. + + Args: + y: Target labels + + Returns: + One-hot encoded targets + """ + self.classes_ = np.unique(y) + self.n_classes_ = len(self.classes_) + + # Create one-hot encoding + y_onehot = np.zeros((len(y), self.n_classes_)) + for i, class_label in enumerate(self.classes_): + y_onehot[y == class_label, i] = 1 + + return y_onehot + + def fit(self, x: np.ndarray, y: np.ndarray) -> "LogisticRegressionVectorized": + """ + Fit the logistic regression model. + + Args: + x: Feature matrix of shape (n_samples, n_features) + y: Target labels of shape (n_samples,) + + Returns: + Self for method chaining + + >>> lr = LogisticRegressionVectorized(max_iterations=10) + >>> x = np.array([[1, 2], [3, 4], [5, 6]]) + >>> y = np.array([0, 1, 0]) + >>> _ = lr.fit(x, y) + """ + if x.ndim != 2: + raise ValueError("x must be 2-dimensional") + if len(x) != len(y): + raise ValueError("x and y must have the same number of samples") + + _n_samples, n_features = x.shape + + # Determine if this is multi-class classification + unique_classes = np.unique(y) + is_multiclass = len(unique_classes) > 2 + + if is_multiclass: + y_encoded = self._prepare_multiclass_targets(y) + n_classes = self.n_classes_ + if n_classes is None: + raise ValueError("n_classes_ must be set for multiclass classification") + else: + y_encoded = y + n_classes = 1 + + # Initialize weights and bias + if is_multiclass: + self.weights_ = self.rng_.standard_normal((n_features, n_classes)) * 0.01 + self.bias_ = np.zeros(n_classes) + else: + self.weights_ = self.rng_.standard_normal(n_features) * 0.01 # type: ignore[assignment] + bias_value: np.ndarray | float = 0.0 # type: ignore[assignment] + self.bias_ = bias_value # type: ignore[assignment] + + # Type assertions to help mypy + assert self.weights_ is not None + assert self.bias_ is not None + + # Gradient descent + self.cost_history_ = [] + + for iteration in range(self.max_iterations): + # Compute cost + cost = self._compute_cost( + x, y_encoded, self.weights_, self.bias_, is_multiclass + ) + self.cost_history_.append(cost) + + # Compute gradients + weight_gradients, bias_gradient = self._compute_gradients( + x, y_encoded, self.weights_, self.bias_, is_multiclass + ) + + # Update parameters + self.weights_ -= self.learning_rate * weight_gradients + self.bias_ -= self.learning_rate * bias_gradient + + # Check for convergence + if ( + iteration > 0 + and abs(self.cost_history_[-1] - self.cost_history_[-2]) + < self.tolerance + ): + break + + return self + + def predict_proba(self, x: np.ndarray) -> np.ndarray: + """ + Predict class probabilities. + + Args: + x: Feature matrix of shape (n_samples, n_features) + + Returns: + Probability matrix of shape (n_samples, n_classes) for multi-class + or (n_samples,) for binary classification + + >>> lr = LogisticRegressionVectorized() + >>> x_train = np.array([[1, 2], [3, 4]]) + >>> y_train = np.array([0, 1]) + >>> _ = lr.fit(x_train, y_train) + >>> x_test = np.array([[1, 2], [3, 4]]) + >>> proba = lr.predict_proba(x_test) + >>> proba.shape[0] == x_test.shape[0] + True + """ + if self.weights_ is None: + raise ValueError("Model must be fitted before prediction") + + z = np.dot(x, self.weights_) + self.bias_ + + if self.n_classes_ is None or self.n_classes_ <= 2: + # Binary classification + return self._sigmoid(z) + else: + # Multi-class classification + return self._softmax(z) + + def predict(self, x: np.ndarray) -> np.ndarray: + """ + Predict class labels. + + Args: + x: Feature matrix of shape (n_samples, n_features) + + Returns: + Predicted class labels + + >>> lr = LogisticRegressionVectorized() + >>> x_train = np.array([[1, 2], [3, 4], [5, 6]]) + >>> y_train = np.array([0, 1, 0]) + >>> _ = lr.fit(x_train, y_train) + >>> x_test = np.array([[1, 2], [3, 4]]) + >>> predictions = lr.predict(x_test) + >>> len(predictions) == x_test.shape[0] + True + """ + probabilities = self.predict_proba(x) + + if self.n_classes_ is None or self.n_classes_ <= 2: + # Binary classification + predictions = (probabilities > 0.5).astype(int) + else: + # Multi-class classification + predictions = np.argmax(probabilities, axis=1) + # Convert back to original class labels + if self.classes_ is None: + raise ValueError("Model must be fitted before predict") + predictions = self.classes_[predictions] + + return predictions + + def score(self, x: np.ndarray, y: np.ndarray) -> float: + """ + Compute the accuracy score. + + Args: + x: Feature matrix + y: True labels + + Returns: + Accuracy score between 0 and 1 + + >>> lr = LogisticRegressionVectorized() + >>> x = np.array([[1, 2], [3, 4], [5, 6]]) + >>> y = np.array([0, 1, 0]) + >>> _ = lr.fit(x, y) + >>> score = lr.score(x, y) + >>> bool(0 <= score <= 1) + True + """ + predictions = self.predict(x) + return np.mean(predictions == y) + + +def generate_sample_data( + n_samples: int = 100, + n_features: int = 2, + n_classes: int = 2, + random_state: int = 42, +) -> tuple[np.ndarray, np.ndarray]: + """ + Generate sample data for testing. + + Args: + n_samples: Number of samples + n_features: Number of features + n_classes: Number of classes + random_state: Random seed + + Returns: + Tuple of (X, y) + """ + rng = np.random.default_rng(random_state) + + if n_classes == 2: + # Binary classification: linearly separable data + x = rng.standard_normal((n_samples, n_features)) + # Create a simple linear boundary + y = (x[:, 0] + x[:, 1] > 0).astype(int) + else: + # Multi-class classification + from sklearn.datasets import make_classification + + x, y = make_classification( + n_samples=n_samples, + n_features=n_features, + n_classes=n_classes, + n_redundant=0, + n_informative=n_features, + random_state=random_state, + ) + + return x, y + + +def compare_with_sklearn() -> None: + """ + Compare our implementation with scikit-learn's LogisticRegression. + """ + try: + from sklearn.linear_model import LogisticRegression as SklearnLR + from sklearn.metrics import accuracy_score + + # Generate data + x, y = generate_sample_data(n_samples=100, n_features=4, n_classes=2) + + # Split data + split_idx = int(0.8 * len(x)) + x_train, x_test = x[:split_idx], x[split_idx:] + y_train, y_test = y[:split_idx], y[split_idx:] + + # Our implementation + lr_ours = LogisticRegressionVectorized(max_iterations=1000, learning_rate=0.1) + lr_ours.fit(x_train, y_train) + lr_ours.predict(x_test) + accuracy_ours = lr_ours.score(x_test, y_test) + + # Scikit-learn implementation + lr_sklearn = SklearnLR(max_iter=1000, random_state=42) + lr_sklearn.fit(x_train, y_train) + predictions_sklearn = lr_sklearn.predict(x_test) + accuracy_sklearn = accuracy_score(y_test, predictions_sklearn) + + print(f"Our implementation accuracy: {accuracy_ours:.4f}") + print(f"Scikit-learn accuracy: {accuracy_sklearn:.4f}") + print(f"Difference: {abs(accuracy_ours - accuracy_sklearn):.4f}") + + except ImportError: + print("Scikit-learn not available for comparison") + + +def main() -> None: + """ + Demonstrate vectorized logistic regression implementation. + """ + print("=== Binary Classification Example ===") + + # Generate binary classification data + x_binary, y_binary = generate_sample_data(n_samples=100, n_features=2, n_classes=2) + + print(f"Data shape: {x_binary.shape}") + print(f"Classes: {np.unique(y_binary)}") + + # Train model + lr_binary = LogisticRegressionVectorized(learning_rate=0.1, max_iterations=1000) + lr_binary.fit(x_binary, y_binary) + + # Make predictions + lr_binary.predict(x_binary) + probabilities = lr_binary.predict_proba(x_binary) + + print(f"Training accuracy: {lr_binary.score(x_binary, y_binary):.4f}") + print(f"Final cost: {lr_binary.cost_history_[-1]:.6f}") + print(f"Sample probabilities: {probabilities[:5]}") + + print("\n=== Multi-class Classification Example ===") + + # Generate multi-class data + x_multi, y_multi = generate_sample_data(n_samples=150, n_features=4, n_classes=3) + + print(f"Data shape: {x_multi.shape}") + print(f"Classes: {np.unique(y_multi)}") + + # Train model + lr_multi = LogisticRegressionVectorized(learning_rate=0.1, max_iterations=1000) + lr_multi.fit(x_multi, y_multi) + + # Make predictions + lr_multi.predict(x_multi) + probabilities_multi = lr_multi.predict_proba(x_multi) + + print(f"Training accuracy: {lr_multi.score(x_multi, y_multi):.4f}") + print(f"Final cost: {lr_multi.cost_history_[-1]:.6f}") + print(f"Sample probabilities shape: {probabilities_multi[:5].shape}") + + print("\n=== Comparison with Scikit-learn ===") + compare_with_sklearn() + + +if __name__ == "__main__": + doctest.testmod() + main() diff --git a/machine_learning/naive_bayes_laplace.py b/machine_learning/naive_bayes_laplace.py new file mode 100644 index 000000000000..4203d386b849 --- /dev/null +++ b/machine_learning/naive_bayes_laplace.py @@ -0,0 +1,667 @@ +""" +Naive Bayes Classifier with Laplace Smoothing implementation from scratch. + +Naive Bayes is a probabilistic classifier based on applying Bayes' theorem with +strong independence assumptions between features. This implementation includes +Laplace smoothing (also known as add-one smoothing) to handle zero probabilities +and improve generalization. + +Key features: +- Multinomial Naive Bayes with Laplace smoothing +- Support for both discrete and continuous features +- Gaussian Naive Bayes for continuous features +- Comprehensive probability calculations +- Robust handling of unseen features/values + +Reference: https://en.wikipedia.org/wiki/Naive_Bayes_classifier +""" + +import doctest + +import numpy as np + + +class NaiveBayesLaplace: + """ + Naive Bayes Classifier with Laplace Smoothing. + + This implementation provides both multinomial and Gaussian variants + of the Naive Bayes algorithm with Laplace smoothing for robust + probability estimation. + """ + + def __init__(self, alpha: float = 1.0, feature_type: str = "discrete") -> None: + """ + Initialize Naive Bayes classifier. + + Args: + alpha: Laplace smoothing parameter (alpha > 0) + feature_type: Type of features ('discrete' or 'continuous') + + >>> nb = NaiveBayesLaplace(alpha=1.0, feature_type="discrete") + >>> nb.alpha + 1.0 + >>> nb.feature_type + 'discrete' + """ + self.alpha = alpha + self.feature_type = feature_type + + # Model parameters + self.classes_: np.ndarray | None = None + self.class_prior_: dict[int, float] = {} + self.feature_count_: dict[int, dict[int, dict[int, int]]] = {} + self.feature_log_prob_: dict[int, dict[int, dict[int, float]]] = {} + self.feature_mean_: dict[int, dict[int, float]] = {} + self.feature_var_: dict[int, dict[int, float]] = {} + self.n_features_: int | None = None + + def _check_input(self, x: np.ndarray, y: np.ndarray) -> None: + """ + Validate input data. + + Args: + x: Feature matrix + y: Target labels + + Raises: + ValueError: If input is invalid + """ + if x.ndim != 2: + raise ValueError("x must be 2-dimensional") + if len(x) != len(y): + raise ValueError("x and y must have the same length") + if self.alpha <= 0: + raise ValueError("Alpha must be positive") + if self.feature_type not in ["discrete", "continuous"]: + raise ValueError("feature_type must be 'discrete' or 'continuous'") + + def _compute_class_prior(self, y: np.ndarray) -> dict[int, float]: + """ + Compute prior probabilities for each class. + + Args: + y: Target labels + + Returns: + Dictionary mapping class to prior probability + + >>> nb = NaiveBayesLaplace() + >>> y = np.array([0, 1, 0, 1, 1]) + >>> prior = nb._compute_class_prior(y) + >>> len(prior) + 2 + >>> bool(np.isclose(sum(prior.values()), 1.0)) + True + """ + classes, counts = np.unique(y, return_counts=True) + total_samples = len(y) + + prior = {} + for class_label, count in zip(classes, counts): + prior[class_label] = count / total_samples + + return prior + + def _compute_feature_counts( + self, x: np.ndarray, y: np.ndarray + ) -> dict[int, dict[int, dict[int, int]]]: + """ + Compute feature counts for each class (for discrete features). + + Args: + x: Feature matrix + y: Target labels + + Returns: + Nested dictionary: class -> feature -> count + + >>> nb = NaiveBayesLaplace() + >>> x = np.array([[0, 1], [1, 0], [0, 1]]) + >>> y = np.array([0, 1, 0]) + >>> counts = nb._compute_feature_counts(x, y) + >>> int(counts[0][0][0]) # class 0, feature 0, value 0 + 2 + >>> int(counts[1][1][0]) # class 1, feature 1, value 0 + 1 + """ + feature_counts: dict[int, dict[int, dict[int, int]]] = {} + + for class_label in np.unique(y): + feature_counts[class_label] = {} + + # Get samples for this class + class_mask = y == class_label + x_class = x[class_mask] + + # Count occurrences of each feature value + for feature_idx in range(x.shape[1]): + feature_counts[class_label][feature_idx] = {} + + for feature_value in np.unique(x[:, feature_idx]): + count = np.sum(x_class[:, feature_idx] == feature_value) + feat_val_int = int(feature_value) + feature_counts[class_label][feature_idx][feat_val_int] = int(count) + + return feature_counts + + def _compute_feature_statistics( + self, x: np.ndarray, y: np.ndarray + ) -> tuple[dict[int, dict[int, float]], dict[int, dict[int, float]]]: + """ + Compute mean and variance for each feature in each class (continuous features). + + Args: + x: Feature matrix + y: Target labels + + Returns: + Tuple of (means, variances) dictionaries + + >>> nb = NaiveBayesLaplace(feature_type="continuous") + >>> x = np.array([[1.0, 2.0], [2.0, 3.0], [1.5, 2.5]]) + >>> y = np.array([0, 1, 0]) + >>> means, vars = nb._compute_feature_statistics(x, y) + >>> len(means) + 2 + >>> len(vars) + 2 + """ + means: dict[int, dict[int, float]] = {} + variances: dict[int, dict[int, float]] = {} + + for class_label in np.unique(y): + means[class_label] = {} + variances[class_label] = {} + + # Get samples for this class + class_mask = y == class_label + x_class = x[class_mask] + + # Compute mean and variance for each feature + for feature_idx in range(x.shape[1]): + feature_values = x_class[:, feature_idx] + means[class_label][feature_idx] = np.mean(feature_values) + # Add small epsilon to avoid division by zero + variances[class_label][feature_idx] = np.var(feature_values) + 1e-9 + + return means, variances + + def _compute_log_probabilities_discrete( + self, x: np.ndarray, y: np.ndarray + ) -> dict[int, dict[int, dict[int, float]]]: + """ + Compute log probabilities for discrete features with Laplace smoothing. + + Args: + x: Feature matrix + y: Target labels + + Returns: + Nested dictionary: class -> feature -> value -> log_probability + """ + feature_counts = self._compute_feature_counts(x, y) + log_probabilities: dict[int, dict[int, dict[int, float]]] = {} + + for class_label in np.unique(y): + log_probabilities[class_label] = {} + class_mask = y == class_label + n_class_samples = np.sum(class_mask) + + for feature_idx in range(x.shape[1]): + log_probabilities[class_label][feature_idx] = {} + + # Get all possible values for this feature + all_values = np.unique(x[:, feature_idx]) + + for feature_value in all_values: + # Count occurrences of this value in this class + count = feature_counts[class_label][feature_idx].get( + int(feature_value), 0 + ) + + # Apply Laplace smoothing formula + n_unique_values = len(all_values) + smoothed_prob = (count + self.alpha) / ( + n_class_samples + self.alpha * n_unique_values + ) + + # Store log probability + log_probabilities[class_label][feature_idx][feature_value] = np.log( + smoothed_prob + ) + + return log_probabilities + + def _gaussian_log_probability(self, x: float, mean: float, var: float) -> float: + """ + Compute log probability of x under Gaussian distribution. + + Args: + x: Input value + mean: Mean of Gaussian distribution + var: Variance of Gaussian distribution + + Returns: + Log probability + + >>> nb = NaiveBayesLaplace(feature_type="continuous") + >>> log_prob = nb._gaussian_log_probability(0.0, 0.0, 1.0) + >>> isinstance(log_prob, float) + True + """ + # Gaussian log probability: -0.5 * log(2*pi*var) - (x-mean)^2/(2*var) + return -0.5 * (np.log(2 * np.pi * var) + (x - mean) ** 2 / var) + + def fit(self, x: np.ndarray, y: np.ndarray) -> "NaiveBayesLaplace": + """ + Fit the Naive Bayes classifier. + + Args: + x: Feature matrix of shape (n_samples, n_features) + y: Target labels of shape (n_samples,) + + Returns: + Self for method chaining + + >>> nb = NaiveBayesLaplace() + >>> x = np.array([[0, 1], [1, 0], [0, 1], [1, 1]]) + >>> y = np.array([0, 1, 0, 1]) + >>> _ = nb.fit(x, y) + """ + self._check_input(x, y) + + self.classes_ = np.unique(y) + self.n_features_ = x.shape[1] + + # Compute class priors + self.class_prior_ = self._compute_class_prior(y) + + if self.feature_type == "discrete": + # For discrete features: compute feature counts and log probabilities + self.feature_count_ = self._compute_feature_counts(x, y) + self.feature_log_prob_ = self._compute_log_probabilities_discrete(x, y) + + elif self.feature_type == "continuous": + # For continuous features: compute means and variances + self.feature_mean_, self.feature_var_ = self._compute_feature_statistics( + x, y + ) + + return self + + def _predict_log_proba_discrete(self, x: np.ndarray) -> np.ndarray: + """ + Predict log probabilities for discrete features. + + Args: + x: Feature matrix + + Returns: + Log probability matrix of shape (n_samples, n_classes) + """ + if self.classes_ is None: + raise ValueError("Model must be fitted before predict") + + n_samples = x.shape[0] + n_classes = len(self.classes_) + log_proba = np.zeros((n_samples, n_classes)) + + for i, class_label in enumerate(self.classes_): + # Start with log prior probability + log_proba[:, i] = np.log(self.class_prior_[class_label]) + + # Add log likelihood for each feature + for feature_idx in range(x.shape[1]): + for sample_idx in range(n_samples): + feature_value = x[sample_idx, feature_idx] + + # Get log probability for this feature value in this class + feature_value_int = int(feature_value) + if ( + feature_value_int + in self.feature_log_prob_[class_label][feature_idx] + ): + log_prob = self.feature_log_prob_[class_label][feature_idx][ + feature_value_int + ] + else: + # Unseen feature value: use Laplace smoothing + all_values = list( + self.feature_log_prob_[class_label][feature_idx].keys() + ) + n_unique_values = len(all_values) + 1 # +1 for the unseen value + + # Estimate class size from existing counts + class_samples = sum( + self.feature_count_[class_label][feature_idx].values() + ) + smoothed_prob = self.alpha / ( + class_samples + self.alpha * n_unique_values + ) + log_prob = np.log(smoothed_prob) + + log_proba[sample_idx, i] += log_prob + + return log_proba + + def _predict_log_proba_continuous(self, x: np.ndarray) -> np.ndarray: + """ + Predict log probabilities for continuous features. + + Args: + x: Feature matrix + + Returns: + Log probability matrix of shape (n_samples, n_classes) + """ + if self.classes_ is None: + raise ValueError("Model must be fitted before predict") + + n_samples = x.shape[0] + n_classes = len(self.classes_) + log_proba = np.zeros((n_samples, n_classes)) + + for i, class_label in enumerate(self.classes_): + # Start with log prior probability + log_proba[:, i] = np.log(self.class_prior_[class_label]) + + # Add log likelihood for each feature + for feature_idx in range(x.shape[1]): + means = self.feature_mean_[class_label][feature_idx] + variances = self.feature_var_[class_label][feature_idx] + + # Compute Gaussian log probabilities for all samples + feature_values = x[:, feature_idx] + log_proba[:, i] += np.array( + [ + self._gaussian_log_probability(val, means, variances) + for val in feature_values + ] + ) + + return log_proba + + def predict_log_proba(self, x: np.ndarray) -> np.ndarray: + """ + Predict log probabilities for each class. + + Args: + x: Feature matrix of shape (n_samples, n_features) + + Returns: + Log probability matrix of shape (n_samples, n_classes) + + >>> nb = NaiveBayesLaplace() + >>> x_train = np.array([[0, 1], [1, 0], [0, 1], [1, 1]]) + >>> y_train = np.array([0, 1, 0, 1]) + >>> _ = nb.fit(x_train, y_train) + >>> x_test = np.array([[0, 1], [1, 0]]) + >>> log_proba = nb.predict_log_proba(x_test) + >>> log_proba.shape + (2, 2) + """ + if self.classes_ is None: + raise ValueError("Model must be fitted before prediction") + + if self.feature_type == "discrete": + return self._predict_log_proba_discrete(x) + else: + return self._predict_log_proba_continuous(x) + + def predict_proba(self, x: np.ndarray) -> np.ndarray: + """ + Predict class probabilities. + + Args: + x: Feature matrix of shape (n_samples, n_features) + + Returns: + Probability matrix of shape (n_samples, n_classes) + + >>> nb = NaiveBayesLaplace() + >>> x_train = np.array([[0, 1], [1, 0], [0, 1], [1, 1]]) + >>> y_train = np.array([0, 1, 0, 1]) + >>> _ = nb.fit(x_train, y_train) + >>> x_test = np.array([[0, 1], [1, 0]]) + >>> proba = nb.predict_proba(x_test) + >>> proba.shape + (2, 2) + >>> np.allclose(np.sum(proba, axis=1), 1.0) + True + """ + log_proba = self.predict_log_proba(x) + + # Convert log probabilities to probabilities using log-sum-exp trick + # for numerical stability + max_log_proba = np.max(log_proba, axis=1, keepdims=True) + exp_log_proba = np.exp(log_proba - max_log_proba) + proba = exp_log_proba / np.sum(exp_log_proba, axis=1, keepdims=True) + + return proba + + def predict(self, x: np.ndarray) -> np.ndarray: + """ + Predict class labels. + + Args: + x: Feature matrix of shape (n_samples, n_features) + + Returns: + Predicted class labels + + >>> nb = NaiveBayesLaplace() + >>> x_train = np.array([[0, 1], [1, 0], [0, 1], [1, 1]]) + >>> y_train = np.array([0, 1, 0, 1]) + >>> _ = nb.fit(x_train, y_train) + >>> x_test = np.array([[0, 1], [1, 0]]) + >>> predictions = nb.predict(x_test) + >>> len(predictions) == x_test.shape[0] + True + """ + if self.classes_ is None: + raise ValueError("Model must be fitted before predict") + + log_proba = self.predict_log_proba(x) + predictions = self.classes_[np.argmax(log_proba, axis=1)] + return predictions + + def score(self, x: np.ndarray, y: np.ndarray) -> float: + """ + Compute accuracy score. + + Args: + x: Feature matrix + y: True labels + + Returns: + Accuracy score between 0 and 1 + + >>> nb = NaiveBayesLaplace() + >>> x = np.array([[0, 1], [1, 0], [0, 1], [1, 1]]) + >>> y = np.array([0, 1, 0, 1]) + >>> _ = nb.fit(x, y) + >>> score = nb.score(x, y) + >>> bool(0 <= score <= 1) + True + """ + predictions = self.predict(x) + return np.mean(predictions == y) + + +def generate_discrete_data( + n_samples: int = 100, + n_features: int = 3, + n_classes: int = 2, + random_state: int = 42, +) -> tuple[np.ndarray, np.ndarray]: + """ + Generate discrete sample data for testing. + + Args: + n_samples: Number of samples + n_features: Number of features + n_classes: Number of classes + random_state: Random seed + + Returns: + Tuple of (x, y) + """ + rng = np.random.default_rng(random_state) + + # Generate random discrete features (0, 1, 2) + x = rng.integers(0, 3, size=(n_samples, n_features)) + + # Create simple decision rule for labels + y = np.sum(x, axis=1) % n_classes + + return x, y + + +def generate_continuous_data( + n_samples: int = 100, + n_features: int = 2, + n_classes: int = 2, + random_state: int = 42, +) -> tuple[np.ndarray, np.ndarray]: + """ + Generate continuous sample data for testing. + + Args: + n_samples: Number of samples + n_features: Number of features + n_classes: Number of classes + random_state: Random seed + + Returns: + Tuple of (x, y) + """ + rng = np.random.default_rng(random_state) + + # Generate continuous features with different means for different classes + x = rng.standard_normal((n_samples, n_features)) + y = rng.integers(0, n_classes, size=n_samples) + + # Add class-specific offsets + for class_label in range(n_classes): + mask = y == class_label + x[mask] += class_label * 2 # Separate classes by offset + + return x, y + + +def compare_with_sklearn() -> None: + """ + Compare our implementation with scikit-learn's NaiveBayes. + """ + try: + from sklearn.metrics import accuracy_score + from sklearn.naive_bayes import GaussianNB, MultinomialNB + + print("=== Discrete Features Comparison ===") + x_disc, y_disc = generate_discrete_data(n_samples=100, n_features=4) + + # Split data + split_idx = int(0.8 * len(x_disc)) + x_train, x_test = x_disc[:split_idx], x_disc[split_idx:] + y_train, y_test = y_disc[:split_idx], y_disc[split_idx:] + + # Our implementation + nb_ours = NaiveBayesLaplace(alpha=1.0, feature_type="discrete") + nb_ours.fit(x_train, y_train) + nb_ours.predict(x_test) + accuracy_ours = nb_ours.score(x_test, y_test) + + # Scikit-learn implementation + nb_sklearn = MultinomialNB(alpha=1.0) + nb_sklearn.fit(x_train, y_train) + predictions_sklearn = nb_sklearn.predict(x_test) + accuracy_sklearn = accuracy_score(y_test, predictions_sklearn) + + print(f"Our implementation accuracy: {accuracy_ours:.4f}") + print(f"Scikit-learn accuracy: {accuracy_sklearn:.4f}") + print(f"Difference: {abs(accuracy_ours - accuracy_sklearn):.4f}") + + print("\n=== Continuous Features Comparison ===") + x_cont, y_cont = generate_continuous_data(n_samples=100, n_features=2) + + # Split data + split_idx = int(0.8 * len(x_cont)) + x_train, x_test = x_cont[:split_idx], x_cont[split_idx:] + y_train, y_test = y_cont[:split_idx], y_cont[split_idx:] + + # Our implementation + nb_ours_cont = NaiveBayesLaplace(alpha=1.0, feature_type="continuous") + nb_ours_cont.fit(x_train, y_train) + nb_ours_cont.predict(x_test) + accuracy_ours_cont = nb_ours_cont.score(x_test, y_test) + + # Scikit-learn implementation + nb_sklearn_cont = GaussianNB() + nb_sklearn_cont.fit(x_train, y_train) + predictions_sklearn_cont = nb_sklearn_cont.predict(x_test) + accuracy_sklearn_cont = accuracy_score(y_test, predictions_sklearn_cont) + + print(f"Our implementation accuracy: {accuracy_ours_cont:.4f}") + print(f"Scikit-learn accuracy: {accuracy_sklearn_cont:.4f}") + print(f"Difference: {abs(accuracy_ours_cont - accuracy_sklearn_cont):.4f}") + + except ImportError: + print("Scikit-learn not available for comparison") + + +def main() -> None: + """ + Demonstrate Naive Bayes with Laplace smoothing implementation. + """ + print("=== Discrete Features Example ===") + + # Generate discrete data + x_disc, y_disc = generate_discrete_data(n_samples=100, n_features=3, n_classes=2) + + print(f"Data shape: {x_disc.shape}") + print(f"Classes: {np.unique(y_disc)}") + print(f"Feature values: {np.unique(x_disc)}") + + # Train model + nb_disc = NaiveBayesLaplace(alpha=1.0, feature_type="discrete") + nb_disc.fit(x_disc, y_disc) + + # Make predictions + nb_disc.predict(x_disc) + probabilities = nb_disc.predict_proba(x_disc) + + print(f"Training accuracy: {nb_disc.score(x_disc, y_disc):.4f}") + print(f"Sample probabilities: {probabilities[:5]}") + + # Test with unseen feature values + x_unseen = np.array([[5, 6, 7], [8, 9, 10]]) # Unseen values + predictions_unseen = nb_disc.predict(x_unseen) + print(f"Predictions on unseen data: {predictions_unseen}") + + print("\n=== Continuous Features Example ===") + + # Generate continuous data + x_cont, y_cont = generate_continuous_data(n_samples=100, n_features=2, n_classes=2) + + print(f"Data shape: {x_cont.shape}") + print(f"Classes: {np.unique(y_cont)}") + + # Train model + nb_cont = NaiveBayesLaplace(alpha=1.0, feature_type="continuous") + nb_cont.fit(x_cont, y_cont) + + # Make predictions + nb_cont.predict(x_cont) + probabilities_cont = nb_cont.predict_proba(x_cont) + + print(f"Training accuracy: {nb_cont.score(x_cont, y_cont):.4f}") + print(f"Sample probabilities: {probabilities_cont[:5]}") + + print("\n=== Comparison with Scikit-learn ===") + compare_with_sklearn() + + +if __name__ == "__main__": + doctest.testmod() + main() diff --git a/machine_learning/pca_from_scratch.py b/machine_learning/pca_from_scratch.py new file mode 100644 index 000000000000..ef9b01e88ae9 --- /dev/null +++ b/machine_learning/pca_from_scratch.py @@ -0,0 +1,333 @@ +""" +Principal Component Analysis (PCA) implemented from scratch using NumPy. + +PCA is a dimensionality reduction technique that transforms high-dimensional data +into a lower-dimensional representation while retaining as much variance as possible. + +This implementation includes: +- Data standardization (mean centering and scaling) +- Covariance matrix computation +- Eigenvalue decomposition to find principal components +- Dimensionality reduction with explained variance calculation +- Comparison with scikit-learn implementation + +Reference: https://en.wikipedia.org/wiki/Principal_component_analysis +""" + +import doctest + +import numpy as np + + +class PCAFromScratch: + """ + Principal Component Analysis implementation from scratch using NumPy. + + This class provides a complete PCA implementation without external ML libraries, + demonstrating the mathematical foundations of the algorithm. + """ + + def __init__(self, n_components: int | None = None) -> None: + """ + Initialize PCA with specified number of components. + + Args: + n_components: Number of principal components to retain. + If None, all components are retained. + + >>> pca = PCAFromScratch(n_components=2) + >>> pca.n_components + 2 + """ + self.n_components = n_components + self.components_: np.ndarray | None = None + self.explained_variance_: np.ndarray | None = None + self.explained_variance_ratio_: np.ndarray | None = None + self.mean_: np.ndarray | None = None + self.std_: np.ndarray | None = None + + def _standardize_data(self, x: np.ndarray) -> np.ndarray: + """ + Standardize the data by mean centering and scaling to unit variance. + + Args: + x: Input data matrix of shape (n_samples, n_features) + + Returns: + Standardized data matrix + + >>> pca = PCAFromScratch() + >>> X = np.array([[1, 2], [3, 4], [5, 6]]) + >>> X_std = pca._standardize_data(X) + >>> np.allclose(X_std.mean(axis=0), 0, atol=1e-15) + True + >>> np.allclose(X_std.std(axis=0), 1, atol=1e-10) + True + """ + # Calculate mean and standard deviation + self.mean_ = np.mean(x, axis=0) + self.std_ = np.std(x, axis=0, ddof=0) # ddof=0 for population std + + # Avoid division by zero for constant features + self.std_[self.std_ == 0] = 1.0 + + # Standardize the data + x_standardized = (x - self.mean_) / self.std_ + + return x_standardized + + def _compute_covariance_matrix(self, x: np.ndarray) -> np.ndarray: + """ + Compute the covariance matrix of the standardized data. + + Args: + x: Standardized data matrix of shape (n_samples, n_features) + + Returns: + Covariance matrix of shape (n_features, n_features) + + >>> pca = PCAFromScratch() + >>> X = np.array([[1, 2], [2, 3], [3, 4]]) + >>> X_std = pca._standardize_data(X) + >>> cov_matrix = pca._compute_covariance_matrix(X_std) + >>> cov_matrix.shape + (2, 2) + >>> np.allclose(cov_matrix, cov_matrix.T) # Symmetric matrix + True + """ + n_samples = x.shape[0] + # Covariance matrix = (X^T * X) / (n_samples - 1) + covariance_matrix = np.dot(x.T, x) / (n_samples - 1) + return covariance_matrix + + def _eigenvalue_decomposition( + self, covariance_matrix: np.ndarray + ) -> tuple[np.ndarray, np.ndarray]: + """ + Perform eigenvalue decomposition on the covariance matrix. + + Args: + covariance_matrix: Covariance matrix of shape (n_features, n_features) + + Returns: + Tuple of (eigenvalues, eigenvectors) + + >>> pca = PCAFromScratch() + >>> cov_matrix = np.array([[2, 1], [1, 2]]) + >>> eigenvalues, eigenvectors = pca._eigenvalue_decomposition(cov_matrix) + >>> eigenvalues.shape + (2,) + >>> eigenvectors.shape + (2, 2) + """ + # Compute eigenvalues and eigenvectors + eigenvalues, eigenvectors = np.linalg.eigh(covariance_matrix) + + # Sort eigenvalues and eigenvectors in descending order + idx = np.argsort(eigenvalues)[::-1] + eigenvalues = eigenvalues[idx] + eigenvectors = eigenvectors[:, idx] + + return eigenvalues, eigenvectors + + def fit(self, x: np.ndarray) -> "PCAFromScratch": + """ + Fit PCA to the data. + + Args: + x: Input data matrix of shape (n_samples, n_features) + + Returns: + Self for method chaining + + >>> pca = PCAFromScratch(n_components=2) + >>> X = np.random.randn(100, 4) + >>> fitted = pca.fit(X) + >>> isinstance(fitted, PCAFromScratch) + True + """ + if x.ndim != 2: + raise ValueError("Input data must be 2-dimensional") + + n_samples, n_features = x.shape + + # Set default number of components + if self.n_components is None: + self.n_components = min(n_samples, n_features) + elif self.n_components > min(n_samples, n_features): + msg = ( + f"n_components={self.n_components} cannot be larger than " + f"min(n_samples, n_features)={min(n_samples, n_features)}" + ) + raise ValueError(msg) + + # Standardize the data + x_standardized = self._standardize_data(x) + + # Compute covariance matrix + covariance_matrix = self._compute_covariance_matrix(x_standardized) + + # Perform eigenvalue decomposition + eigenvalues, eigenvectors = self._eigenvalue_decomposition(covariance_matrix) + + # Select the top n_components + self.components_ = eigenvectors[:, : self.n_components] + self.explained_variance_ = eigenvalues[: self.n_components] + + # Calculate explained variance ratio + total_variance = np.sum(eigenvalues) + self.explained_variance_ratio_ = self.explained_variance_ / total_variance + + return self + + def transform(self, x: np.ndarray) -> np.ndarray: + """ + Transform data using the fitted PCA. + + Args: + x: Input data matrix of shape (n_samples, n_features) + + Returns: + Transformed data matrix of shape (n_samples, n_components) + + >>> pca = PCAFromScratch(n_components=2) + >>> X = np.random.randn(50, 4) + >>> fitted = pca.fit(X) + >>> X_transformed = pca.transform(X) + >>> X_transformed.shape + (50, 2) + """ + if self.components_ is None: + raise ValueError("PCA must be fitted before transform") + + # Standardize the input data using the same parameters as during fit + x_standardized = (x - self.mean_) / self.std_ + + # Project data onto principal components + x_transformed = np.dot(x_standardized, self.components_) + + return x_transformed + + def fit_transform(self, x: np.ndarray) -> np.ndarray: + """ + Fit PCA and transform data in one step. + + Args: + x: Input data matrix of shape (n_samples, n_features) + + Returns: + Transformed data matrix of shape (n_samples, n_components) + + >>> pca = PCAFromScratch(n_components=2) + >>> X = np.random.randn(50, 4) + >>> X_transformed = pca.fit_transform(X) + >>> X_transformed.shape + (50, 2) + """ + return self.fit(x).transform(x) + + def inverse_transform(self, x_transformed: np.ndarray) -> np.ndarray: + """ + Transform data back to original space. + + Args: + x_transformed: Transformed data matrix of shape (n_samples, n_components) + + Returns: + Data in original space of shape (n_samples, n_features) + + >>> pca = PCAFromScratch(n_components=2) + >>> X = np.random.randn(50, 4) + >>> X_transformed = pca.fit_transform(X) + >>> X_reconstructed = pca.inverse_transform(X_transformed) + >>> X_reconstructed.shape + (50, 4) + """ + if self.components_ is None or self.mean_ is None or self.std_ is None: + raise ValueError("PCA must be fitted before inverse_transform") + + # Transform back to standardized space + x_standardized = np.dot(x_transformed, self.components_.T) + + # Denormalize to original space + x_original = (x_standardized * self.std_) + self.mean_ + + return x_original + + +def compare_with_sklearn() -> None: + """ + Compare our PCA implementation with scikit-learn's PCA. + + This function demonstrates that our implementation produces results + very close to the scikit-learn implementation. + """ + from sklearn.datasets import make_blobs + from sklearn.decomposition import PCA + + # Generate sample data + x, _ = make_blobs(n_samples=100, centers=3, n_features=4, random_state=42) + + # Our implementation + pca_ours = PCAFromScratch(n_components=2) + x_transformed_ours = pca_ours.fit_transform(x) + + # Scikit-learn implementation + pca_sklearn = PCA(n_components=2, random_state=42) + x_transformed_sklearn = pca_sklearn.fit_transform(x) + + # Compare results (should be very similar, possibly with different signs) + print("Our PCA - First 5 rows:") + print(x_transformed_ours[:5]) + print("\nScikit-learn PCA - First 5 rows:") + print(x_transformed_sklearn[:5]) + + print(f"\nOur explained variance ratio: {pca_ours.explained_variance_ratio_}") + print(f"Sklearn explained variance ratio: {pca_sklearn.explained_variance_ratio_}") + + # Check if results are similar (within tolerance) + correlation = np.corrcoef( + x_transformed_ours.flatten(), x_transformed_sklearn.flatten() + )[0, 1] + print(f"\nCorrelation between implementations: {correlation:.6f}") + + +def main() -> None: + """ + Demonstrate PCA from scratch implementation. + """ + # Generate sample data + rng = np.random.default_rng(42) + n_samples, n_features = 100, 4 + x = rng.standard_normal((n_samples, n_features)) + + print("Original data shape:", x.shape) + print("Original data (first 5 rows):") + print(x[:5]) + + # Apply PCA + pca = PCAFromScratch(n_components=2) + x_transformed = pca.fit_transform(x) + + print(f"\nTransformed data shape: {x_transformed.shape}") + print("Transformed data (first 5 rows):") + print(x_transformed[:5]) + + print(f"\nExplained variance ratio: {pca.explained_variance_ratio_}") + if pca.explained_variance_ratio_ is not None: + print(f"Total variance explained: {np.sum(pca.explained_variance_ratio_):.4f}") + + # Demonstrate inverse transform + x_reconstructed = pca.inverse_transform(x_transformed) + reconstruction_error = np.mean((x - x_reconstructed) ** 2) + print(f"\nReconstruction error (MSE): {reconstruction_error:.6f}") + + # Compare with sklearn + print("\n" + "=" * 50) + print("Comparison with scikit-learn:") + compare_with_sklearn() + + +if __name__ == "__main__": + doctest.testmod() + main()