In [None]:
import numpy as np
import math
import matplotlib.pyplot as plt
import random

In [None]:
colorMaps1 = {
    0: '#000000',
    1: '#ff0000',
    2: '#0000ff',
    3: '#00ff00',
    7: '#00ff00',
    4: '#ff6100',
    5: '#0000ff',
    6: '#ff0000'
}


def strToHexWithRatio(string, ratio):
    string = string.replace('#', '')
    myHex = int(string, 16)
    myHex = int(ratio * myHex)
    myStr = '{:0>6x}'.format(myHex)
    myStr = '#' + myStr
    return myStr

In [None]:
# k个分类
# M为每个分布的样本个数
# mean为平均值矩阵组成的一个list
# cov为协方差矩阵组成的list
def genGMM(k, mean, cov, M):
    X = np.random.multivariate_normal(mean[0], cov[0], M).T
    Y = []
    Y.append(0 * np.ones((1, M)))
    for i in range(1, k):
        X = np.c_[X, np.random.multivariate_normal(mean[i], cov[i], M).T]
        Y.append(i * np.ones((1, M)))

    # 归一化
    theMax = np.max(X, axis=1).reshape((2, -1))
    theMin = np.min(X, axis=1).reshape((2, -1))
    X = X - theMin
    X = X / (theMax - theMin)
    return X, Y


# 随机选取k个点
# points中
# 第一行是横坐标
# 第二行是纵坐标
# 第三行代表第i个分类
def genPoints(k):
    points = np.array([random.random(), random.random(), 0]).reshape((-1, 1))
    for i in range(1, k):
        temp = np.array([random.random(), random.random(), i]).reshape((-1, 1))
        points = np.c_[points, temp]
    return points


# 从X中随机选取k个点
def genPoints(k, X):
    temp = X.copy().T
    np.random.shuffle(temp)
    temp = temp.T
    index = random.randint(0, k * M - k - 1)
    points = temp[:, index:index + k].reshape(2, -1)
    return points


# k-means算法的一次迭代
def kmeans(k, M, points):
    theMax = 10 * np.ones((1, k * M))
    label = -1 * np.ones((1, k * M))
    for i in range(k):
        temp = np.sum((X - points[0:2, i].reshape(2, -1))**2, axis=0)
        temp = temp.reshape((1, -1))
        label[temp < theMax] = i
        theMax[temp < theMax] = temp[temp < theMax]
    newPoints = points.copy()
    label.reshape((1, -1))
    #     print(X.shape)
    #     print((label==i).shape)

    #     a = X[0].reshape(1,-1)
    #     print(a.shape)
    #     print(a[label==i])

    for i in range(k):
        if label.all() != i:
            continue
        a = X[0].reshape(1, -1)
        xSum = np.sum(a[label == i]) / np.sum(label == i)
        b = X[1].reshape(1, -1)
        ySum = np.sum(b[label == i]) / np.sum(label == i)
        newPoints[0][i] = xSum
        newPoints[1][i] = ySum

    varSum = 0
    for i in range(label.size):
        x1 = (X[0, i] - points[0, label[0, i].astype('int')])**2
        x2 = (X[1, i] - points[1, label[0, i].astype('int')])**2
        varSum += (x1 + x2)**0.5
    varSum /= k * M
    return newPoints, label, varSum


# alpha.shape = (1, k)
def getAlpha(X, label, k, M):
    alpha = np.ones((1, k))
    for i in range(k):
        alpha[0, i] = np.sum(label == i) / (k * M)
    return alpha


# sigma.shape = (2, 2*k)
def getSigma(X, label):
    sigma = np.ones((2, 2 * k))
    tempX = X[0].reshape((1, -1))
    tempY = X[1].reshape((1, -1))
    flag = True
    for i in range(k):
        temp1 = tempX[label == i].reshape((1, -1))
        temp2 = tempY[label == i].reshape((1, -1))
        temp = np.r_[temp1, temp2]
        if flag:
            sigma = np.cov(temp)
            flag = False
        else:
            sigma = np.c_[sigma, np.cov(temp)]
    #     print(sigma)
    return sigma


