In [None]:
import torch
import numpy as np
from scipy import stats
from Data import *
from utils import *
from torch.utils.data import TensorDataset,DataLoader
import matplotlib.pyplot as plt

%matplotlib inline
%config InlineBackend.figure_format='retina'
plt.style.use("classic")
import warnings
warnings.filterwarnings("ignore")


In [1]:
def repeated_experimental_indicators(all_params, true_params):
    mse = torch.mean((all_params - true_params) ** 2)
    mae = torch.mean(torch.abs(all_params - true_params))
    bias = torch.mean((all_params.mean(0) - true_params) ** 2)
    mse_sd = torch.std((all_params - true_params) ** 2)
    mae_sd = torch.std(torch.abs(all_params - true_params))
    bias_sd = torch.std((all_params.mean(0) - true_params) ** 2)
    
    res = {
        "MSE": mse.item(),
        "MSE_SD": mse_sd.item(),
        "MAE": mae.item(),
        "MAE_SD": mae_sd.item(),
        "Bias": bias.item(),
        "Bias_SD": bias_sd.item()
    }
    return res



class VBHMMs:
    def __init__(self, Y, X, K,beta0,Z,bei=3.0):
        self.Y = Y
        self.X = X
        self.N = X.shape[0]  
        self.K = K
        self.d = X.shape[1]
        self.alpha_pi = 1 / self.K * torch.ones([K])
        self.alpha_A = torch.ones([K])
        self.mu_0 = torch.zeros([self.d, 1])
        self.Sigma0 = torch.eye(self.d)
        self.p_k = 1 / K * torch.ones([self.N, self.K])
        self.Sigma = 1 * torch.eye(self.d)
        self.pi_p = 1 / self.K * torch.ones([self.K])
        self.A_p = 1 / self.K * torch.ones([self.K, self.K])
        self.X_p = torch.softmax(torch.nn.functional.one_hot(Z.long().reshape(-1),K)*bei,dim=1)
        self.mu = beta0.clone()+torch.distributions.Normal(0,0.1).sample([K,beta.shape[1]])
        self.sigma = torch.eye(1).unsqueeze(0).repeat([K, self.d, self.d])
        self.mu_1 = torch.ones([K, self.d])
        self.lambd_1 = torch.ones([K, self.d])
        r = 1.0  
        delta = 1.0  
        gamma_dist = torch.distributions.gamma.Gamma(r, 1.0 / delta)
        self.lambd = gamma_dist.sample([K, self.d])
    def update_pi(self):
        p1 = self.X_p[0]
        alpha_pi_star = p1 + self.alpha_pi
        self.pi_p = alpha_pi_star
    def update_A(self):
        for j in range(self.K):
            p_n_1 = self.X_p[:-1, j].view(-1, 1)
            p_n = self.X_p[1:, :]
            alpha_A_j_star = torch.sum(p_n_1 * p_n, 0) + self.alpha_A
            self.A_p[j] = alpha_A_j_star
    def update_beta(self):
        for k in range(self.K):
            ig = stats.invgauss(self.mu_1.numpy(), self.lambd_1.numpy())
            term1 = 0
            term2 = 0
            for n in range(self.N):
                x_n = self.X[n].view(-1, 1)
                y_n = self.Y[n]
                term1 += 1 / sigma ** 2 * self.X_p[n, k] * x_n.mm(x_n.T)
                term2 += 1 / sigma ** 2 * self.X_p[n, k] * y_n * x_n.T
            self.sigma[k] = torch.inverse(
                term1 + 1 / sigma ** 2 * torch.diag(torch.from_numpy(ig.mean()))[k] + torch.eye(self.d)*0.0001
            )
            self.mu[k] = term2.mm(self.sigma[k]).view(-1)

    def update_Z(self, num_smaple=10000):
        pi_dist = torch.distributions.Dirichlet(self.pi_p)
        pi_samples = pi_dist.sample([num_smaple])
        E_log_pi = torch.mean(torch.log(pi_samples), 0)
        A_dist = torch.distributions.Dirichlet(self.A_p)
        A_samples = A_dist.sample([num_smaple])
        E_log_A = torch.mean(torch.log(A_samples), 0)
        y1 = self.Y[0]
        x1 = self.X[0]
        beta = np.ones((num_smaple,3,self.d))
        for j in range(num_smaple):
            for i in range(self.K):
                beta[j,i,:] = np.random.multivariate_normal(self.mu[i,:],self.sigma[i,:,:])
        beta = torch.tensor(beta,dtype=torch.float32)
        torch.bmm(beta,x1.view(-1, 1).repeat([num_smaple, 1, 1]))
        normal_y1 = torch.distributions.Normal(
            beta.bmm(x1.view(-1, 1).repeat([num_smaple, 1, 1])), sigma
        )
        E_log_p_y1 = normal_y1.log_prob(y1).mean(0).view(-1)
        p1 = E_log_pi + E_log_p_y1 + (E_log_A * self.X_p[1]).sum(1)
        self.X_p[0] = torch.distributions.Multinomial(1, logits=p1).mean

        for n in range(1, self.N - 1):
            yn = self.Y[n]
            xn = self.X[n]

            normal_yn = torch.distributions.Normal(
                beta.bmm(xn.view(-1, 1).repeat([num_smaple, 1, 1])), sigma
            )
            E_log_p_yn = normal_yn.log_prob(yn).mean(0).view(-1)
          
            pn = (
                torch.sum(E_log_A.T * (self.X_p[n - 1]) + E_log_A * self.X_p[n + 1], 1)
                + E_log_p_yn
            )
            
            self.X_p[n] = torch.distributions.Multinomial(1, logits=pn).mean

       
        yN = self.Y[0]
        xN = self.X[0]
        
        normal_yN = torch.distributions.Normal(
            beta.bmm(xN.view(-1, 1).repeat([num_smaple, 1, 1])), sigma
        )
        E_log_p_yN = normal_yN.log_prob(yN).mean(0).view(-1)
        pN = torch.sum(E_log_A * (self.X_p[self.N - 1].T), 0) + E_log_p_yN
        self.X_p[self.N - 1] = torch.distributions.Multinomial(1, logits=pN).mean

    def update_xi(self):
        for k in range(self.K):
            for j in range(self.d):
                self.mu_1[k, j] = (
                    self.lambd[j, k]
                    * sigma ** 2
                    / (self.mu[k, j] ** 2 + self.sigma[k, j, j])
                )
        self.lambd_1 = self.lambd

    def update(self, iters=1):
        self.mus = torch.zeros([iters, self.K, self.d])
        for i in range(iters):
            self.update_pi()
            self.update_A()
            self.update_beta()
            self.update_Z()
            self.mus[i] = self.mu

