### **Code implementation for DFL in partial index tracking**

**1. Install & Import Packages**

In [None]:
# Install packages for CvxpyLayer, Gurobipy

!pip install cvxpylayers
!pip install gurobipy



In [None]:
# Import packages

import pandas as pd
import numpy as np
import cvxpy as cp
from tqdm import tqdm
from cvxpylayers.torch import CvxpyLayer
import argparse
import torch
import operator
from copy import deepcopy
import random
from functools import reduce
import gurobipy as gp

**2. Util functions**

In [None]:
class View(torch.nn.Module):

    def __init__(self, shape):

        super().__init__()
        self.shape = shape

    def __repr__(self):

        return f'View{self.shape}'

    def forward(self, input):
        """
        Reshapes the input according to the shape saved in the view data structure.
        """

        batch_size = input.shape[:-1]
        shape = (*batch_size, *self.shape)
        out = input.view(shape)

        return out

def dense_nn(
    num_features,
    num_targets,
    num_layers,
    intermediate_size=10,
    activation='relu',
    output_activation='sigmoid',
):
    """
    Construct feedforward neural networks
    :param num_features: input dimension of networks
    :param num_targets: output dimension of networks
    :return: Feedforward networks model

    """
    if num_layers > 1:

        # The number of nodes
        if intermediate_size is None:
            intermediate_size = max(num_features, num_targets)

        # Activation function
        if activation == 'relu':
            activation_fn = torch.nn.ReLU
        elif activation == 'sigmoid':
            activation_fn = torch.nn.Sigmoid
        else:
            raise Exception('Invalid activation function: ' + str(activation))

        # Stack layers
        net_layers = [torch.nn.Linear(num_features, intermediate_size), activation_fn()]
        for _ in range(num_layers - 2):
            net_layers.append(torch.nn.Linear(intermediate_size, intermediate_size))
            net_layers.append(activation_fn())
        if not isinstance(num_targets, tuple):
            net_layers.append(torch.nn.Linear(intermediate_size, num_targets))
        else:
            net_layers.append(torch.nn.Linear(intermediate_size, reduce(operator.mul, num_targets, 1)))
            net_layers.append(View(num_targets))
    else:

        # Stack layers
        if not isinstance(num_targets, tuple):
            net_layers = [torch.nn.Linear(num_features, num_targets)]
        else:
            net_layers = [torch.nn.Linear(num_features, reduce(operator.mul, num_targets, 1)), View(num_targets)]

    # Output activation function
    if output_activation == 'relu':
        net_layers.append(torch.nn.ReLU())
    elif output_activation == 'sigmoid':
        net_layers.append(torch.nn.Sigmoid())
    elif output_activation == 'tanh':
        net_layers.append(torch.nn.Tanh())
    elif output_activation == 'softmax':
        net_layers.append(torch.nn.Softmax(dim=-1))

    return torch.nn.Sequential(*net_layers)

**2. Index tracking problem class**

You can download data at the following link:

https://github.com/Bridge-PO/ICAIF_25

Before running codes, pleas upload Xs.npy, Ys.npy, cap_weights.npy files in folder in Colab Webpage

`Data structure`

Processed with KOSPI30 from 2015-01-02 ~ 2024-12-30

[1] Xs -> [2074 (Total days), 30 (Num stocks), 28 (Num features)]

Feature list (Wang et al. 2020)

1) Return i days before rebalancing day (i=1,2,3,...,10)

2) Cumulative return over the past period j (j= week, month, quarter, year)

3) Mean and variance of returns over the past week

4) Mean and variance of prices over the past period (3day,week,month,quarter,year)

5) Current return

6) Current price

[2] Ys -> [2074 (Total days), 30 (Num stocks)] -> Cumulative return for next 19 days

[3] cap_weight -> [2074 (Total days), 30 (Num stocks)] -> Market cap weight

***Reference***

Wang, K., Wilder, B., Perrault, A., & Tambe, M. (2020). Automatically learning compact quality-aware surrogates for optimization problems. Advances in Neural Information Processing Systems, 33, 9586-9596.

