In [3]:
import numpy as np

In [4]:
from abc import ABC, abstractmethod

class SupervisedMachineLearningModel(ABC):
    @abstractmethod
    def train(self, X, y):
        pass

    @abstractmethod
    def predict(self, X):
        pass

### Linear Regression

#### Data
```
 ----------
|  X |  y  |
 ----------

y is continous value
```

#### 1. Model

```
y_pred = wX + b

where,
X = Independent variable
y_pred = Dependent variable
w = weight or slope
b = intercept or bias
```

#### 2. Loss

```
J(w) = (1 / (2 * m)) * Σ[(y_pred(i) - y(i))^2]

where,
m is the number of training examples.
y(i) is the actual output for the i-th example.
y_pred(i) is the predicted output for the i-th example.
```

#### 3. Gradient

```
∂J(w) / ∂wj = (1 / m) * Σ[(y_pred(i) - y(i)) * Xj(i)]

Compute the error term (y_pred(i) - y(i)).
Multiply the error by the corresponding feature Xj(i).
Take the average over all examples.
```

#### 4. Gradient Descent Update

```
wj = wj - α * ∂J(w) / ∂wj

where,
∂J(w) / ∂wj is gradient
```


In [15]:
class LinearRegression(SupervisedMachineLearningModel):
    def __init__(self, learning_rate: float = 0.01, epochs: int = 1000) -> None:
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.weights = None

    def train(self, X, y, verbose=False) -> None:
        """Train the linear regression model."""
        m, n = X.shape
        # Initialize weights (including bias as part of weights)
        self.weights = np.zeros(n + 1)

        # Add bias as the first column of X
        X = np.c_[np.ones(m), X]

        for epoch in range(self.epochs):
            # Predictions
            y_pred = X @ self.weights

            # Error calculation
            error = y_pred - y

            # Loss
            loss = (1 / (2 * m)) * np.sum(error ** 2)

            # Gradient calculation
            gradient = (1 / m) * (X.T @ error)

            # Update weights
            self.weights -= self.learning_rate * gradient

            if verbose:
                # Display
                print(f"iteration: ({epoch+1}/{self.epochs}) - error: {error} - loss: {loss} - gradient - {gradient}")

    def predict(self, X) -> np.ndarray:
        """Predict using the linear regression model."""
        m = X.shape[0]
        # Add bias as the first column of X
        X = np.c_[np.ones(m), X]
        return X @ self.weights

### Logistic Regression

#### Data

```
 ----------
|  X |  y  |
 ----------

y is discrete value
```

#### 1. Model

```
y_pred = sigmoid(X @ weights)

where,
sigmoid = 1/(1+exp(-z))

z = w0 + w1*x1 + w2*x2 + ... + wn*xn
w0 is bias term
```

#### 2. Loss

```
Cost = -(1/m) * Σ [y * log(y_pred) + (1 - y) * log(1 - y_pred)]
```

#### 3. Gradient Calculation

```
# Error
error = y_pred - y

# Gradient
gradient = (1 / m) * (X.T @ error)

# Weight update
weights -= learning_rate * gradient
```