In [2]:
#BP
from sklearn.neural_network import MLPRegressor
#linear
from sklearn.linear_model import LinearRegression
#
from sklearn.ensemble import RandomForestRegressor

In [None]:
# 产生模拟数据
sigma = 0.4
K = 3
p =30
beta = torch.zeros([K, p])

beta[0, :4] = torch.tensor([0.5, -2, 2, -1])
beta[1, :4] = torch.tensor([1, -2, 1.5, -1.5])
beta[2, :4] = torch.tensor([1.5, -1.5, 1, -2])

N = 300
# 初始概率
pi = torch.tensor([0.6, 0.3, 0.1])
# 转移概率论矩阵
A = torch.tensor([[0.2, 0.3, 0.5], [0.1, 0.6, 0.3], [0.5, 0.4, 0.1]])
batch_size=8
T=100
all_beta=torch.zeros([T,K,p])
all_prob=torch.zeros([T,K,K])
all_model=[]
n_test = 10
num_index=4
VBHMMs_pred=torch.zeros([T,n_test])
LSTM_pred=torch.zeros([T,n_test])
BP_pred=torch.zeros([T,n_test])
LM_pred=torch.zeros([T,n_test])
pred_result={"VBHMMs":torch.zeros([T,num_index]),
             "LSTM":torch.zeros([T,num_index]),
             "BP":torch.zeros([T,num_index]),
             "LM":torch.zeros([T,num_index]),
             }

