In [31]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_spd_matrix


In [None]:
def generate_training_data_unfixed(m=100, n=2, noise=0.01):
    """
    Generates synthetic training data:
    - x_i ~ N(2,1)
    - y_i = Ax_i + b + noise
    """
    np.random.seed(42)
    
    X = np.random.normal(loc=2, scale=1, size=(m, n))
    A = np.random.normal(0, 1, size=(n,)) 
    b = np.random.normal(0, 1)
    eta_i = np.random.normal(0, noise, size=(m,))
    y = X @ A + b + eta_i
    true_coefficients = {'A': A, 'b': b}
    return X, y, true_coefficients

In [33]:
def generate_training_data_fixed(m=100, n=2, noise=0.01, model_type='linear', nonlinear_func=None):
    """
    Generates synthetic training data with fixed A and b coefficients for linear regression, polynomial regression, and nonlinear cases.
    """
    X = np.random.normal(loc=2, scale=1, size=(m, n))
    if model_type == 'linear':
        A = np.array([1.0, 2.0])[:n]  
        b = 1.0                       
        eta_i = np.random.normal(0, noise, size=(m,))
        y = X @ A + b + eta_i
        true_coefficients = {'A': A, 'b': b}
        
    elif model_type == 'polynomial':
        A_poly = np.array([1.0, 0.5, 0.3, 0.2])
        b = 1.0
        eta_i = np.random.normal(0, noise, size=(m,))
        X_poly = np.hstack([X, X**2])  
        y = X_poly @ A_poly + b + eta_i
        true_coefficients = {'A': A_poly, 'b': b}
        X = X_poly
    
    elif model_type == 'nonlinear':
        b = 1.0
        eta_i = np.random.normal(0, noise, size=(m,))
        y = nonlinear_func(X[:, 0]) + b + eta_i
        true_coefficients = {'function': nonlinear_func.__name__, 'b': b}
    
    return X, y, true_coefficients
    

In [34]:
# need model to fit true data, variable issue, have model have fit true data
# y_i = Ax_i + b + noise
# linear
# polynomial
# tradeoff: higher-deg is richer, but more parameters therefore need more data


