In [86]:
import cvxpy as cp
import numpy as np

# Risk functions module
#
# This module defines the financial risk measures to be used in the optimization layer of the E2E
# problem.
#


def p_var(z: cp.Expression, c: float, x: np.ndarray) -> cp.Expression:
    """
    Compute the squared error for the given input.

    :param z: A cvxpy expression (decision variable)
    :param c: A constant threshold or target value
    :param x: A numpy array (weights or features)
    :return: The squared error expression
    """
    return cp.square(x @ z - c)


def p_mad(z: cp.Expression, c: float, x: np.ndarray) -> cp.Expression:
    """
    Compute the mean absolute deviation for the given input.

    :param z: A cvxpy expression (decision variable)
    :param c: A constant threshold or target value
    :param x: A numpy array (weights or features)
    :return: The absolute deviation expression
    """
    return cp.abs(x @ z - c)


# Define test data
z = cp.Variable(3)  # Decision variable (portfolio weights)
c = 0.02  # Centering parameter (expected return)
x = np.array(
    [
        [0.05, 0.02, -0.01],  # Realized returns for multiple scenarios
        [0.03, -0.01, 0.04],
        [-0.02, 0.01, 0.01],
    ]
)
if __name__ == "__main__":

    # Test variance function (p_var)
    print("\nTesting p_var...")
    var_expr = p_var(z, c, x[0])  # Apply p_var to the first row of x
    objective_var = cp.Minimize(var_expr)  # Minimize the variance
    constraints = [
        cp.sum(z) == 1,
        z >= 0,
    ]  # Portfolio constraints: sum of weights = 1, weights >= 0
    problem_var = cp.Problem(objective_var, constraints)
    var_opt_value = problem_var.solve()

    # Output results for variance minimization
    print("Optimized portfolio weights (Variance):", z.value)
    print("Variance objective value:", var_opt_value)

    # Reinitialize decision variable for MAD problem
    z = cp.Variable(3)

    # Test MAD function (p_mad)
    print("\nTesting p_mad...")
    mad_expr = p_mad(z, c, x[0])  # Apply p_mad to the first row of x
    objective_mad = cp.Minimize(mad_expr)  # Minimize the MAD
    problem_mad = cp.Problem(objective_mad, constraints)
    mad_opt_value = problem_mad.solve()

    # Output results for MAD minimization
    print("Optimized portfolio weights (MAD):", z.value)
    print("MAD objective value:", mad_opt_value)



Testing p_var...
Optimized portfolio weights (Variance): [0.33333333 0.33333333 0.33333333]
Variance objective value: 0.0

Testing p_mad...
Optimized portfolio weights (MAD): [ 0.13355262  0.33328474 -0.66566742]
MAD objective value: 3.469446951953614e-18


In [87]:
import torch
from torch import Tensor


# Performance loss functions with type hints and improved comments
def single_period_loss(z_star: Tensor, y_perf: Tensor) -> Tensor:
    """
    Calculate the single-period loss based on the out-of-sample portfolio return.

    This function computes the out-of-sample portfolio return for a given portfolio over the next
    time step. It computes the loss as the negative return since optimization typically focuses
    on minimizing the loss, and maximizing returns translates into minimizing negative returns.

    :param z_star: Tensor of shape (n_y, 1) representing the optimal portfolio weights.
    :param y_perf: Tensor of shape (perf_period, n_y) representing the realized returns.
    :return: A scalar tensor representing the realized return at the first time step (negative).
    """
    # Calculate the portfolio return for the first time step and negate it (since we want to minimize loss)
    return -y_perf[0] @ z_star


def single_period_over_var_loss(z_star: Tensor, y_perf: Tensor) -> Tensor:
    """
    Calculate the loss as the portfolio return divided by the portfolio's volatility.

    This function computes the portfolio return at the first time step and divides it by the
    realized volatility (standard deviation) of the portfolio returns over the performance period.
    This provides a return-over-risk measure, which is often used in portfolio analysis.

    :param z_star: Tensor of shape (n_y, 1) representing the optimal portfolio weights.
    :param y_perf: Tensor of shape (perf_period, n_y) representing the realized returns.
    :return: A scalar tensor representing the return over realized volatility (negative).
    """
    # Calculate the portfolio returns over the entire performance period
    portfolio_returns = y_perf @ z_star
    # Calculate the standard deviation (volatility) of the portfolio returns, adding epsilon for numerical stability
    volatility = torch.std(portfolio_returns, unbiased=True) + 1e-6
    # Calculate the return at the first time step and divide by the volatility, then negate for loss
    return -portfolio_returns[0] / volatility


def sharpe_loss(z_star: Tensor, y_perf: Tensor) -> Tensor:
    """
    Calculate the loss based on the Sharpe ratio over a performance period.

    This function computes a simplified Sharpe ratio, which is the ratio of the mean portfolio
    return to its standard deviation (volatility) over the performance period. The loss is defined
    as the negative Sharpe ratio to allow for minimization.

    :param z_star: Tensor of shape (n_y, 1) representing the optimal portfolio weights.
    :param y_perf: Tensor of shape (perf_period, n_y) representing the realized returns.
    :return: A scalar tensor representing the negative Sharpe ratio.
    """
    # Calculate the portfolio returns over the entire performance period
    portfolio_returns = y_perf @ z_star
    # Calculate the mean return of the portfolio
    mean_return = torch.mean(portfolio_returns)
    # Calculate the standard deviation (volatility) of the portfolio returns, adding epsilon for numerical stability
    volatility = torch.std(portfolio_returns, unbiased=True) + 1e-6
    # Calculate the Sharpe ratio and negate it for loss
    return -mean_return / volatility


if __name__ == "__main__":
    # Example portfolio weights (3 assets)
    z_star = torch.tensor([0.3, 0.5, 0.2])
    # Realized returns for 3 assets over 3 periods
    y_perf = torch.tensor(
        [
            [0.01, 0.02, -0.01],
            [0.03, -0.01, 0.04],
            [0.02, 0.01, 0.01],
        ]
    )

    # Test the single-period loss function
    print("Testing single_period_loss...")
    loss_sp = single_period_loss(z_star, y_perf)
    print(f"Single period loss: {loss_sp.item()}")

    # Test the single-period-over-volatility loss function
    print("\nTesting single_period_over_var_loss...")
    loss_sp_var = single_period_over_var_loss(z_star, y_perf)
    print(f"Single period loss over volatility: {loss_sp_var.item()}")

    # Test the Sharpe ratio loss function
    print("\nTesting sharpe_loss...")
    loss_sharpe = sharpe_loss(z_star, y_perf)
    print(f"Sharpe ratio loss: {loss_sharpe.item()}")

Testing single_period_loss...
Single period loss: -0.010999999940395355

Testing single_period_over_var_loss...
Single period loss over volatility: -10.989008903503418

Testing sharpe_loss...
Sharpe ratio loss: -11.988011360168457


In [88]:
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset

####################################################################################################
# SlidingWindow Dataset to index data using a sliding window
####################################################################################################


