In [176]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import random
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.metrics import f1_score, ConfusionMatrixDisplay, roc_auc_score, roc_curve
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import math
from scipy.optimize import minimize

In [177]:

column_names = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT', 'MEDV']
df = pd.read_csv('housing.csv', sep = '\s+', header = None, names = column_names) 

In [178]:
def clean_outliers(df, feature):
    #first quartile
    feature_q1 = df[feature].quantile(0.25)
    #third quartile
    feature_q3 = df[feature].quantile(0.75)
    #interquartile range
    feature_iqr = feature_q3 - feature_q1
    #mask to eliminate low outliers
    mask1 = df[feature]>feature_q1-1.5*feature_iqr
    #mask to eliminate high outliers
    mask3 = df[feature]<feature_q3+1.5*feature_iqr
    return df[mask1][mask3].reset_index()



In [179]:
def my_train_test_split(*arrays, test_size = 0.2, training_size = 0.8, random_state = None):
    if test_size+training_size!=1:
        raise 'Bad training/test split size'
    for i in arrays:
        if len(i)!=len(arrays[0]):
            raise 'Bad input object size'
    random.seed(random_state)
    array_size = len(arrays[0])
    num_test = math.floor(array_size*test_size)
    num_train = math.floor(array_size*training_size)
    num_train+=(array_size-num_train-num_test)
    return_list = []
    test_list = random.sample(range(array_size),num_test, )
    for i in arrays:
        if type(i)==pd.DataFrame:
            test_array = pd.DataFrame(columns = i.columns)
            train_array = pd.DataFrame(columns = i.columns)
        elif type(i)==pd.Series:
            test_array = pd.Series()
            train_array = pd.Series()
        test_array = i.iloc[test_list]
        train_array = i.drop(test_list)
        return_list.append(train_array)
        return_list.append(test_array)
    return return_list

In [180]:
def my_mse(a, b):
    if not isinstance(a, np.ndarray):
        a = np.array(a)
    if not isinstance(b, np.ndarray):
        b = np.array(b)
    return np.mean((a - b) ** 2)

def my_rmse(a, b):
    return math.sqrt(my_mse(a,b))

def my_mae(a,b):
    a = a.to_numpy()
    sum = 0
    for i in range(len(a)):
        sum+=abs(a[i]-b[i])
    sum/=len(a)
    return sum

def my_r2(a,b):
    a = a.to_numpy()
    y_bar = sum(a)/len(a)
    ss_res = 0
    ss_tot = 0
    for i in range(len(a)):
        ss_res+=(a[i]-b[i])**2
    for j in range(len(a)):
        ss_tot+=(a[j]-y_bar)**2
    return 1-(ss_res/ss_tot)

