In [None]:
import os 
import numpy as np
import matplotlib.pyplot as plt
import glob

from numpy import sqrt, pi, exp, transpose, matmul
from numpy.linalg import det, inv

%matplotlib inline

In [None]:
# load a test image and the corresponding ground truth mask
files = glob.glob("apples/*.jpg")
color_images = []
for f in files:
    im = plt.imread(f) / 255
    h,w,d = im.shape
    color_images.append(im.reshape(d, h*w)) # reshape to flatten pixels

files = glob.glob("apples/*.png")
masks = []
for f in files:
    im = plt.imread(f)
    im = im[:,:,1] # images are already in binary; arbitrarily picking one channel to use for our binary values
    h,w =im.shape
    masks.append(im.reshape(1,h*w)) # reshape to flatten pixels

# prepare data
labeled_data = []
rgb_apple = []
rgb_non_apple = []
for i in range(len(color_images)):
    # appending the mask to our data as our label
    labeled = np.append(color_images[i], masks[i], axis=0)
    labeled_data.append(labeled)

    # get apple pixels - ie pixels wjere label = 1; reshape to flatten 
    apple = labeled[:, np.where(labeled[3,:] == 1)][:3]
    d, t, n = apple.shape
    rgb_apple.append(apple.reshape(d,n))

    # get non-apple pixels - ie pixels where label = 0; reshape to flatten
    non_apple = labeled[:, np.where(labeled[3,:] == 0)][:3]
    d, t, n = non_apple.shape
    rgb_non_apple.append(non_apple.reshape(d,n))


rgb_apple = np.concatenate((rgb_apple[0], rgb_apple[1], rgb_apple[2]), axis=1)
rgb_non_apple = np.concatenate((rgb_non_apple[0], rgb_non_apple[1], rgb_non_apple[2]), axis=1)

In [None]:
# subroutine to return gaussian probabilities given the multivariate Gaussian 
def getMultiGaussProb(x, mean, cov, D):
    """
    Returns the likelihood that each data point in x belongs to the Gaussian described by mean and 
    var.

    x -- dx1 vector containing our data point
    mean -- dx1 vector containing the d means for this Gaussian
    cov -- dxd covariance matrix for this Gaussian
    D -- scalar representing number of dimensions in the multivariate Gaussian 

    Returns: 
    prob -- scalar contianing the likelihood that each data point in data belongs to the mutivariate Gaussian 
    distribution defined by mean and cov.
    """

    return(1 / ((2 * np.pi)**(D / 2) * np.linalg.det(cov)**(1/2)) * np.exp((-0.5 * (x - mean).T @ inv(cov) @ (x - mean))))
def getMixGaussLogLike(data, label, mixGaussEst): 
    """
    Calculate the log likelihood for the whole dataset under a mixture of Gaussians model.
    
    Keyword arguments:
    data -- d by n matrix containing data points.
    mixGaussEst -- dict containing the mixture of gaussians parameters.
    label -- scalar representing the label we are computing the log likelihood for  

    Returns: 
    logLike -- scalar containing the log likelihood.
    
    """
    
    data = np.atleast_2d(data)                                                                         
    # find total number of data items                                                                  
    nDims, nData = data.shape                                                                          
    
    # initialize log likelihoods                                                                       
    logLike = 0;        
    # initialize container for additive likelihoods 
    like = 0                                                                               
                                                                                                       
    # run through each data item                                                                       
    for cData in range(nData):    
        thisData = data[:, cData]                                                                          
        # loop through each Gaussian in our mixture of Gaussians , compute likelihood by summing the weighted likelihoods for each Gaussian
        for k in range(mixGaussEst['k']):                                                         
            like += mixGaussEst['weight'][k, label] * getMultiGaussProb(thisData, mixGaussEst['mean'][:,k, label], mixGaussEst['cov'][:,:,k, label], nDims)

        # add to total log like                                                                        
        logLike = logLike + np.log(like)                                                     
                                                                                                       
    return(logLike)   

def getMixGaussLike(data, label, mixGaussEst): 
    """
    Calculate the likelihood for the whole dataset under a mixture of Gaussians model.
    
    Keyword arguments:
    data -- d by n matrix containing data points.
    mixGaussEst -- dict containing the mixture of gaussians parameters.
    label -- scalar representing the label we are computing the log likelihood for  

    Returns: 
    L -- scalar containing the log likelihood.
    
    """
    
    data = np.atleast_2d(data)                                                                         
    # find total number of data items                                                                  
    nDims, nData = data.shape                                                                          
    
    # initialize log likelihoods                                                                       
    L = 0;        
    # initialize container for additive likelihoods 
    like = 0                                                                               
                                                                                                       
    # run through each data item                                                                       
    for cData in range(nData):    
        thisData = data[:, cData]                                                                          
        # loop through each Gaussian in our mixture of Gaussians , compute likelihood by summing the weighted likelihoods for each Gaussian
        for k in range(mixGaussEst['k']):                                                         
            like += mixGaussEst['weight'][k, label] * getMultiGaussProb(thisData, mixGaussEst['mean'][:,k, label], mixGaussEst['cov'][:,:,k, label], nDims)

        # add to total log like                                                                        
        L += like                                                   
                                                                                                       
    return(L)  

