### Import the libraries

In [1]:
# general
import os
import random
import numpy as np
import pandas as pd
from numpy.lib.npyio import save
import math
import csv
import time

# visualization
import matplotlib.pyplot as plt
from  matplotlib.widgets import Button
import matplotlib
import seaborn as sns
%matplotlib

# for images handling & manipulation

from skimage.io import imread
from skimage import measure

from scipy.ndimage import gaussian_filter
from scipy.ndimage import laplace

from PIL import Image, ImageEnhance

Using matplotlib backend: Qt5Agg


In [2]:
# setting the directories
DATASET_DIR = "dataset/mias/all-mias/"
OUTPUT_DIR = "output/"
TEST_RESULT_DIR = "test_results/"

### Class to handle the button clicks

In [3]:
class TestEngine:
    
    # data to write into csv
    org_img_name = ''
    timestep = 7 # time step
    iter_inner = 10
    iter_outer = 20
    lmda = 5 # coefficient of the weighted length term L(phi)
    alfa = -3 # coefficient of the weighted area term A(phi)
    epsilon = 1.2 # parameter that specifies the width of the DiracDelta function
    sigma = 0.8 # scale parameter in Gaussian kernel
    
    def __init__(self, username):
        self.username = username
        
        self.file = open(TEST_RESULT_DIR+username+'.csv', 'w', newline='')
        # create the csv writer
        self.writer = csv.writer(self.file)
        
        header = ['org_img_name', 
                  'timestep', 
                  'iter_inner', 
                  'iter_outer', 
                  'lmda', 
                  'alfa', 
                  'epsilon', 
                  'sigma', 
                  'username', 
                  'result']
        self.writer.writerow(header)
        
    def update_data(self, org_img_name, timestep, iter_inner, iter_outer, lmda, alfa, epsilon, sigma):
        self.username = username
        self.org_img_name = org_img_name 
        self.timestep = timestep
        self.iter_inner = iter_inner
        self.iter_outer = iter_outer
        self.lmda = lmda
        self.alfa = alfa
        self.epsilon = epsilon
        self.sigma =  sigma
        
    def update_img_name(self, img_name):
        self.org_img_name = img_name

    def terminate(self):
        # close file
        self.file.close()
        
    def success(self, event):
        # write into csv with file name, user name & result
        row = [
            self.org_img_name,
            self.timestep,
            self.iter_inner,
            self.iter_outer,
            self.lmda,
            self.alfa,
            self.epsilon,
            self.sigma,
            self.username,
            1
        ]
        self.writer.writerow(row)
        
    def anomaly(self, event):
        # write into csv with file name, user name & result
        row = [
            self.org_img_name,
            self.timestep,
            self.iter_inner,
            self.iter_outer,
            self.lmda,
            self.alfa,
            self.epsilon,
            self.sigma,
            self.username,
            0
        ]
        self.writer.writerow(row)

    def fail(self, event):
        # write into csv with file name, user name & result
        row = [
            self.org_img_name,
            self.timestep,
            self.iter_inner,
            self.iter_outer,
            self.lmda,
            self.alfa,
            self.epsilon,
            self.sigma,
            self.username,
            -1
        ]
        self.writer.writerow(row)

### Prep for the testing

In [4]:
# list of test images
images = [str(x).zfill(3) for x in range(1,42)] # if you want to record 3 images, set end=5 (i.e. +2)

# list of parameters to test
# outer_iter_vals = [30, 35, 45]
# sigma_vals = [0.4, 0.7, 1.0]
# lmda_vals = [4.5, 5.5, 6.5]
# alfa_vals = [-2.5, -3.5, -4.5]
# epsilon_vals = [1.0, 1.2, 1.4]

outer_iter_vals = [45]
sigma_vals = [0.4]

In [5]:
# input user name
username = ''
while username.lower() not in ['james', 'usama', 'yasir', 'nitin']:
    username = input("Enter your name for the record (james, yasir, usama, nitin): ")

# to handle button clicks
callback = TestEngine(username)

# plot config
fig, ax = plt.subplots(nrows=1, ncols=2)
plt.subplots_adjust(bottom=0.3)

# display the feedback buttons
axButtonSuccess = plt.axes([0.15, 0.1, 0.2, 0.1]) # left, bottom, width, height
btn1 = Button(ax=axButtonSuccess,
              label='Acceptable',
              hovercolor='lightgreen')