In [181]:
# Used as a week learned within the gradient boosting ensemble (builds iteratively which is why there is a max depth of 1)
# High bias but low variance, 
# Ideal for boosting methods that aim to iteratively reduce error by focusing on hard-to-predict instances.
class DecisionTree:
    def __init__(self, max_depth=1):
        self.max_depth = max_depth
        self.tree = None

    # Each node is a decision point for traversing through the decision tree
    # Follows decision tree logic
    class Node:
        def __init__(self, feature_index=None, threshold=None, left=None, right=None, value=None):
            # feature_Index and threshold are determining factors for where the split happens
            self.feature_index = feature_index
            self.threshold = threshold
            self.left = left
            self.right = right
            self.value = value

    def fit(self, X, y):
        self.tree = self._build_tree(X, y, depth=0)

    # Recursive binary splitting
    # Finds the best feature and threshold to split the data at each noted based on minimizing MSE
    def _build_tree(self, X, y, depth):
        # x - features, y - target, depth - depth of tree
        # Continues to split until the maximum specified depth is reached
        num_samples, num_features = X.shape
        # Stopping condition: if the current depth exceeds the max depth set during initialization or the dataset cannot be split further
        if depth >= self.max_depth or num_samples <= 1:
            # Create a leaf node with the calculated value
            leaf_value = self._calculate_leaf_value(y)
            # Usually averages the target values in regression trees
            return self.Node(value=leaf_value)

        # Finds the best features by iterating through and selecting lowest MSE, calling find best split to find best feature/threshold
        best_feature, best_threshold = self._find_best_split(X, y, num_samples, num_features)
        # In scenarios where the MSE cannot be further decreased (no more effective split found)
        if best_feature is None:
            # If no split can improve the outcome, create a leaf node
            return self.Node(value=self._calculate_leaf_value(y))
        
        # Split the dataset and recursively build left and right subtrees (<= left or > right)
        left_idxs, right_idxs = self._split(X[:, best_feature], best_threshold)
        # Recursively calls build tree to construct subtrees on left and right increasing depth by 1 each time
        left_subtree = self._build_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right_subtree = self._build_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return self.Node(feature_index=best_feature, threshold=best_threshold, left=left_subtree, right=right_subtree)

    def _find_best_split(self, X, y, num_samples, num_features):
        # Initializes variables to store best feature index
        best_feature, best_threshold = None, None
        # Used to insure any inital real MSE found is lower than this starting value
        best_mse = np.inf
        # Iterates through each feature in dataset X by index
        for feature_index in range(num_features):
            # extracts unique values of current features as potential thresholds for splitting, will change in future for midpoints or using minimizer
            thresholds = np.unique(X[:, feature_index])
            # SCIPI minimizer --- TODO
            # Iterates through each unique threshold in current thresholds
            for threshold in thresholds:
                # Calls split function to divide dataset into two subsets based on threshold, left - where features are <= threshold, right - > threshold
                left_idxs, right_idxs = self._split(X[:, feature_index], threshold)
                # Check for if one subset becomes empty (all features fall onto right or left side)
                if len(left_idxs) == 0 or len(right_idxs) == 0:
                    continue
                mse = self._calculate_mse(y[left_idxs], y[right_idxs])
                # If MSE of current split is lower than best MSE found so far... 
                # Updates best MSE with new value and records the feature/ threshold of this achieved MSE
                if mse < best_mse:
                    best_mse = mse
                    best_feature = feature_index
                    best_threshold = threshold
        return best_feature, best_threshold
    
    # Splits dataset into left/right based on threshold of given feature
    def _split(self, feature_values, threshold):
        left_idxs = np.where(feature_values <= threshold)[0]
        right_idxs = np.where(feature_values > threshold)[0]
        return left_idxs, right_idxs
    
    def _calculate_leaf_value(self, y):
        # The leaf value can be the mean of the target values, calculates that mean value for the leaf node
        return np.mean(y)

    def _calculate_mse(self, left_y, right_y):
        # Calculate the MSE of the left and right splits by weighted averages of variance
        total_left_mse = np.var(left_y) * len(left_y) if len(left_y) > 0 else 0
        total_right_mse = np.var(right_y) * len(right_y) if len(right_y) > 0 else 0
        total_mse = (total_left_mse + total_right_mse) / (len(left_y) + len(right_y))
        return total_mse
    
    def predict(self, X):
        # Predictions array to store predictions for each sample in X
        predictions = np.array([self._traverse_tree(x, self.tree) for x in X])
        return predictions

    def _traverse_tree(self, x, node):
        # Recursive method to traverse the tree for a single sample 'x' until a leaf node is reached
        if node.value is not None:
            return node.value
        if x[node.feature_index] <= node.threshold:
            return self._traverse_tree(x, node.left)
        else:
            return self._traverse_tree(x, node.right)

In [188]:
# X_train_np = X_train.to_numpy() if isinstance(X_train, pd.DataFrame) else X_train
# y_train_np = y_train.to_numpy() if isinstance(y_train, pd.Series) else y_train
# X_test_np = X_test.to_numpy() if isinstance(X_test, pd.DataFrame) else X_test


In [190]:
# Tree1 = DecisionTree()
# Tree1.fit(X_train_np, y_train_np)
# y_pred = Tree1.predict(X_test_np)
# my_r2(y_test, y_pred)

0.4386820326080767

In [None]:
# def _find_best_split(self, X, y, num_samples, num_features):
#     best_feature, best_threshold = None, None
#     best_mse = np.inf
#     # Function to calculate MSE for a given threshold
#     def mse_for_threshold(threshold, feature_index):
#         left_idxs, right_idxs = self._split(X[:, feature_index], threshold)
#         if len(left_idxs) == 0 or len(right_idxs) == 0:
#             return np.inf  # Return a large error if split is not feasible
#         return self._calculate_mse(y[left_idxs], y[right_idxs])