class SlidingWindow(Dataset):
    """Dataset class for creating a sliding window from time series data."""

    def __init__(
        self,
        X: pd.DataFrame,
        Y: pd.DataFrame,
        n_obs: int,
        perf_period: int,
        dtype: torch.dtype = torch.float32,
        device: torch.device = torch.device("cpu"),
    ) -> None:
        """
        Initialize the SlidingWindow dataset.

        :param X: DataFrame containing the complete feature dataset.
        :param Y: DataFrame containing the complete asset return dataset.
        :param n_obs: Number of observations in the sliding window.
        :param perf_period: Number of future observations used for out-of-sample performance evaluation.
        :param dtype: The desired data type for tensors (default is torch.float32).
        :param device: Device on which to place the tensors (e.g., 'cpu' or 'cuda' for GPU).
        """
        self.X = torch.tensor(
            X.values, dtype=dtype, device=device
        )  # Convert feature dataset to tensor
        self.Y = torch.tensor(
            Y.values, dtype=dtype, device=device
        )  # Convert asset return dataset to tensor
        self.n_obs = n_obs  # Number of observations in the sliding window
        self.perf_period = (
            perf_period  # Number of future observations for performance evaluation
        )

    def __getitem__(
        self, index: int
    ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Retrieve a single window of data.

        :param index: Index of the sliding window.
        :return: Tuple (x, y, y_perf):
            x: Features window of shape (n_obs + 1, n_x).
            y: Realizations window of shape (n_obs, n_y).
            y_perf: Future performance window of shape (perf_period, n_y).
        """
        # Retrieve features for the sliding window (n_obs + 1 observations)
        x = self.X[index : index + self.n_obs + 1]
        # Retrieve asset returns for the sliding window (n_obs observations)
        y = self.Y[index : index + self.n_obs]
        # Retrieve future performance data (perf_period observations)
        y_perf = self.Y[index + self.n_obs : index + self.n_obs + self.perf_period]
        return (x, y, y_perf)

    def __len__(self) -> int:
        """
        Return the number of windows that can be created from the dataset.

        :return: Length of the dataset, considering the sliding windows.
        """
        return (
            len(self.X) - self.n_obs - self.perf_period
        )  # Total number of sliding windows available


####################################################################################################
# Backtest class to store out-of-sample results
####################################################################################################


class Backtest:
    """Class to store out-of-sample results for a backtest."""

    def __init__(self, len_test: int, n_y: int, dates: pd.DatetimeIndex) -> None:
        """
        Initialize the Backtest object.

        :param len_test: Number of scenarios in the out-of-sample evaluation period.
        :param n_y: Number of assets in the portfolio.
        :param dates: DatetimeIndex containing the corresponding dates.
        """
        self.weights = np.zeros(
            (len_test, n_y)
        )  # Initialize portfolio weights over time
        self.rets = np.zeros(
            len_test
        )  # Initialize realized portfolio returns over time
        self.dates = dates[
            -len_test:
        ]  # Keep only the dates for the out-of-sample period

    def stats(self) -> None:
        """
        Compute and store the cumulative returns, mean return, volatility, and Sharpe ratio.

        This method calculates key performance metrics of the portfolio, including:
        - Cumulative returns (Total Return Index), which show the total growth of the portfolio over time.
        - Annualized mean return, which is an estimate of the average return the portfolio would achieve per year.
        - Volatility, which measures the risk by calculating the standard deviation of returns.
        - Sharpe ratio, which indicates the risk-adjusted return of the portfolio.
        """
        # Calculate cumulative returns (Total Return Index)
        tri = np.cumprod(self.rets + 1)
        # Calculate the annualized mean return using the final cumulative return and the number of periods
        self.mean = (tri[-1]) ** (1 / len(tri)) - 1
        # Calculate the volatility (standard deviation) of the portfolio returns
        self.vol = np.std(self.rets)
        # Calculate the Sharpe ratio (mean return divided by volatility)
        self.sharpe = self.mean / self.vol
        # Create a DataFrame containing realized returns and cumulative returns, indexed by dates
        if len(self.dates) == len(self.rets):
            self.rets = pd.DataFrame(
                {"Date": self.dates, "rets": self.rets, "tri": tri}
            ).set_index("Date")
        else:
            raise ValueError("Length of dates and returns must be equal.")


####################################################################################################
# InSample class to store in-sample results
####################################################################################################


class InSample:
    """Class to store the in-sample results of neural network training."""

    def __init__(self) -> None:
        """
        Initialize the InSample object.
        """
        self.loss = []  # List to hold training losses
        self.gamma = []  # List to hold gamma values (hyperparameter)
        self.delta = []  # List to hold delta values (hyperparameter)
        self.val_loss = []  # List to hold validation losses (optional)

    def df(self) -> pd.DataFrame:
        """
        Return a DataFrame containing the training statistics.

        :return: DataFrame with columns representing different metrics during training.
        """
        # Return a DataFrame based on available data, adjusting columns accordingly
        if not self.delta and not self.val_loss:
            return pd.DataFrame(
                list(zip(self.loss, self.gamma)), columns=["loss", "gamma"]
            )
        elif not self.delta:
            return pd.DataFrame(
                list(zip(self.loss, self.val_loss, self.gamma)),
                columns=["loss", "val_loss", "gamma"],
            )
        elif not self.val_loss:
            return pd.DataFrame(
                list(zip(self.loss, self.gamma, self.delta)),
                columns=["loss", "gamma", "delta"],
            )
        else:
            return pd.DataFrame(
                list(zip(self.loss, self.val_loss, self.gamma, self.delta)),
                columns=["loss", "val_loss", "gamma", "delta"],
            )


####################################################################################################
# CrossVal class to store cross-validation results
####################################################################################################


class CrossVal:
    """Class to store cross-validation results of neural network training."""

    def __init__(self) -> None:
        """
        Initialize the CrossVal object.
        """
        self.lr = []  # List to hold learning rates
        self.epochs = []  # List to hold the number of epochs in each run
        self.val_loss = []  # List to hold validation losses

    def df(self) -> pd.DataFrame:
        """
        Return a DataFrame containing the cross-validation statistics.

        :return: DataFrame with learning rate, epochs, and validation loss.
        """
        # Create and return a DataFrame with learning rates, epochs, and validation losses
        return pd.DataFrame(
            list(zip(self.lr, self.epochs, self.val_loss)),
            columns=["lr", "epochs", "val_loss"],
        )


####################################################################################################
# Test code for the Backtest class
####################################################################################################

if __name__ == "__main__":
    # Example usage
    X = pd.DataFrame(
        np.random.randn(100, 3)
    )  # Create feature dataset with 100 samples and 3 features
    Y = pd.DataFrame(
        np.random.randn(100, 2)
    )  # Create asset return dataset with 100 samples and 2 assets
    dates = pd.date_range(start="2020-01-01", periods=100, freq="D")

    # Check if GPU is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize SlidingWindow with given parameters
    n_obs = 10
    perf_period = 5
    sliding_window = SlidingWindow(X, Y, n_obs, perf_period, device=device)

    # Fetch a sample window
    print("Testing SlidingWindow...")
    x, y, y_perf = sliding_window[0]
    print(f"x (features): {x.shape}")
    print(f"y (realizations): {y.shape}")
    print(f"y_perf (performance window): {y_perf.shape}")

    # Initialize Backtest with given parameters
    len_test = 30
    backtest_obj = Backtest(len_test=len_test, n_y=2, dates=dates)

    # Simulate some portfolio returns
    backtest_obj.rets = np.random.randn(len_test)

    print("\nTesting Backtest...")
    backtest_obj.stats()
    print(backtest_obj.rets.head())
    print(f"Mean return: {backtest_obj.mean:.4f}")
    print(f"Volatility: {backtest_obj.vol:.4f}")
    print(f"Sharpe ratio: {backtest_obj.sharpe:.4f}")

    # Test Backtest stats calculation
    print("\nTesting Backtest stats calculation...")
    backtest_obj.rets = np.array([0.05, -0.02, 0.03, 0.04, -0.01])
    backtest_obj.dates = dates[
        -len(backtest_obj.rets) :
    ]  # Adjust dates to match returns length
    backtest_obj.stats()
    print(backtest_obj.rets.head())
    print(f"Mean return: {backtest_obj.mean:.4f}")
    print(f"Volatility: {backtest_obj.vol:.4f}")
    print(f"Sharpe ratio: {backtest_obj.sharpe:.4f}")

Testing SlidingWindow...
x (features): torch.Size([11, 3])
y (realizations): torch.Size([10, 2])
y_perf (performance window): torch.Size([5, 2])

Testing Backtest...
                rets       tri
Date                          
2020-03-11  0.568441  1.568441
2020-03-12  0.599594  2.508869
2020-03-13 -0.507123  1.236564
2020-03-14 -0.236355  0.944296
2020-03-15 -1.187913 -0.177445
Mean return: -0.3105
Volatility: 1.0426
Sharpe ratio: -0.2978

Testing Backtest stats calculation...
            rets       tri
Date                      
2020-04-05  0.05  1.050000
2020-04-06 -0.02  1.029000
2020-04-07  0.03  1.059870
2020-04-08  0.04  1.102265
2020-04-09 -0.01  1.091242
Mean return: 0.0176
Volatility: 0.0279
Sharpe ratio: 0.6324


In [89]:
# DataLoad module
#
####################################################################################################
# Import libraries
####################################################################################################
import torch
import torch.nn as nn
import pandas as pd
import pandas_datareader as pdr
import numpy as np
from alpha_vantage.timeseries import TimeSeries
import time
import statsmodels.api as sm


####################################################################################################
# TrainTest class
####################################################################################################
class TrainTest:
    def __init__(self, data: pd.DataFrame, n_obs: int, split: list[float]) -> None:
        """
        Object to hold the training, validation, and testing datasets.

        :param data: pandas DataFrame with time series data.
        :param n_obs: Number of observations per batch.
        :param split: List of ratios that control the partition of data into training, testing, and validation sets.
        """
        self.data: pd.DataFrame = data  # Store the input data as a DataFrame
        self.n_obs: int = n_obs  # Set the number of observations per batch
        self.split: list[float] = (
            split  # Set the split ratios for training, validation, and testing
        )

        n_obs_tot: int = self.data.shape[
            0
        ]  # Calculate the total number of observations in the dataset
        numel: np.ndarray = n_obs_tot * np.cumsum(
            split
        )  # Calculate the cumulative number of elements based on split ratios
        self.numel: list[int] = [
            round(i) for i in numel
        ]  # Round the cumulative elements to get the indices for splits

    def split_update(self, split: list[float]) -> None:
        """
        Update the list outlining the split ratio of training, validation, and testing datasets.

        :param split: List of ratios that control the partition of data into training, testing, and validation sets.
        """
        self.split: list[float] = (
            split  # Update the split ratios with the new list provided
        )
        n_obs_tot: int = self.data.shape[
            0
        ]  # Calculate the total number of observations in the dataset
        numel: np.ndarray = n_obs_tot * np.cumsum(
            split
        )  # Calculate the cumulative number of elements based on new split ratios
        self.numel: list[int] = [
            round(i) for i in numel
        ]  # Round the cumulative elements to get the indices for splits

    def train(self) -> pd.DataFrame:
        """
        Return the training subset of observations.

        :return: pandas DataFrame containing the training data subset.
        """
        return self.data[
            : self.numel[0]
        ]  # Return the data from the start up to the end of the training set

    def test(self) -> pd.DataFrame:
        """
        Return the test subset of observations.

        :return: pandas DataFrame containing the test data subset.
        """
        return self.data[
            self.numel[0] - self.n_obs : self.numel[1]
        ]  # Return the data for the test set, including overlap
if __name__ == "__main__":
    # Generate synthetic data for testing
    n_tot = 1000  # Total number of observations
    n_features = 5  # Number of features
    split_ratios = [0.7, 0.3]  # 70% training, 30% testing
    n_obs = 50  # Number of observations per batch

    # Create a synthetic DataFrame with random data
    data = pd.DataFrame(
        np.random.randn(n_tot, n_features),
        columns=[f"Feature_{i}" for i in range(n_features)],
    )

    # Initialize TrainTest object
    train_test_obj = TrainTest(data=data, n_obs=n_obs, split=split_ratios)

    # Test the training data split
    train_data = train_test_obj.train()
    print("Training Data:")
    print(train_data.head())
    print(f"Number of training observations: {len(train_data)}")

    # Test the test data split
    test_data = train_test_obj.test()
    print("\nTest Data:")
    print(test_data.head())
    print(f"Number of test observations: {len(test_data)}")

    # Update split ratios and test again
    new_split_ratios = [0.6, 0.4]  # Update split ratios
    train_test_obj.split_update(split=new_split_ratios)

    # Test the updated training data split
    updated_train_data = train_test_obj.train()
    print("\nUpdated Training Data:")
    print(updated_train_data.head())
    print(f"Number of updated training observations: {len(updated_train_data)}")

    # Test the updated test data split
    updated_test_data = train_test_obj.test()
    print("\nUpdated Test Data:")
    print(updated_test_data.head())
    print(f"Number of updated test observations: {len(updated_test_data)}")

Training Data:
   Feature_0  Feature_1  Feature_2  Feature_3  Feature_4
0  -0.966517   0.417190  -0.688667  -0.062657  -0.563649
1  -0.410931  -0.367224  -0.706757   0.302450  -0.627928
2   0.015687  -0.728732  -1.534299   0.912435   0.499527
3   2.288682  -1.043973  -0.208960  -0.128507   0.989369
4  -0.507108  -1.171042   1.207809  -0.316650   0.878644
Number of training observations: 700

Test Data:
     Feature_0  Feature_1  Feature_2  Feature_3  Feature_4
650  -0.769562   0.547303   1.364082  -1.378803  -0.196991
651  -0.610782   0.142851   0.010018   1.345381  -0.108405
652   1.374784  -0.315100   0.926482   0.228663   1.729885
653  -1.477015  -1.163858   1.831622  -0.733265  -0.669803
654  -0.875724   1.148446  -1.253458   0.022450  -0.265622
Number of test observations: 350

Updated Training Data:
   Feature_0  Feature_1  Feature_2  Feature_3  Feature_4
0  -0.966517   0.417190  -0.688667  -0.062657  -0.563649
1  -0.410931  -0.367224  -0.706757   0.302450  -0.627928
2   0.015687

In [90]:
####################################################################################################
# Generate linear synthetic data
####################################################################################################
def synthetic(
    n_x: int = 5,
    n_y: int = 10,
    n_tot: int = 1200,
    n_obs: int = 104,
    split: list[float] = [0.6, 0.4],
    set_seed: int = 100,
) -> tuple[TrainTest, TrainTest]:
    """
    Generates synthetic (normally-distributed) asset and factor data.

    :param n_x: Number of features.
    :param n_y: Number of assets.
    :param n_tot: Number of observations in the whole dataset.
    :param n_obs: Number of observations per batch.
    :param split: List of floats representing train-validation-test split percentages (must sum up to one).
    :param set_seed: Integer seed for replicability of the numpy RNG.

    :return: Tuple of TrainTest objects for features and asset data split into train, validation, and test subsets.
    """
    np.random.seed(set_seed)  # Set the random seed for reproducibility

    # 'True' prediction bias and weights
    a: np.ndarray = (
        np.sort(np.random.rand(n_y) / 250) + 0.0001
    )  # Generate small bias terms for each asset
    b: np.ndarray = (
        np.random.randn(n_x, n_y) / 5
    )  # Generate random weights for linear relationships between features and assets
    c: np.ndarray = np.random.randn(
        int((n_x + 1) / 2), n_y
    )  # Generate additional random weights for auxiliary features

    # Noise standard deviation
    s: np.ndarray = (
        np.sort(np.random.rand(n_y)) / 20 + 0.02
    )  # Generate small standard deviations for noise for each asset

    # Synthetic features
    X: np.ndarray = (
        np.random.randn(n_tot, n_x) / 50
    )  # Generate synthetic features from a normal distribution
    X2: np.ndarray = (
        np.random.randn(n_tot, int((n_x + 1) / 2)) / 50
    )  # Generate auxiliary features from a normal distribution

    # Synthetic outputs
    Y: np.ndarray = (
        a + X @ b + X2 @ c + s * np.random.randn(n_tot, n_y)
    )  # Generate synthetic outputs based on linear combinations of features and noise

    X: pd.DataFrame = pd.DataFrame(X)  # Convert features to a pandas DataFrame
    Y: pd.DataFrame = pd.DataFrame(Y)  # Convert outputs to a pandas DataFrame

    # Partition dataset into training and testing sets
    return TrainTest(X, n_obs, split), TrainTest(
        Y, n_obs, split
    )  # Return TrainTest objects for features and outputs


####################################################################################################
# Test code for synthetic data generation
####################################################################################################
if __name__ == "__main__":
    # Parameters for synthetic data generation
    n_x = 5  # Number of features
    n_y = 10  # Number of assets
    n_tot = 1200  # Total number of observations
    n_obs = 104  # Number of observations per batch
    split = [0.6, 0.4]  # Split ratios for training and testing
    set_seed = 100  # Random seed for reproducibility

    # Generate synthetic data
    train_test_features, train_test_outputs = synthetic(
        n_x, n_y, n_tot, n_obs, split, set_seed
    )

    # Test the generated feature data
    train_features = train_test_features.train()
    test_features = train_test_features.test()
    print("Training Features:")
    print(train_features.head())
    print(f"Number of training feature observations: {len(train_features)}")
    print("\nTest Features:")
    print(test_features.head())
    print(f"Number of test feature observations: {len(test_features)}")

    # Test the generated output data
    train_outputs = train_test_outputs.train()
    test_outputs = train_test_outputs.test()
    print("\nTraining Outputs:")
    print(train_outputs.head())
    print(f"Number of training output observations: {len(train_outputs)}")
    print("\nTest Outputs:")
    print(test_outputs.head())
    print(f"Number of test output observations: {len(test_outputs)}")

    # Verify the shape of the generated data
    assert (
        train_features.shape[1] == n_x
    ), "Number of features in training set does not match expected value."
    assert (
        train_outputs.shape[1] == n_y
    ), "Number of assets in training set does not match expected value."
    assert (
        test_features.shape[1] == n_x
    ), "Number of features in test set does not match expected value."
    assert (
        test_outputs.shape[1] == n_y
    ), "Number of assets in test set does not match expected value."

Training Features:
          0         1         2         3         4
0  0.037531 -0.007538  0.036639  0.000060 -0.001520
1  0.000079 -0.003700 -0.049743 -0.034093 -0.022725
2 -0.059466  0.000666 -0.004978 -0.009004  0.002649
3  0.000444  0.006347 -0.015048 -0.025928  0.001903
4 -0.008474 -0.023720 -0.007309 -0.025420  0.031723
Number of training feature observations: 720

Test Features:
            0         1         2         3         4
616  0.014443  0.017019 -0.003300 -0.056563 -0.017051
617  0.018964 -0.016458 -0.003853 -0.011657 -0.022333
618 -0.022941  0.020247 -0.012068  0.003381  0.019936
619  0.006151  0.024943 -0.006490 -0.010648 -0.017412
620 -0.031169 -0.034504  0.012386  0.022285 -0.043454
Number of test feature observations: 584

Training Outputs:
          0         1         2         3         4         5         6  \
0  0.020391 -0.024384 -0.050628 -0.016314 -0.044117  0.017402  0.090213   
1 -0.001323  0.039438  0.068975 -0.035036  0.059205 -0.009392 -0.091626   

In [91]:
####################################################################################################
# Generate non-linear synthetic data
####################################################################################################
import numpy as np
import pandas as pd
import unittest


def synthetic_nl(
    n_x: int = 5,
    n_y: int = 10,
    n_tot: int = 1200,
    n_obs: int = 104,
    split: list[float] = [0.6, 0.4],
    set_seed: int = 100,
) -> tuple[TrainTest, TrainTest]:
    """
    Generates synthetic (normally-distributed) factor data and mixes them using a quadratic model
    of linear, squared, and cross products to produce the asset data.

    :param n_x: Number of features.
    :param n_y: Number of assets.
    :param n_tot: Number of observations in the whole dataset.
    :param n_obs: Number of observations per batch.
    :param split: List of floats representing train-validation-test split percentages (must sum up to one).
    :param set_seed: Integer seed for replicability of the numpy RNG.
    :return: Tuple of TrainTest objects for features and asset data split into train, validation, and test subsets.
    """
    # Set the random seed for reproducibility
    np.random.seed(set_seed)

    # Generate 'True' prediction bias and weights
    a: np.ndarray = (
        np.sort(np.random.rand(n_y) / 200) + 0.0005
    )  # Bias terms for each asset
    b: np.ndarray = np.random.randn(n_x, n_y) / 4  # Linear relationship weights
    c: np.ndarray = np.random.randn(
        int((n_x + 1) / 2), n_y
    )  # Auxiliary feature weights
    d: np.ndarray = np.random.randn(n_x**2, n_y) / n_x  # Cross-product weights

    # Generate noise standard deviation for each asset
    s: np.ndarray = np.sort(np.random.rand(n_y)) / 20 + 0.02

    # Generate synthetic features
    X: np.ndarray = np.random.randn(n_tot, n_x) / 50  # Main features
    X2: np.ndarray = (
        np.random.randn(n_tot, int((n_x + 1) / 2)) / 50
    )  # Auxiliary features
    X_cross: np.ndarray = 100 * (X[:, :, None] * X[:, None, :]).reshape(
        n_tot, n_x**2
    )  # Cross-product features
    X_cross = X_cross - X_cross.mean(axis=0)  # Center cross-product features

    # Generate synthetic outputs
    Y: np.ndarray = a + X @ b + X2 @ c + X_cross @ d + s * np.random.randn(n_tot, n_y)

    # Convert features and outputs to pandas DataFrames
    X_df: pd.DataFrame = pd.DataFrame(X)
    Y_df: pd.DataFrame = pd.DataFrame(Y)

    # Partition dataset into training and testing sets
    return TrainTest(X_df, n_obs, split), TrainTest(Y_df, n_obs, split)


####################################################################################################
# Unit Test for synthetic_nl function
####################################################################################################
def test_synthetic_nl() -> None:
    n_x = 5
    n_y = 10
    n_tot = 1200
    n_obs = 104
    split = [0.6, 0.4]
    set_seed = 100

    features, outputs = synthetic_nl(
        n_x=n_x,
        n_y=n_y,
        n_tot=n_tot,
        n_obs=n_obs,
        split=split,
        set_seed=set_seed,
    )

    assert isinstance(
        features, TrainTest
    ), "Features should be an instance of TrainTest."
    assert isinstance(outputs, TrainTest), "Outputs should be an instance of TrainTest."
    assert features.data.shape == (n_tot, n_x), "Features data shape mismatch."
    assert outputs.data.shape == (n_tot, n_y), "Outputs data shape mismatch."
    expected_train_len = int(n_tot * split[0])
    assert (
        len(features.train()) == expected_train_len
    ), "Training split size mismatch for features."
    assert (
        len(outputs.train()) == expected_train_len
    ), "Training split size mismatch for outputs."

    # Test reproducibility
    features_1, outputs_1 = synthetic_nl(
        n_x=n_x,
        n_y=n_y,
        n_tot=n_tot,
        n_obs=n_obs,
        split=split,
        set_seed=set_seed,
    )
    features_2, outputs_2 = synthetic_nl(
        n_x=n_x,
        n_y=n_y,
        n_tot=n_tot,
        n_obs=n_obs,
        split=split,
        set_seed=set_seed,
    )
    pd.testing.assert_frame_equal(
        features_1.data, features_2.data, "Feature data mismatch for reproducibility."
    )
    pd.testing.assert_frame_equal(
        outputs_1.data, outputs_2.data, "Output data mismatch for reproducibility."
    )


if __name__ == "__main__":
    test_synthetic_nl()
    print("All tests passed.")

All tests passed.


In [92]:
####################################################################################################
# Generate non-linear synthetic data
####################################################################################################
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from typing import List, Tuple, Optional
import time
import statsmodels.api as sm
import pandas_datareader as pdr
from alpha_vantage.timeseries import TimeSeries


# Function to generate synthetic data using a 3-layer neural network
def synthetic_NN(
    n_x: int = 5,
    n_y: int = 10,
    n_tot: int = 1200,
    n_obs: int = 104,
    split: List[float] = [0.6, 0.4],
    set_seed: int = 45678,
) -> Tuple["TrainTest", "TrainTest"]:
    """
    Generates synthetic (normally-distributed) factor data and mixes them using a 3-layer neural network.

    :param n_x: Number of features.
    :param n_y: Number of assets.
    :param n_tot: Number of observations in the whole dataset.
    :param n_obs: Number of observations per batch.
    :param split: List of floats representing train-validation-test split percentages (must sum up to one).
    :param set_seed: Integer seed for reproducibility.
    :return: Tuple of TrainTest objects for feature and asset data.
    """
    np.random.seed(set_seed)  # Set random seed for reproducibility

    # Generate synthetic features (n_tot samples with n_x features each)
    X: np.ndarray = (
        np.random.randn(n_tot, n_x) * 10 + 0.5
    )  # Scale and shift features to have varied range

    # Initialize neural network
    synth: synthetic3layer = synthetic3layer(
        n_x, n_y, set_seed
    ).double()  # Create an instance of the 3-layer neural network

    # Generate synthetic outputs using the neural network
    Y: torch.Tensor = synth(
        torch.from_numpy(X)
    )  # Convert X to a torch tensor and pass it through the neural network

    # Convert synthetic features and outputs to pandas DataFrames
    X_df: pd.DataFrame = pd.DataFrame(X)  # Convert features to DataFrame
    Y_df: pd.DataFrame = (
        pd.DataFrame(Y.detach().numpy()) / 10
    )  # Convert outputs to DataFrame and scale down

    # Return TrainTest objects for features and outputs
    return TrainTest(X_df, n_obs, split), TrainTest(Y_df, n_obs, split)


####################################################################################################
# Neural Network Module
####################################################################################################
class synthetic3layer(nn.Module):
    def __init__(self, n_x: int, n_y: int, set_seed: int) -> None:
        """
        Initialize a 3-layer neural network to synthesize data.

        :param n_x: Number of input features.
        :param n_y: Number of output features.
        :param set_seed: Integer seed for reproducibility.
        """
        super().__init__()  # Call the parent class (nn.Module) initializer
        torch.manual_seed(
            set_seed
        )  # Set random seed for torch to ensure reproducible weights

        # Define a neural network with 3 hidden layers
        self.pred_layer: nn.Sequential = nn.Sequential(
            nn.Linear(
                n_x, int(0.5 * (n_x + n_y))
            ),  # First linear layer to project input features to an intermediate size
            nn.ReLU(),  # ReLU activation function to introduce non-linearity
            nn.Linear(
                int(0.5 * (n_x + n_y)), int(0.6 * (n_x + n_y))
            ),  # Second linear layer to another intermediate size
            nn.ReLU(),  # ReLU activation function
            nn.Linear(
                int(0.6 * (n_x + n_y)), n_y
            ),  # Third linear layer to project to output size
            nn.ReLU(),  # ReLU activation function
            nn.Linear(n_y, n_y),  # Final linear layer to produce the final outputs
        )

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        """
        Perform the forward pass of the neural network to generate synthetic outputs.

        :param X: Features. (n_obs x n_x) torch tensor with feature time series data.
        :return: Synthetically generated output. (n_obs x n_y) torch tensor of outputs.
        """
        # Apply the prediction layer to each input tensor in the batch and stack the results
        return torch.stack([self.pred_layer(x_t) for x_t in X])


####################################################################################################
# Test code to detect bugs
####################################################################################################
def test_synthetic_nn() -> None:
    try:
        # Define parameters for the test
        n_x = 5
        n_y = 10
        n_tot = 1200
        n_obs = 104
        split = [0.6, 0.4]
        set_seed = 45678

        # Generate synthetic data
        features, outputs = synthetic_NN(n_x, n_y, n_tot, n_obs, split, set_seed)

        # Check if generated features and outputs are pandas DataFrames
        assert isinstance(
            features.data, pd.DataFrame
        ), "Features should be a pandas DataFrame."
        assert isinstance(
            outputs.data, pd.DataFrame
        ), "Outputs should be a pandas DataFrame."

        # Check the dimensions of the generated data
        assert features.data.shape == (
            n_tot,
            n_x,
        ), f"Expected features shape {(n_tot, n_x)}, but got {features.data.shape}."
        assert outputs.data.shape == (
            n_tot,
            n_y,
        ), f"Expected outputs shape {(n_tot, n_y)}, but got {outputs.data.shape}."

        # Check reproducibility
        features_1, outputs_1 = synthetic_NN(n_x, n_y, n_tot, n_obs, split, set_seed)
        features_2, outputs_2 = synthetic_NN(n_x, n_y, n_tot, n_obs, split, set_seed)
        pd.testing.assert_frame_equal(
            features_1.data,
            features_2.data,
            "Feature data mismatch for reproducibility.",
        )
        pd.testing.assert_frame_equal(
            outputs_1.data, outputs_2.data, "Output data mismatch for reproducibility."
        )

        print("All tests passed successfully.")
    except AssertionError as e:
        print(f"Test failed: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


# Run the test function
if __name__ == "__main__":
    test_synthetic_nn()

All tests passed successfully.


In [93]:
from typing import Callable
import cvxpy as cp
from cvxpylayers.torch import CvxpyLayer

#---------------------------------------------------------------------------------------------------
# base_mod: CvxpyLayer that declares the portfolio optimization problem
#---------------------------------------------------------------------------------------------------
def base_mod(n_y: int, n_obs: int, prisk: Callable) -> CvxpyLayer:
    """
    Base optimization problem declared as a CvxpyLayer object.

    :param n_y: Number of assets.
    :param n_obs: Number of scenarios in the dataset.
    :param prisk: Portfolio risk function.
    :return: CvxpyLayer representing the optimization layer.
    """
    # Variables
    z: cp.Variable = cp.Variable((n_y, 1), nonneg=True)  # Portfolio weights (long-only positions)

    # Parameters
    y_hat: cp.Parameter = cp.Parameter(n_y)  # Predicted outcomes (e.g., expected returns)
    
    # Constraints
    constraints: list[cp.Constraint] = [
        cp.sum(z) == 1  # Budget constraint: sum of weights equals 1
    ]

    # Objective function
    objective: cp.Minimize = cp.Minimize(-y_hat @ z)  # Maximize returns by minimizing negative expected returns

    # Construct optimization problem and differentiable layer
    problem: cp.Problem = cp.Problem(objective, constraints)

    return CvxpyLayer(problem, parameters=[y_hat], variables=[z])

#---------------------------------------------------------------------------------------------------
# nominal: CvxpyLayer that declares the nominal portfolio optimization problem
#---------------------------------------------------------------------------------------------------
def nominal(n_y: int, n_obs: int, prisk: Callable) -> CvxpyLayer:
    """
    Nominal optimization problem declared as a CvxpyLayer object.

    :param n_y: Number of assets.
    :param n_obs: Number of scenarios in the dataset.
    :param prisk: Portfolio risk function.
    :return: CvxpyLayer representing the optimization layer.
    """
    # Variables
    z: cp.Variable = cp.Variable((n_y, 1), nonneg=True)  # Portfolio weights (long-only positions)
    c_aux: cp.Variable = cp.Variable()  # Auxiliary variable for risk calculation
    obj_aux: cp.Variable = cp.Variable(n_obs)  # Objective auxiliary variable for each scenario
    mu_aux: cp.Variable = cp.Variable()  # Expected portfolio return

    # Parameters
    ep: cp.Parameter = cp.Parameter((n_obs, n_y))  # Scenario matrix of residuals
    y_hat: cp.Parameter = cp.Parameter(n_y)  # Predicted outcomes (e.g., expected returns)
    gamma: cp.Parameter = cp.Parameter(nonneg=True)  # Risk aversion coefficient
    
    # Constraints
    constraints: list[cp.Constraint] = [
        cp.sum(z) == 1,  # Budget constraint: sum of weights equals 1
        mu_aux == y_hat @ z  # Calculate expected return
    ]
    for i in range(n_obs):
        constraints.append(obj_aux[i] >= prisk(z, c_aux, ep[i]))  # Add risk constraints for each scenario

    # Objective function
    objective: cp.Minimize = cp.Minimize((1 / n_obs) * cp.sum(obj_aux) - gamma * mu_aux)  # Minimize risk-adjusted return

    # Construct optimization problem and differentiable layer
    problem: cp.Problem = cp.Problem(objective, constraints)

    return CvxpyLayer(problem, parameters=[ep, y_hat, gamma], variables=[z])

In [94]:
import pandas as pd
import time
from alpha_vantage.timeseries import TimeSeries
import pandas_datareader.data as pdr

####################################################################################################
# Option 4: Factors from Kenneth French's data library and asset data from AlphaVantage
# https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/data_library.html
# https://www.alphavantage.co
####################################################################################################


def AV(
    start: str,
    end: str,
    split: list[float],
    freq: str = "weekly",
    n_obs: int = 104,
    n_y: int | None = None,
    use_cache: bool = False,
    save_results: bool = False,
    AV_key: str | None = "YDNA9HH8P2IW985M",
) -> tuple[TrainTest, TrainTest]:
    """
    Load data from Kenneth French's data library and from AlphaVantage.

    :param start: Start date of time series.
    :param end: End date of time series.
    :param split: List of floats representing train-validation-test split percentages.
    :param freq: Data frequency (daily, weekly, monthly). Default is 'weekly'.
    :param n_obs: Number of observations per batch. Default is 104.
    :param n_y: Number of features to select. If None, the maximum number (8) is used. Default is None.
    :param use_cache: Whether to load cached data or download new data. Default is False.
    :param save_results: Whether to save the data for future use. Default is False.
    :param AV_key: AlphaVantage API key for accessing their data. Default is 'YDNA9HH8P2IW985M'.
    :return: Tuple containing TrainTest objects for features and asset data.
    """
    if use_cache:
        # Load cached data
        X: pd.DataFrame = pd.read_pickle(f"./cache/factor_{freq}.pkl")
        Y: pd.DataFrame = pd.read_pickle(f"./cache/asset_{freq}.pkl")
    else:
        # Define list of tickers to be downloaded
        tick_list: list[str] = [
            "AAPL",
            "MSFT",
            "AMZN",
            "C",
            "JPM",
            "BAC",
            "XOM",
            "HAL",
            "MCD",
            "WMT",
            "COST",
            "CAT",
            "LMT",
            "JNJ",
            "PFE",
            "DIS",
            "VZ",
            "T",
            "ED",
            "NEM",
        ]

        # Select the first n_y tickers if specified
        if n_y is not None:
            tick_list = tick_list[:n_y]

        # Ensure API key is provided
        if AV_key is None:
            print(
                """A personal AlphaVantage API key is required to load the asset pricing data.
                If you do not have a key, you can get one from www.alphavantage.co (free for academic users)"""
            )
            AV_key = input("Enter your AlphaVantage API key: ")

        # Initialize TimeSeries object to interact with AlphaVantage API
        ts: TimeSeries = TimeSeries(
            key=AV_key, output_format="pandas", indexing_type="date"
        )

        # Download asset data from AlphaVantage
        Y_list: list[pd.Series] = []
        for tick in tick_list:
            data, _ = ts.get_daily_adjusted(symbol=tick, outputsize="full")
            data = data["5. adjusted close"]
            Y_list.append(data)
            time.sleep(12.5)  # AlphaVantage rate limit to avoid API ban
        Y: pd.DataFrame = pd.concat(Y_list, axis=1)
        Y = Y[::-1]
        Y = Y.loc["1999-01-01":end].pct_change()
        Y = Y.loc[start:end]
        Y.columns = tick_list

        # Download factor data from Kenneth French's library
        dl_freq: str = "_daily"
        X: pd.DataFrame = pdr.get_data_famafrench(
            f"F-F_Research_Data_5_Factors_2x3{dl_freq}", start=start, end=end
        )[0]
        rf_df: pd.Series = X["RF"]
        X = X.drop(["RF"], axis=1)
        mom_df: pd.DataFrame = pdr.get_data_famafrench(
            f"F-F_Momentum_Factor{dl_freq}", start=start, end=end
        )[0]
        st_df: pd.DataFrame = pdr.get_data_famafrench(
            f"F-F_ST_Reversal_Factor{dl_freq}", start=start, end=end
        )[0]
        lt_df: pd.DataFrame = pdr.get_data_famafrench(
            f"F-F_LT_Reversal_Factor{dl_freq}", start=start, end=end
        )[0]

        # Concatenate all factors into a single DataFrame
        X = pd.concat([X, mom_df, st_df, lt_df], axis=1) / 100

        # Convert daily returns to weekly returns if specified
        if freq in ["weekly", "_weekly"]:
            Y = Y.resample("W-FRI").agg(lambda x: (x + 1).prod() - 1)
            X = X.resample("W-FRI").agg(lambda x: (x + 1).prod() - 1)

        # Save the data if requested
        if save_results:
            X.to_pickle(f"./cache/factor_{freq}.pkl")
            Y.to_pickle(f"./cache/asset_{freq}.pkl")

    # Partition dataset into training and testing sets. Lag the data by one observation
    return TrainTest(X[:-1], n_obs, split), TrainTest(Y[1:], n_obs, split)


####################################################################################################
# Test code to verify functionality and detect bugs
####################################################################################################
def test_AV() -> None:
    """
    Test the AV function to ensure correctness of generated data.

    :return: None
    """
    try:
        # Define parameters for the test
        start = "2020-01-01"
        end = "2023-01-01"
        split = [0.7, 0.3]
        freq = "weekly"
        n_obs = 104
        n_y = 5
        use_cache = False
        save_results = False
        AV_key = "YDNA9HH8P2IW985M"

        # Generate synthetic data
        features, outputs = AV(
            start, end, split, freq, n_obs, n_y, use_cache, save_results, AV_key
        )

        # Check if generated features and outputs are pandas DataFrames
        assert isinstance(
            features.data, pd.DataFrame
        ), "Features should be a pandas DataFrame."
        assert isinstance(
            outputs.data, pd.DataFrame
        ), "Outputs should be a pandas DataFrame."

        # Check the dimensions of the generated data
        assert (
            features.data.shape[0] == outputs.data.shape[0]
        ), "Features and outputs should have matching number of rows."

        print("All tests passed successfully.")
    except AssertionError as e:
        print(f"Test failed: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


# Run the test function
if __name__ == "__main__":
    test_AV()

An unexpected error occurred: Thank you for using Alpha Vantage! This is a premium endpoint. You may subscribe to any of the premium plans at https://www.alphavantage.co/premium/ to instantly unlock all premium endpoints


In [95]:
import pandas as pd
import statsmodels.api as sm
import numpy as np


####################################################################################################
# Statistical Analysis Function
####################################################################################################
def statanalysis(X: pd.DataFrame, Y: pd.DataFrame) -> pd.DataFrame:
    """
    Conduct a pairwise statistical significance analysis of each feature in X against each asset in Y.

    :param X: DataFrame containing the time series of features (independent variables).
    :param Y: DataFrame containing the time series of asset returns (dependent variables).
    :return: DataFrame containing p-values obtained from regressing each individual feature against each individual asset.
    """
    # Initialize an empty DataFrame to store p-values
    stats: pd.DataFrame = pd.DataFrame(columns=X.columns, index=Y.columns)

    # Iterate over each asset (Y) and each feature (X) to perform OLS regression
    # OLS (Ordinary Least Squares) is used to estimate the relationship between each feature and asset return.
    # For each asset, regress it against each feature with a constant term to obtain the p-value.
    for ticker in Y.columns:
        for feature in X.columns:
            # Perform OLS regression and store the p-value of the feature
            stats.loc[ticker, feature] = (
                sm.OLS(Y[ticker].values, sm.add_constant(X[feature]).values)
                .fit()
                .pvalues[1]
            )

    # Convert p-values to float and round to two decimal places
    return stats.astype(float).round(2)


####################################################################################################
# Test code to verify functionality and avoid bugs
####################################################################################################
def test_statanalysis() -> None:
    """
    Test the statanalysis function to ensure it correctly calculates p-values for feature-asset relationships.

    :return: None
    """
    try:
        # Generate random data for testing
        np.random.seed(42)
        X_test = pd.DataFrame(
            np.random.randn(100, 5), columns=[f"Feature_{i+1}" for i in range(5)]
        )
        Y_test = pd.DataFrame(
            np.random.randn(100, 3), columns=[f"Asset_{i+1}" for i in range(3)]
        )

        # Run the statistical analysis
        stats_result = statanalysis(X_test, Y_test)

        # Check if the result is a DataFrame
        assert isinstance(
            stats_result, pd.DataFrame
        ), "The result should be a pandas DataFrame."

        # Check if the dimensions of the result match expectations
        assert stats_result.shape == (
            3,
            5,
        ), f"Expected shape (3, 5), but got {stats_result.shape}."

        # Check if all p-values are between 0 and 1
        assert (
            stats_result.map(lambda x: 0 <= x <= 1).all().all()
        ), "All p-values should be between 0 and 1."

        print("All tests passed successfully.")
    except AssertionError as e:
        print(f"Test failed: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


# Run the test function
if __name__ == "__main__":
    test_statanalysis()

All tests passed successfully.


In [96]:
import psutil

num_cores: int = (
    psutil.cpu_count()
)  # Get the number of CPU cores available in the system.
torch.set_num_threads(
    num_cores
)  # Set the number of threads for PyTorch to utilize all CPU cores.
if psutil.MACOS:
    num_cores = 0  # Set num_cores to 0 on macOS systems (specific behavior).

num_cores

16

In [97]:
from typing import Callable
import cvxpy as cp
from cvxpylayers.torch import CvxpyLayer


# ---------------------------------------------------------------------------------------------------
# base_mod: CvxpyLayer that declares the portfolio optimization problem
# ---------------------------------------------------------------------------------------------------
def base_mod(n_y: int, n_obs: int, prisk: Callable) -> CvxpyLayer:
    """
    Base optimization problem declared as a CvxpyLayer object.

    :param n_y: Number of assets.
    :param n_obs: Number of scenarios in the dataset.
    :param prisk: Portfolio risk function.
    :return: CvxpyLayer representing the optimization layer.
    """
    # Variables
    z: cp.Variable = cp.Variable(
        (n_y, 1), nonneg=True
    )  # Portfolio weights (long-only positions)

    # Parameters
    y_hat: cp.Parameter = cp.Parameter(
        n_y
    )  # Predicted outcomes (e.g., expected returns)

    # Constraints
    constraints: list[cp.Constraint] = [
        cp.sum(z) == 1  # Budget constraint: sum of weights equals 1
    ]

    # Objective function
    objective: cp.Minimize = cp.Minimize(
        -y_hat @ z
    )  # Maximize returns by minimizing negative expected returns

    # Construct optimization problem and differentiable layer
    problem: cp.Problem = cp.Problem(objective, constraints)

    return CvxpyLayer(problem, parameters=[y_hat], variables=[z])


# ---------------------------------------------------------------------------------------------------
# nominal: CvxpyLayer that declares the nominal portfolio optimization problem
# ---------------------------------------------------------------------------------------------------
def nominal(n_y: int, n_obs: int, prisk: Callable) -> CvxpyLayer:
    """
    Nominal optimization problem declared as a CvxpyLayer object.

    :param n_y: Number of assets.
    :param n_obs: Number of scenarios in the dataset.
    :param prisk: Portfolio risk function.
    :return: CvxpyLayer representing the optimization layer.
    """
    # Variables
    z: cp.Variable = cp.Variable(
        (n_y, 1), nonneg=True
    )  # Portfolio weights (long-only positions)
    c_aux: cp.Variable = cp.Variable()  # Auxiliary variable for risk calculation
    obj_aux: cp.Variable = cp.Variable(
        n_obs
    )  # Objective auxiliary variable for each scenario
    mu_aux: cp.Variable = cp.Variable()  # Expected portfolio return

    # Parameters
    ep: cp.Parameter = cp.Parameter((n_obs, n_y))  # Scenario matrix of residuals
    y_hat: cp.Parameter = cp.Parameter(
        n_y
    )  # Predicted outcomes (e.g., expected returns)
    gamma: cp.Parameter = cp.Parameter(nonneg=True)  # Risk aversion coefficient

    # Constraints
    constraints: list[cp.Constraint] = [
        cp.sum(z) == 1,  # Budget constraint: sum of weights equals 1
        mu_aux == y_hat @ z,  # Calculate expected return
    ]
    for i in range(n_obs):
        constraints.append(
            obj_aux[i] >= prisk(z, c_aux, ep[i])
        )  # Add risk constraints for each scenario

    # Objective function
    objective: cp.Minimize = cp.Minimize(
        (1 / n_obs) * cp.sum(obj_aux) - gamma * mu_aux
    )  # Minimize risk-adjusted return

    # Construct optimization problem and differentiable layer
    problem: cp.Problem = cp.Problem(objective, constraints)

    return CvxpyLayer(problem, parameters=[ep, y_hat, gamma], variables=[z])

In [98]:
# ---------------------------------------------------------------------------------------------------
# Total Variation: sum_t abs(p_t - q_t) <= delta
# ---------------------------------------------------------------------------------------------------
def tv(n_y: int, n_obs: int, prisk: Callable) -> CvxpyLayer:
    """
    DRO layer using the 'Total Variation' distance to define the probability ambiguity set.
    From Ben-Tal et al. (2013).

    :param n_y: Number of assets.
    :param n_obs: Number of scenarios in the dataset.
    :param prisk: Portfolio risk function.
    :return: CvxpyLayer representing the optimization layer.
    """
    # Variables
    z: cp.Variable = cp.Variable(
        (n_y, 1), nonneg=True
    )  # Decision variable representing portfolio weights with nonnegative constraint.
    c_aux: cp.Variable = (
        cp.Variable()
    )  # Auxiliary variable used in risk function calculation.
    lambda_aux: cp.Variable = cp.Variable(
        nonneg=True
    )  # Auxiliary variable representing Lagrange multiplier for the Total Variation constraint (nonnegative).
    eta_aux: cp.Variable = (
        cp.Variable()
    )  # Auxiliary variable representing an additional term in the optimization.
    beta_aux: cp.Variable = cp.Variable(
        n_obs
    )  # Auxiliary variable representing scenario-specific values for optimization.
    mu_aux: cp.Variable = (
        cp.Variable()
    )  # Auxiliary variable representing the expected return of the portfolio.

    # Parameters
    ep: cp.Parameter = cp.Parameter(
        (n_obs, n_y)
    )  # Parameter representing the different scenarios for asset returns.
    y_hat: cp.Parameter = cp.Parameter(
        n_y
    )  # Parameter representing the expected returns for each asset.
    gamma: cp.Parameter = cp.Parameter(
        nonneg=True
    )  # Parameter representing the risk-aversion coefficient (nonnegative).
    delta: cp.Parameter = cp.Parameter(
        nonneg=True
    )  # Parameter representing the size of the ambiguity set for Total Variation (nonnegative).

    # Constraints
    constraints: list[cp.Constraint] = [
        cp.sum(z) == 1,
        beta_aux >= -lambda_aux,
        mu_aux == y_hat @ z,
    ]  # Constraints for portfolio weights, auxiliary variables, and expected return.
    for i in range(n_obs):
        constraints.append(
            beta_aux[i] >= prisk(z, c_aux, ep[i]) - eta_aux
        )  # Constraint to enforce risk bounds for each scenario.
        constraints.append(
            lambda_aux >= prisk(z, c_aux, ep[i]) - eta_aux
        )  # Constraint for Lagrange multiplier to handle variation.

    # Objective function
    objective: cp.Minimize = cp.Minimize(
        eta_aux + delta * lambda_aux + (1 / n_obs) * cp.sum(beta_aux) - gamma * mu_aux
    )  # Objective to minimize the sum of risk terms and maximize expected return.

    # Construct optimization problem and differentiable layer
    problem: cp.Problem = cp.Problem(
        objective, constraints
    )  # Define the optimization problem with the objective and constraints.

    return CvxpyLayer(
        problem, parameters=[ep, y_hat, gamma, delta], variables=[z]
    )  # Return the CvxpyLayer that represents the optimization problem.


# ---------------------------------------------------------------------------------------------------
# Hellinger distance: sum_t (sqrt(p_t) - sqrt(q_t))^2 <= delta
# ---------------------------------------------------------------------------------------------------
def hellinger(n_y: int, n_obs: int, prisk: Callable) -> CvxpyLayer:
    """
    DRO layer using the Hellinger distance to define the probability ambiguity set.
    From Ben-Tal et al. (2013).

    :param n_y: Number of assets.
    :param n_obs: Number of scenarios in the dataset.
    :param prisk: Portfolio risk function.
    :return: CvxpyLayer representing the optimization layer.
    """
    # Variables
    z: cp.Variable = cp.Variable(
        (n_y, 1), nonneg=True
    )  # Decision variable representing portfolio weights with nonnegative constraint.
    c_aux: cp.Variable = (
        cp.Variable()
    )  # Auxiliary variable used in risk function calculation.
    lambda_aux: cp.Variable = cp.Variable(
        nonneg=True
    )  # Auxiliary variable representing Lagrange multiplier for Hellinger constraint (nonnegative).
    xi_aux: cp.Variable = (
        cp.Variable()
    )  # Auxiliary variable representing an additional term in the optimization.
    beta_aux: cp.Variable = cp.Variable(
        n_obs, nonneg=True
    )  # Scenario-specific auxiliary variable for optimization with nonnegative constraint.
    tau_aux: cp.Variable = cp.Variable(
        n_obs, nonneg=True
    )  # Auxiliary variable for scenario-specific terms, nonnegative.
    mu_aux: cp.Variable = (
        cp.Variable()
    )  # Auxiliary variable representing the expected return of the portfolio.

    # Parameters
    ep: cp.Parameter = cp.Parameter(
        (n_obs, n_y)
    )  # Parameter representing the different scenarios for asset returns.
    y_hat: cp.Parameter = cp.Parameter(
        n_y
    )  # Parameter representing the expected returns for each asset.
    gamma: cp.Parameter = cp.Parameter(
        nonneg=True
    )  # Parameter representing the risk-aversion coefficient (nonnegative).
    delta: cp.Parameter = cp.Parameter(
        nonneg=True
    )  # Parameter representing the size of the ambiguity set for Hellinger distance (nonnegative).

    # Constraints
    constraints: list[cp.Constraint] = [
        cp.sum(z) == 1,
        mu_aux == y_hat @ z,
    ]  # Constraints to ensure portfolio weights sum to 1 and compute expected return.
    for i in range(n_obs):
        constraints.append(
            xi_aux + lambda_aux >= prisk(z, c_aux, ep[i]) + tau_aux[i]
        )  # Constraint to handle Hellinger distance using auxiliary variables.
        constraints.append(
            beta_aux[i] >= cp.quad_over_lin(lambda_aux, tau_aux[i])
        )  # Quadratic constraint for scenario-specific optimization.

    # Objective function
    objective: cp.Minimize = cp.Minimize(
        xi_aux
        + (delta - 1) * lambda_aux
        + (1 / n_obs) * cp.sum(beta_aux)
        - gamma * mu_aux
    )  # Objective to minimize the sum of risk terms and maximize expected return.

    # Construct optimization problem and differentiable layer
    problem: cp.Problem = cp.Problem(
        objective, constraints
    )  # Define the optimization problem with the objective and constraints.

    return CvxpyLayer(
        problem, parameters=[ep, y_hat, gamma, delta], variables=[z]
    )  # Return the CvxpyLayer that represents the optimization problem.

In [None]:
####################################################################################################
# E2E neural network module
####################################################################################################
class e2e_net(nn.Module):
    """End-to-end DRO learning neural net module."""

    def __init__(
        self,
        n_x: int,
        n_y: int,
        n_obs: int,
        opt_layer: str = 'nominal',
        prisk: str = 'p_var',
        perf_loss: str = 'sharpe_loss',
        pred_model: str = 'linear',
        pred_loss_factor: Optional[float] = 0.5,
        perf_period: int = 13,
        train_pred: bool = True,
        train_gamma: bool = True,
        train_delta: bool = True,
        set_seed: Optional[int] = None,
        cache_path: str = './cache/'
    ) -> None:
        """
        Initializes the end-to-end learning neural net module.

        :param n_x: Number of input features in the prediction model.
        :param n_y: Number of outputs from the prediction model.
        :param n_obs: Number of scenarios for residuals calculation.
        :param opt_layer: Optimization layer type ('nominal', 'base_mod', etc.).
        :param prisk: Portfolio risk function used in the optimization layer.
        :param perf_loss: Performance loss function.
        :param pred_model: Type of prediction model ('linear', '2layer', '3layer').
        :param pred_loss_factor: Trade-off between prediction loss and performance loss.
        :param perf_period: Number of lookahead realizations for performance loss.
        :param train_pred: Whether to train the prediction layer.
        :param train_gamma: Whether to train the risk appetite parameter gamma.
        :param train_delta: Whether to train the robustness parameter delta.
        :param set_seed: Random seed for replicability.
        :param cache_path: Path to cache model data.
        """
        super().__init__()

        # Set random seed for replicability
        if set_seed is not None:
            torch.manual_seed(set_seed)
            self.seed: Optional[int] = set_seed

        self.n_x: int = n_x
        self.n_y: int = n_y
        self.n_obs: int = n_obs

        # Prediction loss function
        if pred_loss_factor is not None:
            self.pred_loss_factor: float = pred_loss_factor
            self.pred_loss: Optional[nn.Module] = torch.nn.MSELoss()
        else:
            self.pred_loss: Optional[nn.Module] = None

        # Define performance loss
        #self.perf_loss: Callable = eval('lf.' + perf_loss)
        if perf_loss == 'sharpe_loss':
            self.perf_loss: Callable = lf.sharpe_loss
        elif perf_loss == 'sortino_loss':
            self.perf_loss = lf.sortino_loss
        self.perf_period: int = perf_period

        # Register 'gamma' (risk-return trade-off parameter)
        self.gamma: nn.Parameter = nn.Parameter(torch.FloatTensor(1).uniform_(0.02, 0.1))
        self.gamma.requires_grad = train_gamma
        self.gamma_init: float = self.gamma.item()

        # Determine model type and optionally register 'delta'
        if opt_layer == 'nominal':
            self.model_type: str = 'nom'
        elif opt_layer == 'base_mod':
            self.gamma.requires_grad = False
            self.model_type: str = 'base_mod'
        else:
            # Register 'delta' (ambiguity sizing parameter) for DRO layer
            if opt_layer == 'hellinger':
                ub: float = (1 - 1 / (n_obs**0.5)) / 2
                lb: float = (1 - 1 / (n_obs**0.5)) / 10
            else:
                ub = (1 - 1 / n_obs) / 2
                lb = (1 - 1 / n_obs) / 10
            self.delta: nn.Parameter = nn.Parameter(torch.FloatTensor(1).uniform_(lb, ub))
            self.delta.requires_grad = train_delta
            self.delta_init: float = self.delta.item()
            self.model_type = 'dro'

        # Prediction model
        self.pred_model: str = pred_model
        if pred_model == 'linear':
            self.pred_layer: nn.Module = nn.Linear(n_x, n_y)
            self.pred_layer.weight.requires_grad = train_pred
            self.pred_layer.bias.requires_grad = train_pred
        elif pred_model == '2layer':
            self.pred_layer = nn.Sequential(
                nn.Linear(n_x, int(0.5 * (n_x + n_y))),
                nn.ReLU(),
                nn.Linear(int(0.5 * (n_x + n_y)), n_y),
                nn.ReLU(),
                nn.Linear(n_y, n_y),
            )
        elif pred_model == '3layer':
            self.pred_layer = nn.Sequential(
                nn.Linear(n_x, int(0.5 * (n_x + n_y))),
                nn.ReLU(),
                nn.Linear(int(0.5 * (n_x + n_y)), int(0.6 * (n_x + n_y))),
                nn.ReLU(),
                nn.Linear(int(0.6 * (n_x + n_y)), n_y),
                nn.ReLU(),
                nn.Linear(n_y, n_y),
            )

        # Optimization model
        self.opt_layer: CvxpyLayer = eval(opt_layer)(n_y, n_obs, eval('rf.' + prisk))

        # Cache path for storing model data
        self.cache_path: str = cache_path

        # Store initial model state
        if train_gamma and train_delta:
            self.init_state_path: str = f"{cache_path}{self.model_type}_initial_state_{pred_model}"
        elif train_delta and not train_gamma:
            self.init_state_path = f"{cache_path}{self.model_type}_initial_state_{pred_model}_TrainGamma{train_gamma}"
        elif train_gamma and not train_delta:
            self.init_state_path = f"{cache_path}{self.model_type}_initial_state_{pred_model}_TrainDelta{train_delta}"
        else:
            self.init_state_path = f"{cache_path}{self.model_type}_initial_state_{pred_model}_TrainGamma{train_gamma}_TrainDelta{train_delta}"
        torch.save(self.state_dict(), self.init_state_path)
    # -----------------------------------------------------------------------------------------------
    # forward: forward pass of the e2e neural net
    # -----------------------------------------------------------------------------------------------
    def forward(self, X, Y):
        """Forward pass of the NN module

        The inputs 'X' are passed through the prediction layer to yield predictions 'Y_hat'. The
        residuals from prediction are then calcuclated as 'ep = Y - Y_hat'. Finally, the residuals
        are passed to the optimization layer to find the optimal decision z_star.

        Inputs
        X: Features. ([n_obs+1] x n_x) torch tensor with feature timeseries data
        Y: Realizations. (n_obs x n_y) torch tensor with asset timeseries data

        Other
        ep: Residuals. (n_obs x n_y) matrix of the residual between realizations and predictions

        Outputs
        y_hat: Prediction. (n_y x 1) vector of outputs of the prediction layer
        z_star: Optimal solution. (n_y x 1) vector of asset weights
        """
        # Multiple predictions Y_hat from X
        Y_hat = torch.stack([self.pred_layer(x_t) for x_t in X])

        # Calculate residuals and process them
        ep = Y - Y_hat[:-1]
        y_hat = Y_hat[-1]

        # Optimization solver arguments (from CVXPY for ECOS/SCS solver)
        solver_args = {"solve_method": "ECOS", "max_iters": 120, "abstol": 1e-7}
        # solver_args = {'solve_method': 'SCS', 'eps': 1e-7, 'acceleration_lookback': 5,
        # 'max_iters':20000}

        # Optimize z per scenario
        # Determine whether nominal or dro model
        if self.model_type == "nom":
            (z_star,) = self.opt_layer(ep, y_hat, self.gamma, solver_args=solver_args)
        elif self.model_type == "dro":
            (z_star,) = self.opt_layer(
                ep, y_hat, self.gamma, self.delta, solver_args=solver_args
            )
        elif self.model_type == "base_mod":
            (z_star,) = self.opt_layer(y_hat, solver_args=solver_args)

        return z_star, y_hat

    # -----------------------------------------------------------------------------------------------
    # net_train: Train the e2e neural net
    # -----------------------------------------------------------------------------------------------
    def net_train(self, train_set, val_set=None, epochs=None, lr=None):
        """Neural net training module

        Inputs
        train_set: SlidingWindow object containing features x, realizations y and performance
        realizations y_perf
        val_set: SlidingWindow object containing features x, realizations y and performance
        realizations y_perf
        epochs: Number of training epochs
        lr: learning rate

        Output
        Trained model
        (Optional) val_loss: Validation loss
        """

        # Assign number of epochs and learning rate
        if epochs is None:
            epochs = self.epochs
        if lr is None:
            lr = self.lr

        # Define the optimizer and its parameters
        optimizer = torch.optim.Adam(self.parameters(), lr=lr)

        # Number of elements in training set
        n_train = len(train_set)

        # Train the neural network
        for epoch in range(epochs):

            # TRAINING: forward + backward pass
            train_loss = 0
            optimizer.zero_grad()
            for t, (x, y, y_perf) in enumerate(train_set):

                # Forward pass: predict and optimize
                z_star, y_hat = self(x.squeeze(), y.squeeze())

                # Loss function
                if self.pred_loss is None:
                    loss = (1 / n_train) * self.perf_loss(z_star, y_perf.squeeze())
                else:
                    loss = (1 / n_train) * (
                        self.perf_loss(z_star, y_perf.squeeze())
                        + (self.pred_loss_factor / self.n_y)
                        * self.pred_loss(y_hat, y_perf.squeeze()[0])
                    )

                # Backward pass: backpropagation
                loss.backward()

                # Accumulate loss of the fully trained model
                train_loss += loss.item()

            # Update parameters
            optimizer.step()

            # Ensure that gamma, delta > 0 after taking a descent step
            for name, param in self.named_parameters():
                if name == "gamma":
                    param.data.clamp_(0.0001)
                if name == "delta":
                    param.data.clamp_(0.0001)

        # Compute and return the validation loss of the model
        if val_set is not None:

            # Number of elements in validation set
            n_val = len(val_set)

            val_loss = 0
            with torch.no_grad():
                for t, (x, y, y_perf) in enumerate(val_set):

                    # Predict and optimize
                    z_val, y_val = self(x.squeeze(), y.squeeze())

                    # Loss function
                    if self.pred_loss_factor is None:
                        loss = (1 / n_val) * self.perf_loss(z_val, y_perf.squeeze())
                    else:
                        loss = (1 / n_val) * (
                            self.perf_loss(z_val, y_perf.squeeze())
                            + (self.pred_loss_factor / self.n_y)
                            * self.pred_loss(y_val, y_perf.squeeze()[0])
                        )

                    # Accumulate loss
                    val_loss += loss.item()

            return val_loss

    # -----------------------------------------------------------------------------------------------
    # net_cv: Cross validation of the e2e neural net for hyperparameter tuning
    # -----------------------------------------------------------------------------------------------
    def net_cv(self, X, Y, lr_list, epoch_list, n_val=4):
        """Neural net cross-validation module

        Inputs
        X: Features. TrainTest object of feature timeseries data
        Y: Realizations. TrainTest object of asset time series data
        epochs: number of training passes
        lr_list: List of candidate learning rates
        epoch_list: List of candidate number of epochs
        n_val: Number of validation folds from the training dataset

        Output
        Trained model
        """
        results = pc.CrossVal()
        X_temp = dl.TrainTest(X.train(), X.n_obs, [1, 0])
        Y_temp = dl.TrainTest(Y.train(), Y.n_obs, [1, 0])
        for epochs in epoch_list:
            for lr in lr_list:

                # Train the neural network
                print("================================================")
                print(f"Training E2E {self.model_type} model: lr={lr}, epochs={epochs}")

                val_loss_tot = []
                for i in range(n_val - 1, -1, -1):

                    # Partition training dataset into training and validation subset
                    split = [round(1 - 0.2 * (i + 1), 2), 0.2]
                    X_temp.split_update(split)
                    Y_temp.split_update(split)

                    # Construct training and validation DataLoader objects
                    train_set = DataLoader(
                        pc.SlidingWindow(
                            X_temp.train(), Y_temp.train(), self.n_obs, self.perf_period
                        )
                    )
                    val_set = DataLoader(
                        pc.SlidingWindow(
                            X_temp.test(), Y_temp.test(), self.n_obs, self.perf_period
                        )
                    )

                    # Reset learnable parameters gamma and delta
                    self.load_state_dict(torch.load(self.init_state_path))

                    if self.pred_model == "linear":
                        # Initialize the prediction layer weights to OLS regression weights
                        X_train, Y_train = X_temp.train(), Y_temp.train()
                        X_train.insert(0, "ones", 1.0)

                        X_train = Variable(
                            torch.tensor(X_train.values, dtype=torch.double)
                        )
                        Y_train = Variable(
                            torch.tensor(Y_train.values, dtype=torch.double)
                        )

                        Theta = torch.inverse(X_train.T @ X_train) @ (
                            X_train.T @ Y_train
                        )
                        Theta = Theta.T
                        del X_train, Y_train

                        with torch.no_grad():
                            self.pred_layer.bias.copy_(Theta[:, 0])
                            self.pred_layer.weight.copy_(Theta[:, 1:])

                    val_loss = self.net_train(
                        train_set, val_set=val_set, lr=lr, epochs=epochs
                    )
                    val_loss_tot.append(val_loss)

                    print(f"Fold: {n_val-i} / {n_val}, val_loss: {val_loss}")

                # Store results
                results.val_loss.append(np.mean(val_loss_tot))
                results.lr.append(lr)
                results.epochs.append(epochs)
                print("================================================")

        # Convert results to dataframe
        self.cv_results = results.df()
        self.cv_results.to_pickle(self.init_state_path + "_results.pkl")

        # Select and store the optimal hyperparameters
        idx = self.cv_results.val_loss.idxmin()
        self.lr = self.cv_results.lr[idx]
        self.epochs = self.cv_results.epochs[idx]

        # Print optimal parameters
        print(
            f"CV E2E {self.model_type} with hyperparameters: lr={self.lr}, epochs={self.epochs}"
        )

    # -----------------------------------------------------------------------------------------------
    # net_roll_test: Test the e2e neural net
    # -----------------------------------------------------------------------------------------------
    def net_roll_test(self, X, Y, n_roll=4, lr=None, epochs=None):
        """Neural net rolling window out-of-sample test

        Inputs
        X: Features. ([n_obs+1] x n_x) torch tensor with feature timeseries data
        Y: Realizations. (n_obs x n_y) torch tensor with asset timeseries data
        n_roll: Number of training periods (i.e., number of times to retrain the model)
        lr: Learning rate for test. If 'None', the optimal learning rate is loaded
        epochs: Number of epochs for test. If 'None', the optimal # of epochs is loaded

        Output
        self.portfolio: add the backtest results to the e2e_net object
        """

        # Declare backtest object to hold the test results
        portfolio = pc.backtest(
            len(Y.test()) - Y.n_obs, self.n_y, Y.test().index[Y.n_obs :]
        )

        # Store trained gamma and delta values
        if self.model_type == "nom":
            self.gamma_trained = []
        elif self.model_type == "dro":
            self.gamma_trained = []
            self.delta_trained = []

        # Store the squared L2-norm of the prediction weights and their difference from OLS weights
        if self.pred_model == "linear":
            self.theta_L2 = []
            self.theta_dist_L2 = []

        # Store initial train/test split
        init_split = Y.split

        # Window size
        win_size = init_split[1] / n_roll

        split = [0, 0]
        t = 0
        for i in range(n_roll):

            print(f"Out-of-sample window: {i+1} / {n_roll}")

            split[0] = init_split[0] + win_size * i
            if i < n_roll - 1:
                split[1] = win_size
            else:
                split[1] = 1 - split[0]

            X.split_update(split), Y.split_update(split)
            train_set = DataLoader(
                pc.SlidingWindow(X.train(), Y.train(), self.n_obs, self.perf_period)
            )
            test_set = DataLoader(pc.SlidingWindow(X.test(), Y.test(), self.n_obs, 0))

            # Reset learnable parameters gamma and delta
            self.load_state_dict(torch.load(self.init_state_path))

            if self.pred_model == "linear":
                # Initialize the prediction layer weights to OLS regression weights
                X_train, Y_train = X.train(), Y.train()
                X_train.insert(0, "ones", 1.0)

                X_train = Variable(torch.tensor(X_train.values, dtype=torch.double))
                Y_train = Variable(torch.tensor(Y_train.values, dtype=torch.double))

                Theta = torch.inverse(X_train.T @ X_train) @ (X_train.T @ Y_train)
                Theta = Theta.T
                del X_train, Y_train

                with torch.no_grad():
                    self.pred_layer.bias.copy_(Theta[:, 0])
                    self.pred_layer.weight.copy_(Theta[:, 1:])

            # Train model using all available data preceding the test window
            self.net_train(train_set, lr=lr, epochs=epochs)

            # Store trained values of gamma and delta
            if self.model_type == "nom":
                self.gamma_trained.append(self.gamma.item())
            elif self.model_type == "dro":
                self.gamma_trained.append(self.gamma.item())
                self.delta_trained.append(self.delta.item())

            # Store the squared L2 norm of theta and distance between theta and OLS weights
            if self.pred_model == "linear":
                theta_L2 = torch.sum(self.pred_layer.weight**2, axis=()) + torch.sum(
                    self.pred_layer.bias**2, axis=()
                )
                theta_dist_L2 = torch.sum(
                    (self.pred_layer.weight - Theta[:, 1:]) ** 2, axis=()
                ) + torch.sum((self.pred_layer.bias - Theta[:, 0]) ** 2, axis=())
                self.theta_L2.append(theta_L2)
                self.theta_dist_L2.append(theta_dist_L2)

            # Test model
            with torch.no_grad():
                for j, (x, y, y_perf) in enumerate(test_set):

                    # Predict and optimize
                    z_star, _ = self(x.squeeze(), y.squeeze())

                    # Store portfolio weights and returns for each time step 't'
                    portfolio.weights[t] = z_star.squeeze()
                    portfolio.rets[t] = y_perf.squeeze() @ portfolio.weights[t]
                    t += 1

        # Reset dataset
        X, Y = X.split_update(init_split), Y.split_update(init_split)

        # Calculate the portfolio statistics using the realized portfolio returns
        portfolio.stats()

        self.portfolio = portfolio

    # -----------------------------------------------------------------------------------------------
    # load_cv_results: Load cross validation results
    # -----------------------------------------------------------------------------------------------
    def load_cv_results(self, cv_results):
        """Load cross validation results

        Inputs
        cv_results: pd.dataframe containing the cross validation results

        Outputs
        self.lr: Load the optimal learning rate
        self.epochs: Load the optimal number of epochs
        """

        # Store the cross validation results within the object
        self.cv_results = cv_results

        # Select and store the optimal hyperparameters
        idx = cv_results.val_loss.idxmin()
        self.lr = cv_results.lr[idx]
        self.epochs = cv_results.epochs[idx]