In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn import datasets
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.decomposition import PCA#主成分分析
from sklearn.mixture import GaussianMixture,BayesianGaussianMixture
from sklearn.naive_bayes import GaussianNB

from scipy.stats import multivariate_normal

from collections import Counter
import math

In [42]:
class GMM():
    def __init__(self,n_components = 5):
        self.model = None
        self.k = n_components
        
#     def GMM_component(self, X, theta, param, c):
# #         print(1)
#         for i in range(param['dim']):
#             for j in range(param['dim']):
#                 if np.isnan(theta['sigma'][c,i,j]):
#                     theta['sigma'][c,i,j] = 0
#         theta['sigma'][c, ...] += self.regularization
#         return theta['pi'][c]*multivariate_normal(theta['mu'][c], theta['sigma'][c, ...], allow_singular = True).pdf(X)
 
#     def E_step(self, X, theta, param):
#         '''
#         E步：更新隐变量概率分布q(Z)。
#         '''
#         q = np.zeros((param['k'], len(X)))
#         for i in range(param['k']):
#             q[i, :] = self.GMM_component(X, theta, param, i)
#         q /= q.sum(axis=0)
#         return q

#     def M_step(self, X, q, theta, param):
#         '''
#         M步：使用q(Z)更新GMM参数。
#         '''
#         pi_temp = q.sum(axis=1); pi_temp /= len(X) # 计算pi
#         mu_temp = q.dot(X); mu_temp /= q.sum(axis=1)[:, None] # 计算mu
#         sigma_temp = np.zeros((param['k'], param['dim'], param['dim']))
#         for i in range(param['k']):
#             ys = X - mu_temp[i, :]
#             sigma_temp[i] = np.sum(q[i, :, None, None]*np.matmul(ys[..., None], ys[:, None, :]), axis=0)
#         sigma_temp /= np.sum(q, axis=1)[:, None, None] # 计算sigma
#         theta['pi'] = pi_temp; theta['mu'] = mu_temp; theta['sigma'] = sigma_temp
#         return theta

#     def likelihood(self, X, theta, param):
#         '''
#         计算GMM的对数似然。
#         '''
#         ll = 0
#         for i in range(param['k']):
#             ll += self.GMM_component(X, theta, param, i)
#         ll = np.log(ll).sum()
#         return ll

#     def EM_GMM(self, X, theta, param, eps=1e-5, max_iter=100):
#         '''
#         高斯混合模型的EM算法求解
#             theta: GMM模型参数; param: 其它系数; eps: 计算精度; max_iter: 最大迭代次数
#             返回对数似然和参数theta，theta是包含pi、mu、sigma的Python字典
#         '''
#         for i in range(max_iter):
#             ll_old = 0
#             # E-step
#             q = self.E_step(X, theta, param)
#             # M-step
#             theta = self.M_step(X, q, theta, param)
#             ll_new = self.likelihood(X, theta, param)
#             if np.abs(ll_new - ll_old) < eps:
#                 break;
#             else:
#                 ll_old = ll_new
#         return theta

    def EM_GMM(self, X, theta, param, eps=1e-5, max_iter=100):
        P_mat = np.zeros((len(X), param['k']))  # 概率矩阵
        for i in range(max_iter):# 迭代次数
            for k in range(param['k']):
                theta['sigma'][k, ...] += self.regularization
                P_mat[:, k] = theta['pi'][k]*multivariate_normal(theta['mu'][k], theta['sigma'][k, ...], allow_singular = True).pdf(X)
                
            totol_N = P_mat.sum(axis=1)  # 计算各样本出现的总频率
            totol_N[totol_N == 0] = param['k']# 如果某一样本在各类中的出现频率和为0，则使用K来代替，相当于分配等概率
            P_mat /= totol_N.reshape(-1, 1)
           
            #### M-step，更新参数 ####
            for k in range(param['k']):
                Nk = np.sum(P_mat[:, k], axis=0)  # 类出现的频率
                theta['pi'][k] = Nk / len(X)
                theta['mu'][k] = (1 / Nk) * np.sum(X *P_mat[:, k].reshape(-1, 1), axis=0)  # 该类的新均值
                theta['sigma'][k] = (1 / Nk) * np.dot((P_mat[:, k].reshape(-1, 1)* (X - theta['mu'][k])).T,
                                                          (X - theta['mu'][k]))
                
   # 分类别求出数学期望和标准差
    def train(self, X, y):
        theta = {}; param = {}
        param['k'] = self.k; param['N'] = X.shape[0]; param['dim'] = X.shape[1]#这里的N是整个训练样本的样本数，在实际计算时取相同标签的样本数
        theta['pi'] = np.ones(param['k'])/param['k']                 # 均匀初始化
        theta['mu'] = np.random.random((param['k'],param['dim']))    # 随机初始化
        theta['sigma'] = np.array([np.eye(param['dim'])]*param['k']) # 初始化为单位正定矩阵
        self.regularization = np.dot(np.eye(param['dim']),0.001)
  
        labels = list(set(y))#标签的列表
        data = {label:[] for label in labels}#{0.0: [], 1.0: []}
        for f, label in zip(X, y):
            data[label].append(f)#print(data)#形成一个字典，根据标签将训练样本进行分类
       
        self.model = {label:{} for label in range(len(labels))}        
        for i in range(len(labels)):
            self.model[i] = self.EM_GMM(data[i],theta,param,eps=1e-5)
            