#     for feature_index in range(num_features):
#         # Initial guess for the threshold can be the mean of the feature values
#         initial_guess = np.mean(X[:, feature_index])
#         # Bounds for the minimizer to ensure it stays within min and max of feature values
#         bounds = [(np.min(X[:, feature_index]), np.max(X[:, feature_index]))]
#         # The minimization process for the current feature
#         result = minimize(mse_for_threshold, x0=initial_guess, args=(feature_index,), bounds=bounds, method='L-BFGS-B')
#         if result.fun < best_mse:
#             best_mse = result.fun
#             best_feature = feature_index
#             best_threshold = result.x[0]
#     return best_feature, best_threshold

In [182]:
class GradientBoost:
    def __init__(self, n_estimators: int = 25, max_depth: int = 1, learning_rate: int =.5):
        self.max_depth = max_depth # Max depth of the trees
        self.n_estimators = n_estimators # Number of trees
        self.learning_rate = learning_rate # Learning rate, step size for parameter update
        self.trees = [] # List of our trees
    
    def fit(self, X_train, y_train):
            # Convert X_train and y_train to NumPy arrays if they aren't already
        X_train = np.asarray(X_train)
        y_train = np.asarray(y_train)
        # To start all residuals = y_train
        residuals = np.copy(y_train)
        self.f_hat = 0 
        # Now time to make decision trees
        for i in range(self.n_estimators):
            # Build and Fit Tree to data
            tree = DecisionTree(max_depth=self.max_depth)
            tree.fit(X_train, residuals)
            # Save our tree
            self.trees.append(tree)
            # Make prediction
            f_hat_b = tree.predict(X_train)
            # Update f_hat
            self.f_hat += (self.learning_rate*f_hat_b) 
            # Update residuals
            residuals = residuals - (self.learning_rate*f_hat_b)
        return self
    
    def predict(self, X_test):
        y_hat = np.zeros((X_test.shape[0], ))
        for tree in self.trees:
            y_hat += self.learning_rate*tree.predict(X_test)
        return y_hat

In [183]:
df = clean_outliers(df, 'MEDV')

  return df[mask1][mask3].reset_index()


In [184]:
X = df[['RM',  'TAX', 'PTRATIO', 'LSTAT', 'INDUS']]
y = df['MEDV']
X_train, X_test, y_train, y_test = my_train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
model = GradientBoost(max_depth=2, n_estimators=20)
model.fit(X_train=X_train.to_numpy(), y_train=y_train.to_numpy())
y_pred = model.predict(X_test.to_numpy())
print(my_mse(y_test, y_pred))
print(my_mae(y_test, y_pred))
print(my_r2(y_test, y_pred))

In [None]:
param_grid = {
    'n_estimators': [10, 20, 50, 100],
    'max_depth': [1, 2, 3, 4],
    'learning_rate': [0.01, 0.1, 0.2, 0.5]
}

In [None]:
# Takes in dictionary of parameters to explore
def grid_search(X_train, y_train, X_test, y_test, param_grid):
    # Track lowest MSE seeen so far, set to infinity initially so every value is lower
    # Params stores best paramaters found through best score
    best_score = float('inf')
    best_params = {}

    # Nested loops to iterate over every combination of estimators depth and LR
    for n_estimators in param_grid['n_estimators']:
        for max_depth in param_grid['max_depth']:
            for learning_rate in param_grid['learning_rate']:
                # Initalize and fit model with every combination of parameters
                model = GradientBoost(n_estimators=n_estimators, max_depth=max_depth, learning_rate=learning_rate)
                model.fit(X_train, y_train)

                # Make predictions on the test set and calculate MSE
                y_pred = model.predict(X_test)
                score = my_mse(y_test, y_pred)

                # Update best_score and best_params if current model is better
                if score < best_score:
                    best_score = score
                    best_params = {'n_estimators': n_estimators, 'max_depth': max_depth, 'learning_rate': learning_rate}

    return best_params, best_score


In [None]:

X_train_np = np.asarray(X_train)
y_train_np = np.asarray(y_train)
X_test_np = np.asarray(X_test)
y_test_np = np.asarray(y_test)

In [None]:
best_params, best_score = grid_search(X_train_np, y_train_np, X_test_np, y_test_np, param_grid)

print("Best Parameters:", best_params)
print("Best MSE Score:", best_score)