In [1]:
# Use black formatter
# %load_ext lab_black

import numpy as np
from scipy.stats import multivariate_normal
from sklearn.metrics import accuracy_score

#### Generar un Dataset Sintético

In [2]:
class SyntheticDataset(object):
    def __init__(self, mu, var, p, n_samples):

        n_uniform = np.random.uniform(0, 1, n_samples)
        x = np.zeros(n_uniform.shape)
        y = np.zeros(n_uniform.shape)

        x[n_uniform <= p] = np.random.normal(mu[0], var[0], x[n_uniform <= p].shape[0])
        x[n_uniform > p] = np.random.normal(mu[1], var[1], x[n_uniform > p].shape[0])
        y[n_uniform <= p] = 0
        y[n_uniform > p] = 1

        self.x = x
        self.y = y
        self.mu = mu
        self.var = var

    def split(self, percentage):  # 0.8

        X = self.x
        y = self.y

        permuted_idxs = np.random.permutation(X.shape[0])
        train_idxs = permuted_idxs[0 : int(percentage * X.shape[0])]
        test_idxs = permuted_idxs[int(percentage * X.shape[0]) : X.shape[0]]

        X_train = X[train_idxs]
        X_test = X[test_idxs]

        y_train = y[train_idxs]
        y_test = y[test_idxs]

        return X_train, X_test, y_train, y_test

In [3]:
# Creamos el dataset

# Establecemos los parámetros de las GMM
mu = np.array([5, 10])
var = np.array([15, 2])
p = 0.25

dataset = SyntheticDataset(mu, var, p, 500)
x_train, x_test, y_train, y_test = dataset.split(0.8)

In [4]:
print("Train: ", x_train.shape)
print("Test: ", x_test.shape)

Train:  (400,)
Test:  (100,)


#### Modelo de Expectation Maximization

In [5]:
class BaseModel(object):
    def __init__(self):
        self.model = None

    def fit(self, X, Y):
        return NotImplemented

    def predict(self, X):
        return NotImplemented


class EMScalar(BaseModel):
    def fit(self, X, k, n_iter=100):
        # Los parámetros deben incluir al menos:
        #  - Alguna forma de detener la iteración
        #  - Los datos observados
        #  - La cantidad de distribuciones

        # Inicialización de parámetros
        self.k = k
        self.n_iter = n_iter
        n = X.shape[0]

        # Inicializar las probabilidades marginales de las clases P(z)
        p = np.full((k, 1), 1 / k)

        # Inicializar medias
        means = np.random.uniform(min(X), max(X), (k, 1))

        # Inicializar matrices covarianza
        covs = np.sum((np.tile(X, (k, 1)) - means.reshape(-1, 1)) ** 2, axis=1) / (
            n - 1
        )

        # Crear matrices place-holders para
        # p(x|z) para cada clase z (n x k), [p(x1|z1) p(x1|z2) p(x1|z3) ..]
        # Responsibilities
        Nij = np.zeros((n, k))
        Eij = np.zeros((n, k))

        # Calcular, con los parámetros iniciales, p(x|z) para todos los z
        for j in range(k):
            Nij[:, j] = multivariate_normal.pdf(X, means[j], covs[j])

        # Algoritmo de actualización
        for _ in range(n_iter):
            for j in range(k):
                # Responsibilities
                Eij[:, j] = (Nij[:, j] * p[j]) / (Nij @ p).reshape(-1)

                # Actualizar medias
                means[j] = (Eij[:, j] @ X) / np.sum(Eij[:, j])

                # Actualizar covarianzas
                covs[j] = (
                    Eij[:, j] @ ((X - means[j]) * (X - means[j])) / np.sum(Eij[:, j])
                )

                # Actualizar pesos de clases
                p[j] = np.sum(Eij[:, j]) / n

                # Actualizar p(x|z)
                Nij[:, j] = multivariate_normal.pdf(X, means[j], covs[j])

        # Al finalizar el loop, guardar el modelo en la clase
        self.model = {
            "means": means,
            "covs": covs,
            "p": p,
        }

    def predict(self, X):
        # Devuelve para cada observación la clase asignada
        n = X.shape[0]
        k = self.model["means"].shape[0]
        N = np.zeros((n, k))
        E = np.zeros((n, k))

        for i in range(k):
            N[:, i] = multivariate_normal.pdf(
                X, self.model["means"][i], self.model["covs"][i]
            )
        for i in range(k):
            E[:, i] = (self.model["p"][i, 0] * N[:, i]) / (N @ self.model["p"]).reshape(
                -1
            )
        idx = np.argmax(E, axis=1)
        return idx

#### Pruebas

In [6]:
em_scalar = EMScalar()
em_scalar.fit(x_train, 2)

In [7]:
predicted = em_scalar.predict(x_test)
predicted

array([1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1,
       1, 0, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
       0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])

In [8]:
y_test.astype(int)

array([1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1,
       1, 0, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0,
       0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1])

Como en este caso generamos un dataset sintético sabemos a que cluster pertenece cada punto. Gracias a esto podemos usar la métrica de accuracy para ver el porcentaje de aciertos de nuestro modelo.

In [9]:
accuracy_score(predicted, y_test)

0.95