In [1]:
from scipy.stats import wishart, random_correlation
import numpy as np
import cvxpy as cp
from cvxpylayers.torch import CvxpyLayer
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm


In [2]:
# Problem generation parameters
seed = 42
n_vars = 5
n_context = 4
wishart_df = 50
datset_size = 100
test_proportion = 0.3

# Optimisation problem parameters
risk_limit = 0.4
robustness_parameter = 1
penalty_coefficient = 1

# Prediction model hyperparameters
hidden_size = 10

# Training hyperparameters
learning_rate = 0.1
epochs = 200
batch_size = 32


In [3]:
class ConditionalProcessGenerator:
    
    def __init__(self, n_vars, n_context, seed):
        self.n_vars = n_vars
        self.n_context = n_context
        self.seed = seed
        
        np.random.seed(self.seed)
        self.correlation = self.generate_correlation()
        self.conditional_variance_weights = [np.random.rand(self.n_context) for _ in range(n_vars)]
        self.conditional_variance_weights = [weights/np.sum(weights) for weights in self.conditional_variance_weights]
        
    def generate_correlation(self):
        np.random.seed(self.seed)
        generated_correlation_eig = np.random.rand(n_vars)
        generated_correlation_eig = generated_correlation_eig/np.sum(generated_correlation_eig)
        generated_correlation_eig *= n_vars
        correlation = random_correlation.rvs(generated_correlation_eig)
        return correlation
    
    def conditional_covariance(self, context):
        conditional_variances = np.array([context @ weights for weights in self.conditional_variance_weights])
        inverse_square_root_variances = np.sqrt(1/conditional_variances)
        matrix_of_isqv = np.diag(inverse_square_root_variances) # matrix of inverse square root variances as diagonal
        conditional_covariance = matrix_of_isqv@self.correlation@matrix_of_isqv
        return conditional_covariance
    
    
    def generate_context(self, size, lb = 0.1, ub = 1.):
        # generates context z for problem
        # z should be > 0 so the conditional variances make sense (convex combinations)
        assert lb > 0, "the lower bound of the generated context has to be greater than 0"
        
        np.random.seed(self.seed)
        z = np.random.uniform(low =lb, high = ub, size = (size,n_context))
        
        return z
    
    def generate_dataset(self, size, lb = 0.1, ub = 1., wishart_df = 100):
        np.random.seed(self.seed)
        
        z = self.generate_context(size = size, lb = lb, ub = ub)
        
        conditional_covariances = np.apply_along_axis(dgp.conditional_covariance, 1, z)
        
        wishart_outcomes = [wishart(df = wishart_df, scale=conditional_covariances[i,...]).rvs()/wishart_df for 
                            i in range(size)
                           ]
        
        wishart_outcomes = np.array(wishart_outcomes)
        
        # Fixed objective throughout
        c = np.random.rand(n_vars,1)
        
        return z, wishart_outcomes, c
    
## Build prediction model that outputs symmetric matrices and in this case a vector for the cost function
class NNMatrixOutput(nn.Module):
    def __init__(self, n_context, hidden_size, n_vars):
        super(NNMatrixOutput, self).__init__()
        assert n_vars >= 2, "not designed for outputing scalars"
        self.n_vars = n_vars
        self.latent = nn.Linear(n_context, hidden_size)
        self.relu = nn.ReLU()
        self.diagonal = nn.Linear(hidden_size, n_vars)
        
        n_lower_triangular_elements = int(n_vars*(n_vars - 1)/2)
        self.lower_triangular = nn.Linear(hidden_size,n_lower_triangular_elements)
        
        self.c_predict = nn.Linear(hidden_size, n_vars)

    def forward(self, z):
        
        z = self.latent(z)
        z = self.relu(z)
        diagonal = self.diagonal(z)
        lower_triangular = self.lower_triangular(z)
        
        # construct predicted matrix 
        matrix = torch.diag_embed(diagonal)
        tind = torch.tril_indices(diagonal.shape[-1],diagonal.shape[-1],offset = -1)

        matrix[...,tind[0],tind[1]] = lower_triangular
        matrix[...,tind[1],tind[0]] = lower_triangular
        
        # predict conditional cost vector
        chat = self.c_predict(z).unsqueeze(-1)

        return {"Sigma" : matrix, "c" : chat}
    
class GeneratedDataset(Dataset):
    def __init__(self, z, Sigma):
        self.z = torch.tensor(z).float()
        self.Sigma = torch.tensor(Sigma).float()

    def __len__(self):
        return self.z.shape[0]

    def __getitem__(self, idx):
        sample = {'z': self.z[idx], 'Sigma': self.Sigma[idx]}
        return sample
    

In [4]:
## Build parameterised optimisation problem
## Construct ROB_1 and ROB_2

# outer problem, ROB_1
x_det = cp.Variable((n_vars,1), name= "decision")
# Define parameters for problem (the variables of interest for optimisation)
c_param = [cp.Parameter((n_vars,1))]
Sigma_sqrt_param = [cp.Parameter((n_vars,n_vars))] # Needs to be defined this way for cvxpy DPP compliance https://www.cvxpy.org/tutorial/advanced/index.html

# construct problem
constraints = []
constraints += [cp.sum_squares(Sigma_sqrt_param[0]@x_det) <= risk_limit]
constraints += [cp.sum(x_det) <= 1]
constraints += [x_det >= 0]
objective = cp.Minimize(-c_param[0].T@x_det)

outer_problem = cp.Problem(objective,constraints)

ROB_1 = CvxpyLayer(outer_problem, parameters=c_param + Sigma_sqrt_param, variables=[x_det])

# inner problem, ROB_2
x_fixed = cp.Parameter((n_vars,1))
x_xt_fixed = cp.Parameter((n_vars,n_vars))

