In [1]:
import os
import scipy
import numpy as np
import pandas as pd
import sympy as sp

from matplotlib import pyplot as plt

%matplotlib inline

# 4.1

In [2]:
class Moth():
    def __init__(self, X = [85, 196, 341, 578], num_iteration=100) -> None:
        self.X = np.array(X)
        self.sum_ = np.sum(X)
        self.n = np.zeros(9)
        self.p = 1/3*np.ones(3)
        self.num_iteration = num_iteration

    def E_step(self, x, p):
        dom_p1 = p[0]**2+2*p[0]*p[1]+2*p[0]*p[2]
        dom_p2 = p[1]**2+2*p[1]*p[2]
        dom_p3 = dom_p2+p[2]**2

        n_cc = x[0]*p[0]**2/dom_p1
        n_ci = 2*x[0]*p[0]*p[1]/dom_p1
        n_ct = 2*x[0]*p[0]*p[2]/dom_p1
        n_ii = x[1]*p[1]**2/dom_p2
        n_it = 2*x[1]*p[1]*p[2]/dom_p2
        n_tt = x[2]
        n_uii = x[3]*p[1]**2/dom_p3
        n_uit = 2*x[3]*p[1]*p[2]/dom_p3
        n_utt = x[3]*p[2]**2/dom_p3

        n = np.array([n_cc, n_ci, n_ct, n_ii, n_it, n_tt, n_uii, n_uit, n_utt])
        
        return n
    
    def M_step(self, x, n):
        sum_ = np.sum(x)
        p_c = (2*n[0]+n[1]+n[2])/(2*sum_)
        p_i = (2*n[3]+n[4]+n[1]+2*n[6]+n[7])/(2*sum_)
        p_t = (2*n[5]+n[4]+n[2]+2*n[8]+n[7])/(2*sum_)
        p = np.array([p_c, p_i, p_t])

        return p
    
    def l(self, x, p):
        return x[0]*np.log(p[0]**2 + 2*p[0]*p[1] + 2*p[0]*p[2]) + x[1]*np.log(p[1]**2+2*p[1]*p[2]) + 2*x[2]*np.log(p[2]) + 2*x[3]*np.log(p[1]+p[2])
    
    def grad_Q(self, n, p):
        dp_c = (2*n[0]+n[1]+n[2])/p[0] - (n[2]+n[4]+n[7]+2*(n[5]+n[8]))/p[2]
        dp_i = (n[1]+2*(n[3]+n[6])+n[4]+n[7])/p[1] - (n[2]+n[4]+n[7]+2*(n[5]+n[8]))/p[2]
        return np.array([dp_c, dp_i])
    
    def hessian_Q(self, n, p):
        dp_cc = -(2*n[0]+n[1]+n[2])/(p[0]**2) - (n[2]+n[4]+n[7]+2*(n[5]+n[8]))/(p[2]**2)
        dp_ci = -(n[2]+n[4]+n[7]+2*(n[5]+n[8]))/(p[2]**2)
        dp_ii = -(n[1]+2*(n[3]+n[6])+n[4]+n[7])/(p[1]**2) - (n[2]+n[4]+n[7]+2*(n[5]+n[8]))/(p[2]**2)
        return np.array([[dp_cc, dp_ci],[dp_ci, dp_ii]])
    
    def StepHalvingBacktracking(self, x, p0, D):
        t_tmp = 1
        p_dim = p0.shape[0]
        l0 = self.l(x, p0)
        tmp_p = np.zeros(p0.shape)
        tmp_p[:2] = p0[:2] + t_tmp*D
        tmp_p[2] = 1 - np.sum(tmp_p[:2])
        
        while((np.sum(tmp_p >= 0)<p_dim) or (np.sum(tmp_p <= 1)<p_dim)):
            t_tmp = 0.5*t_tmp
            tmp_p = np.zeros(p0.shape)
            tmp_p[:2] = p0[:2] + t_tmp*D
            tmp_p[2] = 1 - np.sum(tmp_p[:2])

        while(self.l(x, tmp_p) < l0):
            t_tmp = 0.5*t_tmp
            tmp_p = np.zeros(p0.shape)
            tmp_p[:2] = p0[:2] + t_tmp*D
            tmp_p[2] = 1 - np.sum(tmp_p[:2])

        return tmp_p
    
    def gradient_M_step(self, x, n, p):
        tmp_p = p.copy()
        hessian_inv = np.linalg.inv(self.hessian_Q(n, tmp_p))
        grad = self.grad_Q(n, tmp_p)
        delta__nt = -hessian_inv@grad
        tmp_p = self.StepHalvingBacktracking(x, tmp_p, delta__nt)

        return tmp_p

    def EM_algorithm(self, X, epsilon=1e-10, inplace=False):
        n, p = self.n, self.p
        for i in range(1, self.num_iteration+1, 1):
            tmp_n = self.E_step(X, p)
            tmp_p = self.M_step(X, tmp_n)
            delta_n = tmp_n - n
            delta_p = tmp_p - p
            p, n = tmp_p, tmp_n
            error = np.sqrt(np.sum(delta_n**2)) + np.sqrt(np.sum(delta_p**2))
            if(error < epsilon):
                break
        if(error >= epsilon):
            print("算法没收敛")
        if(inplace):
            self.p, self.n = p, n

        return p
    
    def BootStrap_p(self, num_bootstrap):
        P = np.zeros((num_bootstrap, self.p.shape[0]))
        for i in range(num_bootstrap):
            n_c = np.random.binomial(self.sum_, self.X[0]/self.sum_, size=None)
            n_i = np.random.binomial(self.sum_-n_c, self.X[1]/(self.sum_-n_c), size=None)
            n_t = np.random.binomial(self.sum_-n_c-n_i, self.X[2]/(self.sum_-n_c-n_i), size=None)
            n_u = self.sum_-n_c-n_t-n_i

            x_generated = np.array([n_c, n_i, n_t, n_u])
            p = self.EM_algorithm(x_generated, inplace=False)
            P[i] = p
            
        return P
    
    def cal_p_statistic(self, num_bootstrap):
        P = self.BootStrap_p(num_bootstrap)
        p_mean = np.mean(P, axis=0)
        p_std = np.std(P, axis=0)
        cov = np.cov(P.T)
        cor = np.corrcoef(P.T)
        return p_mean, p_std, cov, cor
    
    def EM_gradient_algorithm(self, X, epsilon=1e-10, inplace=False):
        n, p = self.n, self.p
        for i in range(1, self.num_iteration+1, 1):
            tmp_n = self.E_step(X, p)
            tmp_p = self.gradient_M_step(X, tmp_n, p)
            delta_n = tmp_n - n
            delta_p = tmp_p - p
            p, n = tmp_p, tmp_n
            error = np.sqrt(np.sum(delta_n**2)) + np.sqrt(np.sum(delta_p**2))
            if(error < epsilon):
                break
        if(error >= epsilon):
            print("算法没收敛")
        if(inplace):
            self.p, self.n = p, n
        return p