btn1.on_clicked(callback.success)

axButtonAnomaly = plt.axes([0.4, 0.1, 0.2, 0.1]) # left, bottom, width, height
btn2 = Button(ax=axButtonAnomaly,
              label='Anomaly',
              hovercolor='beige')
btn2.on_clicked(callback.anomaly)

axButtonFail = plt.axes([0.65, 0.1, 0.2, 0.1]) # left, bottom, width, height
btn3 = Button(ax=axButtonFail,
              label='Unacceptable',
              hovercolor='lightcoral')
btn3.on_clicked(callback.fail)

Enter your name for the record (james, yasir, usama, nitin): yasir


0

### Algorithm from Other File

In [6]:
def left_align(img):
    """
    Determines whether the breast is aligned to the right or left side of the image
    by measuring the mean gray level of either half. Flips the image to the left if it
    is right-aligned.
    
    Parameters:
        img: The numpy array representing the image.
    
    Assumptions:
        - The half of the image on which the majority of the breast region lies has a higher mean than the othe half
        - The input image is LCC view
    
    Returns:
        numpy array with the left aligned image.
    """
    pixels = np.asarray(img)
    if np.mean(pixels[0:256, 0:128]) < np.mean(pixels[0:256, 128:256]):
        return pixels[:, ::-1]
    return pixels 

def perform_contrast(img):
    """
    Adjusts the contrast of the given image by a specific factor.
    
    Parameters:
        img: PIL image to be adjusted
    Assumptions:
        None
    Returns:
        PIL image with the contrast adjusted.
    """
    enhancer = ImageEnhance.Contrast(img)
    factor = 1#increase contrast
    img = enhancer.enhance(factor)
    return img 

def remove_bar(img):
    """
    Finds the width of the black bar on the left of the image by checking iteratively until a pixel whose value is greater than
    the mean of the grey values of the image is found. Then crops the image to remove the bar.
    
    Parameters:
        img: numpy array of the image to be adjusted
    Assumptions:
        - There is no blank space at the top of the image (not even 1px)
        - The image is left-aligned
        - The black bar is darker than the mean grey level of the image, and the pectoral muscle region is brighter.
    Returns:
        Numpy array of the cropped image
    """
    width = 0 
    while img[1, width] <= np.mean(img):
        width += 1
    return img[:, width:256]

def preprocess_image(img):
    """
    Combines all of the above preprocessing steps together on a given image.
    
    Parameters:
        img: PIL image to be adjusted
    Assumptions:
        - There is no blank space at the top of the image (not even 1px).
        - We want to optimize for speed over precision.
        - The input image is LCC view.
        - The half of the image on which the majority of the breast region lies has a higher mean than the othe half.
        
    Returns:
        PIL image ready for the level set algorithm.
    """
    img = img.resize((256,256))
    img = left_align(img)
    img = remove_bar(img)
    img = np.interp(img, [np.min(img), np.max(img)], [0, 255])
    return img