class SGD:
    """""
    Stochastic Gradient Descent for Regression
    The model is trained on generated synthetic data to minimize mean 
        Model (Linear Case):
            - Prediction: ŷ_i = w_0 + w_1 x_i1 + ... + w_n x_in
            - Loss: f_i(w) = (1/2) (w^T x_{i,new} - y_i)^2
            - Objective: F(w) = (1/m) Σ f_i(w)
            - Gradient: ∇f_i(w) = (w^T x_{i,new} - y_i) x_{i,new}
        
        Model (Polynomial Case):
            - Prediction: ŷ_i = w_0 + w_1 x_i1 + w_2 x_i1^2
    """""
    def __init__(self, X, y, num_iterations=1000):
        """
        Initializes the SGD model with generated data.

        Args:
            X: The input data matrix of shape (m, n), where m is the number of samples 
                and n is the number of features.
            y: The output data vector of shape (m,).
            num_iterations: The number of iterations for running the SGD optimization. Defaults to 1000.
        """
        self.X = X 
        self.y = y 
        self.m, self.n = X.shape  
        
        self.X_new = np.hstack([np.ones((self.m, 1)), self.X])  
        self.n_new = self.n + 1  
        
        self.num_iterations = num_iterations
        self.w = np.zeros(self.n_new)  
        
        self.w_star = np.linalg.solve(self.X_new.T @ self.X_new, self.X_new.T @ self.y.flatten())
        self.F_star = self.F(self.w_star)
        print(self.w_star)
        
    def f_i(self, w, i):
        """
        Computes the loss for a single sample.

        Args:
            w: The parameter vector.
            i: The index of the current training sample.

        Returns:
            he loss for the i-th training sample.
        """
        return 0.5 * (self.X_new[i] @ w - self.y[i]) ** 2
    
    def grad_f_i(self, w, i):
        """
        Computes the gradient of the loss function with respect to parameters for a single sample.

        Args:
            w: The parameter vector.
            i: The index of the current training sample.

        Returns:
            The gradient of the loss function for the i-th sample.
        """
        return (self.X_new[i] @ w - self.y[i]) * self.X_new[i]
    

    def F(self, w):
        """
        Computes the average loss over all samples.

        Args:
            w: The parameter vector.

        Returns:
            float: The average loss over all samples.
        """
        F = (1/self.m) * sum(self.f_i(w, j) for j in range(self.m))
        return F

    def grad_F(self, w):
        """
        Computes the gradient of the objective function with respect to the parameters.

        Args:
            w: The parameter vector.

        Returns:
            The gradient of the objective function.
        """
        grad_F = (1/self.m) * sum(self.grad_f_i(w, i) for i in range(self.m))
        return grad_F
    
    def stochastic_grad(self):
        """
        Computes the stochastic gradient (using a random training sample).

        Returns:
            The stochastic gradient based on a randomly selected training sample.
        """
        i = np.random.randint(0, self.m)
        grad = self.grad_f_i(self.w, i)
        return grad 
    
    def mini_batch_grad(self):
        """
        Computes the mini-batch gradient (using a random subset of training samples).

        Args:
            batch_size: The number of samples in the mini-batch.

        Returns:
            The mini-batch gradient.
        """
        indices = np.random.choice(self.n, self.batch_size, replace=False)
        return (1 / self.batch_size) * sum(self.grad_f_i(self.w, i) for i in indices)
    
    def compute_L(self, num_samples=1000):
        """
        Computes the Lipschitz constant L of the gradient of the objective function.

        Args:
            num_samples: The number of random samples to estimate L. Default is 1000.

        Returns:
            The estimated Lipschitz constant.
        """
        L_vals = []
        d = self.X_new.shape[1]
        for _ in range(num_samples):
            w1, w2 = np.random.randn(d), np.random.randn(d)
            grad_diff = np.linalg.norm(self.grad_F(w1) - self.grad_F(w2), 2)
            w_diff = np.linalg.norm(w1 - w2, 2)
            
            if w_diff > 1e-8: 
                L_vals.append(grad_diff / w_diff)
        return max(L_vals) if L_vals else 1.0
    
    def compute_c(self):
        """
        Computes the constant c associated with strong convexity (Assumption 4.5).

        Args:
            num_samples (int): The number of random samples to estimate L. Default is 1000.

        Returns:
            float: The constant c.
        """
        H = (1/self.m) * (self.X_new.T @ self.X_new)
        eigenvalues = np.linalg.eigvalsh(H)
        c = min(eigenvalues)
        return c
    
    def estimate_parameters(self):
        """
        Estimates the parameters bounding the variance of the gradient updates.

        Returns:
            tuple: Estimated parameters (mu, mu_G, M, M_V, M_G).
        """
        mu = 1 
        mu_G = 1
        M = 3
        M_V = 1.5
        M_G = M_V + mu_G ** 2
        return mu, mu_G, M, M_V, M_G
            
        
    def compute_fixed_stepsize(self):
        """
        Computes the fixed stepsize for SGD using estimated parameters.

        Returns:
            float: The computed fixed stepsize for the SGD algorithm.
        """
        L = self.compute_L()
        c = self.compute_c()
        M, mu, mu_G, M_V, M_G = self.estimate_parameters()
        M_G = M_V + mu_G ** 2  
        alpha = mu / (L * M_G)
        print(f"Parameters: L = {L}, c = {c}, M_V = {M_V}, mu = {mu}, mu_G = {mu_G}, M_G = {M_G} \n Fixed Stepsize: alpha_bar = {alpha}")
        return alpha
    
#    def compute_diminishing_stepsize_params(self):
    def optimize(self, stepsize_type='fixed'):
        """
        Runs the SGD optimization process for a specified number of iterations.

        Args:
            stepsize_type: 'fixed' or 'diminishing' stepsize.

        Returns:
            tuple: Optimized parameters, the history of the objective function, gradient norms, and 
                   distance to the optimal solution.
        """
        alpha = self.compute_fixed_stepsize()
    
        w = self.w.copy()
        obj_history = [self.F(w)]
        grad_norm_history = [np.linalg.norm(self.grad_F(w))**2]
        dist_to_opt_history = [np.linalg.norm(w - self.w_star)**2]
    
        for k in range(self.num_iterations):
            alpha_k = alpha
        
            self.w = w
            g_k = self.stochastic_grad()
        
            w -= alpha_k * g_k
        
            obj_history.append(self.F(w))
            grad_norm_history.append(np.linalg.norm(self.grad_F(w))**2)
            dist_to_opt_history.append(np.linalg.norm(w - self.w_star)**2)
    
        return w, np.array(obj_history), np.array(grad_norm_history), np.array(dist_to_opt_history)

# mess around with step size
# mess around with (x,y) from different distribution
    

In [35]:
# generate random training data
# generate training data
# x_i has norm(2,0,1)
# for i=1,...,n
    # y = Ax_I + b + ita_i 