In [None]:
class IndexTracking():

  def __init__(self, n_train, n_test, k):
    """
    Initialize index tracking problem
    :param n_train: number of training data
    :param n_test: number of test data
    :param k: Cardinality k
    """

    # Load data
    self.Xs = torch.tensor(np.load('Xs.npy'))
    self.Ys = torch.tensor(np.load('Ys.npy'))
    self.cap_weights = torch.tensor(np.load('cap_weights.npy'))
    self.k = k

    # Train, test split
    self.n_train = n_train
    self.n_test = n_test

    idxs = list(range(n_train + n_test))
    self.train_idxs = idxs[:self.n_train]
    self.test_idxs = idxs[self.n_train:]
    self.n_stock = self.Xs.shape[1]

  def _load_semi_definite_problem(self):
    """
    Make dictionary of semi-definite index tracking problems
    :return: Dictionary of index tracking problems
    """

    prob_dict = []

    # Train set
    for idx in tqdm(self.train_idxs, desc="Train problem generation>>>"):
      cvxpy_prob = self._create_cvxpy_problem(self.cap_weights[idx])
      prob_dict.append(cvxpy_prob)

    return prob_dict

  def _create_cvxpy_problem(self, cap_weight):
    """
    For designated trading day, make cvxpy problem
    :param cap_weight: Market cap weight for designated trading day
    :return: cvxpy problem
    """

    # Semi-definite relaxation with Cvxpy
    W = cp.Variable((self.n_stock, self.n_stock), PSD = True)
    R_hat = cp.Parameter((self.n_stock, self.n_stock))
    objective = cp.Minimize((cp.trace(R_hat@W)-2*cp.sum(W@R_hat@cap_weight)))
    constraints = [cp.sum(W) == 1,
                   cp.sum(cp.abs(W)) <= self.k * cp.trace(W),
                   W>>0]
    problem = cp.Problem(objective, constraints)

    return CvxpyLayer(problem, parameters=[R_hat], variables=[W])

  def _get_objective(self, Yhats, Y, cap_weight, data_partition=None, data_num=None, loss_type='surrogate'):
    """
    Get decision objective value of index tracking for true Y, prediction Yhats
    If loss_type is 'relaxation', solve semi-definite formulation
    If loss_type is 'surrogate', solve basic formulation
    :param Yhats: Predicted return
    :param Y: Asset returns
    :param cap_weight: Market cap weight
    :param data_partition: 'train', 'val', 'test'
    :param data_num: index of data (Only for train and test)
    :param loss_type: 'relaxation', 'surrogate'
    :return: Objective value
    """

    if loss_type == 'relaxation': # Solve semi-definite formulation

      # Make r to R (=rr^T)
      Y_sq_hats = torch.outer(Yhats, Yhats)
      Y_sq = torch.outer(Y, Y)

      problem = self.prob_dict[data_num]
      W = problem(Y_sq_hats, solver_args = {'max_iters':int(5e3),'eps_abs':1e-5,'eps_rel':1e-5})[0]

      obj = torch.trace(Y_sq@W)-2*torch.sum(W@Y_sq@cap_weight)

      return obj

    elif loss_type == 'surrogate': # Solve basic formulation

      target = (Y @ cap_weight).sum()

      w = cp.Variable(Y.shape[0])
      z = cp.Variable(Y.shape[0], boolean=True)

      # Get optimal portfolio under prediction Yhats
      objective = cp.Minimize((w @ Yhats - target) ** 2)
      constraints = [cp.sum(w) == 1,
                    cp.sum(z) == self.k,
                    w <= z,
                    w >= -z]
      problem = cp.Problem(objective, constraints)
      problem.solve(solver='GUROBI')
      w_dfl = w.value

      # Evaluate obtained portfoilo under true return Y
      new_objective = cp.Minimize((w @ Y - target) ** 2)
      new_problem = cp.Problem(new_objective, constraints)
      new_problem.solve(solver='GUROBI')

      # Decision Loss
      obj = (w_dfl @ Y.detach().numpy() - target) ** 2 - new_problem.value

      return obj

  def _loss_fn(self, Yhats, Ys, aux, data_partition, data_num, loss_type):
    """
    Loss function
    :param Yhats: Predicted return
    :param Ys: Actual return
    :param aux: Market cap weight
    :param data_partition: 'train', 'val', 'test'
    :param data_num: index of data
    :param loss_type: 'relaxation', 'surrogate'
    :return: Loss value
    """

    dflalpha = 1.0

    if loss_type == 'relaxation':
      obj = self._get_objective(Yhats, Ys, aux, data_partition, data_num, loss_type)

    elif loss_type == 'surrogate':
      surrogate_input = torch.cat([Yhats, Ys], dim=-1)
      obj = self.surro_model(surrogate_input)

    loss = dflalpha * obj + (Yhats - Ys).square().mean()

    return loss

  def _get_surrogate_loss(self, n_samples):
    """
    Get surrogate loss
    :param n_samples: number of samples per Y
    :return: Surrogate loss
    """

    # Get data
    _, Y_train, Y_train_aux = self.get_train_data()

    # Sample Yhats around Y, and accumulate objective value
    Y_samples_list = []
    Y_trains_list = []
    objs_list = []

    bar = tqdm(range(Y_train.shape[0]), desc='Generating Samples')
    for idx in bar:
      y = Y_train[idx]

      noise = torch.FloatTensor(n_samples, y.shape[0]).uniform_(-0.1, 0.1)
      y_samples = y.unsqueeze(0) + noise

      for i in range(n_samples):
        y_sample = y_samples[i]
        obj = self._get_objective(y_sample,
                                  y,
                                  Y_train_aux[idx])

        Y_samples_list.append(y_sample)
        Y_trains_list.append(y)
        objs_list.append(obj)

    Y_samples = torch.stack(Y_samples_list)
    Y_trains = torch.stack(Y_trains_list)
    Objs = torch.tensor(objs_list).float().unsqueeze(1)

    # Learn Surrogate Model
    # Simple Networks Model
    def surrogate_nn():

      return torch.nn.Sequential(
          torch.nn.Linear(Y_samples.shape[1] * 2, 64),
          torch.nn.ReLU(),
          torch.nn.Linear(64, 64),
          torch.nn.ReLU(),
          torch.nn.Linear(64, 1),
          torch.nn.ReLU()
      )

    surro_model = surrogate_nn()
    optimizer = torch.optim.Adam(surro_model.parameters(), lr=0.001)
    mse = torch.nn.MSELoss()

    inputs = torch.cat([Y_samples, Y_trains], dim=1)  # [N, 2*dim]

    bar = tqdm(range(100))
    for epoch in bar:
        surro_model.train()
        optimizer.zero_grad()
        preds = surro_model(inputs)
        loss = mse(preds, Objs)
        bar.set_description(f'Loss of Surrogate Model: {loss}')
        loss.backward()
        optimizer.step()

    return surro_model

  def get_train_data(self):
    """
    Get train data
    :return: Train data
    """
    return self.Xs[self.train_idxs,:self.n_stock,:], self.Ys[self.train_idxs,:self.n_stock], self.cap_weights[self.train_idxs,:self.n_stock]

  def get_test_data(self):
    """
    Get test data
    :return: Test data
    """
    return self.Xs[self.test_idxs,:self.n_stock,:], self.Ys[self.test_idxs,:self.n_stock], self.cap_weights[self.test_idxs,:self.n_stock]