def fitMixGauss(data, k):
    """
    Estimate a k MoG model that would fit the data. Incremently plots the outcome.
               
    
    Keyword arguments:
    data -- list containing w d by n matrices with data points.
    k -- scalar representing the number of gaussians to use in the MoG model.

    
    Returns: 
    mixGaussEst -- dict containing the estimated MoG parameters.
    
    """
    
    #     MAIN E-M ROUTINE  
    #     In the E-M algorithm, we calculate a complete posterior distribution over                                  
    #     the (nData) hidden variables in the E-Step.  
    
    #     In the M-Step, we update the parameters of the Gaussians (mean, cov, w).   
    
    # set value for w, or number of states/labels
    w = len(data)
    nDims, nData = data[0].shape

    postHidden = np.zeros(shape=(k, w, nData))


    # we will initialize the values to random values
    mixGaussEst = dict()
    mixGaussEst['d'] = nDims
    mixGaussEst['k'] = k
    mixGaussEst['w'] =  data
    mixGaussEst['weight'] = (1 / k) * np.ones(shape=(k,w))
    mixGaussEst['mean'] = 2 * np.random.randn(nDims, k, w)
    mixGaussEst['cov'] = np.zeros(shape=(nDims, nDims, k, w))
    for label in range(w):
        for cGauss in range(k):
            mixGaussEst['cov'][:, :, cGauss, label] = 2.5 + 1.5 * np.random.uniform() * np.eye(nDims)

    # calculate current likelihood
    label = 1

    logLikeApple = getMixGaussLogLike(data[label], label, mixGaussEst)
    print('Log Likelihood Apple Iter 0 : {:4.3f}\n'.format(logLikeApple))

    label = 0

    logLikeNonApple = getMixGaussLogLike(data[label], label, mixGaussEst)
    print('Log Likelihood Non-Apple Iter 0 : {:4.3f}\n'.format(logLikeNonApple)) 

    # do 10 iterations to save time
    nIter = 10;

    logLikeVec = np.zeros(shape=(2 * nIter))
    boundVec = np.zeros(shape=(2 * nIter))


    for cIter in range(nIter):
        print('Iteration ', cIter)

        # ===================== =====================
        # Expectation step
        # ===================== =====================

        for cData in range(nData):

            # initialize array to hold likelihood s for each Gaussian k and each w
            like = np.zeros((k, w))
            # loop through each label value 
            for label in range(w):
                # loop through each Gaussian and calculate the likelihood for each 
                for cGauss in range(k):
                    like[cGauss, label] = mixGaussEst['weight'][cGauss, label] * getMultiGaussProb(data[label][:, cData], mixGaussEst['mean'][:,cGauss,label], mixGaussEst['cov'][:,:,cGauss, label], nDims)

            #  calculate the posterior for each k and each w by normalizing the likelihood
            postHidden[:, :, cData] = like / np.sum(like)
   
            
        # ===================== =====================
        # Maximization Step
        # ===================== =====================
        # for each state of the world (apple and non-apple)
        for label in range(w):
            # for each constituent Gaussian
            for cGauss in range(k):
                # Update weighting parameters mixGauss.weight based on the total
                # posterior probability associated with each Gaussian and each label.
                mixGaussEst['weight'][cGauss][label] =  np.sum(postHidden[cGauss, label, :]) / np.sum(postHidden)
            
                # Update mean parameters mixGauss.mean by weighted average
                # where weights are given by posterior probability associated with
                # Gaussian and label.  
                mixGaussEst['mean'][:,cGauss][label] =  0
                # loop through each data point
                for i in range(nData):
                    mixGaussEst['mean'][:,cGauss, label] += postHidden[cGauss,label,i] * data[label][:,i]
                
                mixGaussEst['mean'][:,cGauss, label] = mixGaussEst['mean'][:,cGauss, label] / np.sum(postHidden[cGauss, label, :])
                
                # Update covariance parameter based on weighted average of
                # square distance from update mean, where weights are given by
                # posterior probability associated with Gaussian and label
                mixGaussEst['cov'][:, :, cGauss, label] = 0 

                for i in range(nData):
                    sub = (data[label][:,i] - mixGaussEst['mean'][:,cGauss, label]).reshape(-1,1)
                    mixGaussEst['cov'][:, :, cGauss, label] += postHidden[cGauss, label, i] * sub @ sub.T
                
                mixGaussEst['cov'][:, :, cGauss, label] = mixGaussEst['cov'][:, :, cGauss, label] / np.sum(postHidden[cGauss, label, :])

        # calculate current likelihood
        label = 1

        logLikeApple = getMixGaussLogLike(data[label], label, mixGaussEst)
        print('Log Likelihood Apple Iter 0 : {:4.3f}\n'.format(logLikeApple))

        label = 0

        logLikeNonApple = getMixGaussLogLike(data[label], label, mixGaussEst)
        print('Log Likelihood Non-Apple Iter 0 : {:4.3f}\n'.format(logLikeNonApple))


    return mixGaussEst