In [55]:
class LogisticRegression(SupervisedMachineLearningModel):
    def __init__(self, learning_rate: float = 0.01, epochs: int = 1000, threshold: float = 0.5):
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.threshold = threshold
        self.weights = None
        self.mean = None
        self.std = None

    @staticmethod
    def __sigmoid(z):
        """Compute the sigmoid function."""
        return np.clip(1 / (1 + np.exp(-z)), 1e-10, 1 - 1e-10)

    def _standardize(self, X):
        """Standardize features (zero mean and unit variance)."""
        if self.mean is None or self.std is None:
            self.mean = np.mean(X, axis=0)
            self.std = np.std(X, axis=0)
        return (X - self.mean) / (self.std + 1e-10)

    def train(self, X, y, verbose=False):
        """Train the logistic regression model."""
        m, n = X.shape
        X = np.c_[np.ones(m), X]  # Add bias term
        X = self._standardize(X)  # Standardize the data
        self.weights = np.random.randn(n + 1) * 0.01  # Small random initialization

        for epoch in range(self.epochs):
            z = X @ self.weights  # Compute linear combination
            y_pred = self.__sigmoid(z)  # Apply sigmoid
            error = y_pred - y  # Calculate error
            loss = -(1 / m) * np.sum(y * np.log(np.clip(y_pred, 1e-10, 1 - 1e-10)) + (1 - y) * np.log(np.clip(1 - y_pred, 1e-10, 1 - 1e-10)))
            gradient = (1 / m) * (X.T @ error)  # Compute gradient

            if np.any(np.isnan(gradient)):
                print(f"NaN in gradient at epoch {epoch+1}")
                break

            self.weights -= self.learning_rate * gradient  # Update weights

            if verbose:
                print(f"Epoch {epoch+1}/{self.epochs} - Loss: {loss:.6f} - Weights: {self.weights}")

    def predict(self, X):
        """Make predictions using the logistic regression model."""
        X = np.c_[np.ones(X.shape[0]), X]  # Add bias term
        X = self._standardize(X)  # Standardize the data
        probabilities = self.__sigmoid(X @ self.weights)  # Get predicted probabilities
        return (probabilities >= self.threshold).astype(int)  # Apply threshold

### **1. Decision Tree Overview**
A decision tree is a tree-like model used for classification and regression. It splits data into subsets based on feature values, using a tree structure to make decisions.

---

#### **2. Key Concepts**

##### **2.1 Root Node**
- The topmost node in a tree.
- Represents the entire dataset and is split into subsets.

##### **2.2 Internal Nodes**
- Nodes where the dataset is further split based on conditions on features.

##### **2.3 Leaf Nodes**
- Terminal nodes that represent a class label (for classification) or a value (for regression).

##### **2.4 Splitting Criteria**
The choice of feature and threshold to split the data is determined by a splitting criterion.

---

#### **3. Splitting Criteria**

##### **3.1 Classification**

##### **Information Gain (IG)**  
- Measures the reduction in entropy after a split.
- Formula:  
  ```
  IG = Entropy(parent) - [Weighted average of Entropy(children)]
  ```

##### **Entropy**  
- Measures the impurity of the dataset.  
- Formula:  
  ```
  Entropy = -∑(p * log2(p))
  ```  
  where `p` is the proportion of data points in each class.

##### **Gini Index**  
- Another metric for impurity, ranging from 0 (pure) to 0.5 (most impure for binary classification).  
- Formula:  
  ```
  Gini = 1 - ∑(p^2)
  ```

#### **3.2 Regression**

###### **Mean Squared Error (MSE)**  
- Measures the variance within the data at a node.  
- Formula:  
  ```
  MSE = (1 / N) * ∑(y - y_mean)^2
  ```  
  where `N` is the number of samples at the node, and `y_mean` is the average value of the target variable at that node.

###### **Mean Absolute Error (MAE)**  
- Measures the average absolute deviation.  
- Formula:  
  ```
  MAE = (1 / N) * ∑|y - y_mean|
  ```

---

#### **4. Pruning**
Pruning reduces the size of the tree to prevent overfitting.

##### **4.1 Pre-Pruning**
- Stop splitting when certain criteria (e.g., depth or minimum samples per leaf) are met.

##### **4.2 Post-Pruning**
- Build the full tree and then remove branches that have low importance.

---

#### **5. Advantages**
- Simple to understand and interpret.
- Requires little data preprocessing.
- Handles both numerical and categorical features.

---

#### **6. Disadvantages**
- Prone to overfitting (especially with deep trees).
- Can be sensitive to small changes in data (unstable splits).
- Biased towards features with many levels (e.g., categorical features with high cardinality).

---

#### **7. Decision Tree Model Equation**
The model prediction is computed as:
- **For Classification:** The class of the majority samples in the leaf node.  
  ```
  y_pred = majority_class(leaf_node)
  ```

- **For Regression:** The average of the target values in the leaf node.  
  ```
  y_pred = mean(target_values_in_leaf_node)
  ```

---

#### **8. Common Hyperparameters**
- `max_depth`: Maximum depth of the tree.
- `min_samples_split`: Minimum number of samples required to split an internal node.
- `min_samples_leaf`: Minimum number of samples required to be in a leaf node.
- `criterion`: Metric used to evaluate splits (e.g., Gini, Entropy, MSE).



