# "Objectify" the Gaussian Mixture Code

In [13]:
import abc
import numpy as np
import numpy.random as npr

import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("white")
%matplotlib inline

In [22]:
class Distribution(object):
    """
    A distribution supports a few operations:
      - it contains the value of the parameters (e.g. mean of the Gaussian)
      - it also has hyperparameters that specify priors on the parameters
      - it allows you to sample random variables
      - it computes the log likelihood of a set of data points
    """
    
    __metaclass__ = abc.ABCMeta
    
    @abc.abstractmethod
    def rvs(self, size=1):
        """
        Generate 'size' number of random variables from the dist.
        """
        pass
    
    @abc.abstractmethod
    def log_likelihood(self, X):
        """
        Compute the log likelihood of X.
        """
        pass  
    

class GibbsSampling(object):
    """
    Define an interface for any distribution that 
    supports Gibbs sampling.
    """
    __metaclass__ = abc.ABCMeta
    
    @abc.abstractmethod
    def resample(self, X):
        """
        Take in a set of observed datapoints and 
        update the parameters with a draw from their
        conditional distribution.
        
        :param X: NxD matrix, where D is the dimensionality 
                  of the rv's this distribution generates
        """
        pass
        
    

In [47]:
class Gaussian(Distribution, GibbsSampling):
    """
    Gaussian is parameterized by a mean parameter; 
    let's assume that the variance is fixed for now. 
    
    x ~ N(mu, sigma^2)
    mu ~ N(mu_0, eta_0^2)
    
    """
    def __init__(self, mu=None, sigmasq=1.0, mu_0=1.0, etasq_0=1.0):
        # Hyperparameters
        self.mu_0 = mu_0
        self.etasq_0 = etasq_0
        self.sigmasq = sigmasq
        
        # If mu is given, use it, otherwise sample prior
        if mu is not None:
            self.mu = mu
        else:
            self.resample(np.array([]))
        
    def rvs(self, size=1):
        """
        Sample x for given mu and sigma^2
        """
        return npr.normal(self.mu, np.sqrt(self.sigmasq), size=size)
        
    def log_likelihood(self, X):
        """
        Log likelihood, log p(x | mu, sigma^2)
        
        :param X: is N x 1 vector 
        """
        m, v = self.mu, self.sigmasq
        return -1./2 * np.log(2*np.pi*v)  -1./2 * (X-m)**2 / v
    
    def get_statistics(self, X):
        """
        Extract sufficient statistics of X for resampling.
        """
        M = X.shape[0]
        S = X.sum(axis=0)
        return M, S
    
    def resample(self, X):
        """
        Sample a new mean given the observed datapoints X
        and the prior, defined by mu_0 and eta^2_0
        """
        # Allow X to be a vector 
        X = X[:,None] if X.ndim == 1 else X
        assert X.shape[1] == 1
        
        # TODO: Write this in terms of "natural parameters"
        stats = self.get_statistics(X)
        v = 1./(1./self.etasq_0 + stats[0]/self.sigmasq)
        m = v * (self.mu_0/self.etasq_0 + stats[1]/self.sigmasq)
        
        # Sample from the conditional normal distribution
        self.mu = npr.normal(m, np.sqrt(v))
        

In [51]:
# Simple test. Generate from one Gaussian and resample another.
g_true = Gaussian(mu=0.0, sigmasq=1.0, mu_0=0., etasq_0=2.0)
g_test = Gaussian(sigmasq=1.0, mu_0=0., etasq_0=2.0)

# Generate data from a Gaussian with mean 2 and then resample
print("True mean: {0:.2f}".format(g_true.mu))
print("Initial Test mean: {0:.2f}".format(g_test.mu))
X = g_true.rvs(size=1000)

# Resample the test mean given X
g_test.resample(X)
print("Final Test mean: {0:.2f}".format(g_test.mu))

