In [1]:
from collections import Counter
from typing import List

import numpy as np
import pandas as pd

np.random.seed(313)

"""
Tips for debugging:
- Use `print` to check the shape of your data. Shape mismatch is a common error.
- Use `ipdb` to debug your code
    - `ipdb.set_trace()` to set breakpoints and check the values of your variables in interactive mode
    - `python -m ipdb -c continue hw3.py` to run the entire script in debug mode. Once the script is paused, you can use `n` to step through the code line by line.
"""


# 1. Load datasets
def load_data() -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    DO NOT MODIFY THIS FUNCTION.
    """
    # Load iris dataset
    iris = pd.read_csv(
        "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
        header=None,
    )
    iris.columns = [
        "sepal_length",
        "sepal_width",
        "petal_length",
        "petal_width",
        "class",
    ]

    # Load Boston housing dataset
    boston = pd.read_csv(
        "https://raw.githubusercontent.com/selva86/datasets/master/BostonHousing.csv"
    )

    return iris, boston


# 2. Preprocessing functions
def train_test_split(
    df: pd.DataFrame, target: str, test_size: float = 0.3
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    # Shuffle and split dataset into train and test sets
    df = df.sample(frac=1, random_state=42).reset_index(drop=True)
    split_idx = int(len(df) * (1 - test_size))
    train = df.iloc[:split_idx]
    test = df.iloc[split_idx:]

    # Split target and features
    X_train = train.drop(target, axis=1).values
    y_train = train[target].values
    X_test = test.drop(target, axis=1).values
    y_test = test[target].values

    return X_train, X_test, y_train, y_test


def normalize(X: np.ndarray) -> np.ndarray:
    # Normalize features to [0, 1]
    # You can try other normalization methods, e.g., z-score, etc.
    # TODO: 1%
    X_min = np.min(X, axis=0)
    X_max = np.max(X, axis=0)
    X_norm = (X - X_min) / (X_max - X_min)
    return X_norm
    raise NotImplementedError


def one_hot_encode(y: np.ndarray) -> np.ndarray:
    """
    One-hot encode labels.
    """
    unique_labels = np.unique(y)
    num_classes = len(unique_labels)
    encoded_labels = np.zeros((len(y), num_classes))
    for i, label in enumerate(unique_labels):
        encoded_labels[y == label, i] = 1
    return encoded_labels




def accuracy(y_true, y_pred):
    # y_true = np.argmax(y_true, axis=1 )
    print(f'y_true : {y_true}\ny_pred : {y_pred}' )
    correct = np.sum(y_true == y_pred)    
    total = len(y_true)
    accuracy = correct / total
    return accuracy


def mean_squared_error(y_true, y_pred):
    mse = np.mean((y_true - y_pred) ** 2)
    return mse


def encode_labels(y: np.ndarray) -> np.ndarray:
    """
    Encode labels to integers.
    """
    unique_labels = np.unique(y)
    label_map = {label: i for i, label in enumerate(unique_labels)}
    encoded_labels = np.array([label_map[label] for label in y])
    return encoded_labels





In [2]:

iris, boston = load_data()

# Iris dataset - Classification
X_train, X_test, y_train, y_test = train_test_split(iris, "class")
X_train, X_test = normalize(X_train), normalize(X_test)
# print(y_train)
y_train, y_test = encode_labels(y_train), encode_labels(y_test)
print(y_train, y_train.shape)


[1 0 2 1 1 0 1 2 1 1 2 0 0 0 0 1 2 1 1 2 0 2 0 2 2 2 2 2 0 0 0 0 1 0 0 2 1
 0 0 0 2 1 1 0 0 1 2 2 1 2 1 2 1 0 2 1 0 0 0 1 2 0 0 0 1 0 1 2 0 1 2 0 2 2
 1 1 2 1 0 1 2 0 0 1 1 0 2 0 0 1 1 2 1 2 2 1 0 0 2 2 0 0 0 1 2] (105,)


Linear model

In [28]:
class LinearModel:
    def __init__(self, learning_rate=0.01, iterations=15, model_type="linear") -> None:
        self.learning_rate = learning_rate
        self.iterations = iterations
        self.model_type = model_type

        assert model_type in ["linear", "logistic"], "model_type must be either 'linear' or 'logistic'"

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        X = np.insert(X, 0, 1, axis=1)
        n_classes = len(np.unique(y))
        n_features = X.shape[1]
        self.weights = np.zeros((n_features, n_classes))

        if self.model_type == "logistic":
            y = y.reshape(-1, 1)
            for iter in range(self.iterations):
                gradients = self._compute_gradients(X, y, iter )
                print(f'{gradients=}')
                self.weights -= self.learning_rate * gradients
        else:
            for iter in range(self.iterations):
                gradients = self._compute_gradients(X, y, iter)
                self.weights += self.learning_rate * gradients

    def predict(self, X: np.ndarray) -> np.ndarray:
        X = np.insert(X, 0, 1, axis=1)

        if self.model_type == "linear":
            return np.dot(X, self.weights)
        elif self.model_type == "logistic":
            prob = self._softmax(np.dot(X, self.weights))
            print("prob : " , prob)
            return np.argmax(prob, axis= 1)

    def _compute_gradients(self, X: np.ndarray, y: np.ndarray, iter ) -> np.ndarray:
        if self.model_type == "linear":
            predictions = np.dot(X, self.weights)
            error = predictions - y
            gradients = np.dot(X.T, error) / len(y)
        elif self.model_type == "logistic":
            Y = np.zeros((len(y), 3))
            for i in range(3):
                Y[y.flatten() == i, i] = 1
            
            predictions = self._softmax(np.dot(X, self.weights))
            error = predictions - Y
            gradients = np.dot(X.T, error) / len(y)

        return gradients

    def _softmax(self, z: np.ndarray) -> np.ndarray:
        exp = np.exp(z)
        return exp / np.sum(exp, axis=1, keepdims=True)


In [29]:
# driver code for linear model in iris dataset
logistic_regression = LinearModel(model_type="logistic")
logistic_regression.fit(X_train, y_train)
y_pred = logistic_regression.predict(X_test)

# print(f'y_pred = {y_pred}\ny_test = {y_test}')
print("Logistic Regression Accuracy:", accuracy(y_test, y_pred))


gradients=array([[-0.04761905,  0.01904762,  0.02857143],
       [ 0.06261023, -0.00246914, -0.06014109],
       [-0.07804233,  0.0505291 ,  0.02751323],
       [ 0.11665321, -0.02684961, -0.08980361],
       [ 0.11997354, -0.01812169, -0.10185185]])
gradients=array([[-0.04777279,  0.01897739,  0.02879541],
       [ 0.06248486, -0.00248711, -0.05999775],
       [-0.078081  ,  0.05048771,  0.02759329],
       [ 0.11649106, -0.02686159, -0.08962947],
       [ 0.11980938, -0.01813262, -0.10167676]])
gradients=array([[-0.04792521,  0.0189073 ,  0.02901791],
       [ 0.06236015, -0.00250507, -0.05985508],
       [-0.0781191 ,  0.0504464 ,  0.0276727 ],
       [ 0.11632964, -0.02687356, -0.08945608],
       [ 0.11964594, -0.01814355, -0.10150239]])
gradients=array([[-0.04807629,  0.01883736,  0.02923893],
       [ 0.06223609, -0.00252301, -0.05971308],
       [-0.07815664,  0.05040517,  0.02775147],
       [ 0.11616896, -0.02688552, -0.08928343],
       [ 0.11948322, -0.01815447, -0.10132875

In [4]:
X_train, X_test, y_train, y_test = train_test_split(boston, "medv")
X_train, X_test = normalize(X_train), normalize(X_test)

linear_regression = LinearModel(model_type="linear")
linear_regression.fit(X_train, y_train)
y_pred = linear_regression.predict(X_test)
print("Linear Regression MSE:", mean_squared_error(y_test, y_pred))

: 

: 

---------doesn't matter -----------

Decision tree

In [39]:

class DecisionTree:
    def __init__(self, max_depth: int = 5, model_type: str = "classifier"):
        self.max_depth = max_depth
        self.model_type = model_type

        assert model_type in [
            "classifier",
            "regressor",
        ], "model_type must be either 'classifier' or 'regressor'"

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        self.tree = self._build_tree(X, y, 0)

    def predict(self, X: np.ndarray) -> np.ndarray:
        return np.array([self._traverse_tree(x, self.tree) for x in X])

    def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int) -> dict:
        if depth >= self.max_depth or self._is_pure(y):
            return self._create_leaf(y)

        feature, threshold = self._find_best_split(X, y)
        mask = X[:, feature] <= threshold
        X_left, y_left = X[mask], y[mask]
        X_right, y_right = X[~mask], y[~mask]

        left_child = self._build_tree(X_left, y_left, depth + 1)
        right_child = self._build_tree(X_right, y_right, depth + 1)

        return {
            "feature": feature,
            "threshold": threshold,
            "left": left_child,
            "right": right_child,
        }

    def _is_pure(self, y: np.ndarray) -> bool:
        return len(set(y)) == 1

    def _create_leaf(self, y: np.ndarray):
        if self.model_type == "classifier":
            return np.bincount(y).argmax()
        else:
            return np.mean(y)

    def _find_best_split(self, X: np.ndarray, y: np.ndarray) -> tuple[int, float]:
        best_gini = float("inf")
        best_mse = float("inf")
        best_feature = None
        best_threshold = None

        for feature in range(X.shape[1]):
            sorted_indices = np.argsort(X[:, feature])
            for i in range(1, len(X)):
                if X[sorted_indices[i - 1], feature] != X[sorted_indices[i], feature]:
                    threshold = (
                        X[sorted_indices[i - 1], feature]
                        + X[sorted_indices[i], feature]
                    ) / 2
                    mask = X[:, feature] <= threshold
                    left_y, right_y = y[mask], y[~mask]

                    if self.model_type == "classifier":
                        gini = self._gini_index(left_y, right_y)
                        if gini < best_gini:
                            best_gini = gini
                            best_feature = feature
                            best_threshold = threshold
                    else:
                        mse = self._mse(left_y, right_y)
                        if mse < best_mse:
                            best_mse = mse
                            best_feature = feature
                            best_threshold = threshold

        return best_feature, best_threshold

    def _gini_index(self, left_y: np.ndarray, right_y: np.ndarray) -> float:
        unique_classes = np.unique(np.concatenate((left_y, right_y)))
        gini_index = 0.0

        for class_val in unique_classes:
            p_left = len(left_y[left_y == class_val]) / len(left_y)
            p_right = len(right_y[right_y == class_val]) / len(right_y)
            gini_left = 1 - p_left ** 2 - (1 - p_left) ** 2
            gini_right = 1 - p_right ** 2 - (1 - p_right) ** 2
            weighted_gini = (len(left_y) * gini_left + len(right_y) * gini_right) / (
                len(left_y) + len(right_y)
            )
            gini_index += weighted_gini

        return gini_index

    def _mse(self, left_y: np.ndarray, right_y: np.ndarray) -> float:
        mean_left = np.mean(left_y)
        mean_right = np.mean(right_y)
        mse_left = np.mean((left_y - mean_left) ** 2)
        mse_right = np.mean((right_y - mean_right) ** 2)
        weighted_mse = (len(left_y) * mse_left + len(right_y) * mse_right) / (len(left_y) + len(right_y) )

        return weighted_mse
    
    def _traverse_tree(self, x: np.ndarray, node: dict):
        if isinstance(node, dict):
            feature, threshold = node["feature"], node["threshold"]
            if x[feature] <= threshold:
                return self._traverse_tree(x, node["left"])
            else:
                return self._traverse_tree(x, node["right"])
        else:
            return node




In [40]:
decision_tree_classifier = DecisionTree(model_type="classifier")
decision_tree_classifier.fit(X_train, y_train)
y_pred = decision_tree_classifier.predict(X_test)
print("Decision Tree Classifier Accuracy:", accuracy(y_test, y_pred))

Decision Tree Classifier Accuracy: 0.8888888888888888


    boston 

In [34]:
X_train, X_test, y_train, y_test = train_test_split(boston, "medv")
X_train, X_test = normalize(X_train), normalize(X_test)

decision_tree_regressor = DecisionTree(model_type="regressor")
decision_tree_regressor.fit(X_train, y_train)
y_pred = decision_tree_regressor.predict(X_test)
print("Decision Tree Regressor MSE:", mean_squared_error(y_test, y_pred))

Decision Tree Regressor MSE: 28.341570306709183


random forest

In [41]:
from tqdm import tqdm

class RandomForest:
    def __init__(self, n_estimators: int = 100, max_depth: int = 5, model_type: str = "classifier"):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.model_type = model_type
        # Initialize a list of DecisionTree instances based on the specified number of estimators, max depth, and model type
        self.trees = [ DecisionTree(max_depth=max_depth, model_type=model_type) for _ in range(n_estimators)]

        assert model_type in ["classifier", "regressor"], "model_type must be either 'classifier' or 'regressor'"

    def fit(self, X: np.ndarray, y: np.ndarray, detail = False ) -> None:
        iter = 1 
        for tree in tqdm(self.trees):
            # Generate bootstrap indices by random sampling with replacement
            bootstrap_indices = np.random.choice(len(X), len(X), replace=True)
            bootstrap_X, bootstrap_y = X[bootstrap_indices], y[bootstrap_indices]
            
            # Fit each tree with the corresponding samples from X and y
            tree.fit(bootstrap_X, bootstrap_y)
            if (iter % 10 == 0 and detail):
                print(f'Finished {iter} iters')
            iter += 1

    def predict(self, X: np.ndarray) -> np.ndarray:
        predictions = []

        for tree in self.trees:
            # Predict the output for each tree
            tree_pred = tree.predict(X)
            predictions.append(tree_pred)

        if self.model_type == "classifier":
            # Majority voting for classification
            predictions = np.array(predictions)
            majority_votes = np.apply_along_axis(lambda x: np.argmax(np.bincount(x)), axis=0, arr=predictions)
            return majority_votes
        else:
            # Averaging for regression
            predictions = np.array(predictions)
            return np.mean(predictions, axis=0)


In [42]:
random_forest_regressor = RandomForest(model_type="regressor", n_estimators= 100)
random_forest_regressor.fit(X_train, y_train)
y_pred = random_forest_regressor.predict(X_test)
print("Random Forest Regressor MSE:", mean_squared_error(y_test, y_pred))

100%|██████████| 100/100 [00:00<00:00, 224.50it/s]

Random Forest Regressor MSE: 0.04437957998163452





In [44]:
X_train, X_test, y_train, y_test = train_test_split(iris, "class")
X_train, X_test = normalize(X_train), normalize(X_test)
y_train, y_test = encode_labels(y_train), encode_labels(y_test)

decision_tree_classifier = DecisionTree(model_type="classifier")
decision_tree_classifier.fit(X_train, y_train)
y_pred = decision_tree_classifier.predict(X_test)
print("Decision Tree Classifier Accuracy:", accuracy(y_test, y_pred))

random_forest_classifier = RandomForest(model_type="classifier")
random_forest_classifier.fit(X_train, y_train)
y_pred = random_forest_classifier.predict(X_test)
print("Random Forest Classifier Accuracy:", accuracy(y_test, y_pred))

X_train, X_test, y_train, y_test = train_test_split(boston, "medv")
X_train, X_test = normalize(X_train), normalize(X_test)

decision_tree_regressor = DecisionTree(model_type="regressor")
decision_tree_regressor.fit(X_train, y_train)
y_pred = decision_tree_regressor.predict(X_test)
print("Decision Tree Regressor MSE:", mean_squared_error(y_test, y_pred))

random_forest_regressor = RandomForest(model_type="regressor")
random_forest_regressor.fit(X_train, y_train)
y_pred = random_forest_regressor.predict(X_test)
print("Random Forest Regressor MSE:", mean_squared_error(y_test, y_pred))

Decision Tree Classifier Accuracy: 0.8888888888888888


100%|██████████| 100/100 [00:00<00:00, 276.41it/s]


Random Forest Classifier Accuracy: 0.9111111111111111
Decision Tree Regressor MSE: 28.341570306709183


100%|██████████| 100/100 [00:14<00:00,  7.10it/s]

Random Forest Regressor MSE: 23.60490294302168



