In [4]:
import os
from helper_functions import image_to_matrix, matrix_to_image, \
    flatten_image_matrix
import numpy as np
import numpy.random as random
from helper_functions import image_difference, default_convergence
import matplotlib.pyplot as plt
import unittest

In [2]:
class K_means_test(unittest.TestCase):
    def runTest(self):
        pass

    def test_initial_means(self, initial_means):
        image_file = 'images/Starry.png'
        image_values = image_to_matrix(image_file).reshape(-1, 3)
        m, n = image_values.shape
        for k in range(1, 10):
            means = initial_means(image_values, k)
            self.assertEqual(means.shape, (k, n),
                             msg=("Initialization for %d dimensional array "
                                  "with %d clusters returned an matrix of an incompatible dimension.") % (n, k))
            for mean in means:
                self.assertTrue(any(np.equal(image_values, mean).all(1)), 
                                msg=("Means should be points from given array"))

    def test_k_means_step(self, k_means_step):
        initial_means = [
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237], [0, 0, 0]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237], [0, 0, 0],
                      [0.8392157, 0.80392158, 0.63921571]]),
        ]

        expected_new_means = [
            np.array([[0.80551694, 0.69010299, 0.17438512],
                      [0.33569541, 0.45309059, 0.52275014]]),
            np.array([[0.82325169, 0.83027274, 0.49915016],
                      [0.5706171,  0.70232249, 0.72329472],
                      [0.25756221, 0.35204852, 0.40436148]]),
            np.array([[0.81913559, 0.82433047, 0.48307031],
                      [0.56450876, 0.69757995, 0.71964568],
                      [0.25756221, 0.35204852, 0.40436148],
                      [0.80715786, 0.88434938, 0.8546841 ]]),
            np.array([[0.81913559, 0.82433047, 0.48307031],
                      [0.56450876, 0.69757995, 0.71964568],
                      [0.37062106, 0.48453161, 0.5107251 ],
                      [0.80715786, 0.88434938, 0.8546841 ],
                      [0.09686573, 0.16374335, 0.25318128]]),
            np.array([[0.89840523, 0.87403922, 0.52888891],
                      [0.5291115,  0.68206999, 0.76917248],
                      [0.36574703, 0.480494,   0.51145273],
                      [0.80699967, 0.8843717,  0.85660891],
                      [0.09686573, 0.16374335, 0.25318128],
                      [0.68161022, 0.74824015, 0.55152448]])
        ]

        expected_cluster_sums = [111069, 195753, 197783, 263443, 303357]

        k_min = 2
        k_max = 6
        image_file = 'images/Starry.png'
        image_values = image_to_matrix(image_file).reshape(-1, 3)
        m, n = image_values.shape
        for i, k in enumerate(range(k_min, k_max + 1)):
            new_means, new_clusters = k_means_step(image_values, k=k, means=initial_means[k - k_min])
            self.assertTrue(new_means.shape == initial_means[k - k_min].shape,
                            msg="New means array are of an incorrect shape. Expected: %s got: %s" %
                                (initial_means[k - k_min].shape, new_means.shape))
            self.assertTrue(new_clusters.shape[0] == m,
                            msg="New clusters array are of an incorrect shape. Expected: %s got: %s" %
                                (m, new_clusters.shape))
            self.assertTrue(np.allclose(new_means, expected_new_means[i], atol = 1e-4),
                            msg="Incorrect new mean values.")
            self.assertTrue(np.sum(new_clusters) == expected_cluster_sums[i],
                            msg="Incorrect clusters prediction.")
        print_success_message()

    def test_k_means(self, k_means_cluster):
        """
        Testing your implementation
        of k-means on the segmented
        Starry reference images.
        """
        k_min = 2
        k_max = 6
        image_dir = 'images/'
        image_name = 'Starry.png'
        image_values = image_to_matrix(image_dir + image_name)
        # initial mean for each k value
        initial_means = [
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237], [0, 0, 0]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237], [0, 0, 0],
                      [0.8392157, 0.80392158, 0.63921571]]),
        ]
        # test different k values to find best
        for k in range(k_min, k_max + 1):
            updated_values = k_means_cluster(image_values, k,
                                             initial_means[k - k_min])
            ref_image = image_dir + 'k%d_%s' % (k, image_name)
            ref_values = image_to_matrix(ref_image)
            dist = image_difference(updated_values, ref_values)
            self.assertEqual(int(dist), 0, msg=("Clustering for %d clusters"
                                                + "produced unrealistic image segmentation.") % k)
        print_success_message()

