In [3]:
import torch
import torch.nn as nn

In [None]:

class LagTransformer(nn.Module):
    def __init__(self,d_model, n_heads, dropout, hidden_dim, max_length ,n_blocks, device):
        super().__init__()
        
class MultiHeadAttention(nn.Module):
    def __init__(self,d_model, n_heads, dropout):
        self.d_model, self.n_heads, self.dropout = d_model, n_heads, nn.Dropout(dropout)
        self.query, self.key, self.values = nn.Linear(d_model, d_model, bias = False), nn.Linear(d_model, d_model, bias = False),nn.Linear(d_model, d_model, bias = False)
        self.softmax = nn.Softmax(-1)
    def forward(self, q,k,v, mask):
        h = self.d_model//self.n_heads
        B = src.shape[0]
        Q = self.query(q).view(B,-1,self.n_heads,h).permute(0,2,1,3)
        K = self.key(k).view(B,-1,self.n_heads,h).permute(0,2,1,3)
        V = self.values(v).view(B,-1,self.n_heads,h).permute(0,2,1,3)
        energy = Q @ K.permute(0,1,3,2)
        if mask is not None :
            energy =  energy.masked_fill(mask==0, -1e20)
        A = self.softmax(energy)
        C = (A@V).permute(0,2,1,3).reshape(B,-1,self.d_model)
        return C

class DecoderLayer(nn.Module):
    def __init__(self,d_model, n_heads, dropout, hidden_dim):
        self.d_model, self.n_heads, self.dropout = d_model, n_heads, nn.Dropout(dropout)
        self.mha = MultiHeadAttention(d_model, n_heads, dropout)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, d_model)
        )
        self.ln1, self.ln2= nn.LayerNorm(d_model), nn.LayerNorm(d_model)
    def forward(self, src, src_mask):
        src_mha = self.mha(src,src,src,mask)
        src = self.ln1(self.dropout(src)+ src_mha)
        src_ffn = self.ffn(src)
        out = self.ln2(src_ffn + self.dropout(src))
        return out 
class Decoder(nn.Module):
    def __init__(self,d_model, n_heads, dropout, hidden_dim, max_length ,n_blocks, device):
        self.d_model, self.n_heads, self.dropout = d_model, n_heads, nn.Dropout(dropout)
        self.layers = nn.ModuleList(
            DecoderLayer(d_model, n_heads, dropout, hidden_dim) for _ in range(n_blocks)
        )
        self.pos_emb = nn.Embedding(max_length, d_model)
    def forward(self, src, src_mask): ## src is already in the form (B,N,d_model), the embeddings are the lag values 
        B,N = src.shape[0], src.shape[1]
        pos_emb = torch.arange(0,N).expand(B,N)
        src = pos_emb + src
        for layer in self.layers:
            src = layer(src, src_mask)
        return src 


def lag_emb(src, lagged_list):
    """
    src : N --> out : (N, d_model) with out[i,:]= [src[i-lagged_list[0]], ..., src[i-lagged_list[-1]]]
    """
    src = np.asarray(src)
    lagged_list = list(lagged_list)

    N = src.shape[0]
    d_model = len(lagged_list)
    out = np.empty((N, d_model), dtype=src.dtype)

    for i in range(N):
        for j, lag in enumerate(lagged_list):
            idx = i - lag
            # handle also the first values: clamp to the first element
            if idx < 0:
                idx = 0
            out[i, j] = src[idx]

    return out

 

## II) Metrics 
#### - MAE ( for point model )
#### - MSE and RMSE ( for point model )
#### - CRPS ( for prob model)
#### - MASE 
#### - Sharpe Ratio ( Financial point of view )

In [None]:
import numpy as np

def mae(y_pred: np.ndarray, y_true: np.ndarray):
    y_pred = np.asarray(y_pred)
    y_true = np.asarray(y_true)
    return np.mean(np.abs(y_pred - y_true))

def MSE(y_pred: np.ndarray, y_true: np.ndarray):
    y_pred = np.asarray(y_pred)
    y_true = np.asarray(y_true)
    return np.mean((y_pred - y_true) ** 2)

def RMSE(y_pred: np.ndarray, y_true: np.ndarray):
    mse = MSE(y_pred, y_true)
    return np.sqrt(mse)
def CRPS(y_pred, y_true): ## y_pred : shape = (n_obs, k_sample), y_tue : shape = (n_obs)
    y_pred = np.asarray(y_pred)
    y_true = np.asarray(y_true)

    k = y_pred.shape[1]
    term1 = np.mean(np.abs(y_pred - y_true[:, None]), axis=1)

    x_sorted = np.sort(y_pred, axis=1)
    w = 2 * np.arange(1, k + 1) - k - 1
    pair_sum = np.sum(x_sorted * w[None, :], axis=1)

    crps = term1 - pair_sum / (k ** 2)
    return np.mean(crps)

def sharpe_ratio(y_pred, y_true, risk_free_rate, N_day, number_days=252):
    y_pred = np.asarray(y_pred, dtype=float)
    y_true = np.asarray(y_true, dtype=float)

    N = y_true.shape[0]
    if N == 0:
        return np.nan

    r = y_true / y_pred - 1.0
    rf_per_step = (1.0 + risk_free_rate) ** (1.0 / (number_days * N_day)) - 1.0
    excess = r - rf_per_step

    vol = np.std(excess)
    if vol <= 0:
        return np.nan

    mean_excess = np.mean(excess)
    return (mean_excess * (number_days * N_day)) / (vol * np.sqrt(number_days * N_day))

