In [1]:
import pandas as pd 
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F


In [96]:
df_prc = pd.read_csv("./close.csv", index_col=0, parse_dates=True)
df_logret = df_prc.apply(np.log).diff().iloc[1:]

In [97]:
x_logret = torch.Tensor(df_logret.values)

In [None]:
lookback = 5
ts = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=lookback, padding_mode='zeros', padding=lookback-1, bias=False)
ts.weight.data = ts.weight*0 + 1
# 1d convolution
x_logret_ts = ts(x_logret.T[:,torch.newaxis,:]).transpose(0,1)[0,:,:-lookback+1].T#.shape

In [114]:
df_logret.rolling(lookback).sum().values

array([[        nan,         nan,         nan, ...,         nan,
                nan,         nan],
       [        nan,         nan,         nan, ...,         nan,
                nan,         nan],
       [        nan,         nan,         nan, ...,         nan,
                nan,         nan],
       ...,
       [ 0.04993695, -0.00423729,  0.02188727, ..., -0.01344881,
         0.04082199,  0.03623591],
       [ 0.05454558,  0.00211193,  0.00415801, ..., -0.00956945,
         0.0453557 , -0.02848491],
       [ 0.03979891,  0.00423729,  0.00311042, ..., -0.01151644,
         0.0453557 , -0.03111641]], shape=(2464, 214))

In [115]:
x_logret_ts

tensor([[-0.0122,  0.0080,  0.0159,  ...,  0.0174, -0.0338,  0.0346],
        [-0.0122, -0.0040,  0.0159,  ...,  0.0513, -0.0013,  0.0832],
        [-0.0030,  0.0159,  0.0301,  ...,  0.0921, -0.0129,  0.0514],
        ...,
        [ 0.0499, -0.0042,  0.0219,  ..., -0.0134,  0.0408,  0.0362],
        [ 0.0545,  0.0021,  0.0042,  ..., -0.0096,  0.0454, -0.0285],
        [ 0.0398,  0.0042,  0.0031,  ..., -0.0115,  0.0454, -0.0311]],
       grad_fn=<PermuteBackward0>)

In [116]:
x_logret

tensor([[-0.0122,  0.0080,  0.0159,  ...,  0.0174, -0.0338,  0.0346],
        [ 0.0000, -0.0120,  0.0000,  ...,  0.0339,  0.0325,  0.0486],
        [ 0.0092,  0.0199,  0.0141,  ...,  0.0408, -0.0116, -0.0318],
        ...,
        [ 0.0115,  0.0085,  0.0104,  ..., -0.0019, -0.0061,  0.0059],
        [-0.0014,  0.0063, -0.0062,  ...,  0.0058,  0.0061, -0.0574],
        [-0.0072, -0.0021,  0.0021,  ..., -0.0039,  0.0000, -0.0063]])

In [68]:

class MinVar(nn.Module):
    def __init__(self, cov, gross_exposure_constraint, use_latent=False, cuda=True):
        super().__init__()
        self.enable_cuda = cuda
        self.cov = cov.cuda() if cuda else cov
        num_instrument = cov.shape[0]
        self.use_latent = use_latent
        if use_latent:
            self.latent = nn.Linear(in_features=30, out_features=num_instrument, bias=False) # A
            self.fc = nn.Linear(in_features=30, out_features=1, bias=False) # w_e
        else:
            self.fc = nn.Linear(in_features=num_instrument, out_features=1, bias=False) # w_e
        self.fc.weight.data = (torch.rand_like(self.fc.weight) - 0.5) / num_instrument 
        self.gross_exp_constr = gross_exposure_constraint

        eigenvalues, eigenvectors = torch.linalg.eigh(self.cov)
        sorted_indices = torch.argsort(eigenvalues, descending=True)
        self.eigenvalues = eigenvalues[sorted_indices]
        self.eigenvectors = eigenvectors[:, sorted_indices]

    @property
    def portfolio_weight(self): # w_p = v * A * w_e
        if self.use_latent:
            weight_in_eigenvalue_space = self.latent(self.fc.weight).T
        weight_in_eigenvalue_space = self.fc.weight.T
        weight_in_instrument_space = self.eigenvectors @ weight_in_eigenvalue_space
        norm_coef = self.normalize_coeff(weight_in_instrument_space)
        return weight_in_eigenvalue_space / norm_coef, weight_in_instrument_space / norm_coef
    
    def get_weight(self):
        return self.portfolio_weight[1].cpu().detach().numpy()

    @staticmethod
    def normalize_coeff(x):
        # 1x Long
        return x.sum()
    
    def _normalize_weight(self):
        # Prevents divergence
        if self.use_latent:
            self.latent.weight.data = self.latent.weight / torch.linalg.matrix_norm(self.latent.weight, ord='fro')
        # self.fc.weight.data = self.fc.weight / torch.linalg.vector_norm(self.fc.weight, ord=2)
        self.fc.weight.data = self.fc.weight / self.normalize_coeff(self.fc.weight)

    def fit(self, n_epochs=10000, print_step=1000, learning_rate=1e-4):
        if self.enable_cuda:
            self.cuda()
        optimizer = torch.optim.Adagrad(self.parameters(), lr=learning_rate) # Momentum-less optimizer for barrier method
        for e in range(n_epochs):
            w_e, w_i = self.portfolio_weight
            gross_exposure = torch.linalg.vector_norm(w_i, ord=1)
            portfolio_variance = self.eigenvalues @ (w_e.square())
            barrier_loss = torch.relu(gross_exposure - self.gross_exp_constr)
            loss = torch.log(portfolio_variance) + barrier_loss

            loss.backward()
            optimizer.step()
            # self._normalize_weight()
            print(f'{str(e).zfill(5)}/{n_epochs}, variance={portfolio_variance.item():.2e}, gross_exposure = {gross_exposure.item():.2e}, barrier_loss={barrier_loss:.2e}, loss={loss.item():.2e}, normalization={torch.sum(self.fc.weight):.2e}     ', end='\r', flush=True)
            if print_step and e%print_step == 0 :
                print()
        print("\nTrain Finished")