🌿Preprocessing

In [1]:
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import pickle

# load data
mnist_path = "/home/ajay2425/rclass/mnist_dataset/mnist.npz"
with np.load(mnist_path) as data:
    x_train = data["x_train"]
    y_train = data["y_train"]
print(f"x_train shape: {x_train.shape}, y_train shape: {y_train.shape}")

# 1. Flatten
x_train_flat = x_train.reshape(x_train.shape[0], -1)
print(f"1. x_train_flat shape: {x_train_flat.shape}, y_train shape: {y_train.shape}")

# 2. Subsets
subset_size = 2000
x_train_subset = x_train_flat[:subset_size]
y_train_subset = y_train[:subset_size]
print(f"2. x_train_subset shape: {x_train_subset.shape}")

# 3. PCA
n_components = 175
d = 1 # degree
pca = PCA(n_components=n_components)
x_train_pca = pca.fit_transform(x_train_subset)
print(f"3. x_train_pca shape: {x_train_pca.shape}")
variance = np.sum(pca.explained_variance_ratio_)
print(f"variance = {variance}")

# Save the trained PCA model
pca_model_path = "/home/ajay2425/rclass/models/models_grb/trained_pca.pkl" # main
# pca_model_path = "/home/ajay2425/rclass/models/add/trained_pca.pkl" # temp.
with open(pca_model_path, "wb") as file:
    pickle.dump(pca, file)
print(f"(train pca model saved)")


# 4. Normalize
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range=(0, 1))
x_train_norm = scaler.fit_transform(x_train_pca)
print(f"4. x_train_norm shape: {x_train_norm.shape}")

# # 4. Binarize
# threshold_value = 0
# x_train_norm = (x_train_pca > threshold_value).astype(int)
# print(f"4. x_train_norm shape: {x_train_norm.shape}")
# # print(x_train_norm)

x_train shape: (60000, 28, 28), y_train shape: (60000,)
1. x_train_flat shape: (60000, 784), y_train shape: (60000,)
2. x_train_subset shape: (2000, 784)
3. x_train_pca shape: (2000, 200)
variance = 0.971538391077148
(train pca model saved)
4. x_train_norm shape: (2000, 200)


🌿Check Directory

In [2]:
import os

# Path to the directory
models_dir = "/home/ajay2425/rclass/models/models_grb/" # main
# pca_model_path = "/home/ajay2425/rclass/models/add/trained_pca.pkl" # temp.
print(f"Using existing directory: {models_dir}")

# Check if the directory exists
if os.path.exists(models_dir):
    print("Directory already exists.")
else:
    print("Directory does not exist!")

Using existing directory: /home/ajay2425/rclass/models/models_grb/
Directory already exists.


🌿Generate multi-Indices

In [3]:
import time
# Start timer
start_time = time.time()

def r_multi_indices(n, d):
    if n == 1:
        yield (d,)
    else:
        for k in range(d + 1):
            for c in r_multi_indices(n - 1, k):
                yield (d - k, *c)

def generate_multi_indices(n, d):
    from itertools import chain
    return list(chain(*[list(r_multi_indices(n, _)) for _ in range(d + 1)]))

c = generate_multi_indices(n_components, d)
print(f"no_of_coeff =", len(c))

no_of_coeff = 201


🌿Generate Polynomials

In [4]:
import numpy as np

def construct_G_H_matrices(x_train_norm, n, d):
    num_data_points = x_train_norm.shape[0]
    multi_indices = generate_multi_indices(n, d)
    num_coefficients = len(multi_indices)

    # Initialize G and H matrices
    G = []
    H = []

    # Construct G and H using multi-indices
    for i in range(num_data_points):
        G_row = []
        H_row = []
        for idx in multi_indices:
            term = np.prod([x_train_norm[i, k] ** idx[k] for k in range(n)])
            G_row.append(term)
            H_row.append(term)
        G.append(G_row)
        H.append(H_row)

    # Convert G and H to NumPy arrays
    G = np.array(G)
    H = np.array(H)

    # # Normalize G and H row-wise for numerical stability
    # G = G / np.linalg.norm(G, axis=1, keepdims=True)
    # H = H / np.linalg.norm(H, axis=1, keepdims=True)

    return G, H, multi_indices

🌿Problem Setup

In [5]:
from gurobipy import Model, GRB, Env

