In [None]:
#### code for he method


def nonconvex_f(beta, X, y):
    residual = y - X * beta  # Element-wise multiplication
    residual_sq = residual ** 2
    cost = 0.5 * np.sum(residual_sq / (1 + residual_sq))
    return cost

def nonconvex_grad_f(beta, X, y):
    residual = y - X * beta
    residual_sq = residual ** 2
    factor = residual / (1 + residual_sq)
    gradient = -np.sum(X * factor)
    return gradient

def nonconvex_hessian_f(beta, X, y):
    residual = y - X * beta
    residual_sq = residual ** 2
    numerator = (1 - residual_sq)
    denominator = (1 + residual_sq) ** 2
    second_derivative = np.sum(X ** 2 * numerator / denominator)
    H = np.array([[second_derivative]])
    return H

def sgd_with_bound(f, grad_f, start_point, end_point, X, y,
                   learning_rate=0.01, iterations=1000, tol=1e-6):
    x = np.array(start_point, dtype=float)
    x_end = np.array(end_point, dtype=float)
    for i in range(iterations):
        gradient = grad_f(x, X, y)
        x_new = x - learning_rate * gradient
        if np.all(x_new >= x_end):
            break
        if np.linalg.norm(x_new - x) < tol:
            break
        x = x_new
    return x, f(x, X, y)

def rearrange_bounds(bounds):
    min_bounds, max_bounds = bounds
    rearranged = list(zip(min_bounds, max_bounds))
    return rearranged

def de_search_method3(f, bounds, X, y, maxiters=100):
    result = differential_evolution(
        lambda beta: f(beta, X, y),
        rearrange_bounds(bounds),
        maxiter=maxiters
    )
    return result.x, result.fun

def generate_uniform_start_end_pairs(start_point, end_point, n):
    points_lst = []
    start = np.array(start_point)
    end = np.array(end_point)
    points = [start + t * (end - start) for t in np.linspace(0, 1, n)]
    for i in range(n - 1):
        start_pt = points[i]
        end_pt = points[i + 1]
        points_lst.append([start_pt, end_pt])
    return points_lst

def process_point_with_no_zoom_in(f, grad_f, hessian_f, global_search, pt, X, y,
                                  learning_rate=0.01, iterations=1000, tol=1e-6):
    points = []
    results = []
    start, end = pt
    point, result = sgd_with_bound(f, grad_f, start, end, X, y, learning_rate, iterations, tol)
    H = hessian_f(point, X, y)
    eigenvalues = np.linalg.eigvalsh(H)
    is_convex = np.all(eigenvalues >= 0)
    points.append(point)
    results.append(result)

    if not is_convex:
        ds_point, ds_min = global_search(f, ([-100], [100]), X, y, maxiters=100)
        points.append(ds_point)
        results.append(ds_min)

    # print(f"Found point: {point} with result: {result}")
    return points, results

executor = ProcessPoolExecutor()

def sgd_opt_global_search(start_intervals, end_intervals, n, f, grad_f, hessian_f, global_search,
                          X, y, learning_rate=0.01, max_iterations=1000, tol=1e-6):
    iters = int(max_iterations / n)
    points_lst = generate_uniform_start_end_pairs(start_intervals, end_intervals, n)
    futures = [executor.submit(
        process_point_with_no_zoom_in, f, grad_f, hessian_f,
        global_search, pt, X, y, learning_rate, iters, tol) for pt in points_lst]
    return futures

In [None]:
import numpy as np