sigma_realisation = cp.Parameter((n_vars,n_vars))
sigma_addition = cp.Variable((n_vars,n_vars), symmetric = True)
sigma_worst_case_in_neighbourhood =cp.Variable((n_vars,n_vars))
constraints = []
constraints += [sigma_worst_case_in_neighbourhood == sigma_realisation + sigma_addition]
constraints += [cp.norm(sigma_addition,"fro") <= robustness_parameter]

objective_function = penalty_coefficient*(cp.sum(cp.multiply(x_xt_fixed,sigma_worst_case_in_neighbourhood))-risk_limit)
objective = cp.Maximize(objective_function)

inner_problem = cp.Problem(objective,constraints)

ROB_2 = CvxpyLayer(inner_problem, parameters=[x_xt_fixed,sigma_realisation], variables=[sigma_addition,sigma_worst_case_in_neighbourhood])



In [5]:
### Build experiment pipeline
dgp = ConditionalProcessGenerator(n_vars = n_vars, n_context = n_context, seed = seed)

# Generate dataset
z, Sigmas, c = dgp.generate_dataset(size = datset_size)
c = torch.tensor(c).float()
# Split into training and test sets and feed into dataloaders
z_train, z_test, Sigmas_train, Sigmas_test = train_test_split(z, Sigmas, 
                                                              test_size=test_proportion, 
                                                              random_state=seed)

train = GeneratedDataset(z_train, Sigmas_train)
test = GeneratedDataset(z_test, Sigmas_test)

train_dataloader = DataLoader(train, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test, batch_size=z_test.shape[0], shuffle=False)

In [6]:
# Initialise prediction model
torch.manual_seed(seed)

PR = NNMatrixOutput(n_context = n_context, hidden_size = hidden_size, n_vars=n_vars)

PR_no_robust = NNMatrixOutput(n_context = n_context, hidden_size = hidden_size, n_vars=n_vars)


# Initialise optimisers
optimiser = torch.optim.Adam(PR.parameters(), lr = learning_rate)

optimiser_no_robust = torch.optim.Adam(PR_no_robust.parameters(), lr = learning_rate)

# Training loop
for epoch in tqdm(range(epochs)):
    for batch in train_dataloader:
        optimiser.zero_grad()
    
        pred = PR(batch["z"])
        
        x_det_opt, = ROB_1(pred["c"],pred["Sigma"]) # here we predict the square root of the matrix
        x_xt_opt = x_det_opt @ x_det_opt.permute(0,2,1)
        
        wc_opt = ROB_2(x_xt_opt,batch["Sigma"])
        worst_case_sigma = wc_opt[1]
        
        loss = 0 
        loss_p = (-c.T@x_det_opt).view(-1,1)
        loss += loss_p
        
        loss_violation = 0
        
        violation = torch.sum((worst_case_sigma@x_xt_opt).diagonal(offset=0, dim1=-2, dim2=-1),axis = -1) - risk_limit
        loss_violation = (nn.ReLU()(violation)).view(-1,1)
        loss += loss_violation
        
        loss = torch.sum(loss)
        
        loss.backward()
        optimiser.step()
            
# Training loop for no robust case
for epoch in tqdm(range(epochs)):
    for batch in train_dataloader:
        
        # Optimise a network without robust regulariser
        optimiser_no_robust.zero_grad()
    
        pred_nr = PR_no_robust(batch["z"])
        
        x_det_opt, = ROB_1(pred_nr["c"],pred_nr["Sigma"]) # here we predict the square root of the matrix
        x_xt_opt = x_det_opt @ x_det_opt.permute(0,2,1)
        
        
        loss = 0 
        loss_p = (-c.T@x_det_opt).view(-1,1)
        loss += loss_p
        
        loss_violation = 0
        
        violation = torch.sum((batch["Sigma"]@x_xt_opt).diagonal(offset=0, dim1=-2, dim2=-1),axis = -1) - risk_limit
        loss_violation = (nn.ReLU()(violation)).view(-1,1)
        loss += loss_violation
        
        loss = torch.sum(loss)
        
        loss.backward()
        optimiser_no_robust.step()
        


  0%|          | 0/200 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

In [7]:
# Evaluation
with torch.no_grad():
    for batch in test_dataloader:
        
        pred = PR(batch["z"])
        pred_nr = PR_no_robust(batch["z"])
        
        x_ro, = ROB_1(pred["c"],pred["Sigma"])
        xxt_ro = x_ro @ x_ro.permute(0,2,1)
        violation_ro = torch.sum((batch["Sigma"]@xxt_ro).diagonal(offset=0, dim1=-2, dim2=-1),axis = -1) - risk_limit

        x_nr, = ROB_1(pred_nr["c"],pred_nr["Sigma"])
        xxt_nr = x_nr @ x_nr.permute(0,2,1)
        violation_nr = torch.sum((batch["Sigma"]@xxt_nr).diagonal(offset=0, dim1=-2, dim2=-1),axis = -1) - risk_limit

        

In [8]:
print(f"Number of constraint violations in test set with ROB_2: {torch.sum(violation_ro >= 0).numpy()} out of {z_test.shape[0]}")

Number of constraint violations in test set with ROB_2: 0 out of 30


In [9]:
print(f"Number of constraint violations in test set without ROB_2: {torch.sum(violation_nr >= 0).numpy()} out of {z_test.shape[0]}")

Number of constraint violations in test set without ROB_2: 13 out of 30


In [10]:
average_increase = torch.mean(((-c.T@x_nr) - (-c.T@x_ro))/(-c.T@x_nr)).numpy()*100
print(f"The average change in the objective is", "%.1f" % average_increase,"%.")

The average change in the objective is 37.3 %.
