In [20]:
import numpy as np
from scipy.stats import t, expon

class Data_generator:
    def __init__(self, _alpha_r=0, _beta_r=1, _d=1, _alpha_u=0, _beta_u=1, _gamma=1, _theta=0, __lambda=1):
        self.alpha_r = _alpha_r
        self.beta_r = _beta_r
        self.d = _d
        self.alpha_u = _alpha_u
        self.beta_u = _beta_u
        self.gamma = _gamma
        self.theta = _theta
        self._lambda = __lambda

    def gen_data(self, n=1000, T=100):
        r=np.zeros((T,n)) #r yield
        eps=np.zeros(n)
        u=np.zeros(n)
        for i in range(T):
            u = self.alpha_u+self.beta_u*u+self.gamma*eps**2+self.theta*np.where(eps<0,eps**2,0)+np.random.exponential(scale=1/self._lambda,size=n)
            eps=np.random.standard_t(df=self.d, size=n)*np.sqrt(u)
            r[i]=self.alpha_r+self.beta_r*u+eps
        return r
    
    def gen_data_full(self, n=1000, T=100):
        r=np.zeros((T,n)) #r yield
        eps_list=np.zeros((T,n))
        u_list=np.zeros((T,n))
        u=np.zeros(n)
        eps=np.zeros(n)
        for i in range(T):
            u = self.alpha_u+self.beta_u*u+self.gamma*eps**2+self.theta*np.where(eps<0,eps**2,0)+np.random.exponential(scale=1/self._lambda,size=n)
            eps=np.random.standard_t(df=self.d, size=n)*np.sqrt(u)
            u_list[i]=u
            eps_list[i]=eps
            r[i]=self.alpha_r+self.beta_r*u+eps
        return r, u_list, eps_list

def updaterr(r_t, eps_t, r_t1, eps_t1, alpha_r, beta_r, d, alpha_u, beta_u, gamma, theta, _lambda):
    u_t1 = (r_t1 - alpha_r - eps_t1) / beta_r
    w_t = alpha_u + beta_u * u_t1 + gamma * eps_t1 ** 2 + theta * (eps_t1 < 0) * eps_t1 ** 2
    nu_t = eps_t * np.sqrt(beta_r / (r_t - eps_t - alpha_r))
    eta_t = (r_t - eps_t - alpha_r) / beta_r - w_t
    p = t.pdf(nu_t, d) * expon.pdf(eta_t, scale=1 / _lambda) / np.sqrt(beta_r * (r_t - eps_t - alpha_r))
    return p

class TEST_SAMPLER:
    """test sampler"""
    samples = []
    def __init__(self, T, params):
        self.alpha_r, self.beta_r, self.d, self.alpha_u, self.beta_u,self.gamma, self.theta, self._lambda = params
        self.params = params
        self.T = T 
        
    def sample(self, sample_num:int, r):
        samples = np.zeros((self.T, sample_num))
        weights = np.ones(sample_num)
        for i in range(1, self.T): 
            for j in range(sample_num):
                samples[i][j] = self.policy(r[i][j], r[i-1][j], samples[i-1][j])
                weights[j] *= updaterr(r[i][j], samples[i][j], r[i-1][j], samples[i-1][j], *self.params)/self.policy_density(r[i][j], samples[i][j], r[i-1][j], samples[i-1][j])
        weights *= sample_num/np.sum(weights)
        return samples, weights

    def policy(self, rt, rt1, et1):
        """sampling policy"""
        return rt - self.alpha_r - expon.rvs(scale=0.8)

    def policy_density(self, rt, et, rt1, et1): 
        """pdf of policy"""
        return  expon.pdf(rt - self.alpha_r - et, scale=0.8)

In [44]:
import torch

class EM:
    def __init__(self, T, _alpha_r=0, _beta_r=1, _d=1, _alpha_u=0, _beta_u=1, _gamma=1, _theta=0, __lambda=1):
        self.alpha_r = torch.tensor(_alpha_r, dtype=torch.float32, requires_grad=True)
        self.beta_r = torch.tensor(_beta_r, dtype=torch.float32, requires_grad=True)
        self.alpha_u = torch.tensor(_alpha_u, dtype=torch.float32, requires_grad=True)
        self.beta_u = torch.tensor(_beta_u, dtype=torch.float32, requires_grad=True)
        self.gamma = torch.tensor(_gamma, dtype=torch.float32, requires_grad=True)
        self.theta = torch.tensor(_theta, dtype=torch.float32, requires_grad=True)
        self._lambda = torch.tensor(__lambda, dtype=torch.float32, requires_grad=True)
        self.d = torch.tensor(_d, dtype=torch.float32, requires_grad=True)
        self.parameters=[self.alpha_r, self.beta_r, self.alpha_u, self.beta_u, self.gamma, self.theta, self._lambda, self.d]
        self.T=T
        self.load_data()
    def call_sampler(self,n):
        #remember to add .item() at final version
        return torch.randn(10), torch.randn(10,self.T)
        #The below is currently not working
        params=(self.alpha_r.item(), self.beta_r.item(), self.d.item(), self.alpha_u.item(), self.beta_u.item(), self.gamma.item(), self.theta.item(), self._lambda.item())
        sampler = TEST_SAMPLER(self.T, params)
        samples, weights=sampler.sample(n, self.r)
        return weights,samples.T
    def load_data(self):
        DG = Data_generator(0.2, 0.2, 6.0, 0.6, 0.4, 0.1, 0.02, 2.5)
        self.r=torch.tensor(DG.gen_data(1, self.T))
    def upd_param(self,lr=0.01):
        with torch.no_grad():
            for param in self.parameters:
                param -= lr * param.grad #check + or -
                param.grad.zero_()
    def singularlikelihood(self,epsilon):
        #calculate log-likelihood of each set of epsilon, (n,T) -> (n,)
        return torch.randn(epsilon.shape[0])
    def log_total(self):
        n=10
        weights,epsilon=self.call_sampler(n)
        likelihood=self.singularlikelihood(epsilon)
        normed=likelihood-torch.sum(likelihood).item()/n+torch.log(weights) #item for removing normalization from gradients
        return torch.log(torch.sum(torch.exp(normed)))
    def optimize(self,num_steps=10):
        for _ in range(num_steps):
            likelihood=self.log_total()
            likelihood.backward()
            self.upd_param(lr=0.01)
    
        

    
    

EM_sampler=EM(10,0.2, 0.2, 6.0, 0.6, 0.4, 0.1, 0.02, 2.5)
EM_sampler.r.shape

torch.Size([10, 1])