In [13]:
import numpy as np
import pandas as pd
from collections import defaultdict
from sklearn.model_selection import train_test_split

In [26]:
class HalfLifeRegressionModel:
    def __init__(self, feature_columns, alpha=0.01, lambda_=0.1, eta=0.01, epsilon=1e-8):
        self.theta = np.random.randn(len(feature_columns))
        self.feature_columns = feature_columns
        self.alpha = alpha
        self.lambda_ = lambda_
        self.eta = eta
        self.epsilon = epsilon

    def _find_h_hat(self, x):
        theta_x = np.clip(np.dot(self.theta, x ), -10, 10 )
        h_hat = 2 ** theta_x
        return h_hat

    def _find_p_hat( self, h_hat, Delta ):
        p_hat = 2 ** np.clip( -Delta / h_hat, -10, 10 )  # Prevent underflow/overflow
        return p_hat
    
    def _p_clip(self, p_hat):
        # bound min/max model predictions (helps with loss optimization)
        return min(max(p_hat, 0.0001), .9999)

    def _cost_function(self, X, p, Delta):
        D = len(X)
        total_cost = 0

        for t in range(D):
            h_hat = self._find_h_hat(X[t])
            p_hat = self._find_p_hat(h_hat, Delta[t])
            h = -Delta[t] / np.log2(p[t] + self.epsilon)
            total_cost += (p[t] - p_hat) ** 2 + self.alpha * (h - h_hat) ** 2

        total_cost += self.lambda_ * np.sum(self.theta ** 2)
        return total_cost / D  # Average cost per instance

    def train(self, dataframe_x, dataframe_y, n_iter=10000, tolerance=1e-5, print_iter = 1000 ):
        X = dataframe_x[self.feature_columns].values
        p = dataframe_y.values
        Delta = dataframe_x['t'].values
        grad_accumulation = np.zeros_like(self.theta)
        cost_history = []

        for iteration in range(n_iter):
            grad_theta = np.zeros_like(self.theta)
            cost = self._cost_function(X, p, Delta)

            for t in range(len(X)):
                h_hat = self._find_h_hat(X[t])
                p_hat = self._find_p_hat(h_hat, Delta[t])

                # Compute the gradients for each theta
                for k in range(len(self.theta)):
                    term1 = 2 * (p[t] - p_hat) * np.log(2) * p_hat * (2 ** (-Delta[t] / h_hat)) * X[t][k]
                    term2 = 2 * self.alpha * (h_hat + Delta[t] / np.log2(p[t])) * np.log(2) * h_hat * X[t][k]
                    term3 = 2 * self.lambda_ * self.theta[k]
                    grad_theta[k] += term1 + term2 - term3

            # Update the accumulated gradient
            grad_accumulation += grad_theta ** 2

            # Update theta using AdaGrad adjustment
            adjusted_eta = self.eta / (np.sqrt(grad_accumulation) + self.epsilon)
            self.theta -= adjusted_eta * grad_theta / len(X)

            cost_history.append(cost)
            
            if iteration % print_iter == 0:
                print(f"Iteration {iteration}, Loss: {cost}")

            # Check for convergence
            if iteration > 0 and np.abs(cost_history[-1] - cost_history[-2]) < tolerance:
                break

        return cost_history

    def predict(self, row):
        x = np.array([row[feature] for feature in self.feature_columns])
        h_hat = self._find_h_hat(x)
        p_hat = self._p_clip( self._find_p_hat(h_hat, row['t']) )
        return p_hat, h_hat

In [77]:
import numpy as np
from collections import defaultdict