def initialise_params(preprocessed_img):
    """
    Return a dictionary containing all the parameters needed for the algorithm.
    :param preprocessed_img: Input image that has been preprocessed. It will be passed as a parameter 
    to the level set algorithm
    """
    # initialize LSF as binary step function
    c0 = 2
    initial_lsf = c0 * np.ones(preprocessed_img.shape)
    # generate the initial region R0 as two rectangles
    initial_lsf[0:10, 0:10] = -c0 # top left corner

    # parameters
    if username.lower() == 'james':
        # default
        return {
            'img': preprocessed_img,
            'initial_lsf': initial_lsf,
            'timestep': 7,  # time step
            'iter_inner': 10,
            'iter_outer': 30, # 
            'lmda': 5.5,  # coefficient of the weighted length term L(phi)
            'alfa': -3.5,  # coefficient of the weighted area term A(phi)
            'epsilon': 1.2,  # parameter that specifies the width of the DiracDelta function -> 1.4
            'sigma': 0.6,  # scale parameter in Gaussian kernel TEST: [0.4, 0.55, 0.7]
            'potential_function': DOUBLE_WELL,
        }
    elif username.lower() == 'usama':
        # high number of iteration combo 
        return {
            'img': preprocessed_img,
            'initial_lsf': initial_lsf,
            'timestep': 7,  # time step
            'iter_inner': 15,
            'iter_outer': 40, # TEST: [30, 35, 45]
            'lmda': 5.5,  # coefficient of the weighted length term L(phi)
            'alfa': -3.5,  # coefficient of the weighted area term A(phi)
            'epsilon': 1.2,  # parameter that specifies the width of the DiracDelta function -> 1.4
            'sigma': 0.6,  # scale parameter in Gaussian kernel TEST: [0.4, 0.55, 0.7]
            'potential_function': DOUBLE_WELL,
        }
    elif username.lower() == 'yasir':
        # increased lmda, alfa and epsilon
        return {
            'img': preprocessed_img,
            'initial_lsf': initial_lsf,
            'timestep': 7,  # time step
            'iter_inner': 15,
            'iter_outer': 40, # TEST: [30, 35, 45]
            'lmda': 6.5,  # coefficient of the weighted length term L(phi)
            'alfa': -4.5,  # coefficient of the weighted area term A(phi)
            'epsilon': 1.5,  # parameter that specifies the width of the DiracDelta function 
            'sigma': 0.6,  # scale parameter in Gaussian kernel TEST: 
            'potential_function': DOUBLE_WELL,
        }
    elif username.lower() == 'nitin':
        # increased sigma 
        return {
            'img': preprocessed_img,
            'initial_lsf': initial_lsf,
            'timestep': 7,  # time step
            'iter_inner': 15,
            'iter_outer': 40, # TEST: [30, 35, 45]
            'lmda': 5.5,  # coefficient of the weighted length term L(phi)
            'alfa': -3.5,  # coefficient of the weighted area term A(phi)
            'epsilon': 1.2,  # parameter that specifies the width of the DiracDelta function -> 1.4
            'sigma': 0.85,  # scale parameter in Gaussian kernel TEST: [0.4, 0.55, 0.7]
            'potential_function': DOUBLE_WELL,
        }
    
# use single well potential p1(s)=0.5*(s-1)^2, which is good for region-based model
DOUBLE_WELL = 'double-well'

# use double-well potential in Eq. (16), which is good for both edge and region based models
SINGLE_WELL = 'single-well'

def find_lsf(img: np.ndarray, initial_lsf: np.ndarray, timestep=1, iter_inner=10, iter_outer=30, lmda=5,
             alfa=-3, epsilon=1.5, sigma=0.8, potential_function=DOUBLE_WELL):
    """
    :param img: Input image as a grey scale uint8 array (0-255)
    :param initial_lsf: Array as same size as the img that contains the seed points for the LSF.
    :param timestep: Time Step
    :param iter_inner: How many iterations to run drlse before showing the output
    :param iter_outer: How many iterations to run the iter_inner
    :param lmda: coefficient of the weighted length term L(phi)
    :param alfa: coefficient of the weighted area term A(phi)
    :param epsilon: parameter that specifies the width of the DiracDelta function
    :param sigma: scale parameter in Gaussian kernal
    :param potential_function: The potential function to use in drlse algorithm. Should be SINGLE_WELL or DOUBLE_WELL
    """
    if len(img.shape) != 2:
        raise Exception("Input image should be a gray scale one")

    if len(img.shape) != len(initial_lsf.shape):
        raise Exception("Input image and the initial LSF should be in the same shape")

    if np.max(img) <= 1:
        raise Exception("Please make sure the image data is in the range [0, 255]")

    # parameters
    mu = 0.2 / timestep  # coefficient of the distance regularization term R(phi)

    img_smooth = np.array(img, dtype='float32')
#     img_smooth = gaussian_filter(img, sigma)  # smooth image by Gaussian convolution
    [Iy, Ix] = np.gradient(img_smooth)
    f = np.square(Ix) + np.square(Iy)
    g = 1 / (1 + f)  # edge indicator function.

    # initialize LSF as binary step function
    phi = initial_lsf.copy()

#     show_fig1(phi)
#     show_fig2(phi, img)
#     print('show fig 2 first time')

    if potential_function != SINGLE_WELL:
        potential_function = DOUBLE_WELL  # default choice of potential function

    # start level set evolution
    for n in range(iter_outer):
        phi = drlse_edge(phi, g, lmda, mu, alfa, epsilon, timestep, iter_inner, potential_function)
#         print('show fig 2 for %i time' % n)
#         draw_all(phi, img)

    # refine the zero level contour by further level set evolution with alfa=0
    alfa = 0
    iter_refine = 10
    phi = drlse_edge(phi, g, lmda, mu, alfa, epsilon, timestep, iter_refine, potential_function)
    return phi