def get_initial_means(array, k):
    """
    Picks k random points from the 2D array 
    (without replacement) to use as initial 
    cluster means

    params:
    array = numpy.ndarray[numpy.ndarray[float]] - m x n | datapoints x features

    k = int

    returns:
    initial_means = numpy.ndarray[numpy.ndarray[float]]
    """
    targets = [int(np.random.rand() * 10) for i in range(k)]
    return(array[targets])

def k_means_step(X, k, means):
    """
    A single update/step of the K-means algorithm
    Based on a input X and current mean estimate,
    predict clusters for each of the pixels and 
    calculate new means. 
    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n | pixels x features (already flattened)
    k = int
    means = numpy.ndarray[numpy.ndarray[float]] - k x n

    returns:
    (new_means, clusters)
    new_means = numpy.ndarray[numpy.ndarray[float]] - k x n
    clusters = numpy.ndarray[int] - m sized vector
    """
    
    clusters = np.array(
        [
        sorted([(c,np.linalg.norm((j-means[c]), 2)) 
                for c in range(k)],
                  key = lambda x: x[1])[0][0] 
        for j in X])
    # clusters = np.array(
    #     [
    #     sorted([(c,np.sqrt(np.power(np.sum(j-means[c]), 2)))
    #             for c in range(k)],
    #               key = lambda x: x[1])[0][0] 
    #     for j in X])
    
    new_means = np.array([np.mean(X[clusters == i, :], 0) for i in set(clusters)])
    return(new_means, clusters)


# 1D

In [5]:

def get_initial_means(array, k):
    """
    Picks k random points from the 2D array 
    (without replacement) to use as initial 
    cluster means

    params:
    array = numpy.ndarray[numpy.ndarray[float]] - m x n | datapoints x features

    k = int

    returns:
    initial_means = numpy.ndarray[numpy.ndarray[float]]
    """
    # targets = [int(np.random.rand() * 10) for i in range(k)]
    # return(array[targets])
    N = array.shape[0]
    indx = np.random.choice(N,size = k, replace = False)

    return(array[indx,:])


def k_means_step(X, k, means):
    """
    A single update/step of the K-means algorithm
    Based on a input X and current mean estimate,
    predict clusters for each of the pixels and 
    calculate new means. 
    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n | pixels x features (already flattened)
    k = int
    means = numpy.ndarray[numpy.ndarray[float]] - k x n

    returns:
    (new_means, clusters)
    new_means = numpy.ndarray[numpy.ndarray[float]] - k x n
    clusters = numpy.ndarray[int] - m sized vector
    """
    
    distance = compute_distance(X, means, k)
    y = np.argmin(distance, axis=1)
    
    new_mean = np.zeros((k, X.shape[1]))
    for k_ in range(k):
        new_mean[k_, :] = np.mean(X[y == k_, :], axis=0)
    
    return(new_mean, y)

def compute_distance(X, centroids, k):
        distance = np.zeros((X.shape[0], k))
        for k in range(k):
            row_norm = np.linalg.norm(X - centroids[k, :], axis=1)
            distance[:, k] = np.square(row_norm)
        return distance


def k_means_segment(image_values, k=3, initial_means=None):

    not_converged = True
    steps = 0
    max_ittr = 250
    X = image_values.reshape((-1,3))
    old_mean = get_initial_means(X, k=k)

    while not_converged and steps < max_ittr: 
        new_mean, y = k_means_step(X, k, old_mean)

        if np.all(new_mean == old_mean):
            not_converged = False
        old_mean = new_mean
        steps += 1

    im_val = X
    for i in range(k):
        im_val[y == i] = old_mean[i] 
    
    return im_val.reshape(image_values.shape)