In [3]:
X = [85, 196, 341, 578]
moth = Moth(X, num_iteration=100)

In [4]:
MLE_EM = moth.EM_algorithm(X, inplace=True)
print("EM算法得到的p的MLE为: p_c = %f, p_i = %f, p_t = %f"%(MLE_EM[0], MLE_EM[1], MLE_EM[2]))

EM算法得到的p的MLE为: p_c = 0.036067, p_i = 0.195799, p_t = 0.768134


In [5]:
num_iterations = 10000
p_mean, p_std, p_cov, p_cor = moth.cal_p_statistic(num_iterations)

In [6]:
std_p_c, std_p_i, std_p_t = np.sqrt(p_cov[0,0]), np.sqrt(p_cov[1,1]), np.sqrt(p_cov[2,2])
print("Boot strapping采样%d次后得到:"%(num_iterations))
print("p_c, p_i, p_t的标准差分别为%f, %f, %f"%(std_p_c, std_p_i, std_p_t))
print("p的相关系数矩阵为:\n", p_cor)
print("可见, corr(p_c, p_i) = %f, corr(p_c, p_t) = %f, corr(p_i, p_t) = %f"%(p_cor[0,1], p_cor[0,2], p_cor[1,2]))

Boot strapping采样10000次后得到:
p_c, p_i, p_t的标准差分别为0.003849, 0.011013, 0.011399
p的相关系数矩阵为:
 [[ 1.         -0.07258155 -0.26749594]
 [-0.07258155  1.         -0.94160231]
 [-0.26749594 -0.94160231  1.        ]]
