In [None]:
from standard_functions import *
path = os.getcwd()
data_path = os.path.join(os.path.dirname(path), 'Data') 

BloombergData = pd.read_csv(data_path + "/BloombergData_Swap_Features.csv")
preData = pd.read_csv(data_path + "/TestData_Swap_Features_pre.csv")
postData = pd.read_csv(data_path + "/TestData_Swap_Features_post.csv")

insample = np.array(BloombergData.iloc[:,2:].reset_index(drop=True)) 
insample_scaled = [x/100 for x in insample]
insample_tensor = torch.from_numpy(np.float32(insample_scaled))

X_train1, X_val1 = train_validation_split(BloombergData.reset_index(drop=True), 0.1) 
X_train2  = np.array(X_train1.iloc[:,2:].reset_index(drop=True)) 
X_train_scaled = [x/100 for x in X_train2]
X_train_tensor = torch.from_numpy(np.float32(X_train_scaled))

X_val2  = np.array(X_val1.iloc[:,2:].reset_index(drop=True)) 
X_val_scaled = [x/100 for x in X_val2]
X_val_tensor = torch.from_numpy(np.float32(X_val_scaled))

preX_test2  = np.array(preData.iloc[:,2:].reset_index(drop=True)) 
preX_test_scaled = [x/100 for x in preX_test2]
preX_test_tensor = torch.from_numpy(np.float32(preX_test_scaled))

postX_test2  = np.array(postData.iloc[:,2:].reset_index(drop=True)) 
postX_test_scaled = [x/100 for x in postX_test2]
postX_test_tensor = torch.from_numpy(np.float32(postX_test_scaled))


In [None]:
def forward_odesolution(G_output, sigma_1, sigma_2, sigma_3, rho12, rho13, rho23, mu, 
                        r, encoded_mat, N, model, plot = False):
    sigma_long = build_sigma_matrix_3factor(sigma_1, sigma_2, sigma_3, rho12, rho13, rho23) 
    mu_long = mu.repeat_interleave(30, dim=0)
    G_long = G_output.reshape(-1, 1)
    r_long = r.repeat_interleave(30, dim=0)
    
    alpha = reshape_wide(alpha_fct_3factor(encoded_mat, model, mu_long, sigma_long, G_long), N)
    beta = reshape_wide(beta_fct(r_long, G_long), N)
    gamma = reshape_wide(gamma_fct_3factor(encoded_mat, model, sigma_long), N)
    
    maturities = torch.arange(1, 31, dtype=torch.float32)

    A, B = solve_ODE_constant(alpha, beta, gamma, maturities)
    
    p = torch.exp(torch.clamp(A - B * G_output, min = -80, max = 80))  # (N,30)
    #p = torch.exp(A - B * G_output)
    p_cumsum = torch.cumsum(p, dim=1)  # (N,30)
    
    s = (1 - p) / p_cumsum
    
    # Variance 
    row_std_devalpha = alpha.std(dim=1).detach().sum()
    row_std_devbeta = beta.std(dim=1).detach().sum()
    row_std_devgamma = gamma.std(dim=1).detach().sum()
    row_std_devA = A.std(dim=1).detach().sum()
    row_std_devB = B.std(dim=1).detach().sum()


    # if plot: 
    #     plot_all(alpha, beta, gamma, A, B)

    check_nan_inf(p, "P")
    check_nan_inf(B, "B")
    check_nan_inf(A, "A")
    check_nan_inf(G_output, "G_output")
    check_nan_inf(alpha, "alpha")
    check_nan_inf(beta, "beta")
    check_nan_inf(gamma, "gamma")
    check_nan_inf(s, "s")

    # Arbitrage loss
    dA = B**2 * gamma 
    dB = B * alpha + beta

    # if plot:
        # plot_all(alpha, beta, gamma, A, B, datapoint=0)
        # plot_constant(A, B, dA, dB, alpha, beta, gamma, datapoint=0)

   
    arb_l = arb_equation_3factor(G_output, sigma_1, sigma_2, sigma_3, rho12, rho13, rho23, mu, r, encoded_mat, model, dA, dB, B)

    return s, p, arb_l, alpha, beta, gamma, A, B, row_std_devalpha, row_std_devbeta, row_std_devgamma, row_std_devA, row_std_devB

In [None]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()

        self.encoder = nn.Sequential(
            nn.Linear(8, 3, bias = False)
        )

        self.sigma = nn.Sequential(              
            nn.Linear(3, 10, bias = False),
            centered_softmax(),   
            nn.Linear(10, 6, bias = False)
        )

        self.mu = nn.Sequential(
            nn.Linear(3, 3, bias = True)
        )

        self.r = nn.Sequential(
            nn.Linear(3, 4, bias = False),
            centered_softmax(),
            nn.Linear(4, 1, bias = False)    
        )

        self.decoder = nn.Sequential(
            nn.Linear(4, 10, bias = False),
            centered_softmax(),
            nn.Linear(10, 1, bias = False)
        )

        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                # if m.bias is not None: 
                #     nn.init.xavier_uniform_(m.bias) # maybe works?

    def forward(self, x):
        
        # Encoder
        encoded = self.encoder(x) 
        
        # Maturity 
        mat = torch.arange(1, 31, dtype=torch.float32).repeat(len(encoded)) # length N*30 array of 1-30,1-30 and so on N times - range(1,31)*N
        x1 = encoded.repeat_interleave(30, dim=0)                           # N*30x2 
        encoded_mat = torch.cat([x1, mat.unsqueeze(1)], dim=1)              # N*30x3

        # Decoder --> gives y for all maturities 
        G_output = self.decoder(encoded_mat).reshape(len(x), 30)

        # H network for sigma 
        sigma_1, sigma_2, sigma_3, rho12, rho13, rho23 = torch.split(self.sigma(encoded), 1, dim=1)

        # K network for mu
        mu = self.mu(encoded)

        # R network for rate 
        r = self.r(encoded)

        return G_output, sigma_1, sigma_2, sigma_3, rho12, rho13, rho23, mu, r, encoded_mat, encoded

In [None]:
# torch.manual_seed(4)

# model = Autoencoder()   
# criterion = nn.MSELoss()

# optimizer = optim.Adam(model.parameters(), lr=0.005)
# lr_sched = LR_Scheduler(optimizer, percentage = 0.9, interval = 50)

# # Training
# num_epochs = 3597
# batch_size = 32
# data_loader = torch.utils.data.DataLoader(X_train_tensor, batch_size=batch_size, shuffle=True)

# swap_mats = [1, 2, 3, 5, 10, 15, 20, 30]
# std_alp_epoch, std_bet_epoch, std_gam_epoch, std_A_epoch, std_B_epoch, arb_losses_list, losses_list, vallosses_list = train_ae3(forward_odesolution, X_train_tensor, X_val_tensor, swap_mats, num_epochs, data_loader, model, criterion, optimizer, lr_sched = lr_sched)

# print(std_alp_epoch)
# print(std_bet_epoch)
# print(std_gam_epoch)
# print(std_A_epoch)
# print(std_B_epoch)

In [None]:
# os.makedirs("models", exist_ok=True)
name = "3factor_AF_constant"
# torch.save(model.state_dict(), f"models/{name}.pth")
# print(f"Model saved successfully as models/{name}.pth")
model2 = Autoencoder()       # Ensure Autoencoder class is defined
model2.load_state_dict(torch.load(f"models/{name}.pth"))