
This script is used to obtain an optimised network architecture. It uses Optuna to minimize the objective function.


In [None]:
!pip install --quiet optuna

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
import torch.nn as nn
import numpy as np
import time
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import optuna
from optuna.trial import TrialState
from sklearn.model_selection import train_test_split
from datetime import datetime
import math

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# Constants
n_trials = 500 # number of trials to perform
dimension = 2 # dimension of dataset
noise = False # add noise to the dataset
dataset_type = 'square' # 'circle' or 'square'
N = 3200 # number of points in the dataset
num_epochs = 100 # number of epochs before training is stopped
batch_size = 32
learning_rate = 0.001 # learning rate for training
slope = 0.01 # slope for Leaky functions
patience = 6

In [None]:
# Utils

def sumOne(u):
  u = Positive(u)
  u = u/torch.sum(u)
  return Positive(u)

def Positive(X):
    return torch.abs(X)

# From https://github.com/JiJingYu/delta_orthogonal_init_pytorch/blob/master/demo.py
def genOrthgonal(dim):
    a = torch.zeros((dim, dim)).normal_(0, 1)
    q, r = torch.linalg.qr(a)
    d = torch.diag(r, 0).sign()
    diag_size = d.size(0)
    d_exp = d.view(1, diag_size).expand(diag_size, diag_size)
    q.mul_(d_exp)
    return q

def makeDeltaOrthogonal(weights, gain):
    rows = weights.size(0)
    cols = weights.size(1)
    if rows > cols:
        print("In_filters should not be greater than out_filters.")
    weights.data.fill_(0)
    dim = max(rows, cols)
    q = genOrthgonal(dim)
    # mid1 = weights.size(2) // 2
    # mid2 = weights.size(3) // 2
    # weights[:, :, mid1, mid2] = q[:weights.size(0), :weights.size(1)]
    weigths = q
    weights.mul_(gain)

#Convolutional layers
def deconv_orth_dist(kernel, padding = 2, stride = 1):
    [o_c, i_c, w, h] = kernel.shape
    output = torch.conv2d(kernel, kernel, padding=padding)
    target = torch.zeros((o_c, o_c, output.shape[-2], output.shape[-1])).to(kernel.device)
    ct = int(np.floor(output.shape[-1]/2))
    target[:,:,ct,ct] = torch.eye(o_c).to(kernel.device)
    return torch.norm( output - target )

def deconv_orth_dist_2d(matrix):
    [w, h] = matrix.shape
    target = torch.eye(w)
    # print(f'Matrix: {matrix}')
    # print(f'Target: {target}')
    return torch.linalg.matrix_norm(matrix-target,2)


def conv_orth_dist(kernel, stride = 1):
    [o_c, i_c, w, h] = kernel.shape
    assert (w == h),"Do not support rectangular kernel"
    assert stride<w,"Please use matrix orthgonality instead"
    new_s = stride*(w-1) + w#np.int(2*(half+np.floor(half/stride))+1)
    temp = torch.eye(new_s*new_s*i_c).reshape((new_s*new_s*i_c, i_c, new_s,new_s)).to(kernel.device)
    out = (F.conv2d(temp, kernel, stride=stride)).reshape((new_s*new_s*i_c, -1))
    Vmat = out[np.floor(new_s**2/2).astype(int)::new_s**2, :]
    temp= np.zeros((i_c, i_c*new_s**2))
    for i in range(temp.shape[0]):temp[i,np.floor(new_s**2/2).astype(int)+new_s**2*i]=1
    return torch.norm( Vmat@torch.t(out) - torch.from_numpy(temp).float().to(kernel.device) )

#Fully connected layers
def orth_dist(mat, stride=None):
    mat = mat.reshape( (mat.shape[0], -1) )
    if mat.shape[0] < mat.shape[1]:
        mat = mat.permute(1,0)
    return torch.norm( torch.t(mat)@mat - torch.eye(mat.shape[1]).to(mat.device))

def power_method(A, A_t, u_init, k=1):
    u = u_init
    for i in range(k):
        v = A(u)
        v /= torch.sqrt(torch.sum(v**2))
        u = A_t(v)
        sigma = torch.sum(u * u)
        u /= torch.sqrt(torch.sum(u**2))
    return sigma, u[0] #so it returns a 3d tensor