可见, corr(p_c, p_i) = -0.072582, corr(p_c, p_t) = -0.267496, corr(p_i, p_t) = -0.941602


In [7]:
MLE_EM_grad = moth.EM_gradient_algorithm(X, inplace=False)
print("EM gradient算法得到的p的MLE为: p_c = %f, p_i = %f, p_t = %f"%(MLE_EM_grad[0], MLE_EM_grad[1], MLE_EM_grad[2]))

EM gradient算法得到的p的MLE为: p_c = 0.036067, p_i = 0.195799, p_t = 0.768134


# 4.2

In [8]:
class HIV():
    def __init__(self, X = [379,299,222,145,109,95,73,59,45,30,24,12,4,2,0,1,1], num_iteration=10000) -> None:
        from scipy.special import gamma
        self.X = np.array(X)
        self.sum_ = np.sum(X)
        self.alpha, self.beta, self.mu, self.lamda = 1/3, 1/3, 0.5, 0.5
        self.theta = (self.alpha, self.beta, self.mu, self.lamda)

        self.I = np.arange(0, self.X.shape[0], 1)
        self.zi = np.zeros(self.I.shape[0])
        self.ti = np.zeros(self.I.shape[0])
        self.pi = np.zeros(self.I.shape[0])

        self.F_gamma = gamma
        self.num_iteration = num_iteration

    def E_step(self, theta):
        alpha, beta, mu, lamda = theta

        I = np.arange(0, 17, 1)
        zi = alpha*((I==0).astype(np.int32))
        ti = beta*np.exp(-mu)*(mu**I)
        pi = (1-alpha-beta)*np.exp(-lamda)*(lamda**I)
        PI = zi+ti+pi

        zi /= PI
        ti /= PI
        pi /= PI

        return I, zi, ti, pi

    def M_step(self, x, p):
        I, zi, ti, pi = p

        sum_ = np.sum(x)
        alpha = np.sum(x[0]*zi)/sum_
        beta = np.sum(x*ti)/sum_
        mu = np.sum(I*x*ti)/np.sum(x*ti)
        lamda = np.sum(I*x*pi)/np.sum(x*pi)
        theta = (alpha, beta, mu, lamda)
        
        return theta

    def EM_algorithm(self, X, epsilon=1e-10, inplace=False):
        theta = (self.alpha, self.beta, self.mu, self.lamda)
        p = self.I, self.zi, self.ti, self.pi

        for i in range(1, self.num_iteration+1, 1):
            tmp_p = self.E_step(theta)
            tmp_theta = self.M_step(X, tmp_p)
            delta_theta = np.array(tmp_theta) - np.array(theta)
            delta_p = np.concatenate([tmp_p[j] - p[j] for j in range(len(tmp_p))])
            error = np.sqrt(np.sum(delta_theta**2)) + np.sqrt(np.sum(delta_p**2))
            p, theta = tmp_p, tmp_theta
            if(error < epsilon):
                break
        if(error >= epsilon):
            print("算法没收敛")
        if(inplace):
            self.I, self.zi, self.ti, self.pi = p
            self.theta = theta

        return theta
    
    def BootStrap_theta(self, num_bootstrap):
        P = np.zeros((num_bootstrap, len(self.theta)))
        for i in range(num_bootstrap):
            x_generated = np.zeros(self.X.shape)
            x_sampled = np.random.choice(np.arange(self.X.shape[0]), np.sum(self.X), p=self.X/np.sum(self.X))
            for j in range(x_generated.shape[0]):
                x_generated[j] = np.where(x_sampled == j)[0].shape[0]
            theta = self.EM_algorithm(x_generated, inplace=False)
            P[i] = np.array(theta)
            
        return P
    
    def cal_p_statistic(self, num_bootstrap):
        P = self.BootStrap_theta(num_bootstrap)
        p_mean = np.mean(P, axis=0)
        p_std = np.std(P, axis=0)
        cov = np.cov(P.T)
        cor = np.corrcoef(P.T)
        return p_mean, p_std, cov, cor

