In [3]:
import sys
import warnings
import numpy as np
import pandas as pd

from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor

warnings.filterwarnings('ignore')
np.set_printoptions(threshold=sys.maxsize)

In [4]:
# Wine data --> Classification
wine_data = pd.read_csv('WineQT.csv')
wine_data.drop_duplicates(inplace=True)
wine_data = wine_data.drop(columns=['Id'])
# 1143 samples, 12 attributes including quality which is to be determined (3-8)

# Split the data into attributes and labels
cX = wine_data.iloc[:,wine_data.columns != 'quality'].to_numpy()
cY = wine_data['quality'].map({3:0, 4:0, 5:0, 6:1, 7:1, 8:1}).to_numpy()

# Handle missing values
imputer = SimpleImputer(strategy='mean')
cX = imputer.fit_transform(cX)

# Apply Normalization to the attributes
scaler = MinMaxScaler()
cX = scaler.fit_transform(cX)

# Apply Standardization to the attributes
scaler = StandardScaler()
cX = scaler.fit_transform(cX)

wine_data = list(zip(cX, cY))
cx_train, cx_test, cy_train, cy_test = train_test_split(cX, cY, train_size=0.8)

In [5]:
# Housing data --> Regression
housing_data = pd.read_csv('HousingData.csv')
housing_data.drop_duplicates(inplace=True)
# 506 samples, 14 attributes including MEDV which is to be determined (real)

# Split the data into attributes and labels
rX = housing_data.iloc[:,housing_data.columns != 'MEDV']
rY = np.array(housing_data['MEDV']).reshape(-1, 1)

# Handle missing values
imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
rX = imputer.fit_transform(rX)

# Apply Normalization to the attributes
scaler = MinMaxScaler()
rX = scaler.fit_transform(rX)

# Apply Standardization to the attributes
scaler = StandardScaler()
rX = scaler.fit_transform(rX)

housing_data = list(zip(rX, rY))
rx_train, rx_test, ry_train, ry_test = train_test_split(rX, rY, train_size=0.8)

In [6]:
class RandomForestClassifier:
    def __init__(self, n_trees, max_depth=None):
        self.trees = []
        self.n_trees = n_trees
        self.max_depth = max_depth

    def fit(self, x_train, y_train):
        for _ in range(self.n_trees):
            idx = np.random.choice(len(x_train), len(y_train), replace=True)
            x, y = x_train[idx], y_train[idx]

            tree = DecisionTreeClassifier(max_depth=self.max_depth)
            tree.fit(x, y)
            self.trees.append(tree)

    def predict(self, x_test):
        y_pred = np.array([tree.predict(x_test) for tree in self.trees])
        return np.apply_along_axis(lambda x: np.bincount(x.astype(int)).argmax(), axis=0, arr=y_pred)

In [7]:
model = RandomForestClassifier(3)
model.fit(cx_train, cy_train)
y_pred = model.predict(cx_test)
accuracy = np.mean(cy_test == y_pred)
print('accuracy: ', accuracy)

accuracy:  0.7510917030567685


In [8]:
class RandomForestRegressor:
    def __init__(self, n_trees, max_depth=None):
        self.trees = []
        self.n_trees = n_trees
        self.max_depth = max_depth

    def fit(self, x_train, y_train):
        for _ in range(self.n_trees):
            idx = np.random.choice(len(x_train), len(y_train), replace=True)
            x, y = x_train[idx], y_train[idx]

            tree = DecisionTreeRegressor(max_depth=self.max_depth)
            tree.fit(x, y)
            self.trees.append(tree)

    def predict(self, x_test):
        y_pred = np.array([tree.predict(x_test) for tree in self.trees])
        return np.mean(y_pred, axis=0)

In [9]:
model = RandomForestRegressor(3)
model.fit(rx_train, ry_train)
y_pred = model.predict(rx_test)
print('mse: ', mean_squared_error(ry_test, y_pred))
print('r2 score: ', r2_score(ry_test, y_pred))