class HalfLifeRegressionModelOptimized:
    def __init__(self, feature_columns, alpha=0.01, lambda_=0.1, eta=0.001, epsilon=1e-7):
        self.theta = np.random.randn(len(feature_columns))
        self.feature_columns = feature_columns
        self.alpha = alpha
        self.lambda_ = lambda_
        self.eta = eta
        self.epsilon = epsilon

    def _find_h_hat(self, X):        
        theta_x = np.clip(X.dot(self.theta), -10, 10)
        h_hat = 2 ** theta_x
        return h_hat

    def _find_p_hat(self, h_hat, Delta):
        p_hat = 2 ** np.clip(-Delta / h_hat, -10, 10)  # Prevent underflow/overflow
        return p_hat
    
    def _p_clip(self, p_hat):
        # bound min/max model predictions (helps with loss optimization)
        return min(max(p_hat, 0.0001), .9999)

    def _cost_function(self, X, p, Delta):
        h_hat = self._find_h_hat(X)
        p_hat = self._find_p_hat(h_hat, Delta)
        h = -Delta / np.log2(p + self.epsilon)
        total_cost = np.mean((p - p_hat) ** 2 + self.alpha * (h - h_hat) ** 2)
        total_cost += self.lambda_ * np.sum(self.theta ** 2)
        return total_cost

    def train(self, dataframe_x, dataframe_y, n_iter=100000, tolerance=1e-5, print_iter=1000):
        X = dataframe_x[self.feature_columns].values
        p = dataframe_y.values.flatten()
        Delta = dataframe_x['t'].values
        grad_accumulation = np.zeros_like(self.theta)
        cost_history = []

        for iteration in range(n_iter):
            h_hat = self._find_h_hat(X)
            p_hat = self._find_p_hat(h_hat, Delta)
            grad_theta = -2 * X.T.dot((p - p_hat) * np.log(2) * p_hat * (2 ** (-Delta / h_hat)) + 
                                      self.alpha * (h_hat + Delta / np.log2(p)) * np.log(2) * h_hat) + 2 * self.lambda_ * self.theta
            grad_accumulation += grad_theta ** 2

            # Update theta using AdaGrad adjustment
            adjusted_eta = self.eta / (np.sqrt(grad_accumulation) + self.epsilon)
            self.theta -= adjusted_eta * grad_theta / len(X)

            cost = self._cost_function(X, p, Delta)
            cost_history.append(cost)

            if iteration % print_iter == 0:
                print(f"Iteration {iteration}, Loss: {cost}")

            # Check for convergence
            if iteration > 0 and np.abs(cost_history[-1] - cost_history[-2]) < tolerance:
                break

        return cost_history

    def predict(self, row):
        x = np.array([row[feature] for feature in self.feature_columns])
        h_hat = self._find_h_hat(x[np.newaxis, :])[0]
        p_hat = self._p_clip( self._find_p_hat(h_hat, row['t']) )
        
        return p_hat, h_hat

In [78]:
data      = pd.read_csv( 'subset_1000.csv' )
pred_vars = [ 'right', 'wrong', 'bias', 't' ]

X_train, X_test, Y_train, Y_test = train_test_split( data[ pred_vars ], 
                                                     data[ 'p' ], 
                                                     test_size    = 0.30,
                                                     random_state = 2 )

In [79]:
%%time

model = HalfLifeRegressionModelOptimized(feature_columns=pred_vars)
cost_history = model.train(X_train, Y_train)

Iteration 0, Loss: 244223960.65429652
Iteration 1000, Loss: 244223960.99009675
Iteration 2000, Loss: 244223961.1340904
Iteration 3000, Loss: 244223961.24438635
Iteration 4000, Loss: 244223961.33723107
Iteration 5000, Loss: 244223961.41892144
Iteration 6000, Loss: 244223961.49268788
Iteration 7000, Loss: 244223961.56044954
Iteration 8000, Loss: 244223961.62345713
Iteration 9000, Loss: 244223961.68257928
Iteration 10000, Loss: 244223961.73844877
Iteration 11000, Loss: 244223961.79154298
Iteration 12000, Loss: 244223961.84223297
Iteration 13000, Loss: 244223961.89081392
Iteration 14000, Loss: 244223961.93752486
Iteration 15000, Loss: 244223961.98256314
Iteration 16000, Loss: 244223962.02609357
Iteration 17000, Loss: 244223962.0682553
Iteration 18000, Loss: 244223962.10916772
Iteration 19000, Loss: 244223962.1489336
Iteration 20000, Loss: 244223962.18764228
Iteration 21000, Loss: 244223962.2253722
Iteration 22000, Loss: 244223962.2621925
Iteration 23000, Loss: 244223962.29816464
Iteration 

In [64]:
# Hacer predicciones en el mismo conjunto de datos
predictions = []
for _, row in X_test.iterrows():
    predicted_p, predicted_h = model.predict(row)
    predictions.append((predicted_p, predicted_h))

# Agregar las predicciones al dataframe para comparar
X_test['predicted_p'] = [pred[0] for pred in predictions]
X_test['predicted_h'] = [pred[1] for pred in predictions]

# X_test.head(10)

In [65]:
Y_test

847    0.9999
874    0.9999
471    0.0001
476    0.0001
764    0.9999
        ...  
353    0.9999
236    0.9999
581    0.9999
324    0.9999
988    0.9999
Name: p, Length: 300, dtype: float64

In [66]:
Y_pred = X_test[ 'predicted_p' ]

In [67]:
def calculate_mae(y_true, y_pred):
    return np.mean(np.abs(np.array(y_true) - np.array(y_pred)))

In [68]:
calculate_mae( Y_test, Y_pred )

0.19123345334377023

In [70]:
calculate_roc_auc( Y_test, Y_pred )

0.0