In [56]:
class DecisionTree(SupervisedMachineLearningModel):
    def __init__(self, max_depth=None, criterion='gini'):
        self.max_depth = max_depth
        self.criterion = criterion
        self.tree = None

    def __class_probabilities(self, y):
        """Compute the class probabilities for a given set of labels."""
        unique, counts = np.unique(y, return_counts=True)
        return counts / len(y)

    def __entropy(self, y):
        """Compute the entropy of a dataset."""
        probabilities = self.__class_probabilities(y)
        return -np.sum(probabilities * np.log2(probabilities + 1e-10))

    def __gini(self, y):
        """Compute the Gini index of a dataset."""
        probabilities = self.__class_probabilities(y)
        return 1 - np.sum(probabilities ** 2)

    def __impurity(self, y, criterion):
        """Compute the impurity of a dataset based on the specified criterion."""
        if criterion == 'entropy':
            return self.__entropy(y)
        elif criterion == 'gini':
            return self.__gini(y)
        else:
            raise ValueError(f"Unsupported criterion: {criterion}, choose between 'entropy' and 'gini'.")

    def __split(self, X, y, feature_index, threshold):
        """Split the dataset based on a feature and threshold."""
        left_indices = X[:, feature_index] < threshold
        right_indices = ~left_indices
        return left_indices, right_indices

    def __best_split(self, X, y):
        """Find the best split for a dataset."""
        best_impurity = float('inf')
        best_feature_index, best_threshold = None, None

        for feature_index in range(X.shape[1]):
            thresholds = np.unique(X[:, feature_index])
            for threshold in thresholds:
                left_indices, right_indices = self.__split(X, y, feature_index, threshold)
                if len(left_indices) == 0 or len(right_indices) == 0:
                    continue

                left_impurity = self.__impurity(y[left_indices], self.criterion)
                right_impurity = self.__impurity(y[right_indices], self.criterion)
                weighted_impurity = (len(left_indices) / len(y)) * left_impurity + (len(right_indices) / len(y)) * right_impurity

                print(f"Feature: {feature_index}, Threshold: {threshold}, Weighted Impurity: {weighted_impurity}")

                if weighted_impurity < best_impurity:
                    best_impurity = weighted_impurity
                    best_feature_index = feature_index
                    best_threshold = threshold

        return best_feature_index, best_threshold

    def __build_tree(self, X, y, depth):
        """Recursively build the decision tree."""
        if (len(np.unique(y)) == 1) or (depth == 0):
            leaf_class = np.bincount(y).argmax()  # Majority class
            print(f"Stopping with leaf class: {leaf_class}")
            return {'class': leaf_class}

        feature_index, threshold = self.__best_split(X, y)
        print(f"Best split: feature_index={feature_index}, threshold={threshold}")

        if feature_index is None:
            leaf_class = np.bincount(y).argmax()  # Majority class
            print(f"Stopping with leaf class: {leaf_class}")
            return {'class': leaf_class}

        left_indices, right_indices = self.__split(X, y, feature_index, threshold)
        left_subtree = self.__build_tree(X[left_indices], y[left_indices], depth - 1)
        right_subtree = self.__build_tree(X[right_indices], y[right_indices], depth - 1)
        return {
            'feature_index': feature_index,
            'threshold': threshold,
            'left': left_subtree,
            'right': right_subtree
        }

    def train(self, X, y):
        """Train the decision tree."""
        self.tree = self.__build_tree(X, y, self.max_depth)

    def __predict_sample(self, x, tree):
        """Recursively traverse the decision tree for a single sample."""
        if 'class' in tree:
            return tree['class']

        feature_index = tree['feature_index']
        threshold = tree['threshold']
        if x[feature_index] <= threshold:
            return self.__predict_sample(x, tree['left'])
        else:
            return self.__predict_sample(x, tree['right'])

    def predict(self, X):
        """Predict using the decision tree."""
        return np.array([self.__predict_sample(x, self.tree) for x in X])

### Test