# generating (x_i,y_i) to be used for SGD
# different ways of generating synthetic data to make it robust
#X, y, true_params = generate_training_data_fixed(m=200, n=2, noise=0.01)
#sgd = SGD(X, y, num_iterations=1000)
#w_fixed, obj_fixed, grad_fixed, dist_fixed = sgd.optimize(stepsize_type='fixed')
#print("True parameters:")
#print(f"A: {true_params['A']}, b: {true_params['b']}")
#print(f"Learned parameters (fixed step size): w_0 (bias) = {w_fixed[0]}, w_1 = {w_fixed[1]}, w_2 = {w_fixed[2]}")

In [None]:
def run_experiments_with_fixed_parameters():
    #stepsizes = [0.001, 0.01, 0.1]
    num_steps = [100, 1000, 5000]
    noise_levels = [0.01, 0.1, 1.0]
    
    
    for j, n_steps in enumerate(num_steps):
        for k, noise in enumerate(noise_levels):
            X, y, true_params = generate_training_data_fixed(100, 2, noise, 'linear')
            sgd = SGD(X, y, num_iterations=n_steps)
            w, obj, grad, dist = sgd.optimize()
            label = f"Steps={n_steps}, Noise={noise}"
            print("Linear function:")
            print(f"{label}, Final Loss: {obj[-1]:.4f}")
            
    for j, n_steps in enumerate(num_steps):
        for k, noise in enumerate(noise_levels):
            X_poly, y_poly, true_params_poly = generate_training_data_fixed(100, 2, noise, 'polynomial')
            sgd_poly = SGD(X_poly, y_poly, num_iterations=n_steps)
            w_poly, obj_poly, grad_poly, dist_poly = sgd_poly.optimize()
            label = f"Steps={n_steps}, Noise={noise}"
            print("Polynomial function:")
            print(f"{label}, Final Loss: {obj_poly[-1]:.4f}")
    
    for j, n_steps in enumerate(num_steps):
        for k, noise in enumerate(noise_levels):
            X_nonlin, y_nonlin, true_params_nonlin = generate_training_data_fixed(
            100, 1, noise, 'nonlinear', nonlinear_func=np.cos
        )
            sgd_nonlin = SGD(X_nonlin, y_nonlin, num_iterations=n_step s)
            w_nonlin, obj_nonlin, grad_nonlin, dist_nonlin = sgd_nonlin.optimize()
            label = f"Steps={n_steps}, Noise={noise}"
            print("Nonlinear function")
            print(f"{label}, Final Loss: {obj_nonlin[-1]:.4f}")


In [37]:
run_experiments_with_fixed_parameters()

[1.00131685 1.00052578 1.99852314]
Parameters: L = 9.602295785336711, c = 0.08559861624666057, M_V = 1.5, mu = 1, mu_G = 3, M_G = 10.5 
 Fixed Stepsize: alpha_bar = 0.009918263024508118
Linear function:
Steps=100, Noise=0.01, Final Loss: 0.0165
[0.9662271  1.01081898 2.00643226]
Parameters: L = 10.49291146433752, c = 0.10236555925732833, M_V = 1.5, mu = 1, mu_G = 3, M_G = 10.5 
 Fixed Stepsize: alpha_bar = 0.009076422264857848
Linear function:
Steps=100, Noise=0.1, Final Loss: 0.0601
[1.07920236 1.04551454 1.93085495]
Parameters: L = 9.348227037501406, c = 0.10057344257421398, M_V = 1.5, mu = 1, mu_G = 3, M_G = 10.5 
 Fixed Stepsize: alpha_bar = 0.010187824370978314
Linear function:
Steps=100, Noise=1.0, Final Loss: 0.5094
[0.99842154 1.0000324  2.00165686]
Parameters: L = 10.176785504208413, c = 0.10972607904520881, M_V = 1.5, mu = 1, mu_G = 3, M_G = 10.5 
 Fixed Stepsize: alpha_bar = 0.009358367158147468
Linear function:
Steps=1000, Noise=0.01, Final Loss: 0.0009
[1.02500772 0.990969

In [38]:
#plt.figure(figsize=(15, 9))
#plt.subplot(3, 1, 1)
#plt.plot(obj_fixed - sgd.F_star, label='Fixed Stepsize')
#plt.ylabel('$F(w) - F(w_*)$')
#plt.title('Objective Function Decrease')
#plt.legend()
#plt.yscale('log')

#plt.subplot(3, 1, 2)
#plt.plot(grad_fixed, label='Fixed Stepsize')
#plt.ylabel('$||F(w)||^2$')
#plt.title('Gradient Norm Squared')
#plt.legend()
#plt.yscale('log')

#plt.subplot(3, 1, 3)
#plt.plot(dist_fixed, label='Fixed Stepsize')
#plt.xlabel('Iteration')
#plt.ylabel('$||w_k - w_*||^2$')
#plt.title('Distance to Optimum')
#plt.legend()
#plt.yscale('log')

#plt.tight_layout()
#plt.show()