for t in range(T):

    torch.random.manual_seed(t)
    X = torch.distributions.Normal(0, 2).sample([N, p])
    Y, Z = generate_data(X, beta, sigma, pi, A, N)


    X_train = X[:-n_test]
    X_test = X[-n_test:]
    y_train = Y[:-n_test]
    y_test = Y[-n_test:]
    z_train=Z[:-n_test]
    z_test = Z[-n_test:]


    torch.random.manual_seed(t)
    model = VBHMMs(y_train, X_train, K,beta,z_train,5.)

    mu_0=model.mu
    mu_1=model.mu+1
    MAX_iter = 100
    cnt=0
    while torch.mean((mu_0-mu_1)**2)>1e-6 and cnt<MAX_iter:
        mu_0=model.mu.clone()
        model.update(1)
        model.mu=model.mu[model.mu[:,0].argsort()]
        mu_1=model.mu.clone()
        cnt+=1
    Dir = torch.distributions.Dirichlet(model.A_p)
    transition_probs = Dir.mean
    all_beta[t]=model.mu
    all_prob[t]=transition_probs
    all_model.append(model)

    all_beta[t]=model.mu
    all_prob[t]=transition_probs
    all_model.append(model)
    #VBHMMs
    labels_train = model.X_p.argmax(1)
    Dir = torch.distributions.Dirichlet(model.A_p)
    transition_probs = Dir.mean.numpy()

    y_test_pred = predict_obs(
        model, labels_train[-1].item(), transition_probs, X_test, n_test, 1
    )
    VBHMMs_pred[t]=y_test_pred

    #LSTM
    data_set=TensorDataset(X_train.unsqueeze(1),y_train)
    data_laoder=DataLoader(data_set,batch_size=batch_size,shuffle=True)

    lr=1e-3
    num_epoch=50
    hidden_size=10
    setup_seed(t)
    lstm_model = LSTM(p,hidden_size,False)
    loss_list=lstm_model.train_step(data_laoder,lr,num_epoch)
    lstm_pred=lstm_model(X_test).detach()
    LSTM_pred[t]=lstm_pred.view(-1)

    #BP
    bp=MLPRegressor(hidden_layer_sizes=[20],random_state=0,max_iter=int(1e4))
    bp.fit(X_train,y_train)
    bp_pred=torch.from_numpy(bp.predict(X_test)).float()
    BP_pred[t]=bp_pred

    #LinearRegression
    lm=RandomForestRegressor(n_estimators=100,random_state=0)
    lm.fit(X_train,y_train)
    lm_pred=torch.from_numpy(lm.predict(X_test)).float()
    LM_pred[t]=lm_pred.view(-1)

    #result
    pred_result["VBHMMs"][t]=torch.FloatTensor(list(evaluate_model(y_test,VBHMMs_pred[t]).values()))
    pred_result["LSTM"][t]=torch.FloatTensor(list(evaluate_model(y_test,LSTM_pred[t]).values()))
    pred_result["BP"][t]=torch.FloatTensor(list(evaluate_model(y_test,BP_pred[t]).values()))
    pred_result["LM"][t]=torch.FloatTensor(list(evaluate_model(y_test,LM_pred[t]).values()))

    print("{}/{} iter = {} finished!".format(t+1,T,cnt))

# 原来的代码
# pred_result["VBHMMs"]=dict(zip(['MAPE', 'RMSE', 'MAE', 'R2'],pred_result["VBHMMs"].mean(0).tolist()))
# pred_result["LSTM"]=dict(zip(['MAPE', 'RMSE', 'MAE', 'R2'],pred_result["LSTM"].mean(0).tolist()))
# pred_result["BP"]=dict(zip(['MAPE', 'RMSE', 'MAE', 'R2'],pred_result["BP"].mean(0).tolist()))
# pred_result["LM"]=dict(zip(['MAPE', 'RMSE', 'MAE', 'R2'],pred_result["LM"].mean(0).tolist()))

# 修改后的代码
metrics = ['MAPE', 'RMSE', 'MAE', 'R2']
for method in ['VBHMMs', 'LSTM', 'BP', 'LM']:
    means = pred_result[method].mean(0)
    stds = pred_result[method].std(0)
    pred_result[method] = {metric: {'mean': mean.item(), 'std': std.item()} for metric, mean, std in zip(metrics, means, stds)}

# 打印结果
for key, value in pred_result.items():
    print(key, value)

print("beta:")
print(repeated_experimental_indicators(all_beta,beta))
print("A")
print(repeated_experimental_indicators(all_prob,A))