In [None]:
centroids = get_initial_means(X, k=k)
X = 

distance = np.zeros((X.shape[0], 5))
for k in range(k):
    row_norm = np.linalg.norm(X - centroids[k, :], axis=1)
    distance[:, k] = np.square(row_norm)


In [6]:
k = 5
k_min = 2
k_max = 6
image_dir = 'images/'
image_name = 'Starry.png'
image_values = image_to_matrix(image_dir + image_name)

In [31]:
updated = k_means_segment(image_values=image_values, k=k)


In [32]:
image_dir = 'images/'
image_name = 'Starry.png'
ref_image = image_dir + 'k%d_%s' % (k, image_name)
ref_values = image_to_matrix(ref_image)
dist = image_difference(updated, ref_values)
dist

0.0

In [8]:
def pairwise_dist(x, y):  # [5 pts]
    np.random.seed(1)
    """
    Args:
        x: N x D numpy array
        y: M x D numpy array
    Return:
        dist: N x M array, where dist2[i, j] is the euclidean distance between
        x[i, :] and y[j, :]
    """
    x = np.asarray(x[:,np.newaxis,:])
    y = np.asarray(y[np.newaxis,:,:])
    dist = np.sum(np.abs(y-x)**2, axis=-1)**(1./2)
    return dist

In [9]:
# distance = compute_distance(X, MU, k)
# pairwise_dist(X, )
# y = np.argmin(distance, axis=1)
# X[y == k]
idx = np.array([np.argmin(i) for i in pairwise_dist(X, MU)])


sigma = np.array([np.cov(X[idx == k].T) for k in range(MU.shape[0])])

NameError: name 'MU' is not defined

In [235]:
sigma = covariance
mu = mean
x = X[:5]
body = np.zeros((mu.shape[0], x.shape[0], x.shape[1]))
for k in range(mu.shape[0]):
    body[k] = np.array([x[m] - mu[k] for m in range(x.shape[0])])

In [10]:
sigma = covariance
mu = mean
X = image_values.reshape((-1,3))
x = X[:5]
body = x - mu
TTTT = np.einsum('ij,ji->i', np.dot(body, np.linalg.pinv(sigma)) , body.T)

In [13]:
exponent = (-1/2)*TTTT

1/ (np.sqrt((2*np.pi)**x.shape[0] *(np.linalg.det(sigma))))

dinom = 1/(np.sqrt((2*np.pi)**x.shape[0] *(np.linalg.det(sigma))))

prob = dinom*np.exp(exponent)

np.sum(prob)

0.8056402030255776

In [8]:
image_file = 'images/Starry.png'
image_matrix = image_to_matrix(image_file)
image_matrix = image_matrix.reshape(-1, 3)
m, n = image_matrix.shape
mean = np.array([0.0627451, 0.10980392, 0.54901963])
covariance = np.array([[0.28756526, 0.13084501, -0.09662368],
                        [0.13084501, 0.11177602, -0.02345659],
                        [-0.09662368, -0.02345659, 0.11303925]])

In [19]:
def initialize_parameters(X, k):
    """
    Return initial values for training of the GMM
    Set component mean to a random
    pixel's value (without replacement),
    based on the mean calculate covariance matrices,
    and set each component mixing coefficient (PIs)
    to a uniform values
    (e.g. 4 components -> [0.25,0.25,0.25,0.25]).
    
    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    k = int
    
    returns:
    (MU, SIGMA, PI)
    MU = numpy.ndarray[numpy.ndarray[float]] - k x n
    SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - k x n x n
    PI = numpy.ndarray[float] - k 
    """
    MU = get_initial_means(X, k)
    SIGMA = compute_sigma(X, MU)
    PI = np.array([1/k for t in range(1, k+1)])
    return(MU,SIGMA,PI)