#             dpgmm = GaussianMixture(n_components=param['k']).fit(data[i])
#             self.model[i]['pi'] = dpgmm.weights_
#             self.model[i]['mu'] = dpgmm.means_
#             self.model[i]['sigma'] = dpgmm.covariances_
#         print(self.model)
        return self.model

    # 计算概率
    def calculate_probabilities(self, input_data):
        probabilities = {}
        dim = np.size(input_data)
        
        for label, value in self.model.items():#value是一个字典，表示特定标签的模型参数
            pp = 0.0
            for i in range(self.k):
                mu,sigma = value['mu'][i],value['sigma'][i]
                pp += multivariate_normal(mu, sigma, allow_singular = True).pdf(input_data)*value['pi'][i]
            probabilities[label] = pp
            
        return probabilities
    
    # 类别
    def predict(self, X_test):
        label = list(range(X_test.shape[0]))
        for i in range(X_test.shape[0]):#每个样本迭代一次
            label[i] = sorted(self.calculate_probabilities(X_test[i,:]).items(), key=lambda x: x[-1])[-1][0]
        
        return label
    
    
iris = datasets.load_iris()
X=iris.data
y=iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)


model = GMM(n_components=5)
model.train(X_train, y_train)
y_pred = model.predict(X_test)

print("IRIS:Number of mislabeled points out of a total %d points : %d, Acc: %f%%"
      % (X_test.shape[0], (y_test != y_pred).sum(),100*(y_test == y_pred).sum()/X_test.shape[0]))


(120, 4) (30, 4) (120,) (30,)
IRIS:Number of mislabeled points out of a total 30 points : 3, Acc: 90.000000%


In [2]:
class GMM:
    def __init__(self):
        self.model = {}
        self.k = 3
        
    def EM_GMM(self,X,theta,param,eps=1e-5,max_iter=1000):#eps: 计算精度; max_iter: 最大迭代次数
        dpgmm = BayesianGaussianMixture(n_components=param['k']).fit(X)
        theta['pi'] = dpgmm.weights_
        theta['mu'] = dpgmm.means_
        theta['sigma'] = dpgmm.covariances_
        return theta 

    # 分类别求出数学期望和标准差
    def train(self, X, y):
        theta = {}; param = {}
        param['k'] = self.k; param['N'] = X.shape[0]; param['dim'] = X.shape[1]#这里的N是整个训练样本的样本数，在实际计算时取相同标签的样本数
        theta['pi'] = np.ones(param['k'])/param['k']                 # 均匀初始化
        theta['mu'] = np.random.random((param['k'],param['dim']))    # 随机初始化
        theta['sigma'] = np.array([np.eye(param['dim'])]*param['k']) # 初始化为单位正定矩阵
        self.regularization = np.dot(np.eye(param['dim']),0.001)
  
        labels = list(set(y))#标签的列表
        data = {label:[] for label in labels}#{0.0: [], 1.0: []}
        for f, label in zip(X, y):
            data[label].append(f)#print(data)#形成一个字典，根据标签将训练样本进行分类
       
        self.model = {label:{} for label in range(len(labels))}
        
        for i in range(len(labels)):
            dpgmm = BayesianGaussianMixture(n_components=param['k']).fit(data[i])
            self.model[i]['pi'] = dpgmm.weights_
            self.model[i]['mu'] = dpgmm.means_
            self.model[i]['sigma'] = dpgmm.covariances_

        return self.model

    # 计算概率
    def calculate_probabilities(self, input_data):
        probabilities = {}
        dim = np.size(input_data)
        
        for label, value in self.model.items():#value是一个字典，表示特定标签的模型参数
            pp = 0.0
            for i in range(self.k):
                mu,sigma = value['mu'][i],value['sigma'][i]
                pp += multivariate_normal(mu,sigma).pdf(input_data)*value['pi'][i]
            probabilities[label] = pp
            
        return probabilities
    
    # 类别
    def predict(self, X_test):
        label = list(range(X_test.shape[0]))
        for i in range(X_test.shape[0]):#每个样本迭代一次
            label[i] = sorted(self.calculate_probabilities(X_test[i,:]).items(), key=lambda x: x[-1])[-1][0]
        
        return label
    

