In [None]:
import os
os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'
import torch
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader
import numpy as np
from sklearn.preprocessing import StandardScaler
from scipy.stats import linregress, t, f
import matplotlib.pyplot as plt
import torch.nn as nn
from scipy.optimize import curve_fit
import torch.optim as optim
import random
import warnings
import json
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

In [None]:
def set_seed(seed):
    # Set seed for CPU
    torch.manual_seed(seed)
    # Set seed for GPU (if available)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # For multiple GPUs
    # Ensure deterministic behavior
    torch.use_deterministic_algorithms(True)

    # Set seed for NumPy
    np.random.seed(seed)

    # Set seed for Python's random module
    random.seed(seed)

In [None]:
set_seed(42)

# Load the training dataset to calculate mean and std
train_dataset = MNIST(root='./data', train=True, download=True, transform=transforms.ToTensor())
train_loader = DataLoader(train_dataset, batch_size=1000, shuffle=False)

# Extract data and labels
train_data = train_dataset.data.numpy()
train_labels = train_dataset.targets.numpy()

# Reshape and standardize the data
num_samples = train_data.shape[0]
train_data = train_data.reshape(num_samples, -1)  # Flatten the images

# Standardize Data
scaler_X = StandardScaler()
train_data = scaler_X.fit_transform(train_data)

# Calculate Sigma
train_data = train_data.T
Sigma = (1 / (train_data.shape[1] - 1)) * train_data @ train_data.T

# Calculate eigenvalues of Sigma
eigenvalues = np.linalg.eigvals(Sigma)

# Find the maximum eigenvalue
max_eigenvalue = np.abs(np.max(eigenvalues))

# Find the maximum learning rate
max_lr = 2 * train_data.shape[1] / (max_eigenvalue * (train_data.shape[1] - 1))

# Print the maximum learning rate
print(f"Maximum Learning Rate: {max_lr}")

# Calculate mean and std
mean = 0.
std = 0.
for images, _ in train_loader:
    batch_mean = images.mean()
    batch_std = images.std()
    mean += batch_mean
    std += batch_std

mean /= len(train_loader)
std /= len(train_loader)

print(f'Mean: {mean}, Std: {std}')

# Define the transform with normalization
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[mean.item()], std=[std.item()]),
    transforms.Lambda(lambda x: x.view(-1))  # Flatten the images
])

# Load the dataset with normalization
batch_size = 32
train_dataset = MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = MNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class LogisticRegression(nn.Module):
    def __init__(self, input_dim):
        super(LogisticRegression, self).__init__()
        self.linear = nn.Linear(input_dim, 10)  # 10 classes for MNIST

    def forward(self, x):
        return self.linear(x)

In [None]:
def calculate_gradient_magnitude(model):
    # Calculate the magnitude of gradients for all parameters
    magnitude = 0.0
    for param in model.parameters():
        if param.grad is not None:
            magnitude += param.grad.norm().item() ** 2
    return magnitude ** 0.5

In [None]:
def exponential_test(loss, t_start):
  with warnings.catch_warnings():
      # Ignore all warnings in this block
      warnings.simplefilter("ignore")
      def exponential_decay(x, a, b, c):
        # Calculate the exponent
        exponent = -b * x
        return a * exponent + c

      x = np.arange(t_start, len(loss)+t_start)
      y = np.array(loss)
      p_exp, _ = curve_fit(exponential_decay, x, y) # Parameters of exponential decay fitting
      lin_slope, lin_intercept, r, p, se = linregress(x, y) # Parameters of linear fitting
      ss_exp = np.sum((y - exponential_decay(x, *p_exp))**2) # Residuals for exponential
      ss_lin = np.sum((y - (lin_slope*x + lin_intercept))**2) # Residuals for linear
      df1 = len(x) - 2 # Degrees of freedom for linear
      df2 = len(x) - 3 # Degrees of freedom for exponential
      f_stat = (ss_lin - ss_exp) / (df1 - df2) / (ss_exp / df2) # F-test of residuals
      p = 1 - f.cdf(f_stat, df1-df2, df2)
      return p

def linear_test(loss, t_start):
  x = np.arange(t_start, len(loss)+t_start)
  y = np.array(loss)
  slope, intercept, r_value, p_value, std_err = linregress(x, y)
  # Calculate the t-statistic
  t_stat = slope / std_err
  # Degrees of freedom
  df = len(x) - 2
  # Calculate the one-tailed p-value
  p = t.cdf(t_stat, df)
  return p

