In [43]:
from google.colab import drive
drive.mount("/content/drive")

path_final = "/content/drive/MyDrive/LTSSUD"
import os

if (os.path.isdir(path_final) == True):
  %cd "/content/drive/MyDrive/LTSSUD"

!ls

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/LTSSUD
Proposal.gdoc	test.csv   Untitled0.ipynb
sequence.ipynb	train.csv  winequality-red.csv


In [44]:
import numpy as np
import pandas as pd
import math

from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import train_test_split
from sklearn.impute  import SimpleImputer
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

In [45]:
class XGBoostTree:
    def __init__(self, max_depth=3, min_samples_split=2, min_impurity=1e-7, gamma = 0):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_impurity = min_impurity
        self.gamma = gamma
        self.tree = {}

    def calculate_gradient(self, y_true, y_pred):
        p = np.exp(y_pred) / (1 + np.exp(y_pred))
        return p - y_true

    def calculate_hessian(self, y_true, y_pred):
        p = np.exp(y_pred) / (1 + np.exp(y_pred))
        return p * (1 - p)

    def split_data(self, X, feature_index, split_value):
        left_indices = X[:, feature_index] <= split_value
        right_indices = X[:, feature_index] > split_value
        return left_indices, right_indices

    def similarity(self, y_true, p, i = 2):
        numerator = np.sum(y_true) ** i
        denominator = np.sum(p * (1 - p)) + self.min_impurity
        return numerator / denominator

    def find_best_split(self, X, y, props):
        best_gain = -np.inf
        best_split_feature = None
        best_split_value = None

        num_samples, num_features = X.shape

        for feature_index in range(num_features):
            feature_values = X[:, feature_index]
            unique_values = np.unique(feature_values)

            for value in unique_values:
                left_indices, right_indices = self.split_data(X, feature_index, value)
                if len(left_indices) < self.min_samples_split or len(right_indices) < self.min_samples_split:
                    continue

                y_left = y[left_indices]
                y_right = y[right_indices]
                p_left = props[left_indices]
                p_right = props[right_indices]

                gain = self.similarity(y_left, p_left) + self.similarity(y_right, p_right) - self.similarity(y, props)

                if gain > best_gain:
                    best_gain = gain
                    best_split_feature = feature_index
                    best_split_value = value
        if(best_gain - self.gamma < 0):
            best_split_feature = None
            best_split_value = None

        return best_split_feature, best_split_value

    def create_leaf_node(self, y, props):
        return self.similarity(y, props, 1)

    def build_tree(self, X, y, props, depth=0):
        if depth >= self.max_depth or len(X) < self.min_samples_split:
            return self.create_leaf_node(y, props)

        split_feature, split_value = self.find_best_split(X, y, props)

        if split_feature is None:
            return self.create_leaf_node(y, props)

        left_indices, right_indices = self.split_data(X, split_feature, split_value)
        left_child = self.build_tree(X[left_indices], y[left_indices], props[left_indices], depth + 1)
        right_child = self.build_tree(X[right_indices], y[right_indices], props[right_indices], depth + 1)

        self.tree = {
            'split_feature': split_feature,
            'split_value': split_value,
            'left_child': left_child,
            'right_child': right_child
        }

        return self.tree

    def fit(self, X, y, props):
        self.tree = self.build_tree(X, y, props)

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.tree) for x in X])

    def _traverse_tree(self, x, node):
        if isinstance(node, dict):
            split_feature = node['split_feature']
            split_value = node['split_value']
            if x[split_feature] <= split_value:
                return self._traverse_tree(x, node['left_child'])
            else:
                return self._traverse_tree(x, node['right_child'])
        else:
            return node

In [46]:
# Define the loss function (e.g., mean squared error)
def mean_squared_error(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)


def residuals(y_true, y_pred):
    return (y_true - y_pred)


def build_weak_learner(X, y, props, min_impurity, gamma):
    # Create the weak learner (decision tree) and fit it to the gradients and Hessians
    model = XGBoostTree(min_impurity = min_impurity, gamma = gamma)  # Adjust the hyperparameters as needed
    model.fit(X, y, props)  # Divide gradients by Hessians to account for second-order effects
    return model

# Define the XGBoost model
class XGBoostModel:
    def __init__(self, n_estimators, learning_rate, min_impurity = 1e-7, gamma = 0):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.initial_prediction = 0
        self.min = None
        self.max = None
        self.min_impurity = min_impurity
        self.gamma = gamma
        self.models = []

    def fit(self, X, y):
        # Convert data to DMatrix format
        #data = pd.concat([X, y], axis=1)
        data = X
        #dmatrix = DMatrix(data.values)
        self.min = np.min(y)
        self.max = np.max(y)
        y = (y - self.min)/(self.max - self.min)

        classes, counts = np.unique(y, return_counts=True)
        #dominant_class = classes[np.argmax(counts)]
        predictions = np.full(len(y), counts[np.argmax(counts)]/np.sum(counts))
        self.initial_prediction = counts[np.argmax(counts)]/np.sum(counts)
        #print(self.initial_prediction)
        # Build the models in a loop
        for _ in tqdm(range(self.n_estimators)):
            props = predictions
            residual = residuals(y, predictions)

            # Fit a weak learner (e.g., a decision tree) to the gradients and Hessians
            model = build_weak_learner(data, residual, props, self.min_impurity, self.gamma)  # Implement your own weak learner

            # Update the predictions using the learning rate and the predictions of the weak learner
            t = np.log(predictions/(1-predictions)) + self.learning_rate * model.predict(data)
            predictions = np.exp(t) / (1 + np.exp(t))

            # Add the model to the ensemble
            self.models.append(model)

    def predict(self, X):
        # Make predictions by aggregating the predictions of all models in the ensemble
        predictions = np.full(len(X),self.initial_prediction)
        #print(predictions)
        for model in self.models:
            t = np.log(predictions/(1-predictions)) + self.learning_rate * model.predict(X)
            predictions = np.exp(t) / (1 + np.exp(t))
        predictions = predictions*(self.max - self.min) + self.min
        return np.around(predictions)

In [47]:
# Example usage
from sklearn import preprocessing
le = preprocessing.LabelEncoder()
dat = pd.read_csv('winequality-red.csv')
dat = dat.dropna(subset=['quality'])
y = dat['quality']= le.fit_transform(dat['quality'])
X = dat.drop(['quality'], axis=1).select_dtypes(exclude=['object'])
train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=0.25)
train_X.fillna(0, inplace = True)
test_X.fillna(0, inplace = True)
my_imputer = SimpleImputer()
train_X = my_imputer.fit_transform(train_X)
test_X = my_imputer.transform(test_X)
#train_X = np.rot90(train_X)
#test_X = np.rot90(test_X)

In [48]:
# Create and train the XGBoost model
xgb_model = XGBoostModel(n_estimators=100, learning_rate=0.1, min_impurity = 0, gamma = 0)
xgb_model.fit(train_X, train_y)

100%|██████████| 100/100 [00:29<00:00,  3.38it/s]


In [49]:
# Make predictions
y_pred = xgb_model.predict(test_X)

In [50]:
from sklearn.metrics import accuracy_score
print("accuracy_score : " + str(accuracy_score(test_y, y_pred)))

accuracy_score : 0.6075


In [51]:
from xgboost import XGBClassifier
xgb = XGBClassifier(n_estimators = 100, learning_rate = 0.1, max_depth=3)
xgb.fit(train_X, train_y)
y_pred = xgb.predict(test_X)

In [52]:
print("accuracy_score : " + str(accuracy_score(test_y, y_pred)))

accuracy_score : 0.62