mse:  9.0289651416122
r2 score:  0.894497505410796


In [10]:
class GradientBoostingClassifier():
    def __init__(self, n_est=30, learn_rate=0.01, max_depth=8):
        self.n_est = n_est
        self.learn_rate = learn_rate
        self.max_depth = max_depth
        self.models = []
        self.weights = []

    def get_gradient(self, X, y):
        y_pred = 0
        for w, est in zip(self.weights, self.models):
            y_pred += self.learn_rate * w * est.predict(X)
        return y - 1 / (1 + np.exp(-y_pred))
    
    def train(self, X, y):
        self.base_estimator = DecisionTreeRegressor(max_depth=self.max_depth)
        self.base_estimator.fit(X, y)
        self.models.append(self.base_estimator)
        self.weights.append(1.0)

        for _ in range(1, self.n_est):
            residuals = -self.get_gradient(X, y)

            model = DecisionTreeRegressor(max_depth=self.max_depth)
            model.fit(X, residuals)

            self.models.append(model)
            self.weights.append(self.learn_rate)

    def predict_proba(self, X):
        return 1 / (1 + np.exp(-np.sum([self.learn_rate * w * est.predict(X) for w, est in zip(self.weights, self.models)], axis=0)))

    def predict(self, X):
        y_pred = 0
        for w, est in zip(self.weights, self.models):
            y_pred += self.learn_rate * w * est.predict(X)
        result =  1 / (1 + np.exp(-y_pred))
        return (result >= 0.5).astype(int)

In [11]:
model = GradientBoostingClassifier()
model.train(cx_train, cy_train)
y_pred = model.predict(cx_test)
accuracy = np.mean(cy_test == y_pred)
print('accuracy: ', accuracy)

accuracy:  0.5458515283842795


In [12]:
class GradientBoostingRegressor:
    def __init__(self, max_depth=8, lr=0.01, n_est=1000):
        self.max_depth = max_depth
        self.learn_rate = lr
        self.n_est = n_est
        self.mean = 0

    def calculate_loss(self, y_true, y_pred):
        mse_loss = np.mean((y_true - y_pred)**2)
        return mse_loss
    
    def get_residuals(self, y_true, y_pred):
        residual = -(y_true - y_pred)
        return residual
    
    def get_base_model(self, x_train, y_train):
        model = DecisionTreeRegressor(max_depth=self.max_depth)
        model.fit(x_train, y_train)
        return model
    
    def train(self, x_train, y_train):
        self.models, self.losses = [], []
        self.mean = np.mean(y_train)
        temp = np.array([np.mean(y_train)] * len(y_train))
        y_pred = temp.reshape(len(temp), 1)

        for _ in range(self.n_est):
            loss = self.calculate_loss(y_train, y_pred)
            self.losses.append(loss)
            residuals = self.get_residuals(y_train, y_pred)
            model = self.get_base_model(x_train, residuals)
            r = (model.predict(x_train)).reshape(len(x_train), 1)
            y_pred -= self.learn_rate * r
            self.models.append(model)

    def predict(self, x_test):
        temp = np.array([self.mean] * len(x_test))
        y_pred = temp.reshape(len(temp), 1)

        for i in range(len(self.models)):
            temp = (self.models[i].predict(x_test)).reshape(len(x_test), 1)
            y_pred -= self.learn_rate * temp
        return y_pred

In [13]:
model = GradientBoostingRegressor()
model.train(rx_train, ry_train)
y_pred = model.predict(rx_test)
print('mse: ', mean_squared_error(ry_test, y_pred))
print('r2 score: ', r2_score(ry_test, y_pred))

mse:  9.813387374734722
r2 score:  0.8853316152885441