In [None]:
def train_exptest(model, max_lr, alpha=0.05, beta=0.33, epochs=10):
  model.to(device)
  model.train()
  criterion = nn.CrossEntropyLoss()
  optimizer = optim.SGD(model.parameters(), lr=max_lr)

  overall_losses = [] # Losses for entire training
  train_losses = [] # Losses for current fitting window
  exp_test = False # No exponential decay has been detected to begin with
  resultant_vector = None # Vector sum of gradients
  mags_sum = 0 # Sum of magnitudes of gradients
  t_start = 0 # Starting time point of fitting
  window = None # Window size of fitting calculations
  full_window = None # Full window (incorporating correction factor, if True) of fitting calculations
  curr_lr = max_lr # Current learning rate
  initial_loss = None # Initial loss value used in window size calculation
  correct = True # Window correction is enabled

  for epoch in range(epochs):
      for images, labels in train_loader:
          # Training loop
          optimizer.zero_grad()
          outputs = model(images.to(device))
          loss = criterion(outputs, labels.to(device))
          train_losses.append(loss.detach().item())
          overall_losses.append(loss.detach().item())
          loss.backward()
          optimizer.step()
          if initial_loss is None:
            initial_loss = loss.item()
          # Store gradients
          if window == None:
            window = round(2 * np.sqrt(2) * initial_loss / (max_lr * np.exp(1)))
          elif (len(train_losses) >=1) and (len(train_losses) < window) and (correct):
            curr_gradients = [p.grad for p in model.parameters() if p.grad is not None]
            curr_magnitude = calculate_gradient_magnitude(model)
            mags_sum += curr_magnitude
            vec = torch.cat([p.clone().detach().flatten() for p in curr_gradients if p is not None])
            if resultant_vector is None:
              resultant_vector = vec
            else:
              resultant_vector += vec
          elif (len(train_losses) == window) and (correct):
            correction = mags_sum / torch.norm(resultant_vector).item()
            full_window = round(window * correction)
          # Perform exponential and linear testing
          elif (len(train_losses) == full_window):
            if exp_test == False:
              p = exponential_test(train_losses, t_start)
              if p < alpha:
                exp_test = True # Exponential decay detected, turn off future detection
                t_start += len(train_losses)
                train_losses = [] # Reset losses over current fitting window
              else:
                curr_lr *= beta # Adjust learning rate
                optimizer = optim.SGD(model.parameters(), lr=curr_lr)
                window = round(2 * np.sqrt(2) * initial_loss / (curr_lr * np.exp(1))) # Recalculate window size
                train_losses = [] # Reset losses over current fitting window
                # Reset window correction parameters
                mags_sum = 0
                resultant_vector = None
            else:
                p = linear_test(train_losses, t_start)
                if p < alpha:
                  t_start += len(train_losses)
                  train_losses = [] # Reset losses over current fitting window
                else:
                  t_start += len(train_losses)
                  curr_lr *= beta # Adjust learning rate
                  optimizer = optim.SGD(model.parameters(), lr=curr_lr)
                  window = round(2 * np.sqrt(2) * initial_loss / (curr_lr * np.exp(1))) # Recalculate window size
                  train_losses = [] # Reset losses over curent fitting window
                  # Reset window correction parameters
                  mags_sum = 0
                  resultant_vector = None

  return overall_losses

In [None]:
def calculate_test_accuracy(model, test_dataloader):
    # Standard test accuracy calculation
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in test_dataloader:
            outputs = model(images.to(device))
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted.detach().cpu() == labels).sum().item()

    accuracy = correct / total
    return accuracy

In [None]:
def moving_average(data, window_size):
    data = np.array(data).astype(float)
    return np.convolve(data, np.ones(window_size) / window_size, mode='valid')

In [None]:
def convert_keys_to_str(d):
    """
    Recursively converts all non-string dictionary keys to strings.
    
    Parameters:
    - d (dict): The dictionary to convert.
    
    Returns:
    - dict: A new dictionary with all keys as strings.
    """
    new_dict = {}
    for key, value in d.items():
        # Convert key to string if it is not already a string
        new_key = str(key) if not isinstance(key, str) else key
        if isinstance(value, dict):
            # Recursively convert nested dictionaries
            new_dict[new_key] = convert_keys_to_str(value)
        elif isinstance(value, list):
            # Convert list elements if they are dictionaries
            new_dict[new_key] = [convert_keys_to_str(v) if isinstance(v, dict) else v for v in value]
        else:
            # Directly copy other values
            new_dict[new_key] = value
    return new_dict

def save_dict_to_file(dictionary, file_path):
    """
    Saves a dictionary to a text file.
    
    Parameters:
    - dictionary (dict): The dictionary to be saved.
    - file_path (str): The path to the file where the dictionary will be saved.
    """
    try:
        # Convert all keys to strings
        dictionary = convert_keys_to_str(dictionary)
        with open(file_path, 'w') as file:
            json.dump(dictionary, file, indent=4)
        print(f"Dictionary successfully saved to {file_path}")
    except Exception as e:
        print(f"An error occurred while saving the dictionary: {e}")


In [None]:
epochs = 5
num_trials = 5

train_losses_dict = {}
test_accuracies_dict = {}

for trial in range(num_trials):
  for beta in [0.33, 0.1, 0.05]:
    for alpha in [0.1, 0.05, 0.01]:
      set_seed(trial)

      # Initialize the model
      input_dim = 28 * 28
      model = LogisticRegression(input_dim)

      # Train with ExpTest
      train_losses_exptest = train_exptest(model, max_lr, alpha=alpha, beta=beta, epochs=epochs)
      test_accuracy_exptest = calculate_test_accuracy(model, test_loader)

      if train_losses_dict.get((alpha, beta)) is None:
        train_losses_dict[(alpha, beta)] = [train_losses_exptest]
      else:
        train_losses_dict[(alpha, beta)].append(train_losses_exptest)

      if test_accuracies_dict.get((alpha, beta)) is None:
        test_accuracies_dict[(alpha, beta)] = [test_accuracy_exptest]
      else:
        test_accuracies_dict[(alpha, beta)].append(test_accuracy_exptest)

      print(f"Test Accuracy (ExpTest): {test_accuracy_exptest}")
      plt.plot(moving_average(train_losses_exptest, round(len(train_dataset)/batch_size)))
      plt.show()

In [None]:
filename = "train_losses_hyperparameters.txt"
save_dict_to_file(train_losses_dict, filename)

filename = "test_losses_hyperparameters.txt"
save_dict_to_file(test_accuracies_dict, filename)
for beta in [0.33, 0.1, 0.05]:
    for alpha in [0.1, 0.05, 0.01]:
        print("Mean +/- STD of Test Accuracies ({}, {}):".format(alpha, beta), round(100*np.mean(test_accuracies_dict[(alpha, beta)]), 2), "+/-", round(100*np.std(test_accuracies_dict[(alpha, beta)]), 4))