In [437]:
import torch
from heavylight import LightModel
from matplotlib import pyplot

In [438]:
spots = torch.FloatTensor(
    [
        -0.00217,
        -0.00163,
        -0.00119,
        -0.00025,
        0.0006,
        0.00177,
        0.00302,
        0.00437,
        0.00558,
        0.00676,
        0.0077,
        0.00849,
        0.00925,
        0.00991,
        0.01048,
        0.01087,
        0.01121,
        0.01152,
        0.01179,
        0.01204,
    ]
)
zcbs = (1 + spots) ** (-torch.arange(1, spots.size(0) + 1))
# r0 = 0.04 but as a torch tensor
r0 = torch.tensor(0.04)
a = torch.tensor(0.2)
b = torch.tensor(0.041)
sigma = torch.tensor(0.05)

In [440]:
class CIRModel(LightModel):
    """
    Currently uses Euler discretization to simulate the CIR model.
    """

    def __init__(self, a, b, sigma, r_init, init_time: int, num_sims: int, dt: float):
        super(CIRModel, self).__init__()
        self.a = a
        self.b = b
        self.sigma = sigma
        self.r_init = r_init
        self.init_time = init_time
        self.num_sims = num_sims
        self.dt = torch.tensor(dt)

    def r(self, t: int):
        assert t >= self.init_time
        if t == self.init_time:
            return self.r_init.expand(self.num_sims)
        r_prev_non_neg = torch.clamp(self.r(t-1), min=0)
        return self.r(t-1) + self.a * (self.b - r_prev_non_neg) * self.dt + self.sigma * torch.sqrt(r_prev_non_neg) * torch.randn_like(self.r_init) * torch.sqrt(self.dt)
    
    def discount(self, t: int):
        assert t >= self.init_time
        if t == self.init_time:
            return torch.ones(self.num_sims)
        return self.discount(t-1) * torch.exp(-self.dt * 0.5 * (self.r(t-1) + self.r(t)))

In [421]:
## TODO: integrate with the CIR, this is from someone else's code but not currently working with my CIR to make a CIR++ model

class ShiftedModel:
    def __init__(self, basemodel: CIR, zcbs: torch.Tensor):
        self.zcbs = zcbs
        self.basemodel = basemodel
        self.xzcbsbase = self.basemodel.init_zcb_price(
            torch.arange(0, zcbs.size(0) + 1)
        )
        self.zcbsbase = self.xzcbsbase[1:]
        self.xzcbs = torch.cat((self.zcbs.data.new(1).fill_(1.0), self.zcbs), dim=0)

    def simulate(self, num_sims, num_years, steps_per_year=12):
        r, d_base = self.basemodel.simulate(
            num_sims, num_years, steps_per_year=steps_per_year
        )
        numYears_plus_one = r.size(0)
        d = d_base * (
            self.xzcbs[:numYears_plus_one].unsqueeze(1)
            / self.xzcbsbase[:numYears_plus_one].unsqueeze(1)
        )
        return r, d

## Multivariate sampler

In [448]:
class MultivariateSampler(LightModel):
    def __init__(self, covariance_matrix: torch.Tensor, num_samples: int):
        self.m = MultivariateNormal(
            torch.zeros(covariance_matrix.size(0)),
            covariance_matrix
        )
        self.num_samples = num_samples

    def sample(self, t: int):
        return self.m.sample(torch.Size([self.num_samples])).T
    
    def dWtx(self, t: int):
        return self.sample(t)[0]
    def dWtS(self, t: int):
        return self.sample(t)[1]
    def dWtR(self, t: int):
        return self.sample(t)[2]

from torch.distributions import MultivariateNormal

covariance_matrix = torch.tensor([[1.0, 0.1, 0.0], [0.1, 1.0, 0.0], [0.0, 0.0, 1.0]])
m = MultivariateNormal(torch.tensor([0.0, 0.0, 0.0]), covariance_matrix)

# m.sample()  # normally distributed with mean=`[0,0,0]` and covariance_matrix=`I`
# how to sample to have 1000 observationns of each type
ms = MultivariateSampler(covariance_matrix, 10)
print(ms.sample(1))

tensor([[ 0.4818,  0.5948, -0.6709, -2.4159,  0.8579,  0.2487,  1.2829, -0.5595,
          0.1507,  0.4305],
        [ 0.5153,  0.2828, -0.2487, -0.6782,  0.6893,  0.0753,  0.8473,  0.0270,
         -1.0455, -1.2770],
        [-0.8772, -1.0671,  1.1152, -0.1365,  1.8599,  0.2909, -1.1435,  0.3790,
          0.8732, -0.8800]])


In [422]:
# TODO: This is not done
class HestonModel(LightModel):

    def __init__(self, kappa, theta, epsilon, r_init, v_init, init_time: int, num_sims: int, dt: float):
        super(HestonModel, self).__init__()
        self.kappa = kappa
        self.theta = theta
        self.epsilon = epsilon
        self.r_init = r_init
        self.v_init = v_init
        self.init_time = init_time
        self.num_sims = num_sims
        self.dt = torch.tensor(dt)

    def r(self, t: int):
        assert t >= self.init_time
        if t == self.init_time:
            return self.r_init.expand(self.num_sims)
        r_prev_non_neg = torch.clamp(self.r(t-1), min=0)
        return self.r(t-1) + self.a * (self.b - r_prev_non_neg) * self.dt + self.sigma * torch.sqrt(r_prev_non_neg) * torch.randn_like(self.r(t-1)) * torch.sqrt(self.dt)
    
    def discount(self, t: int):
        assert t >= self.init_time
        if t == self.init_time:
            return torch.ones(self.num_sims)
        return self.discount(t-1) * torch.exp(-self.dt * 0.5 * (self.r(t-1) + self.r(t)))


In [517]:
cir_model = CIRModel(a, b, sigma, r0, 0, 1000, 1 / 12)
print(cir_model.discount(10).mean())
# cir_model.ResetCache()
print(cir_model.discount(10).mean())

tensor(0.9671)
tensor(0.9671)


## step through it

We are going to take the basemodel and