## 混合高斯分类MNIST数据

In [None]:
from sklearn.decomposition import PCA#主成分分析
from sklearn.datasets import fetch_mldata
 
mnist = fetch_mldata('MNIST original',data_home="E:\scikit_learn_data")
X, y = mnist["data"], mnist["target"]

X_reduced = PCA(n_components=50).fit_transform(X)#特征提取
X_train_d, X_test_d, y_train_d, y_test_d = train_test_split(X_reduced, y, test_size=0.1, random_state=1)#random_state=1使数据集顺序打乱
# X_train_d = (X_train_d-np.min(X_train_d))/(np.max(X_train_d)-np.min(X_train_d))
# X_test_d = (X_test_d-np.min(X_test_d))/(np.max(X_test_d)-np.min(X_test_d))

model_d = GMM()
model_d.train(X_train_d, y_train_d)
y_pred_d = model_d.predict(X_test_d)

print("MNIST:Number of mislabeled points out of a total %d points : %d, Acc: %f%%"
      %(X_test_d.shape[0], (y_test_d != y_pred_d).sum(),100*(y_test_d == y_pred_d).sum()/X_test_d.shape[0]))



## 混合高斯分类CIFAR10数据

In [6]:
import pickle

def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

cifar_batch = {}
cifar_batch[0] = unpickle(".\CIFAR10\data_batch_5")
cifar_batch[1] = unpickle(".\CIFAR10\data_batch_1")
cifar_batch[2] = unpickle(".\CIFAR10\data_batch_2")
cifar_batch[3] = unpickle(".\CIFAR10\data_batch_3")
cifar_batch[4] = unpickle(".\CIFAR10\data_batch_4")
cifar_test = unpickle(".\CIFAR10\\test_batch")
cifar_batch_meta = unpickle(".\CIFAR10\\batches.meta")

model_c = GMM()
for i in range(5):
    X_c,y_c = cifar_batch[i][b'data'],cifar_batch[i][b'labels']
    X_c = X_c/255#对图片数据进行归一化
    y_c = np.array(y_c)#将y_c转化为numpy数组
    X_reduced_c = PCA(n_components=30).fit_transform(X_c)#特征提取
#     X_reduced_c = (X_reduced_c-np.min(X_reduced_c))/(np.max(X_reduced_c)-np.min(X_reduced_c))
    model_c.train(X_reduced_c, y_c)

X_test,y_test = cifar_test[b'data'],cifar_test[b'labels']
X_test = X_test/255
y_test_c = np.array(y_test)
X_test_c = PCA(n_components=30).fit_transform(X_test)#特征提取
# X_test_c = (X_test_c-np.min(X_test_c))/(np.max(X_test_c)-np.min(X_test_c))#数据归一化
y_pred_c = model_c.predict(X_test_c)

print("CIFAR:Number of mislabeled points out of a total %d points : %d, Acc: %f%%"
      %(X_test_c.shape[0], (y_test_c != y_pred_c).sum(),100*(y_test_c == y_pred_c).sum()/X_test_c.shape[0]))



CIFAR:Number of mislabeled points out of a total 10000 points : 8608, Acc: 13.920000%


使用混合高斯对CIFAR10分类精度过低（挠头）