In [9]:
data = np.array([379,299,222,145,109,95,73,59,45,30,24,12,4,2,0,1,1])
hiv = HIV(data)

In [10]:
theta_pred = hiv.EM_algorithm(data, inplace=True)
print("预测的alpha = %f, beta = %f, mu = %f, lambda = %f"%theta_pred)

预测的alpha = 0.122166, beta = 0.562542, mu = 1.467475, lambda = 5.938889


In [11]:
num_iterations = 2000
theta_mean, theta_std, theta_cov, theta_cor = hiv.cal_p_statistic(num_iterations)

In [12]:
std_alpha, std_beta, std_mu, std_lambda = np.sqrt(theta_cov[0,0]), np.sqrt(theta_cov[1,1]), np.sqrt(theta_cov[2,2]), np.sqrt(theta_cov[3,3])
print("Boot strapping采样%d次后得到:"%(num_iterations))
print("alpha, beta, mu, lambda的标准差分别为%f, %f, %f, %f"%(std_alpha, std_beta, std_mu, std_lambda))
print("theta的相关系数矩阵为:\n", theta_cor)
print("可见,\ncorr(alpha, beta) = %f,\tcorr(alpha, mu) = %f,\tcorr(alpha, lambda) = %f,\ncorr(beta, mu) = %f,\tcorr(beta, lambda) = %f,\tcorr(mu, lambda) = %f"%(theta_cor[0,1], theta_cor[0,2], theta_cor[0,3], theta_cor[1,2], theta_cor[1,3], theta_cor[2,3]))

Boot strapping采样2000次后得到:
alpha, beta, mu, lambda的标准差分别为0.044971, 0.089553, 1.290534, 1.426267
theta的相关系数矩阵为:
 [[ 1.         -0.60861803  0.41425872 -0.42970013]
 [-0.60861803  1.         -0.9426542   0.96090041]
 [ 0.41425872 -0.9426542   1.         -0.97836479]
 [-0.42970013  0.96090041 -0.97836479  1.        ]]
可见,
corr(alpha, beta) = -0.608618,	corr(alpha, mu) = 0.414259,	corr(alpha, lambda) = -0.429700,
corr(beta, mu) = -0.942654,	corr(beta, lambda) = 0.960900,	corr(mu, lambda) = -0.978365


# 4.3