def compute_spectral_norm(conv, u_init=None, im_size=(3, 32, 32), k=1):
    if u_init is None:
        with torch.no_grad():
            u_init = torch.randn(1, *im_size).to(conv.weight.device)
    u_init = u_init.to(conv.weight.device)
    with torch.no_grad():
        return power_method(lambda u: torch.nn.functional.conv2d(u, conv.weight, padding=tuple(v//2 for v in conv.weight.shape[2:])),
                lambda v: torch.nn.functional.conv_transpose2d(v, conv.weight, padding=tuple(v//2 for v in conv.weight.shape[2:])),
                u_init, k)

def conv_block(in_channels, out_channels, pool=False):
    convo = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1,bias=False)
    layers = [convo]
    if pool: layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)



In [None]:
class multiClassHingeLoss(nn.Module): # From paper Dynamical Systems
    def __init__(self, p=1, margin=0.1, device='cpu', size_average=True):
        super(multiClassHingeLoss, self).__init__()
        self.margin=margin
        self.size_average=size_average
        self.device = device

    def forward(self, output, y):
        output_y=output[torch.arange(0,y.size()[0]).long().to(self.device),y.data.to(self.device)].view(-1,1) #it is a (Batch Size x 1) tensor, having entries that are x[y]
        loss=output-output_y+self.margin #this has self.margin in position y and the difference between the entry of x and x[y] in the other positions
        #remove i=y items
        loss[torch.arange(0,y.size()[0]).long().to(self.device),y.data.to(self.device)]=0 #sets to 0 the entry in position y, instead of having self.margin
        #max(0,_)
        loss[loss<0]=0 #sets to 0 the entries of loss where we have negative numbers, i.e. those meeting the margin (there is a higher difference than the margin between x[y] and x[i])
        #sum up
        loss=torch.sum(loss)
        if(self.size_average):
            loss/=output.size()[0]
        return loss

In [None]:
# Functions used to generate datasets
num_points = N
def GenerateCircleDataset(noise=False):
  # Generate random points
  points = np.random.randn(num_points, dimension)

  # Calculate distances to the origin
  distances = np.linalg.norm(points, axis=1)

  # Generate labels: 0 if distance < 2/3, 1 if 2/3 < distance < 4/3, 2 if distance > 4/3
  labels = np.zeros_like(distances)
  labels[distances > 2/3] = 1
  labels[distances > 4/3] = 2

  # Convert numpy arrays to PyTorch tensors
  labels_tensor = torch.tensor(labels, dtype=torch.long)
  if noise:
    noise = 0.1*np.random.randn(num_points, dimension)
    noisy_points = points + noise
    points_tensor = torch.tensor(noisy_points, dtype=torch.float32)
  else:
    points_tensor = torch.tensor(points, dtype=torch.float32)

  return points_tensor, labels_tensor

def GenerateSquareDataset(noise=False):
    # Generate random points uniformly between -1 and 1
    points = np.random.uniform(low=-1.0, high=1.0, size=(num_points, dimension))

    # Initialize labels
    labels = np.zeros(num_points)

    # Assign labels based on quadrant
    for i, point in enumerate(points):
        if point[0] >= 0 and point[1] >= 0:
            labels[i] = 0  # Quadrant 1
        elif point[0] < 0 and point[1] >= 0:
            labels[i] = 1  # Quadrant 2
        elif point[0] < 0 and point[1] < 0:
            labels[i] = 2  # Quadrant 3
        else:
            labels[i] = 3  # Quadrant 4

    # Convert numpy arrays to PyTorch tensors
    labels_tensor = torch.tensor(labels, dtype=torch.long)

    if noise:
        # Add noise to the points
        noise = 0.1 * np.random.randn(num_points, dimension)
        noisy_points = points + noise
        points_tensor = torch.tensor(noisy_points, dtype=torch.float32)
    else:
        points_tensor = torch.tensor(points, dtype=torch.float32)

    return points_tensor, labels_tensor

In [None]:
# Generate training data

if dataset_type == 'circle':
  points_tensor, labels_tensor = GenerateCircleDataset(noise=noise)
elif dataset_type == 'square':
  points_tensor, labels_tensor = GenerateSquareDataset(noise=noise)

# Create a scatter plot
plt.axis('equal')
plt.scatter(points_tensor[:,0], points_tensor[:,1], c=labels_tensor)

# Set the title and labels
plt.title('Random Points')
plt.xlabel('X axis')
plt.ylabel('Y axis')


# Generate validation data
if dataset_type == 'circle':
  val_points_tensor, val_labels_tensor = GenerateCircleDataset(noise=False) # no noise on validation data
elif dataset_type == 'square':
  test_points_tensor, test_labels_tensor = GenerateSquareDataset(noise=False)


In [None]:
if dataset_type == 'circle':
    num_outputs = 3
elif dataset_type == 'square':
    num_outputs = 4
else:
    raise ValueError("Invalid dataset type")

class DynamicModel(nn.Module): # Fully connected model
    def __init__(self, trial):
        super(DynamicModel, self).__init__()
        self.layers = nn.ModuleList()
        n_layers = trial.suggest_int('n_layers', 2, 5)  # Suggesting between 2 and 5 layers

        in_features = dimension
        for i in range(n_layers):
            out_features = trial.suggest_int(f'n_units_l{i}', 2, 128)
            self.layers.append(nn.Linear(in_features, out_features))
            in_features = out_features

        self.layers.append(nn.Linear(out_features, num_outputs))  # Output layer

    def forward(self, x):
        for layer in self.layers[:-1]:
            x = F.relu(layer(x))
        x = self.layers[-1](x)
        return x

def objective(trial):
    # Create a dynamic model based on the trial's suggestions
    model = DynamicModel(trial).to(device)

    # Loss function and optimizer
    # criterion = nn.CrossEntropyLoss()
    criterion = multiClassHingeLoss(margin=margin)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Training loop
    for epoch in range(num_epochs):
        model.train()
        for i in range(0, N, batch_size):
          inputs = points_tensor[i:i+batch_size]
          targets = labels_tensor[i:i+batch_size]
          optimizer.zero_grad()
          outputs = model(inputs)
          loss = criterion(outputs, targets)
          loss.backward()
          optimizer.step()

    # Validation loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for i in range(0, N, batch_size):
            batch_features = val_points_tensor[i:i+batch_size]
            batch_labels = val_labels_tensor[i:i+batch_size]

            outputs = model(batch_features)
            loss = criterion(outputs, batch_labels)
            val_loss += loss.item()
    val_loss /= (N // batch_size)  # Average validation loss over batches

    return val_loss

class ExpansiveContractiveNet(nn.Module):
    def __init__(self, trial, nl, in_channels=dimension, sub_steps=1):
        super(ExpansiveContractiveNet, self).__init__()
        n_blocks = trial.suggest_int('n_blocks', 1, 20)
        self.nlayers = n_blocks*2 # how many total layers? = blocks * 2
        self.nl = nl  # activation function
        self.slope = 0.01
        self.sub_steps = sub_steps
        self.u = torch.nn.Parameter(torch.rand(self.nlayers))
        self.rescalings = nn.Parameter(torch.rand(1+self.nlayers//2))

        # Layers
        self.nodes_per_layer = trial.suggest_int('nodes_per_layer', 2, 64)

        self.lift = nn.Linear(in_channels, self.nodes_per_layer)

        self.inner_layers = nn.ModuleList([nn.Linear(self.nodes_per_layer, self.nodes_per_layer) for i in range(self.nlayers)])

        self.project = nn.Linear(self.nodes_per_layer, 4)

        self.rescalings = nn.Parameter(torch.rand(1+self.nlayers//2))

    def getDepth(self):
        return self.nlayers

    def getReg(self,):
        reg = 0
        for i in range(self.nlayers):
            reg += deconv_orth_dist_2d(self.inner_layers[i].weight)
        return reg

    def forward(self, x):
        self.rescalings.data[:-1] = torch.clip(self.rescalings.data[:-1],min=0,max=1)
        x = torch.relu(self.rescalings[-1] * self.lift(x))

        dts = Positive(self.u)

        for i in np.arange(0,self.nlayers,2):

          dte = dts[i]/self.sub_steps
          dtc = dts[i+1]/self.sub_steps
          Ae = self.inner_layers[i]
          Ac = self.inner_layers[i+1]

          for k in range(self.sub_steps):
                x = x + dte * self.rescalings[i//2] * self.nl(Ae(x), self.slope)
                x = x - dtc * self.nl(Ac(x), self.slope)

        x = self.project(x) # project to number of labels
        return x

def objective_lipschitz_constrained(trial):
    # Create a dynamic model based on the trial's suggestions
    # model = DynamicModel(trial).to(device)
    model = ExpansiveContractiveNet(trial, nl=F.relu).to(device)

    # Loss function and optimizer
    margin = 0.0013
    # criterion = nn.CrossEntropyLoss()
    criterion = multiClassHingeLoss(margin=margin)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)


    train_points, val_points, train_labels, val_labels = train_test_split(points_tensor, labels_tensor, test_size=0.2)

    best_val_loss = float('inf')
    epochs_without_improvement = 0

    gamma = .1 # used to scale the orthogonal constraint regulaiton

    for epoch in range(1000):
        model.train()
        for i in range(0, len(train_points), batch_size):
          inputs = train_points[i:i+batch_size]
          targets = train_labels[i:i+batch_size]

          optimizer.zero_grad()
          outputs = model(inputs)
          loss = criterion(outputs, targets)

          if True:
                loss += gamma * model.getReg()
          if math.isnan(loss.item()) or math.isinf(loss.item()):
              print("NaN loss detected, stopping training.")
              return -1
          loss.backward()
          optimizer.step()
          if True:
                with torch.no_grad():
                    refLip = 1

                    lift_norm = torch.linalg.matrix_norm(model.lift.weight.data,2)
                    model.lift.weight.data /= max(1,lift_norm/refLip)

                for q in range(model.getDepth()):
                  norm = torch.linalg.matrix_norm(model.inner_layers[q].weight.data,2)
                  model.inner_layers[q].weight.data /= max(1,norm/refLip)

        # Compute accuracy on test set
        model.eval()
        with torch.no_grad():
            norms = []
            nonort = []
            lift = []
            liftOrt = []

            n = torch.linalg.matrix_norm(model.lift.weight.data,2)
            lift.append(n.item())
            for s in range(model.getDepth()):
              n = torch.linalg.matrix_norm(model.inner_layers[s].weight.data,2)
              nonort.append(deconv_orth_dist_2d(model.inner_layers[s].weight).item())
            outputs = model(test_points_tensor)
            _, predicted = torch.max(outputs, 1) # Maximum along axis=1
            accuracy = (predicted == test_labels_tensor).float().mean()

            outputs = model(test_points_tensor)
            loss = criterion(outputs, test_labels_tensor)
            test_loss = loss.item()

        # Validation step
        val_loss = 0.0
        with torch.no_grad():
            for i in range(0, len(val_points), batch_size):
                inputs = val_points[i:i+batch_size]
                targets = val_labels[i:i+batch_size]
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                if torch.isnan(loss) or math.isinf(loss.item()):
                  print("NaN loss detected, stopping training.")
                  return -1, -1, epoch
                val_loss += loss.item()

        val_loss /= len(val_points) // batch_size
        # print(f'Time: {datetime.now().strftime("%H:%M:%S")}, Epoch [{epoch+1}/{1000}], Loss: {loss.item()}, Val Loss: {val_loss}, Epochs without improvement:{epochs_without_improvement}')

        # Check for early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_without_improvement = 0
        else:
            epochs_without_improvement += 1
            if epochs_without_improvement >= patience:
                print(f'Early stopping triggered after {epoch+1} epochs')
                break
    return accuracy


In [None]:
study = optuna.create_study(direction='minimize')
study.optimize(objective_lipschitz_constrained, n_trials=n_trials)

print("Number of finished trials: ", len(study.trials))
print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)
print("  Params: ")
for key, value in trial.params.items():
    print(f"    {key}: {value}")


In [None]:
optuna.visualization.plot_optimization_history(study)

In [None]:
optuna.visualization.plot_slice(study)

In [None]:
optuna.visualization.plot_contour(study)

In [None]:
# Extract trial values
values = [trial.value for trial in study.trials]

# Plot using matplotlib with a logarithmic scale
plt.figure(figsize=(10, 6))
plt.plot(values, marker='o')
plt.yscale('log')
plt.xlabel('Trial')
plt.ylabel('Objective Value')
plt.title('Optimization History (Logarithmic Scale)')
plt.grid(True)
plt.show()


In [None]:
# Compute the minimum value observed up to each trial
min_values = [min(values[:i+1]) for i in range(len(values))]

# Plot using matplotlib
plt.figure(figsize=(10, 6))
plt.scatter(range(len(values)), values, marker='.', c='blue', label='Trial values')
plt.plot(range(len(values)), values, linestyle='', color='blue', alpha=0.5)
plt.plot(range(len(min_values)), min_values, linestyle='-', color='red', label='Minimum loss value encountered in previous trials')
plt.yscale('log')
plt.xlabel('Trial')
plt.ylabel('Objective Value')
plt.title('Optimization History')
plt.legend()
plt.grid(True)
plt.show()