In [14]:
class AdaBoost:
    def __init__(self, n, attr, num=30, stub_depth=1):
        self.n = n
        self.attr = attr
        self.num = num
        self.stub_depth = stub_depth

        self.weights = np.ones(n) / n
        self.trees, self.alphas = [], []

    def fit(self, x_train, y_train):
        predicted = np.empty((self.n, self.num))
        
        for t in range(self.num):
            self.model = DecisionTreeClassifier(max_depth=self.stub_depth)
            self.model.fit(x_train, y_train, sample_weight=self.weights)
            y_pred = self.model.predict(x_train)

            epsilon = np.sum(self.weights * (y_pred != y_train)) / np.sum(self.weights)
            alpha = np.log((1-epsilon) / epsilon)
            self.weights = np.array([w*(1-epsilon)/epsilon if y_pred[i] != y_train[i] else w for i, w in enumerate(self.weights)])

            self.trees.append(self.model)
            self.alphas.append(alpha)
            predicted[:,t] = y_pred
        self.result = np.sign(np.dot(predicted, self.alphas))
        
    def predict(self, x_test):
        y_pred = np.zeros(len(x_test))
        for t, tree in enumerate(self.trees):
            curr_tree = tree.predict(x_test)
            y_pred += curr_tree*self.alphas[t]
        return np.sign(y_pred)

In [15]:
n, attr = cx_train.shape
model = AdaBoost(n, attr)
model.fit(cx_train, cy_train)
y_pred = model.predict(cx_test)
accuracy = np.mean(cy_test == y_pred)
print(accuracy)

0.5589519650655022


In [16]:
class AdaBoostR2:
    def __init__(self, n, attr, num=30, stub_depth=1):
        self.n = n
        self.attr = attr
        self.num = num
        self.stub_depth = stub_depth

        self.weights = np.ones(n) / n
        self.trees, self.betas = [], []
        self.fitted_values = np.empty((n, num))

    def weighted_median(self, values, weights):
        sorted_indices = values.argsort()
        values = values[sorted_indices]
        weights = weights[sorted_indices]
        weights_cumulative_sum = weights.cumsum()
        median_weight = np.argmax(weights_cumulative_sum >= sum(weights)/2)
        return values[median_weight]
    
    def fit(self, x_train, y_train):
        for t in range(self.num):
            idx = np.random.choice(np.arange(self.n), size=self.n, replace=True, p=self.weights)
            bootstrap_x, bootstrap_y = x_train[idx], y_train[idx]
            
            tree = DecisionTreeRegressor(max_depth=self.stub_depth)
            tree.fit(bootstrap_x, bootstrap_y)
            self.trees.append(tree)

            y_pred = tree.predict(x_train)
            self.fitted_values[:,t] = y_pred
            
            abs_error = np.abs(y_train - y_pred)
            max_abs_error = np.max(abs_error)
            loss_ratio = abs_error / max_abs_error
            
            avg_loss = np.sum(self.weights*loss_ratio)
            if avg_loss >= 0.5:
                self.num = t-1
                self.fitted_values = self.fitted_values[:,:t-1]
                self.trees = self.trees[:t-1]
                break
            
            beta_t = avg_loss/(1 - avg_loss)
            self.betas.append(beta_t)
            
            factor = np.sum(self.weights*beta_t**(1-loss_ratio))
            self.weights *= beta_t**(1-loss_ratio) / factor
            
        self.model_weights = np.log(1 / np.array(self.betas))
        self.pred_labels = np.array([self.weighted_median(self.fitted_values[n], self.model_weights) for n in range(self.n)])
        
    def predict(self, x_test):
        k = len(x_test)
        fitted_values = np.empty((k, self.num))
        for t, tree in enumerate(self.trees):
            fitted_values[:,t] = tree.predict(x_test)
        return np.array([self.weighted_median(fitted_values[n], self.model_weights) for n in range(k)]) 

In [17]:
n, attr = rx_train.shape
model = AdaBoostR2(n, attr)
model.fit(rx_train, ry_train.squeeze())
y_pred = model.predict(rx_test)
print('mse: ', mean_squared_error(ry_test, y_pred))
print('r2 score: ', r2_score(ry_test, y_pred))

mse:  31.65636755151532
r2 score:  0.6300987218429726