def drlse_edge(phi_0, g, lmda, mu, alfa, epsilon, timestep, iters, potential_function):  # Updated Level Set Function
    """

    :param phi_0: level set function to be updated by level set evolution
    :param g: edge indicator function
    :param lmda: weight of the weighted length term
    :param mu: weight of distance regularization term
    :param alfa: weight of the weighted area term
    :param epsilon: width of Dirac Delta function
    :param timestep: time step
    :param iters: number of iterations
    :param potential_function: choice of potential function in distance regularization term.
%              As mentioned in the above paper, two choices are provided: potentialFunction='single-well' or
%              potentialFunction='double-well', which correspond to the potential functions p1 (single-well)
%              and p2 (double-well), respectively.
    """
    phi = phi_0.copy()
    [vy, vx] = np.gradient(g)
    for k in range(iters):
        phi = neumann_bound_cond(phi)
        [phi_y, phi_x] = np.gradient(phi)
        s = np.sqrt(np.square(phi_x) + np.square(phi_y))
        delta = 1e-10
        n_x = phi_x / (s + delta)  # add a small positive number to avoid division by zero
        n_y = phi_y / (s + delta)
        curvature = div(n_x, n_y)

        if potential_function == SINGLE_WELL:
            dist_reg_term = laplace(phi, mode='nearest') - curvature  # compute distance regularization term in equation (13) with the single-well potential p1.
        elif potential_function == DOUBLE_WELL:
            dist_reg_term = dist_reg_p2(phi)  # compute the distance regularization term in eqaution (13) with the double-well potential p2.
        else:
            raise Exception('Error: Wrong choice of potential function. Please input the string "single-well" or "double-well" in the drlse_edge function.')
        dirac_phi = dirac(phi, epsilon)
        area_term = dirac_phi * g  # balloon/pressure force
        edge_term = dirac_phi * (vx * n_x + vy * n_y) + dirac_phi * g * curvature
        phi += timestep * (mu * dist_reg_term + lmda * edge_term + alfa * area_term)
    return phi


def dist_reg_p2(phi):
    """
        compute the distance regularization term with the double-well potential p2 in equation (16)
    """
    [phi_y, phi_x] = np.gradient(phi)
    s = np.sqrt(np.square(phi_x) + np.square(phi_y))
    a = (s >= 0) & (s <= 1)
    b = (s > 1)
    ps = a * np.sin(2 * np.pi * s) / (2 * np.pi) + b * (s - 1)  # compute first order derivative of the double-well potential p2 in equation (16)
    dps = ((ps != 0) * ps + (ps == 0)) / ((s != 0) * s + (s == 0))  # compute d_p(s)=p'(s)/s in equation (10). As s-->0, we have d_p(s)-->1 according to equation (18)
    return div(dps * phi_x - phi_x, dps * phi_y - phi_y) + laplace(phi, mode='nearest')


def div(nx: np.ndarray, ny: np.ndarray) -> np.ndarray:
    [_, nxx] = np.gradient(nx)
    [nyy, _] = np.gradient(ny)
    return nxx + nyy


def dirac(x: np.ndarray, sigma: np.ndarray) -> np.ndarray:
    f = (1 / 2 / sigma) * (1 + np.cos(np.pi * x / sigma))
    b = (x <= sigma) & (x >= -sigma)
    return f * b


def neumann_bound_cond(f):
    """
        Make a function satisfy Neumann boundary condition
    """
    g = f.copy()

    g[np.ix_([0, -1], [0, -1])] = g[np.ix_([2, -3], [2, -3])]
    g[np.ix_([0, -1]), 1:-1] = g[np.ix_([2, -3]), 1:-1]
    g[1:-1, np.ix_([0, -1])] = g[1:-1, np.ix_([2, -3])]
    return g