def compute_sigma(X, MU):
    """
    Calculate covariance matrix, based in given X and MU values
    
    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    MU = numpy.ndarray[numpy.ndarray[float]] - k x n
    
    returns:
    SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - k x n x n
    """
    body = np.zeros((MU.shape[0], X.shape[0], X.shape[1]))
    for k in range(MU.shape[0]):
        body[k] = np.array([X[m] - MU[k] for m in range(X.shape[0])])
    sigma = np.array([np.matmul(body[k].T, body[k]) for k in range(body.shape[0])])/X.shape[0]
    return sigma

def prob(x, mu, sigma):
    """Calculate the probability of x (a single
    data point or an array of data points) under the
    component with the given mean and covariance.
    The function is intended to compute multivariate
    normal distribution, which is given by N(x;MU,SIGMA).

    params:
    x = numpy.ndarray[float] (for single datapoint) a
        or numpy.ndarray[numpy.ndarray[float]] (for array of datapoints)
    mu = numpy.ndarray[float]
    sigma = numpy.ndarray[numpy.ndarray[float]]

    returns:
    probability = float (for single datapoint) 
                or numpy.ndarray[float] (for array of datapoints)
    """
    probib = None
    bridge = x
    if len(x.shape) == 1 :
        bridge = np.empty((1,len(x)))
        bridge = np.array([x])

    body = bridge - mu
    exponent = -1/2 * np.einsum('ij,ji->i', np.dot(body, np.linalg.pinv(sigma)), body.T)
    dinom = 1/(np.sqrt((2*np.pi)**bridge.shape[1] *(np.linalg.det(sigma))))
    probib = (dinom)*np.exp(exponent)
    if len(x.shape) == 1:
        return probib[0]
    
    return probib

In [20]:
k = 5

MU,SIGMA,PI = initialize_parameters(X, k) 
log_likelihood = 0

for i in range(MU.shape[0], X.shape[0], MU.shape[0]):
    weighted_sum = 0
    for j in range(k):
        weighted_sum += PI[j] * prob(X[i], mu=MU[j], sigma=SIGMA[j])
    log_likelihood += np.log(weighted_sum)
    

log_likelihood

146343.1039485228

In [53]:
# weighted = np.zeros((k, X.shape[0]))
weighted = 0
for j in range(k):
    weighted += PI[j] *prob(X, mu=MU[j], sigma=SIGMA[j])
np.log(np.sum(weighted))

# np.log(np.sum(np.sum(weighted, axis=1)))

In [61]:
n_comp_min_bic = None
n_comp_max_likelihood = None
min_bic = 0
max_likelihood = 0
for k, means in enumerate(comp_means, start=1):
    pi, mu, sigma = train_model(image_matrix, k=k, convergence_function=default_convergence ,means=means)

    bic = bayes_info_criterion(image_matrix, pi, mu, sigma, k)

    if bic < min_bic:
        min_bic = bic
        n_comp_min_bic = k
    likelihood = loglikelihood(image_matrix, pi, mu, sigma, mu.shape[0])
    if likelihood > max_likelihood:
        max_likelihood = likelihood
        n_comp_max_likelihood = k

NameError: name 'comp_means' is not defined

# 2D

In [149]:
#export
def k_means_segment(image_values, k=3, initial_means=None):
    not_converged = True
    steps = 0
    max_ittr = 20000
    X = image_values.reshape((-1,3))
    old_mean = get_initial_means(X, k=k)
    while not_converged and steps < max_ittr: 
        
        clusters = [[] for i in range(k)]
        for dp_indx, dp in enumerate(X):
            distance = [dist(dp, old_mean[c]) for c in range(k)]
            clusters[np.argmin(distance)].append(dp_indx)
                
        new_mean = np.zeros((k, X.shape[1]))
        for cluster_idx, cluster in enumerate(clusters):
            cluster_mean = np.mean(X[cluster], axis=0)
            new_mean[cluster_idx] = cluster_mean

        if dist(new_mean, old_mean) == 0:
            not_converged = False

        old_mean = new_mean
        steps += 1

    im_val = X
    for i in range(k):
        im_val[clusters == i] = old_mean[i] 

    return im_val