# convex with beta tuning
# -----------------------------------------------------------
# Full Pipeline with SGD to learn beta
# -----------------------------------------------------------
class FullPipeline:
    def __init__(self, mean_val, median_val, num_categories=3):
        self.imputer = DifferentiableImputer(mean_val, median_val)
        self.encoder = DifferentiableEncoder(num_categories)
        self.model   = SimpleClassifier(input_dim=5)

        # Print beta values
        print(f"Initial Encoder beta: {self.encoder.beta}")
        print(f"Imputer alpha: {self.imputer.alpha}")

    def forward(self, x_num, x_cat):
        # 1) Impute
        x_num_imp = self.imputer.impute(x_num)  # shape (N,)

        # 2) Encode
        x_cat_enc = self.encoder.encode(x_cat)  # shape (N,4)

        # 3) Combine
        x_full = np.hstack([x_num_imp[:, np.newaxis], x_cat_enc])  # (N,5)

        # 4) Predict
        return self.model.predict(x_full)

    def compute_loss(self, y_pred, y_true, epsilon=1e-10):
        # Binary cross-entropy
        y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
        return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))

    def compute_gradients(self, x_full, y_pred, y_true):
        # dL/dz = y_pred - y_true  for logistic
        error = y_pred - y_true
        grad_w = np.dot(x_full.T, error) / len(y_true)  # (input_dim,)
        grad_b = np.mean(error)
        return grad_w, grad_b

    def compute_beta_gradient(self, x_num, x_cat, y_pred, y_true):
        # Compute the gradient for beta based on the loss function
        # This assumes the loss is differentiable w.r.t. beta.

        x_cat_enc = self.encoder.encode(x_cat)
        x_full = np.hstack([x_num[:, np.newaxis], x_cat_enc])  # (N, 5)

        # Gradient of the loss with respect to beta
        # Gradients for beta are derived from the encoding step, and we use the same loss gradient for simplicity.
        grad_beta = np.dot((y_pred - y_true), x_cat_enc[:, -1]) / len(y_true)  # Focus on ordinal part

        return grad_beta

    def sgd_step(self, grad_w, grad_b, grad_beta, lr):
        # Update weights and beta parameter
        self.model.weights -= lr * grad_w
        self.model.bias    -= lr * grad_b
        self.encoder.beta   -= lr * grad_beta

# -----------------------------------------------------------
# Training loop with SGD for beta first, then for classifier
# -----------------------------------------------------------
def train_with_sgd(model, x_num_batch, x_cat_batch, y_batch, num_epochs=30, learning_rate=0.01, beta_epochs=1000):
    # Ensure x_cat_batch is (N, ) not (N,1)
    x_cat_batch = x_cat_batch.ravel()  # Flatten if needed

    # First, we optimize beta using SGD for `beta_epochs`
    for epoch in range(beta_epochs):
        # Forward pass
        y_pred = model.forward(x_num_batch, x_cat_batch)

        # Compute loss
        loss = model.compute_loss(y_pred, y_batch)

        # Compute gradients for beta
        # Generate x_full for both x_num and x_cat for gradient computation
        x_num_imp = model.imputer.impute(x_num_batch)
        x_cat_enc = model.encoder.encode(x_cat_batch)
        x_full = np.hstack([x_num_imp[:, np.newaxis], x_cat_enc])

        # Compute gradients for weights and beta
        grad_w, grad_b = model.compute_gradients(x_full, y_pred, y_batch)
        grad_beta = model.compute_beta_gradient(x_num_batch, x_cat_batch, y_pred, y_batch)

        # Update using SGD
        model.sgd_step(grad_w, grad_b, grad_beta, learning_rate)

        if (epoch + 1) % 10 == 0:
            print(f"Beta Optimization Epoch {epoch+1}, Loss = {loss:.4f}, Beta = {model.encoder.beta}")

    # Now optimize the classifier with the optimized beta
    for epoch in range(num_epochs):
        # Forward pass
        y_pred = model.forward(x_num_batch, x_cat_batch)

        # Compute loss
        loss = model.compute_loss(y_pred, y_batch)

        # Generate x_full for gradient computation
        x_num_imp = model.imputer.impute(x_num_batch)
        x_cat_enc = model.encoder.encode(x_cat_batch)
        x_full = np.hstack([x_num_imp[:, np.newaxis], x_cat_enc])

        # Gradients
        grad_w, grad_b = model.compute_gradients(x_full, y_pred, y_batch)

        # Update classifier weights
        model.sgd_step(grad_w, grad_b, 0, learning_rate)  # No need to update beta here

        if (epoch + 1) % 10 == 0:
            print(f"Classifier Training Epoch {epoch+1}, Loss = {loss:.4f}")