**3. Load Data**

In [None]:
# Argument for indextracking problem
problem_kwargs = {'n_train': 50, # Days
                  'n_test': 50,  # Days
                  'k': 6} # Cardinality

problem = IndexTracking(**problem_kwargs)

# Get data [day,stock,feature]
X_train, Y_train, Y_train_aux = problem.get_train_data()
X_test, Y_test, Y_test_aux = problem.get_test_data()

**4-1. DFL: Semi-definite Relaxation approach**

In [None]:
# Load semi-definite problems for training
problem.prob_dict = problem._load_semi_definite_problem()

# Hyperparameters
n_layers = 2
n_nodes = 10
epochs = 100
batch_size = 32
learning_rate = 0.001

# Load predictive model
ipdim, opdim = problem.Xs.shape[-1], 1

model_1 = dense_nn(
    num_features=ipdim,
    num_targets=opdim,
    num_layers=n_layers,
    intermediate_size=n_nodes,
    output_activation='tanh'
)
optimizer = torch.optim.Adam(model_1.parameters(), lr=learning_rate)

# Learning
bar = tqdm(range(epochs), desc='Training')
for epoch in bar:

    losses = []
    for i in random.sample(range(len(X_train)), min(batch_size, len(X_train))):
        pred = model_1(X_train[i]).squeeze()
        losses.append(problem._loss_fn(pred,
                                        Y_train[i],
                                        aux=Y_train_aux[i],
                                        data_partition='train',
                                        data_num=i,
                                       loss_type='relaxation'))
    losses = torch.stack(losses)
    loss = losses.nansum()/(len(losses)-losses.isnan().sum())
    bar.set_description(f'Loss of Predictive Model : {loss}')
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