In [None]:
initial_means = [
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237], [0, 0, 0]]),
            np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237], [0, 0, 0],
                      [0.8392157, 0.80392158, 0.63921571]]),
        ]

k = 4 
# updated_values = k_means_segment(image_values, k, initial_means[k - 2])
# ref_image = image_dir + 'k%d_%s' % (k, image_name)
# ref_values = image_to_matrix(ref_image)
# dist = image_difference(updated_values, ref_values)

not_converged = True
steps = 0
max_ittr = 20000
old_mean = None
old_mean = get_initial_means()
while not_converged and steps < max_ittr: 
        steps += 1
        clusters = [[] for i in range(k)]
        for dp_indx, dp in enumerate(image_values):
                distance = [dist(dp, old_mean[c]) for c in range(k)]
                clusters[np.argmin(distance)].append(dp_indx)
                
        centroids = np.zeros((k, image_values.shape[1]))
        for cluster_idx, cluster in enumerate(clusters):
            cluster_mean = np.mean(image_values[cluster], axis=0)
            centroids[cluster_idx] = cluster_mean
        print(new_mean.shape, old_mean.shape)
        # if new_mean.shape[0] != k:
        #     old_mean = get_initial_means(image_values, k)
        #     print(f'BAD   {old_mean} in Step   {steps}')
        #     continue
        if steps > 2 and  np.sum(new_mean - old_mean) == 0:
                not_converged = False
        old_mean = new_mean
        print(steps)

im_val = image_values
for i in range(k):
        im_val[clusters == i] = old_mean[i] 


In [66]:
def pairwise_dist(x, y):  # [5 pts]
    np.random.seed(1)
    """
    Args:
        x: N x D numpy array
        y: M x D numpy array
    Return:
        dist: N x M array, where dist2[i, j] is the euclidean distance between
        x[i, :] and y[j, :]
    """
    x = np.asarray(x[:,np.newaxis,:])
    y = np.asarray(y[np.newaxis,:,:])
    dist = np.sum(np.abs(y-x)**2, axis=-1)**(1./2)
    return dist


In [63]:
image_dir = 'images/'
image_name = 'Starry.png'
ref_image = image_dir + 'k%d_%s' % (k, image_name)
ref_values = image_to_matrix(ref_image)
dist = image_difference(updated_values, ref_values)

TypeError: %d format: a number is required, not KMeans

In [20]:

from helper_functions import *

def get_initial_means(array, k):
    """
    Picks k random points from the 2D array
    (without replacement) to use as initial
    cluster means

    params:
    array = numpy.ndarray[numpy.ndarray[float]] - m x n | datapoints x features

    k = int

    returns:
    initial_means = numpy.ndarray[numpy.ndarray[float]]
    """
    
    N = array.shape[0]
    indx = np.random.choice(N,size = k, replace = False)

    return(array[indx,:])

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def compute_distance(X, centroids, k):
    #Used by ML course HW distance calculation.
    distance = np.zeros((X.shape[0], k))
    for k in range(k):
        row_norm = np.linalg.norm(X - centroids[k, :], axis=1)
        distance[:, k] = np.square(row_norm)
    return distance