# -----------------------------------------------------------
# Run it with beta training first
# -----------------------------------------------------------
pipeline = FullPipeline(mean_val, median_val, num_categories=3)
train_with_sgd(pipeline, x_num, x_cat, y, num_epochs=30, learning_rate=0.05, beta_epochs=10)

In [None]:
import numpy as np


# non convex with no beta tuning

# Non-convex activation function: Swish
def swish(x, sigmoid_fn):
    return x * sigmoid_fn(x)  # Swish = x * sigmoid(x)

# Non-convex loss function: Log-Cosh loss
def log_cosh_loss(y_true, y_pred):
    return np.mean(np.log(np.cosh(y_pred - y_true)))

# Modified SimpleClassifier
class NonConvexClassifier:
    def __init__(self, input_dim):
        self.weights = np.random.randn(input_dim)
        self.bias    = np.random.randn(1)

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def predict(self, x):
        return swish(np.dot(x, self.weights) + self.bias, self.sigmoid)

# Modified FullPipeline
class NonConvexPipeline:
    def __init__(self, mean_val, median_val, num_categories=3):
        self.imputer = DifferentiableImputer(mean_val, median_val)
        self.encoder = DifferentiableEncoder(num_categories)
        # 1 numeric + 3 one-hot + 1 ordinal = 5
        self.model   = NonConvexClassifier(input_dim=5)

    def forward(self, x_num, x_cat):
        # 1) Impute
        x_num_imp = self.imputer.impute(x_num)  # shape (N,)

        # 2) Encode
        x_cat_enc = self.encoder.encode(x_cat)  # shape (N,4)

        # 3) Combine
        x_full = np.hstack([x_num_imp[:, np.newaxis], x_cat_enc])  # (N,5)

        # 4) Predict
        return self.model.predict(x_full)

    def compute_loss(self, y_pred, y_true, epsilon=1e-10):
        # Use log-cosh loss for non-convex behavior
        return log_cosh_loss(y_true, y_pred)

    def compute_gradients(self, x_full, y_pred, y_true):
        # Gradient of the log-cosh loss
        error = y_pred - y_true
        grad_w = np.dot(x_full.T, error) / len(y_true)  # (input_dim,)
        grad_b = np.mean(error)
        return grad_w, grad_b

    def sgd_step(self, grad_w, grad_b, lr):
        self.model.weights -= lr * grad_w
        self.model.bias    -= lr * grad_b

# Training loop remains the same
def train_with_sgd(model, x_num_batch, x_cat_batch, y_batch, num_epochs=30, learning_rate=0.01):
    # Ensure x_cat_batch is (N, ) not (N,1)
    x_cat_batch = x_cat_batch.ravel()  # Flatten if needed

    for epoch in range(num_epochs):
        # Forward pass
        y_pred = model.forward(x_num_batch, x_cat_batch)

        # Compute loss
        loss = model.compute_loss(y_pred, y_batch)

        # We need x_full for gradients:
        x_num_imp = model.imputer.impute(x_num_batch)
        x_cat_enc = model.encoder.encode(x_cat_batch)
        x_full    = np.hstack([x_num_imp[:, np.newaxis], x_cat_enc])

        # Gradients
        grad_w, grad_b = model.compute_gradients(x_full, y_pred, y_batch)

        # Update
        model.sgd_step(grad_w, grad_b, learning_rate)

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}, Loss = {loss:.4f}")

# Run with the modified pipeline
pipeline = NonConvexPipeline(mean_val, median_val, num_categories=3)
train_with_sgd(pipeline, x_num, x_cat, y, num_epochs=30, learning_rate=0.05)