Train problem generation>>>: 100%|██████████| 50/50 [00:12<00:00,  4.02it/s]
Loss of Predictive Model : 0.01693020388484001: 100%|██████████| 100/100 [03:29<00:00,  2.09s/it]


**4-1. DFL: Surrogate Approach**

In [None]:
# Traing surrogate loss model
problem.surro_model = problem._get_surrogate_loss(n_samples = 50)

# Hyperparameters
n_layers = 2
n_nodes = 10
epochs = 100
batch_size = 32
learning_rate = 0.001

# Load predictive model
ipdim, opdim = problem.Xs.shape[-1], 1

model_2 = dense_nn(
    num_features=ipdim,
    num_targets=opdim,
    num_layers=n_layers,
    intermediate_size=n_nodes,
    output_activation='tanh'
)
optimizer = torch.optim.Adam(model_2.parameters(), lr=learning_rate)

# Learning
bar = tqdm(range(epochs), desc='Training')
for epoch in bar:

    losses = []
    for i in random.sample(range(len(X_train)), min(batch_size, len(X_train))):
        pred = model_2(X_train[i]).squeeze()
        losses.append(problem._loss_fn(pred,
                                        Y_train[i],
                                        aux=Y_train_aux[i],
                                        data_partition='train',
                                        data_num=i,
                                       loss_type='surrogate'))
    losses = torch.stack(losses)
    loss = losses.nansum()/(len(losses)-losses.isnan().sum())
    bar.set_description(f'Loss of Predictive Model : {loss}')
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

Generating Samples: 100%|██████████| 50/50 [01:15<00:00,  1.51s/it]
Loss of Surrogate Model: 0.00017575877427589148: 100%|██████████| 100/100 [00:00<00:00, 104.73it/s]
Loss of Predictive Model : 0.017560550943017006: 100%|██████████| 100/100 [00:04<00:00, 23.62it/s]


**4. Result**

In [None]:
# Over the test set, you can check mean of decision loss for two kinds of approaches

obj1_list = []
obj2_list = []

bar = tqdm(range(len(X_test)))
for i in bar:

  pred1 = model_1(X_test[i]).squeeze().detach().numpy()
  pred2 = model_2(X_test[i]).squeeze().detach().numpy()

  obj1 = problem._get_objective(pred1, Y_test[0], Y_test_aux[0])
  obj1_list.append(obj1)

  obj2 = problem._get_objective(pred2, Y_test[0], Y_test_aux[0])
  obj2_list.append(obj2)

  bar.set_description(f'Decision loss of Relaxation Approach : {np.mean(obj1_list)}, Decision loss of Surrogate Approach : {np.mean(obj2_list)}')

Decision loss of Relaxation Approach : 0.04512812942266464, Decision loss of Surrogate Approach : 0.0341593436896801: 100%|██████████| 50/50 [00:03<00:00, 14.25it/s]