def k_means_step(X, k, means):
    """
    A single update/step of the K-means algorithm
    Based on a input X and current mean estimate,
    predict clusters for each of the pixels and
    calculate new means.
    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n | pixels x features (already flattened)
    k = int
    means = numpy.ndarray[numpy.ndarray[float]] - k x n

    returns:
    (new_means, clusters)
    new_means = numpy.ndarray[numpy.ndarray[float]] - k x n
    clusters = numpy.ndarray[int] - m sized vector
    """

    distance = compute_distance(X, means, k)
    y = np.argmin(distance, axis=1)

    new_mean = np.zeros((k, X.shape[1]))
    for k_ in range(k):
        new_mean[k_, :] = np.mean(X[y == k_, :], axis=0)

    return(new_mean, y)

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def k_means_segment(image_values, k=3, initial_means=None):
    """
    Separate the provided RGB values into
    k separate clusters using the k-means algorithm,
    then return an updated version of the image
    with the original values replaced with
    the corresponding cluster values.

    params:
    image_values = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - r x c x ch
    k = int
    initial_means = numpy.ndarray[numpy.ndarray[float]] or None

    returns:
    updated_image_values = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - r x c x ch
    """
    not_converged = True
    steps = 0
    max_ittr = 250
    X = flatten_image_matrix(image_matrix=image_values)

    if (initial_means).any() is None:
        old_mean = get_initial_means(X, k=k)
    else:
        old_mean = initial_means

    while not_converged and steps < max_ittr:
        new_mean, y = k_means_step(X, k, old_mean)
        if np.all(new_mean == old_mean):
            not_converged = False
        old_mean = new_mean
        steps += 1

    im_val = X

    for i in range(k):
        im_val[y == i] = old_mean[i]

    return im_val.reshape(image_values.shape)

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def compute_sigma(X, MU):
    """
    Calculate covariance matrix, based in given X and MU values

    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    MU = numpy.ndarray[numpy.ndarray[float]] - k x n

    returns:
    SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - k x n x n
    """
    body = np.zeros((MU.shape[0], X.shape[0], X.shape[1]))
    for k in range(MU.shape[0]):
        body[k] = np.array([X[m] - MU[k] for m in range(X.shape[0])])
    sigma = np.array([np.matmul(body[k].T, body[k]) for k in range(body.shape[0])])/X.shape[0]
    return sigma
########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def initialize_parameters(X, k):
    """
    Return initial values for training of the GMM
    Set component mean to a random
    pixel's value (without replacement),
    based on the mean calculate covariance matrices,
    and set each component mixing coefficient (PIs)
    to a uniform values
    (e.g. 4 components -> [0.25,0.25,0.25,0.25]).

    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    k = int

    returns:
    (MU, SIGMA, PI)
    MU = numpy.ndarray[numpy.ndarray[float]] - k x n
    SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - k x n x n
    PI = numpy.ndarray[float] - k
    """
    MU = get_initial_means(X, k)
    SIGMA = compute_sigma(X, MU)
    PI = np.array([1/k for t in range(1, k+1)])
    return(MU,SIGMA,PI)

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def prob(x, mu, sigma):
    """Calculate the probability of x (a single
    data point or an array of data points) under the
    component with the given mean and covariance.
    The function is intended to compute multivariate
    normal distribution, which is given by N(x;MU,SIGMA).

    params:
    x = numpy.ndarray[float] (for single datapoint) a
        or numpy.ndarray[numpy.ndarray[float]] (for array of datapoints)
    mu = numpy.ndarray[float]
    sigma = numpy.ndarray[numpy.ndarray[float]]

    returns:
    probability = float (for single datapoint)
                or numpy.ndarray[float] (for array of datapoints)
    """
    probib = None
    bridge = x
    if len(x.shape) == 1 :
        bridge = np.empty((1,len(x)))
        bridge = np.array([x])
    body = bridge - mu
    exponent = -1/2 * np.einsum('ij,ji->i', np.dot(body, np.linalg.pinv(sigma)), body.T)
    dinom = 1/(np.sqrt((2*np.pi)**bridge.shape[1] *(np.linalg.det(sigma))))
    probib = (dinom)*np.exp(exponent)
    if len(x.shape) == 1:
        return probib[0]

    return probib

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def E_step(X,MU,SIGMA,PI,k):
    """
    E-step - Expectation
    Calculate responsibility for each
    of the data points, for the given
    MU, SIGMA and PI.

    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    MU = numpy.ndarray[numpy.ndarray[float]] - k x n
    SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - k x n x n
    PI = numpy.ndarray[float] - k
    k = int

    returns:
    responsibility = numpy.ndarray[numpy.ndarray[float]] - k x m
    """
    m, n = X.shape
    responsibility = np.zeros((k, m))
    responsibility = np.array([PI[i] * prob(X, mu=MU[i, :], sigma=SIGMA[i, :, :]) for i in range(k)])
    responsibility /= np.sum(responsibility, axis=0)
    return responsibility

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def M_step(X, r, k):
    """
    M-step - Maximization
    Calculate new MU, SIGMA and PI matrices
    based on the given responsibilities.

    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    r = numpy.ndarray[numpy.ndarray[float]] - k x m
    k = int

    returns:
    (new_MU, new_SIGMA, new_PI)
    new_MU = numpy.ndarray[numpy.ndarray[float]] - k x n
    new_SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - k x n x n
    new_PI = numpy.ndarray[float] - k
    """
    m, n = X.shape
    new_PI = np.mean(r, axis=1)
    new_MU = np.zeros((k, n))
    new_MU = np.array([np.sum(r[k].reshape(-1, 1) * X, axis=0) / np.sum(r[k]) for k in range(k)])
    new_SIGMA = np.zeros((k, n, n))
    for i in range(k):
        X_centered = X - new_MU[i]
        new_SIGMA[i] = (X_centered.T * r[i]).dot(X_centered) / np.sum(r[i])

    return (new_MU, new_SIGMA, new_PI)