In [13]:
class TripleNormal():
    def __init__(self, data, num_iteration = 1000) -> None:
        self.not_known = 0x7fffffff
        self.data_normal = np.array(data.fillna(self.not_known))
        self.not_known_idx1, self.not_known_idx2, self.not_known_idx3 = self.get_unknown_idx(self.data_normal)
        self.konwn0 = np.where(np.array(self.data_normal[:,0])!=self.not_known)
        self.konwn1 = np.where(np.array(self.data_normal[:,1])!=self.not_known)
        self.konwn2 = np.where(np.array(self.data_normal[:,2])!=self.not_known)
        self.known_all = np.intersect1d(self.konwn0, np.intersect1d(self.konwn1, self.konwn2))

        self.mean = np.mean(self.data_normal[self.known_all], axis=0, keepdims=True)
        self.cov= np.array([[1.0, 0.6, 1.2],
                            [0.6, 0.5, 0.5],
                            [1.2, 0.5, 3.0]])
        self.alpha = np.array([[2,4,6]])
        self.beta = np.array([[2,2,2]])

        self.num_iteration = num_iteration

    def get_unknown_idx(self, data):
        not_known_idx = np.where(data == self.not_known)
        not_knowns = [([], []), ([], []), ([], [])]
        tmp_i = -1
        tmp_js = []
        for i, raw in enumerate(not_known_idx[0]):
            if(raw!=tmp_i):
                if(tmp_i != -1):
                    tmp_len = len(tmp_js)
                    not_knowns[tmp_len-1][0].append(tmp_i)
                    not_knowns[tmp_len-1][1].append(tmp_js)
                tmp_i = raw
                tmp_js = []
            tmp_js.append(not_known_idx[1][i])
            if(i == not_known_idx[0].shape[0]-1):
                tmp_len = len(tmp_js)
                not_knowns[tmp_len-1][0].append(tmp_i)
                not_knowns[tmp_len-1][1].append(tmp_js)

        not_known_idx1 = not_knowns[0]
        not_known_idx2 = not_knowns[1]
        not_known_idx3 = not_knowns[2]

        return not_known_idx1, not_known_idx2, not_known_idx3
    
    def value(self, data, mean, cov):
        K = data.shape[1]
        cov_inv = np.linalg.inv(cov)
        alpha = self.alpha
        beta = self.beta

        tmp = data-mean
        ret1 = -1/2*np.sum(K*np.log(2*np.pi)+np.log(np.linalg.det(cov)) + np.sum(tmp*(cov_inv@(tmp.T)), axis=1))
        tmp2 = -(mean-alpha)/beta
        ret2 = np.sum(tmp2 - np.log(beta) - 2*np.log(1+np.exp(tmp2)))

        return ret1 + ret2
    
    def grad_sigma(self, data, mean, cov):
        N = data.shape[0]
        cov_inv = np.linalg.inv(cov)

        ret1 = -1/2*N*cov_inv
        tmp = data-mean
        ret2 = -1/2*(tmp.T@tmp)

        return ret1 + ret2
    
    def grad(self, data, mean, cov):
        K = data.shape[1]
        cov_inv = np.linalg.inv(cov)
        alpha = self.alpha
        beta = self.beta

        ret1 = -np.sum((data-mean)@cov_inv, axis=0, keepdims=True)
        ret2 = -1/beta + 2/(beta*(1+np.exp((mean-alpha)/beta)))

        return ret1 + ret2
    
    def hessian(self, data, mean, cov):
        K = data.shape[1]
        N = data.shape[0]
        cov_inv = np.linalg.inv(cov)
        alpha = self.alpha
        beta = self.beta

        ret1 = cov_inv*N
        tmp = np.exp((mean-alpha)/beta)
        ret2 = -2*np.diag((tmp/((beta*(1+tmp))**2)))

        return ret1 + ret2
    
    def cond_mean(self, data, mean, cov, unknown_indices, known_indices):
        known_cov = cov[np.ix_(known_indices, known_indices)]
        known_unknown_cov = cov[np.ix_(unknown_indices, known_indices, )]
        unknown_cov = cov[np.ix_(unknown_indices, unknown_indices)]
        known_values = data[known_indices]
        conditional_mean = mean[0, unknown_indices] + known_unknown_cov @ np.linalg.inv(known_cov) @ (known_values - mean[0, known_indices])

        return conditional_mean
    
    def cond_cov(self, data, mean, cov, unknown_indices, known_indices):
        known_cov = cov[known_indices, known_indices]
        known_unknown_cov = cov[known_indices, unknown_indices]
        unknown_cov = cov[unknown_indices, unknown_indices]
        conditional_cov = unknown_cov - known_unknown_cov @ np.linalg.inv(known_cov) @ known_unknown_cov.T

        return conditional_cov

    def E_step(self, data, mean, cov):
        tmp_data = data.copy()
        indices = np.arange(0,3,1).astype(np.int32)
        for i, raw in enumerate(self.not_known_idx1[0]):
            unknown_indices = np.array(self.not_known_idx1[1][i])
            known_indices = np.setdiff1d(indices, unknown_indices)
            cond_mean = self.cond_mean(tmp_data[raw], mean, cov, unknown_indices, known_indices)
            tmp_data[raw, unknown_indices] = cond_mean

        for i, raw in enumerate(self.not_known_idx2[0]):
            unknown_indices = np.array(self.not_known_idx2[1][i])
            known_indices = np.setdiff1d(indices, unknown_indices)
            cond_mean = self.cond_mean(tmp_data[raw], mean, cov, unknown_indices, known_indices)
            tmp_data[raw, unknown_indices] = cond_mean

        for i, raw in enumerate(self.not_known_idx3[0]):
            unknown_indices = np.array(self.not_known_idx3[1][i])
            known_indices = np.setdiff1d(indices, unknown_indices)
            cond_mean = self.cond_mean(tmp_data[raw], mean, cov, unknown_indices, known_indices)
            tmp_data[raw, unknown_indices] = cond_mean

        return tmp_data

    def M_step(self, data):
        mu = np.mean(data, axis=0, keepdims=True)
        cov = np.cov(data.T)

        return mu, cov
    
    def EM_algorithm(self, data, epsilon = 1e-10, inplace = False):
        tmp_data = np.array(data.fillna(self.not_known))
        mean = self.mean
        cov = self.cov
        for i in range(1, self.num_iteration+1, 1):
            tmp_data = self.E_step(tmp_data, mean, cov)
            tmp_mean, tmp_cov = self.M_step(tmp_data)
            delta_mean = tmp_mean - mean
            delta_cov = tmp_cov - cov
            error = np.sqrt(np.sum(delta_mean**2)) + np.sqrt(np.sum(delta_cov**2))
            mean, cov = tmp_mean, tmp_cov
            if(error < epsilon):
                break
        if(error >= epsilon):
            print("算法没收敛")
        if(inplace):
            self.mean, self.cov, = mean ,cov
            self.data = tmp_data

        return mean, cov
    
    def gradient_M_step(self, data, fit_sigma = False, delta = 1e-10, learning_rate = 0.01):
        mean = np.mean(data, axis=0)
        cov = np.cov(data.T)
        if(not fit_sigma): 
            cov = self.cov
        error = 1
        if(not fit_sigma):
            while(error > delta):
                grad = self.grad(data, mean, cov)
                hessian = self.hessian(data, mean, cov)
                hessian_inv = np.linalg.inv(hessian)
                delta_mean = -grad@hessian_inv
                mean = mean + delta_mean
                error = np.sqrt(np.sum(delta_mean**2))
        else:
            while(error > delta):
                grad = self.grad(data, mean, cov)
                grad_sigma = self.grad_sigma(data, mean, cov)
                print(grad_sigma)
                hessian = self.hessian(data, mean, cov)
                hessian_inv = np.linalg.inv(hessian)
                delta_mean = -grad@hessian_inv
                delta_sigma = learning_rate*grad_sigma
                mean = mean + delta_mean
                cov = cov + delta_sigma
                error = np.sqrt(np.sum(delta_mean**2) + np.sqrt(np.sum(delta_sigma.T@delta_sigma)))
                
        return mean, cov

    def EM_gradient_algorithm(self, data, epsilon=1e-10, inplace = False, fit_sigma = False):
        tmp_data = np.array(data.fillna(self.not_known))
        mean = self.mean
        cov = self.cov
        for i in range(1, self.num_iteration+1, 1):
            tmp_data = self.E_step(tmp_data, mean, cov)
            tmp_mean, tmp_cov = self.gradient_M_step(tmp_data, fit_sigma = fit_sigma)
            delta_mean = tmp_mean - mean
            delta_cov = tmp_cov - cov
            error = np.sqrt(np.sum(delta_mean**2)) + np.sqrt(np.sum(delta_cov**2))
            mean, cov = tmp_mean, tmp_cov
            if(error < epsilon):
                break
        if(error >= epsilon):
            print("算法没收敛")
        if(inplace):
            self.mean, self.cov, = mean ,cov
            self.data = tmp_data
        return mean, cov