# mu.shape = (2, k)
def getMu(X, points, label, k):
    mu = np.ones((2, k))
    for i in range(k):
        A = X[0].reshape((1, -1))
        B = X[1].reshape((1, -1))
        muA = np.sum(A[label == i]) / np.sum(label == i)
        muB = np.sum(B[label == i]) / np.sum(label == i)
        mu[0, i] = muA
        mu[1, i] = muB
#     print(mu)
    return mu


# 概率密度
# phi.shape = (k, k*M)
def getPhi(X, mu, sigma, k, M):
    phi = np.ones((k, k * M))
    for i in range(k):
        mySigma = sigma[:, i * 2:i * 2 + 2].reshape((2, -1))
        for j in range(k * M):
            sub = X[:, j] - mu[:, i]
            inv = np.linalg.inv(mySigma)
            dot = np.dot(sub.T, inv)
            dot = np.dot(dot, sub)
            exp = math.exp(dot / -2)
            det = np.linalg.det(mySigma)
            coef = 1 / (2 * math.pi * det)
            phi[i, j] = coef * exp
    return phi


# 响应度
# gamma.shape = (k, k*M)
def getGamma(phi, alpha, k, M):
    gamma = np.ones((k, k * M))
    for i in range(k):
        for j in range(k * M):
            up = phi[i, j] * alpha[0, i]
            down = 0
            for index in range(k):
                down += alpha[0, index] * phi[index, j]
            gamma[i, j] = up / down
    return gamma


# 估计新的均值
def getNewMu(gamma, X, k):
    flag = True
    mu = np.ones((2, k))
    for i in range(k):
        up = np.sum(X * gamma[i].reshape((1, -1)), axis=1).reshape((2, 1))
        down = np.sum(gamma[i])
        if flag:
            mu = up / down
            flag = False
        else:
            mu = np.c_[mu, up / down]


#     print(mu)
#     print(mu.shape)
    return mu


def getNewSigma(gamma, X, mu, k, M):
    flag = True
    sigma = np.ones((2, 2 * k))
    for i in range(k):
        down = np.sum(gamma[i])
        up = np.zeros((2, 2))
        for j in range(k * M):
            temp = (X[:, j] - mu[:, i]).reshape((2, 1))
            up += np.dot(temp, temp.T) * gamma[i][j]
        if flag:
            sigma = up / down
            flag = False
        else:
            sigma = np.c_[sigma, up / down]
    return sigma


def getNewAlpha(gamma, k, M):
    alpha = np.sum(gamma, axis=1) / (k * M)
    alpha = alpha.reshape((1, -1))
    return alpha


def EStep(alpha, sigma, mu, k, M, X):
    phi = getPhi(X, mu, sigma, k, M)
    gamma = getGamma(phi, alpha, k, M)
    return gamma


def MStep(gamma, mu, k, M, X):
    newSigma = getNewSigma(gamma, X, mu, k, M)
    newMu = getNewMu(gamma, X, k)
    newAlpha = getNewAlpha(gamma, k, M)
    return newAlpha, newMu, newSigma


def EM(alpha, sigma, mu, k, M, X):
    newGamma = EStep(alpha, sigma, mu, k, M, X)
    newAlpha, newMu, newSigma = MStep(newGamma, mu, k, M, X)
    return newAlpha, newMu, newSigma


def control(X, mu, sigma, alpha, k, M):
    stop = False
    phi = getPhi(X, mu, sigma, k, M)
    a = alpha.T
    mul = phi * a
    Sum = np.sum(mul, axis=1) / (k * M)
    #     Mul = Sum.prod()
    Mul = np.log(Sum)
    Mul = np.sum(Sum)
    return Mul

In [None]:
k = 4
M = 400
mean = [[0, 0], [0, 1], [1, 0], [1, 1]]
cov = []
cov.append(0.04 * np.eye(2))
cov.append(0.04 * np.eye(2))
cov.append(0.04 * np.eye(2))
cov.append(0.04 * np.eye(2))