In [9]:
! pip install ipytest --quiet

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.6 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.5/1.6 MB[0m [31m14.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.6/1.6 MB[0m [31m27.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m19.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [11]:
def skip(func):
    """Decorator to skip executing functions"""
    def wrapper(*args, **kwargs):
        print(f"{func.__name__} is skipped")
    return wrapper

In [71]:
import ipytest
ipytest.autoconfig()

# linear_regression
def test_linear_regression():
    X = np.array([[1], [2], [3], [4], [5], [6], [7]])
    y = np.array([3, 5, 7, 9, 11, 13, 15])  # y = 2x + 1
    model = LinearRegression(learning_rate=0.01, epochs=1000)
    model.train(X[:4], y[:4], verbose=False)
    predictions = model.predict(X[5:])
    expected = y[5:]
    assert np.allclose(predictions, expected, rtol=1e-2, atol=1e-2, equal_nan=False), f"predictions {predictions} do not match expected {expected}"

# logistic_regression
def test_logistic_regression():
    X = np.array([[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]])  # 10 samples, 1 feature
    y = np.array([1, 0, 1, 0, 1, 0, 1, 0, 1, 0])  # Binary labels alternating 1 and 0

    # Initialize the logistic regression model
    model = LogisticRegression(learning_rate=0.01, epochs=15000, threshold=0.5)

    # Train on the first 5 samples
    model.train(X[:5], y[:5], verbose=False)

    # Test on the remaining 5 samples
    predictions = model.predict(X[5:])
    expected = y[5:]

    # Assert if predictions match expected labels
    assert np.allclose(predictions, expected, rtol=1e-2, atol=1e-2, equal_nan=False), f"Predictions {predictions} do not match expected {expected}"

    # Output results
    accuracy = np.mean(predictions == expected)
    print(f"Model accuracy: {accuracy:.4f}")


# decision_tree
def test_decision_tree():
    X = np.array([
        [1, 2],
        [2, 3],
        [3, 4],
        [4, 5],
        [5, 6],
        [6, 7],
        [7, 8],
        [8, 9],
        [9, 10],
        [10, 11]
    ])
    y = np.array([0, 0, 0, 0, 1, 1, 1, 1, 1, 1])

    X_train, X_test = X[:7], X[7:]
    y_train, y_test = y[:7], y[7:]

    model = DecisionTree(max_depth=3)
    model.train(X_train, y_train)

    # Print the tree structure for debugging
    print("Trained Decision Tree Structure:")
    print(model.tree)

    # Make predictions and debug output
    y_pred = model.predict(X_test)
    print("Predictions on Test Set:", y_pred)
    print("Expected Labels:", y_test)

    assert len(y_pred) == len(y_test), "Number of predictions does not match the test set size."
    assert (y_pred == y_test).all(), (
        f"Predictions do not match expected labels. "
        f"Predicted: {y_pred}, Expected: {y_test}"
    )

    print("DecisionTree multi-feature test case passed successfully!")


if __name__ == "__main__":
    ipytest.run()

[32m.[0m[31mF[0m[32m.[0m[32m.[0m[32m.[0m[31m                                                                                        [100%][0m
[31m[1m_____________________________________ test_logistic_regression _____________________________________[0m

    [0m[94mdef[39;49;00m [92mtest_logistic_regression[39;49;00m():[90m[39;49;00m
        X = np.array([[[94m1[39;49;00m], [[94m2[39;49;00m], [[94m3[39;49;00m], [[94m4[39;49;00m], [[94m5[39;49;00m], [[94m6[39;49;00m], [[94m7[39;49;00m], [[94m8[39;49;00m], [[94m9[39;49;00m], [[94m10[39;49;00m]])  [90m# 10 samples, 1 feature[39;49;00m[90m[39;49;00m
        y = np.array([[94m1[39;49;00m, [94m0[39;49;00m, [94m1[39;49;00m, [94m0[39;49;00m, [94m1[39;49;00m, [94m0[39;49;00m, [94m1[39;49;00m, [94m0[39;49;00m, [94m1[39;49;00m, [94m0[39;49;00m])  [90m# Binary labels alternating 1 and 0[39;49;00m[90m[39;49;00m
    [90m[39;49;00m
        [90m# Initialize the logistic regression