def save_segmented_image(phi: np.ndarray, img: np.ndarray, file_name):
    """
    :params phi: finalised level set function (provide the boundary drawn)
    :params img: img to be segmented
    :params filename: file name used to save the image
    
    Assign the pixel within the boundary to be black. Save the output image. Return the segmented and original image for 
    visualisation purpose.
    """
    original_img = img.copy()
    contours = measure.find_contours(phi, 0)
    max_of_y = int(np.round(max(contours[0], key=lambda x: x[0]))[0])
    # contains the rightmost pixel at that particular y 
    record_array = [0] * (max_of_y + 1)
    for contour in contours[0]:
        x, y = int(contour[1]), round(contour[0]) 
        if record_array[y] < x:
            record_array[y] = x
    for i in range(len(record_array)):
        for j in range(record_array[i]):
            img[i,j] = 0
    im = Image.fromarray(img)
    im = im.convert('L')
    org_im = Image.fromarray(original_img)
    org_im = org_im.convert('L')
    im.save("output/"+username+"_segmented_" + file_name)
    org_im.save("output/"+username+"_original_" + file_name)
    return img, original_img

### The actual testing

In [None]:
# loop through the parameter combinations
print('Step 1: Running segmentation on all images. Please wait as this will take some time..')

# loop through the images
for image_num in images:
    # open image
    filename = "mdb" + image_num + ".pgm"
    image_path = DATASET_DIR + filename
    img = Image.open(image_path)

    # preprocessing
    contrasted_img = perform_contrast(img)
    processed_img = preprocess_image(contrasted_img)

    # perform the segmentation
    params = initialise_params(processed_img)

    # store the params
    the_img = params.get('img')
    initial_lsf = params.get('initial_lsf')
    timestep = params.get('timestep')
    iter_inner = params.get('iter_inner')
    iter_outer = params.get('iter_outer')
    lmda = params.get('lmda')
    alfa = params.get('alfa')
    epsilon = params.get('epsilon')
    sigma = params.get('sigma')
    potential_function = params.get('potential_function')

    # update the callback
    callback.update_data(
        org_img_name=filename,
        timestep=timestep, 
        iter_inner=iter_inner, 
        iter_outer=iter_outer, 
        lmda=lmda, 
        alfa=alfa, 
        epsilon=epsilon, 
        sigma=sigma
    )

    phi = find_lsf(img=the_img,
                   initial_lsf=initial_lsf, 
                   timestep=timestep, 
                   iter_inner=iter_inner, 
                   iter_outer=iter_outer, 
                   lmda=lmda, 
                   alfa=alfa, 
                   epsilon=epsilon, 
                   sigma=sigma, 
                   potential_function=potential_function)

    img, original_img = save_segmented_image(phi, processed_img, filename)
            
print('Step 2: Thanks for waiting bossku. Please classify the images carefully now.')
for image_num in images:
    # open image
    name = "mdb" + image_num + ".pgm"
    original_filename = username + "_original_" + name
    segmented_filename = username + "_segmented_" + name
    original_path = OUTPUT_DIR + original_filename
    segmented_path = OUTPUT_DIR + segmented_filename
    
    callback.update_img_name(name)
    
    original_img = Image.open(original_path)
    segmented_img = Image.open(segmented_path)
    
    ax[0].imshow(original_img)
    ax[1].imshow(segmented_img)

    # receive feedback
    plt.waitforbuttonpress()
            
print('Step 3: All done. You may push your branch and rehydrate now!')

Step 1: Running segmentation on all images. Please wait as this will take some time..
Step 2: Thanks for waiting bossku. Please classify the images carefully now.


### Analysis

In [None]:
plt.close()
callback.terminate()

df = pd.read_csv(TEST_RESULT_DIR+username+'.csv')

print('Tester:', username, '\n')

print('------ PARAMS -------')
print('timestep:', df.iloc[0].timestep)
print('iter_inner:', df.iloc[0].iter_inner)
print('iter_outer:', df.iloc[0].iter_outer)
print('lmda:', df.iloc[0].lmda)
print('alfa:', df.iloc[0].alfa)
print('epsilon:', df.iloc[0].epsilon)
print('sigma:', df.iloc[0].sigma)
print('---------------------\n')

row_count = df.shape[0]
success_count = df[df.result == 1].shape[0]
anomaly_count = df[df.result == 0].shape[0]
fail_count = df[df.result == -1].shape[0]

print('Total images tested: ', row_count)
print('Acceptable: ', success_count)
print('Unacceptable: ', fail_count)
print('Anomalies: ', anomaly_count)

accuracy_with_anomalies = (success_count / row_count) * 100
accuracy_without_anomalies = (success_count / (row_count-anomaly_count)) * 100

print()
print('Accuracy (with anomalies):', accuracy_with_anomalies, '%')
print('Accuracy (without anomalies):', accuracy_without_anomalies, '%')