X, Y = genGMM(k, mean, cov, M)
points = genPoints(k, X)
# points = np.array(np.meshgrid(np.linspace(0.25,0.75,2),np.linspace(0.25,0.75,2))).reshape((2,-1))
# points = np.r_[points,np.array([0,1,2,3]).reshape((1,-1))]

newPoints = points.copy()
label = np.array(Y)
label = label.reshape((1, -1))
print(getMu(X, points, label, k))
print((getSigma(X, label)))

In [None]:
# 画出样本点
plt.figure()
x = X[0, :].reshape((1, -1))
y = X[1, :].reshape((1, -1))
# plt.plot(x, y, '.', color='blue')
for i in range(k):
    plt.plot(x[label == i], y[label == i], '.')
plt.xlim((-0.05, 1.05))
plt.ylim((-0.05, 1.05))
plt.show()

In [None]:
varSum = float('inf')
res = points.copy()
for i in range(100):
    flag = True
    count = 0
    while (not (newPoints == points).all()) or flag:
        count = count + 1
        flag = False
        points = newPoints.copy()
        newPoints, label, newVarSum = kmeans(k, M, points)
    if newVarSum <= varSum:
        resLabel = label
        varSum = newVarSum
        res = newPoints

newPoints = res
label = resLabel

print("最终的", k, "个样本中心点为:")
print(res)
print(varSum)

In [None]:
# 画出结果
plt.figure()
x = X[0].reshape((1, -1))
y = X[1].reshape((1, -1))
pointX = newPoints[0]
pointY = newPoints[1]
for i in range(k):
    plt.plot(x[label == i], y[label == i], '.')
plt.plot(pointX, pointY, '+', color='black')
plt.xlim((-0.05, 1.05))
plt.ylim((-0.05, 1.05))
plt.show()

In [None]:
alpha = getAlpha(X, label, k, M)
sigma = getSigma(X, label)
mu = getMu(X, res, label, k)

newAlpha = alpha
newMu = mu
newSigma = sigma
oldLoss = control(X, mu, sigma, alpha, k, M)
newLoss = oldLoss

In [None]:
for i in range(100):
    newAlpha, newMu, newSigma = EM(alpha, sigma, mu, k, M, X)
    alpha = newAlpha
    mu = newMu
    sigma = newSigma
    oldLoss = newLoss
    newLoss = control(X, mu, sigma, alpha, k, M)
    print('newLoss =', newLoss)
    print('t =', abs(newLoss - oldLoss))
    if abs(newLoss - oldLoss) < 1e-6:
        print(i)
        print(abs(newLoss - oldLoss))
        break
print(newAlpha)
print(newMu)
print(newSigma)

In [None]:
x = X[0].reshape((1, -1))
y = X[1].reshape((1, -1))
pointX = mu[0]
pointY = mu[1]

phi = getPhi(X, mu, sigma, k, M)
label = np.argmax(phi, axis=0).reshape((1, -1))
maxLabel = np.max(phi, axis=0).reshape((1, -1))
maxLabel = maxLabel / np.max(maxLabel)

In [None]:
# 画出结果
plt.figure()
# for i in range(k):
#     plt.plot(x[label == i], y[label == i], '.')
for i in range(k * M):
    plt.plot(x[0, i],
             y[0, i],
             '.',
             color=strToHexWithRatio(colorMaps1[label[0, i]],
                                     maxLabel[0, i]**0.2))

plt.plot(pointX, pointY, '+', color='white')
plt.xlim((-0.05, 1.05))
plt.ylim((-0.05, 1.05))
plt.show()

In [None]:
# 画出结果
plt.figure()
# for i in range(k):
#     plt.plot(x[label == i], y[label == i], '.')
newLabel = label + 4 * np.ones_like(label)
for i in range(k * M):
    plt.plot(x[0, i],
             y[0, i],
             '.',
             color=strToHexWithRatio(colorMaps1[newLabel[0, i]], 1))

plt.plot(pointX, pointY, '+', color='black')
plt.xlim((-0.05, 1.05))
plt.ylim((-0.05, 1.05))
plt.show()