# Suppress Gurobi logs globally
env = Env(empty=True)
env.setParam("OutputFlag", 0)  # Suppress Gurobi logs
env.start()

# Initialize the Gurobi model with suppressed logs
model = Model("Rational_Function_Optimization", env=env)
multi_indices = generate_multi_indices(n_components, d)
num_coefficients = len(multi_indices)

🌿Feasibility Check

In [6]:
from gurobipy import Model, GRB, quicksum
import numpy as np

def check_feasibility(z, x_train_norm, y_binary, G, H, num_coefficients):

    delta = 1e-6  # Threshold for positivity constraint
    n_samples = x_train_norm.shape[0]

    # Initialize Gurobi model
    model = Model("constraints")
    model.setParam("OutputFlag", 0)  # Suppress Gurobi logs
    # model.setParam("Seed", 42)  # Fix solver seed for consistency
    # model.setParam("Threads", 1)  # Disable multi-threading for consistency

    # Define variables
    alpha = model.addVars(num_coefficients, lb=-GRB.INFINITY, name="alpha")
    beta = model.addVars(num_coefficients, lb=-GRB.INFINITY, name="beta")
    theta = model.addVar(lb=0, name="theta")
    intermediate_vars = model.addVars(n_samples, 3, lb=-GRB.INFINITY, name="intermediate")

    # Add constraints for each sample
    for i in range(n_samples):
        # Auxiliary variables for linearization
        G_x = quicksum(alpha[j] * G[i, j] for j in range(num_coefficients))  # αᵀG(xᵢ)
        H_x = quicksum(beta[j] * H[i, j] for j in range(num_coefficients))   # βᵀH(xᵢ)
        f_x = y_binary[i]  # Binary label for the sample

        # Define intermediate variables to simplify nonlinear terms
        model.addConstr(intermediate_vars[i, 0] == (f_x - z) * H_x, name=f"term1_sample_{i}")  # (f(xᵢ) - z)·βᵀH(xᵢ)
        model.addConstr(intermediate_vars[i, 1] == (-(f_x + z)) * H_x, name=f"term2_sample_{i}")  # (-(f(xᵢ) + z))·βᵀH(xᵢ)
        model.addConstr(intermediate_vars[i, 2] == H_x, name=f"positivity_term_sample_{i}")  # βᵀH(xᵢ)

        # Constraint 1: (f(xᵢ) - z)·βᵀH(xᵢ) - αᵀG(xᵢ) ≤ θ
        model.addConstr(intermediate_vars[i, 0] - G_x <= theta, name=f"upper_bound_sample_{i}")

        # Constraint 2: αᵀG(xᵢ) + (-(f(xᵢ) + z))·βᵀH(xᵢ) ≤ θ
        model.addConstr(G_x + intermediate_vars[i, 1] <= theta, name=f"lower_bound_sample_{i}")

        # Constraint 3: βᵀH(xᵢ) ≥ δ
        model.addConstr(intermediate_vars[i, 2] >= delta, name=f"positivity_sample_{i}")

    # Set objective
    model.setObjective(theta, GRB.MINIMIZE)

    # Solve the model
    model.optimize()

    # Extract results
    if model.status == GRB.OPTIMAL:
        optimal_alpha = [alpha[j].X for j in range(num_coefficients)]
        optimal_beta = [beta[j].X for j in range(num_coefficients)]
        optimal_theta = theta.X
        print(f"Optimal solution found: Theta = {optimal_theta}")
        return True, optimal_alpha, optimal_beta, optimal_theta
    else:
        print(f"Model not feasible or no solution found. Status: {model.status}")
        return False, None, None, None

🌿Bisection Loop

In [7]:
def bisection_loop(x_train_norm, y_binary, uL, uH, precision, model, num_coefficients, G, H, delta):
    z_values = []
    optimal_alpha, optimal_beta, optimal_theta = None, None, None
    
    print("Starting bisection loop...")
    print(f"Initial bounds: uL={uL}, uH={uH}, precision={precision}")

    while uH - uL > precision:
        z = (uL + uH) / 2  # Midpoint of bounds
        z_values.append(z)
        print(f"Testing z = {z}...")

        # Feasibility Check
        feasible, alpha_coefficients, beta_coefficients, theta = check_feasibility(
            z, x_train_norm, y_binary, G, H, num_coefficients
        )

        # for Debugging feasibility results
        if feasible:
            print(f"z = {z} is feasible.")
            uH = z
            optimal_alpha, optimal_beta, optimal_theta = alpha_coefficients, beta_coefficients, theta
        else:
            print(f"z = {z} is not feasible.")
            uL = z

    print("Bisection loop completed.")
    print(f"Optimal z: {uH}")
    print(f"Optimal theta: {optimal_theta}")
    return uH, optimal_alpha, optimal_beta, optimal_theta, z_values