def getPosterior(im, prior_apple, prior_non_apple):
    """
    Calculating the posterior probability that each pixle is an apple pixel.
    
    Keyword arguments:
    im -- matrix containing pixels of our image
    prior_apple -- scalar representing the prior probability on each pixel being an apple pixel
    prior_non_apple -- scalar representing the prior probability on each pixel not being an apple pixel
    
    Returns: 
    posterior_apple -- matrix represneting the posterior probability that each pixe; is an apple
    
    """
    # run through each pixel in the test images and classify them as being apple or non-apple
    imY, imX, imZ = im.shape
    posterior_apple = np.zeros([imY,imX])

    for cY in range(imY):
        print('Processing Row ',cY,'\n')
        for cX in range(imX): 
            #extract this pixel's data
            thisPixelData = np.double(im[cY,cX,:])
            thisPixelData = thisPixelData[:, np.newaxis]
            
            #calculate likelihood of this data given apple model
            like_apple = getMixGaussLike(thisPixelData,1,mixGaussEst)
            #calculate likelihood of this data given non apple model
            like_non_apple = getMixGaussLike(thisPixelData,0,mixGaussEst)

            # calculate posterior probability from likelihoods and 
            # priors using BAYES rule.
            posterior_apple[cY,cX]= like_apple * prior_apple / (like_apple * prior_apple + like_non_apple * prior_non_apple) # marginalization over the skin - summing over the likelihood prior for the two different conditions 

    return(posterior_apple)


def getROC(prob, mask, thresh):
    """
    Calculate the true positive rate and false positive rate for plotting an ROC curve.
               
    
    Keyword arguments:
    prob -- matrix containing the predicted probability to classify each pixe;
    mask -- scalar representing the number of gaussians to use in the MoG model.
    tresh -- matrix containing pixels for the ground truth mask 

    
    Returns: 
    tpr -- list containing true positive rates for each threshold value
    fpr -- list containing false positive rates for each threshold value 
    
    """

    # for each threshold value, compare the treshold to our predicted probability to classify each pixel 
    # then check against our mask to see if it's a true positive, false positive, true negative or false negative 
    imY, imX  = prob.shape

    # initialize containers for true positive rate and false positive rate points to plot in our roc plot
    tpr = []
    fpr = []

    for t in thresh:
        print('Processing threshold ', t)
        # initialize rates
        tp = 0
        tn = 0
        fp = 0
        fn = 0

        for cY in range(imY):
            for cX in range(imX): 

                # count true positives, negatives and false positives,  negatives 
                if prob[cY, cX] >= t:
                    if mask[cY, cX] == 1:
                        tp += 1
                    else:
                        fp += 1
                else:
                    if mask[cY, cX] == 0:
                        tn += 1
                    else:
                        fn += 1

        # calculate true positive rate and false positive rate for this threshold 
        tpr.append(tp / (tp + fn))     
        fpr.append(fp / (fp + tn))

    return(tpr, fpr)

In [None]:
#find the MoG estimate
data = [rgb_non_apple, rgb_apple]
mixGaussEst = fitMixGauss(data,2);

# load the test  images
files = glob.glob("testApples/*.jpg")
color_images = []
for f in files:
    im = plt.imread(f) / 255
    h,w,d = im.shape
    color_images.append(im) 

# assuming we have no prior knowledge whether an apple is present or not
prior_apple = 0.5
prior_non_apple = 0.5

# testing with only two images to save time 
posterior_apple = getPosterior(color_images[0], prior_apple, prior_non_apple)
posterior_apple_2 = getPosterior(color_images[2], prior_apple, prior_non_apple)

In [None]:
# set up plots.
f, (ax1, ax2) = plt.subplots(1, 2)
#show the image
ax1.imshow(color_images[2])
ax1.set_title('Image')
#show our prediction!
ax2.imshow(posterior_apple_2)
ax2.set_title('Result')
#plt.show()

In [None]:
# set up plots.
f, (ax1, ax2) = plt.subplots(1, 2)
#show the image
ax1.imshow(color_images[0])
ax1.set_title('Image')
#show our prediction!
ax2.imshow(posterior_apple)
ax2.set_title('Result')

plt.show()

In [None]:
# load the mask 
f = glob.glob("testApples/*.png")
mask = plt.imread(f[0])
mask = mask[:,:,1] 

# set thresholds
thresh = np.arange(0.0, 1.2, 0.2)

# calculate true positive rate and false positive rate 
tpr, fpr = getROC(posterior_apple, mask, thresh)


In [None]:
# plot the roc curve
plt.scatter(tpr, fpr)
plt.title('ROC Curve')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.xlim(0,1.1)
plt.ylim(0,1.1)