In [14]:
data_normal = pd.read_csv("trivariatenormal.dat", sep=" ")
data_normal.head(5)

Unnamed: 0,x1,x2,x3
0,,2.67,
1,0.44,2.73,7.68
2,1.36,2.96,
3,-1.76,0.31,9.71
4,,,9.49


In [15]:
tripnormal = TripleNormal(data_normal)

In [16]:
mu, sigma = tripnormal.EM_algorithm(data_normal, inplace = False)
print("预测的mu为: ", mu)
print("预测的sigma为:\n",sigma)

预测的mu为:  [[0.87883229 2.85094528 9.02942437]]
预测的sigma为:
 [[1.3511929  0.95683854 1.28486395]
 [0.95683854 0.73441359 0.69129236]
 [1.28486395 0.69129236 2.36967305]]


In [17]:
mu_grad, sigma_grad = tripnormal.EM_gradient_algorithm(data_normal, inplace = False)
print("EM gradient方法预测的mu为: ", mu_grad)
print("预测的sigma为:\n",sigma_grad)

EM gradient方法预测的mu为:  [[0.8790627  2.8457106  9.03311281]]
预测的sigma为:
 [[1.  0.6 1.2]
 [0.6 0.5 0.5]
 [1.2 0.5 3. ]]


# 4.4