🌿Train (one-vs-all)

In [8]:
from sklearn.preprocessing import LabelBinarizer
import pickle
from gurobipy import *

# import sys
# # Redirect stdout to a log file
# sys.stdout = open('gurobi_output.log', 'w')

#------------------------------------------------------------------------
# Binarize the labels for one-vs-all classification
lb = LabelBinarizer()
y_binarized = lb.fit_transform(y_train_subset)
# print(f"y_binarized shape: {y_binarized.shape}")

# Define bisection parameters
uL = 0.0  # Lower bound
uH = 10.0  # Upper bound
precision = 1e-8  # Precision for bisection loop
delta = 1e-6  # Threshold for positivity constraint

#------------------------------------------------------------------------
# Train models for each digit
for digit in range(10):
    print(f"Training classifier for digit {digit}...")

    # # Extract binary labels for the current digit (one-vs-all)
    # y_binary = y_binarized[:, digit]
    # Assign -1 to the current digit & 1 to others
    # y_binary = np.where(y_binarized[:, digit] == 1, -1, 1)

    y_binary = np.where(y_binarized[:, digit] == 1, 1, -1)

    # Construct G and H matrices for the training data
    G, H, multi_indices = construct_G_H_matrices(x_train_norm, n_components, d)

    # Initialize the Gurobi model
    model = Model(f"digit_{digit}_classifier")
    model.setParam('OutputFlag', 0)  # Suppress output during training

    # Run bisection loop to find optimal coefficients
    optimal_z, optimal_alpha, optimal_beta, optimal_theta, z_values = bisection_loop(
        x_train_norm, y_binary, uL, uH, precision, model, len(multi_indices), G, H, delta
    )

    # Check if a feasible solution was found
    if optimal_alpha is None or optimal_beta is None or optimal_theta is None:
        print(f"No feasible solution found for digit {digit}. Skipping...")
        continue

    #------------------------------------------------------------------------
    # Save the model parameters
    model_data = {
        "alpha": optimal_alpha,
        "beta": optimal_beta,
        "theta": optimal_theta,
        "z": optimal_z,
        "n_components": n_components,
        "degree": d,
        "multi_indices": multi_indices
    }
    with open(f"{models_dir}/classifier_{digit}.pkl", "wb") as file:
        pickle.dump(model_data, file)

    print(f"Model for digit {digit} saved successfully!")

#Summary
print("Training complete. Models saved in:", models_dir)

# Stop timer
training_time = time.time() - start_time
print(f"Total Training Time: {training_time:.2f} seconds")

# # Restore stdout
# sys.stdout.close()
# sys.stdout = sys.__stdout__

Training classifier for digit 0...
Set parameter Username
Academic license - for non-commercial use only - expires 2026-01-12
Starting bisection loop...
Initial bounds: uL=0.0, uH=10.0, precision=1e-08
Testing z = 5.0...
Optimal solution found: Theta = 0.0
z = 5.0 is feasible.
Testing z = 2.5...
Optimal solution found: Theta = 0.0
z = 2.5 is feasible.
Testing z = 1.25...
Optimal solution found: Theta = 0.0
z = 1.25 is feasible.
Testing z = 0.625...
Optimal solution found: Theta = 0.0
z = 0.625 is feasible.
Testing z = 0.3125...
Optimal solution found: Theta = 0.0
z = 0.3125 is feasible.
Testing z = 0.15625...
Optimal solution found: Theta = 2.0554824248953458e-07
z = 0.15625 is feasible.
Testing z = 0.078125...
Optimal solution found: Theta = 4.176220786190078e-07
z = 0.078125 is feasible.
Testing z = 0.0390625...
Optimal solution found: Theta = 5.715917274078482e-07
z = 0.0390625 is feasible.
Testing z = 0.01953125...
Optimal solution found: Theta = 6.72719227726837e-07
z = 0.01953125