########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def loglikelihood(X, PI, MU, SIGMA, k):
    """Calculate a log likelihood of the
    trained model based on the following
    formula for posterior probability:

    log(Pr(X | mixing, mean, stdev)) = sum((i=1 to m), log(sum((j=1 to k),
                                      mixing_j * N(x_i | mean_j,stdev_j))))

    Make sure you are using natural log, instead of log base 2 or base 10.

    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    MU = numpy.ndarray[numpy.ndarray[float]] - k x n
    SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - k x n x n
    PI = numpy.ndarray[float] - k
    k = int

    returns:
    log_likelihood = float
    """
    weighted = 0
    for j in range(k):
        weighted += PI[j] *prob(X, mu=MU[j], sigma=SIGMA[j])
    return np.sum(np.log(weighted))


########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def train_model(X, k, convergence_function, initial_values = None):
    """
    Train the mixture model using the
    expectation-maximization algorithm.
    E.g., iterate E and M steps from
    above until convergence.
    If the initial_values are None, initialize them.
    Else it's a tuple of the format (MU, SIGMA, PI).
    Convergence is reached when convergence_function
    returns terminate as True,
    see default convergence_function example
    in `helper_functions.py`

    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    k = int
    convergence_function = func
    initial_values = None or (MU, SIGMA, PI)

    returns:
    (new_MU, new_SIGMA, new_PI, responsibility)
    new_MU = numpy.ndarray[numpy.ndarray[float]] - k x n
    new_SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - k x n x n
    new_PI = numpy.ndarray[float] - k
    responsibility = numpy.ndarray[numpy.ndarray[float]] - k x m
    """

    MU, SIGMA, PI = initialize_parameters(X, k)
    if initial_values is not None:
        MU, SIGMA, PI = initial_values

    old_likelihood = 0
    new_likelihood = loglikelihood(X, PI, MU, SIGMA, k)
    count = 0
    convrged = False
    while not convrged:
        responsibility = E_step(X, MU, SIGMA, PI, k)
        MU, SIGMA, PI = M_step(X, responsibility, k)
        old_likelihood = new_likelihood
        new_likelihood = loglikelihood(X, PI, MU, SIGMA, k)
        count, convrged = convergence_function(old_likelihood, new_likelihood, count)

    return MU, SIGMA, PI, responsibility

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def cluster(r):
    """
    Based on a given responsibilities matrix
    return an array of cluster indices.
    Assign each datapoint to a cluster based,
    on component with a max-likelihood
    (maximum responsibility value).

    params:
    r = numpy.ndarray[numpy.ndarray[float]] - k x m - responsibility matrix

    return:
    clusters = numpy.ndarray[int] - m x 1
    """
    return(np.argmax(r, axis=0))

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def segment(X, MU, k, r):
    """
    Segment the X matrix into k components.
    Returns a matrix where each data point is
    replaced with its max-likelihood component mean.
    E.g., return the original matrix where each pixel's
    intensity replaced with its max-likelihood
    component mean. (the shape is still mxn, not
    original image size)

    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    MU = numpy.ndarray[numpy.ndarray[float]] - k x n
    k = int
    r = numpy.ndarray[numpy.ndarray[float]] - k x m - responsibility matrix

    returns:
    new_X = numpy.ndarray[numpy.ndarray[float]] - m x n
    """
    clusters = cluster(r)
    return(MU[clusters])

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def best_segment(X,k,iters):
    """Determine the best segmentation
    of the image by repeatedly
    training the model and
    calculating its likelihood.
    Return the segment with the
    highest likelihood.

    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    k = int
    iters = int

    returns:
    (likelihood, segment)
    likelihood = float
    segment = numpy.ndarray[numpy.ndarray[float]]
    """

    best_likelihood = 0
    best_segment = None

    for i in range(iters):
        MU, SIGMA, PI, responsibility = train_model(X, k, convergence_function=default_convergence)
        print(MU.shape)

        likelihood = loglikelihood(X, PI, MU, SIGMA, k)
        if likelihood > best_likelihood:
            best_likelihood = likelihood
            best_segment = segment(X,MU,k, responsibility)

    return (best_likelihood, best_segment)

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def bayes_info_criterion(X, PI, MU, SIGMA, k):
    """
    See description above
    params:
    X = numpy.ndarray[numpy.ndarray[float]] - m x n
    MU = numpy.ndarray[numpy.ndarray[float]] - k x n
    SIGMA = numpy.ndarray[numpy.ndarray[numpy.ndarray[float]]] - k x n x n
    PI = numpy.ndarray[float] - k
    k = int

    return:
    bayes_info_criterion = int
    """
    m, n = X.shape
    likelihood = loglikelihood(X, PI, MU, SIGMA, MU.shape[0])
    num_params = k * (n + n*(n+1)//2 + 1) - 1
    return np.log(m)* num_params -2*likelihood

########## DON'T WRITE ANY CODE OUTSIDE THE FUNCTION! ################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
##### CODE BELOW IS USED FOR RUNNING LOCAL TEST DON'T MODIFY IT ######͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃
################ END OF LOCAL TEST CODE SECTION ######################͏︆͏󠄃͏󠄌͏󠄍͏󠄂͏️͏󠄇͏︊͏︃

def BIC_likelihood_model_test(image_matrix, comp_means):
    """Returns the number of components
    corresponding to the minimum BIC
    and maximum likelihood with respect
    to image_matrix and comp_means.

    params:
    image_matrix = numpy.ndarray[numpy.ndarray[float]] - m x n
    comp_means = list(numpy.ndarray[numpy.ndarray[float]]) - list(k x n) (means for each value of k)

    returns:
    (n_comp_min_bic, n_comp_max_likelihood)
    n_comp_min_bic = int
    n_comp_max_likelihood = int
    """
    n_bic = 0
    n_max_likelihood = 0
    min_bic = np.inf
    max_likelihood = 0
    for k, means in enumerate(comp_means, start=1):
        sigma = compute_sigma(image_matrix, means)
        pi = np.array([1/k for t in range(1, k+1)])
        mu, sigma, pi, r = train_model(X = image_matrix, k=k, convergence_function=default_convergence , initial_values=(means, sigma, pi))
        bic = bayes_info_criterion(image_matrix, pi, mu, sigma, k)
        #minizizing cost
        if bic < min_bic:
            min_bic = bic
            n_bic = k
        #maximizing likelihood
        likelihood = loglikelihood(image_matrix, pi, mu, sigma, mu.shape[0])
        if likelihood > max_likelihood:
            max_likelihood = likelihood
            n_max_likelihood = k

    return n_bic, n_max_likelihood

def return_your_name():
    return("Kiavosh Peynabard")

In [21]:
X = image_values.reshape(-1, 3)
means = np.array([[0.90980393, 0.8392157, 0.65098041],
                      [0.83137256, 0.80784315, 0.69411767],
                      [0.67450982, 0.52941179, 0.25490198],
                      [0.86666667, 0.8392157, 0.70588237], [0, 0, 0],
                      [0.8392157, 0.80392158, 0.63921571]])
BIC_likelihood_model_test(X, [means,means,means])

(3, 3)