True mean: 0.00
Initial Test mean: -1.70
Final Test mean: 0.07


In [55]:
class Categorical(Distribution, GibbsSampling):
    """
    x ~ Cat(pi);   pi=[0,1]^K and \sum_k pi_k = 1
    pi ~ Dir(alpha)
    """
    def __init__(self, K, pi=None, alpha=1.0):
        # Hyperparameters
        self.K = K
        self.alpha = alpha * np.ones(K)
        
        # If mu is given, use it, otherwise sample prior
        if pi is not None:
            self.pi = pi
        else:
            self.resample(np.array([]))
        
    def rvs(self, size=1):
        """
        Sample x for given pi. Return 'size' rolls of K-sided die.
        """
        return npr.choice(self.K, p=self.pi, size=size)
        
    def log_likelihood(self, X):
        """
        Log likelihood, log p(X | pi)
        
        :param X: is N x 1 vector 
        """
        assert X.dtype == int
        lp += np.log(self.pi[X])
        
    def get_statistics(self, X):
        """
        Extract sufficient statistics of X for resampling.
        """
        X = X.astype(int)
        M = np.bincount(X, minlength=self.K)
        return (M,)
    
    def resample(self, X):
        """
        Sample a new mean given the observed datapoints X
        and the prior, defined by mu_0 and eta^2_0
        """
        stats = self.get_statistics(X)
        self.pi = npr.dirichlet(self.alpha + stats[0])

In [57]:
# Simple test: generate from one categorical and resample another
c_true = Categorical(K=6, alpha=1.0)
c_test = Categorical(K=6, alpha=1.0)

np.set_printoptions(precision=3)
print("True pi: ", c_true.pi)
print("Initial Test pi: ", c_test.pi)

X = c_true.rvs(size=1000)

c_test.resample(X)
print("Final Test pi: ", c_test.pi)


True pi:  [ 0.307  0.028  0.191  0.022  0.298  0.154]
Initial Test pi:  [ 0.049  0.048  0.336  0.184  0.376  0.007]
Final Test pi:  [ 0.321  0.022  0.206  0.009  0.301  0.141]


In [58]:
# Define a Labels class that contains the 
# class assignments for each datapoint.
# This won't be a proper distribution, but 
# it will suffice. 
class Labels(object):
    def __init__(X, K, mixture_distns, prior_distn):
        """
        :param X:  The Nx1 dataset of Gaussian observations
        :param K:  The number of classes
        
        :param mixture_distns: A list of K Distribution objects
        :param prior_distn:    A Categorical Distribution object
        """
        # Store the data
        self.X, self.K = X, K
        self.N = X.shape[0]
        
        # Store the distributions
        assert len(mixture_distns) == K
        self.mixture_distns = mixture_distns

        assert isinstance(prior_distn, Categorical)
        self.prior_distn = prior_distn
        
        # Initialize the class labels with a 
        # draw from the prior.
        self.Z = npr.choice(prior_distn.pi)
        
    def resample(self):
        """
        Update Z by sampling from the conditional distribution
        of the labels given the mixture distns and the prior distn.
        """
        # TODO: Do this!
        pass

In [None]:
# Now we can define a Gaussian mixture model
class GaussianMixtureModel(object):
    def __init__(self, K, sigmasq, mu_0, etasq_0, alpha):
        self.K = K
        
        # Make the mixture distributions
        self.mixture_distns = \
            [Gaussian(sigmasq=sigmasq, mu_0=mu_0, etasq_0=etasq_0) 
             for _ in range(K)]
            
        # Make the prior distribution on labels
        self.prior_distn = Categorical(alpha=alpha)
        
        # Make a list for data to be added to
        self.labels_list = []
        
    def add_data(self, X):
        self.labels_list.append(
            Labels(X, self.K, self.mixture_distns, self.prior_distn))
        
    def resample_model(self):
        # TODO: Run the steps of the Gibbs sampler
        pass