In [18]:
class Coupling():
    def __init__(self, 
                 data = np.array([6.94, 5.50, 4.54, 2.14, 3.65, 3.40, 4.38, 10.24, 4.56, 9.42, 4.55, 4.15, 5.64, 10.23]), 
                 mask = np.array([1,0,0,0,1,1,1,0,0,0,1,1,0,1]),
                 a = 0.003,
                 b = 2.5,
                 num_iteration = 1000) -> None:
        from scipy.special import gammaincc, gamma
        self.data = data
        self.mask = mask

        self.a = a
        self.b = b
        self.theta = np.array([[self.a], [self.b]])

        self.num_iteration = num_iteration
        self.gammaincc = gammaincc
        self.gamma = gamma
        self.E_steps={"Integrate": self.E_step, "MonteCarol": self.MC_E_step}

    def value(self, theta, data):
        a = theta[0, 0]
        b = theta[1, 0]

        return np.sum(np.log(a) + np.log(b) + (b-1)*np.log(data) - a*data**b)
    
    def grad(self, theta, data):
        a = theta[0, 0]
        b = theta[1, 0]
        tmp = np.log(data)
        grad_a = np.sum(1/a - data**b)
        grad_b = np.sum(1/b + tmp - a*data**(b)*tmp)
        grad = np.array([[grad_a],[grad_b]])

        return grad
    
    def hessian(self, theta, data):
        a = theta[0, 0]
        b = theta[1, 0]
        tmp = np.log(data)
        grad_aa = np.sum(-1/a**2)
        grad_ab = np.sum(-data**(b)*tmp)
        grad_bb = np.sum(-1/b**2 - a*data**(b)*tmp**2)
        hessian = np.array([[grad_aa, grad_ab], [grad_ab, grad_bb]])

        return hessian
    
    def grad_b(self, theta, data):
        a = theta[0, 0]
        b = theta[1, 0]
        tmp = np.log(data)
        return np.sum(1/b + tmp - a*data**(b)*tmp)
    
    def grad_b2(self, theta, data):
        a = theta[0, 0]
        b = theta[1, 0]
        tmp = np.log(data)
        return np.sum(-1/b**2 - a*data**(b)*tmp**2)
    
    def E_step(self, data, mask, theta):
        tmp_data = data.copy()
        tmp_mask = mask.copy()

        a = theta[0, 0]
        b = theta[1, 0]
        idx = np.where(tmp_mask==1)

        tmp1 = 1+1/b
        tmp2 = a*tmp_data[idx]**b
        tmp_data[idx] = tmp_data[idx]*(tmp2**(-1/b))*(self.gammaincc(tmp1, tmp2)*self.gamma(tmp1))

        return tmp_data, tmp_mask
    
    def MC_E_step(self, data, mask, theta, sample_size = 10000):
        from numpy.random import default_rng
        tmp_data = data.copy()
        tmp_mask = mask.copy()
        idx = np.where(tmp_mask==1)
        a = theta[0, 0]
        b = theta[1, 0]
        lamda = (1/a)**(1/b)

        rng = default_rng()
        for i in idx[0]:
            threshold = tmp_data[i]
            count = 0
            samples = []
            while(count < sample_size):
                tmp_samples = lamda*rng.weibull(b, size=sample_size - count)
                tmp_samples = tmp_samples[np.where(tmp_samples>=threshold)]
                samples.append(tmp_samples)
                count += tmp_samples.shape[0]
            samples = np.concatenate(samples)
            tmp_data[i] = np.mean(samples)
        
        return tmp_data, tmp_mask
    
    def gradient_M_step(self, data, theta, delta = 1e-10):
        tmp_data = data.copy()
        a = theta[0, 0]
        b = theta[1, 0]

        a = 1/np.mean(tmp_data**b)
        tmp_theta = np.array([[a], [b]])
        error = 1
        while(error > delta):
            '''
            grad = self.grad(tmp_theta, tmp_data)
            hessian = self.hessian(tmp_theta, tmp_data)
            hessian_inv = np.linalg.inv(hessian)
            delta_theta = -grad.T@hessian_inv
            tmp_theta = tmp_theta + delta_theta
            error = np.sqrt(np.sum(delta_theta**2))
            print(error)
            '''
            grad = self.grad_b(tmp_theta, tmp_data)
            hessian = self.grad_b2(tmp_theta, tmp_data)
            hessian_inv = 1/hessian
            delta_b = -grad*hessian_inv
            tmp_b = tmp_theta[1,0] + delta_b
            error = np.sqrt((tmp_b-tmp_theta[1,0])**2)
            tmp_theta = np.array([[a],[tmp_b]])

        return tmp_theta
    
    def EM_algorithm(self, data, mask, epsilon=1e-10, inplace=False, E_step = "Integrate"):
        tmp_data = data.copy()
        theta = self.theta
        for i in range(1, self.num_iteration+1, 1):
            tmp_data, tmp_mask = self.E_steps[E_step](tmp_data, mask, theta)
            tmp_theta = self.gradient_M_step(tmp_data, theta)
            delta_theta = tmp_theta - theta
            error = np.sqrt(np.sum(delta_theta**2)) 
            theta = tmp_theta
            if(error < epsilon):
                break
        if(error >= epsilon):
            print("算法没收敛")
        if(inplace):
            self.theta = theta
            self.data = tmp_data
        return theta

In [19]:
data = np.array([6.94, 5.50, 4.54, 2.14, 3.65, 3.40, 4.38, 10.24, 4.56, 9.42, 4.55, 4.15, 5.64, 10.23])
mask = np.array([1,0,0,0,1,1,1,0,0,0,1,1,0,1])

In [20]:
coupling = Coupling(data, mask)

In [21]:
theta_coup = coupling.EM_algorithm(data, mask)
print("EM算法得到的预测值为, a = %f, b = %f"%(theta_coup[0,0], theta_coup[1,0]))

EM算法得到的预测值为, a = 0.013325, b = 2.484870
