In [None]:
# Result Format
# {
#     Classifier1: {
#         Error_rate1: [
#             accuracy_iter1,
#             accuracy_iter2,
#             ...
#         ],
#         Error_rate2:[
#             ...
#         ]
#         ...
#     },
#     Classifier2: {
#     ...      
#     },
#     ...
# }

In [2]:
# GaussianModel
import os
import numpy as np
import cv2
import warnings
import time
import numpy as np
import random as rd
import matplotlib.pyplot as plt
from scipy import stats
from scipy.stats import multivariate_normal, mode
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.mixture import GaussianMixture
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report, accuracy_score

class GaussianMixtureWithLabelingError:
    def __init__(self, n_components=2, n_sub_mixtures=1, max_iters=10, tol=1e-3):
        self.n_components = n_components
        self.n_sub_mixtures = n_sub_mixtures
        self.max_iters = max_iters
        self.tol = tol

    def fit(self, x, init_label):
        self.mu_vectors = []
        self.cov_matrices = []
        self.probs = []
        self.alphas = []
        num_samples = init_label.shape[0]
        for c in range(self.n_components):
            idx = np.nonzero(init_label == c)[0]
            xc = x[idx,:]
            gmm = GaussianMixture(n_components=self.n_sub_mixtures, max_iter=self.max_iters)
            gmm.fit(xc)
            muv = gmm.means_
            covs = gmm.covariances_
            pk = gmm.weights_
            prob = len(idx) / num_samples
            pk = pk*prob
            for m in range(self.n_sub_mixtures):
                self.mu_vectors.append(muv[m, :])
                self.cov_matrices.append(covs[m, :, :])
                self.probs.append(pk[m])
            alpha = np.ones((self.n_components)) / self.n_components
            self.alphas.append(alpha)


        llr_old = -1e100
        for k in range(self.max_iters):
            probx = np.zeros((self.n_components*self.n_sub_mixtures, num_samples))
            for c in range(self.n_components):
                for m in range(self.n_sub_mixtures):
                    factor = 0
                    for c2 in range(self.n_components):
                        factor += (self.alphas[c][c2] * (init_label == c2))
                    idcm = c*self.n_sub_mixtures + m
                    probx[idcm, :] = multivariate_normal.pdf(x, self.mu_vectors[idcm],self.cov_matrices[idcm]) * self.probs[idcm]*factor

            llr_new = np.log(probx.sum(0)).sum()
            probx_sum = probx.sum(0)
            probx /= probx_sum
            probs_c =[]
            for c in range(self.n_components):
                prob_c = 0
                for m in range(self.n_sub_mixtures):
                    idcm = c*self.n_sub_mixtures + m
                    self.probs[idcm] = probx[idcm, :].sum() / probx.sum()
                    self.mu_vectors[idcm] = (x * (probx[idcm, :]).reshape(-1, 1)).sum(0) / (probx[idcm, :]).sum()
                    diff_vec = x - self.mu_vectors[idcm]
                    diff_vec2 = diff_vec * (probx[idcm, :]).reshape(-1, 1)
                    self.cov_matrices[idcm] = np.dot(diff_vec.T, diff_vec2) / (probx[idcm, :]).sum()
                    prob_c += probx[idcm, :]
                probs_c.append(prob_c)
            for c2 in range(self.n_components):
                for c in range(self.n_components):
                    self.alphas[c][c2] = (probs_c[c] * (init_label == c2)).sum() / (probs_c[c].sum())
            if (llr_new - llr_old) / np.abs(llr_new) < self.tol:
                break
            else:
                llr_old = llr_new
        else:
            print("Maximum number of iteration reaches")
            
    def predict(self, x: np.array): # predict with no initial labels
        num_samples = x.shape[0]
        probx = np.zeros((self.n_components,  num_samples))
        idcm = 0
        for c in range(self.n_components): # for each cluster
            for m in range(self.n_sub_mixtures): # for all sub class
                prob_c_m = multivariate_normal.pdf(x, self.mu_vectors[idcm], self.cov_matrices[idcm])
                probx[c,:] += prob_c_m*self.probs[idcm]
                idcm += 1
        probx_max = probx.max(0)
        label = 0
        for c in range(self.n_components):
            label += c*(probx[c,:]==probx_max)
        return label
    
    def compProb(self, x):
        num_samples = x.shape[0]
        probx = np.zeros((self.n_components, num_samples))
        # probc = np.zeros((self.n_components, num_samples))
        idcm = 0
        for c in range(self.n_components):
            for m in range(self.n_sub_mixtures):
                probx[c, :] += multivariate_normal.pdf(x, self.mu_vectors[idcm], self.cov_matrices[idcm]) * self.probs[idcm]
                #probx[c,:] *= self.probs[idcm]
                idcm += 1
        sum_prob = np.sum(probx,axis=0, keepdims=True)
        probx /= sum_prob
        return probx # classs x pixel

class GaussianMixtureWithNoLabelingError:
    def __init__(self,
                n_components:int=2,
                n_sub_mixtures:int=1,
                max_iters:int=10,
                tol:int=1e-3,
                min_portion:int=0.01):
        """
        n_components: the number of clusters
        n_sub_mixtures: the number of sub mixtures within a cluster
        max_iters: maximum number of iteration for model fitting
        tol: error tolerance
        min_portion: portion of sub_mixtures within cluser
        """
        self.n_components = n_components
        if isinstance(n_sub_mixtures, int):
            self.n_sub_mixtures = np.ones((n_components),'int') * n_sub_mixtures
            self.num_mixtures = n_components * n_sub_mixtures

        elif isinstance(n_sub_mixtures, list) or isinstance(n_sub_mixtures, np.ndarray):
            self.n_sub_mixtures = n_sub_mixtures
            self.num_mixtures = 0
            for c in range(n_components):
                self.num_mixtures += self.n_sub_mixtures[c]
        else:
            raise TypeError()

        self.max_iters = max_iters
        self.tol = tol
        self.min_portion = min_portion
        self.fitted = False

    def fit(self, x:np.ndarray, labels: np.ndarray):
        """
        Fit models
        x: MxN 2-D arrray samples where M is the number of samples and N is the number of bands
        labels: M 1-D array of the class labels
        """
        self.mu_vectors = []
        self.cov_matrices = []
        self.probs = []
        num_samples = labels.shape[0]
        t1 = time.time()
        for c in range(self.n_components):
            idx = np.where(labels == c)[0]
            xc = x[idx,:]  # sample with label c
            gmm = GaussianMixture(n_components=self.n_sub_mixtures[c],
                                    tol=self.tol,
                                    max_iter=self.max_iters) # gmm on lable c
            gmm.fit(xc)
            muv = gmm.means_
            covs = gmm.covariances_
            pk = gmm.weights_
            prob = len(idx) / num_samples
            pk = pk*prob
            for m in range(self.n_sub_mixtures[c]):
                self.mu_vectors.append(muv[m, :])
                self.cov_matrices.append(covs[m, :, :])
                self.probs.append(pk[m])




    def predict(self, x: np.array): # predict with no initial labels
        """
        assign each row of x to a single cluster
        x: MxN numpy array of samples
        return assignment
        """
        num_samples = x.shape[0]
        probx = np.zeros((self.n_components,  num_samples))
        idcm = 0
        for c in range(self.n_components): # for each cluster
            for m in range(self.n_sub_mixtures[c]): # for all sub class
                prob_c_m = multivariate_normal.pdf(x,self.mu_vectors[idcm],self.cov_matrices[idcm])
                probx[c,:] +=  prob_c_m*self.probs[idcm]
                idcm += 1
        probx_max = probx.max(0)
        label = 0
        for c in range(self.n_components):
            label += c*(probx[c,:]==probx_max)
        return label

    def compProb(self, x):
        num_samples = x.shape[0]
        probx = np.zeros((self.n_components, num_samples))
        # probc = np.zeros((self.n_components, num_samples))
        idcm = 0
        for c in range(self.n_components):
            for m in range(self.n_sub_mixtures[c]):
                probx[c, :] += multivariate_normal.pdf(x, self.mu_vectors[idcm], self.cov_matrices[idcm]) * self.probs[idcm] 
                #probx[c,:] *= self.probs[idcm]
                idcm += 1
        sum_prob = np.sum(probx,axis=0, keepdims=True)
        probx /= sum_prob
        return probx # classs x pixel

    def saveModel(self, target_files):
        if not self.fitted:
            warnings.warn("Unable to save the model. The model has not been fitted. Run .fit()")
        else:
            model_parameters = [self.n_components, self.n_sub_mixtures, self.num_mixtures, self.max_iters,
                                self.tol, self.min_portion, self.fitted,
                                self.probs, self.mu_vectors, self.cov_matrices]
            np.save(target_files, model_parameters, allow_pickle=True)

    def loadMode(self, model_file):
        self.n_components, self.n_sub_mixtures, self.num_mixtures, self.max_iters,\
        self.tol, self.min_portion, self.fitted,\
        self.probs, self.mu_vectors, self.cov_matrices = np.load(model_file, allow_pickle=True)

In [3]:
#Function

def get_directories(path: str) -> list:
    # Get a list of directory names in the specified path
    directories = [entry.name for entry in os.scandir(path) if entry.is_dir()]
    return directories

def get_files(path: str) -> list:
    # Get a list of file names in the specified path
    files = os.listdir(path)
    return files


def get_pixels(image: list, n: int=20, lower_thr: int=0, upper_thr: int=255) -> list:
    # Get a list of pixel from image with size n * n (didn't check if list have n * n size)
    height, width, _ = image.shape
    mid_height, mid_width = int(height/2), int(width/2)
    pixels = []
    for i in range(max(0, mid_height - n), min(len(image), mid_height + n)):
        for j in range(max(0, mid_width - n), min(len(image[0]), mid_width + n)):
            pixel = np.array(image[i][j], dtype=int)
            mean = pixel.mean()
            if mean <= lower_thr or mean >= upper_thr:
                continue 
            pixels.append(pixel)
    return pixels
    
def get_ImgNameLabel(image_dir: str):
    # Get Img, Name and Label from path
    # path:
    #   class1:
    #       img1 ...
    #   ...
    
    all_class_name = get_directories(image_dir)
    all_img, all_name, label = [], [], []

    for i in range(len(all_class_name)):
        class_path = image_dir + "/" + all_class_name[i]
        all_file = get_files(class_path)
        for j, file in enumerate(all_file):
            image = np.array(cv2.imread(class_path + "/" + file))   
            all_img.append(image)
            all_name.append(file)
            label.append(i)
    return np.array(all_img, dtype=object), np.array(all_name), np.array(label)

def get_Rpixels(image: list, n: int=20, lower_thr: int=20, upper_thr: int=255) -> list:
    # Get a list of pixel from image with size n * n
    pixels = get_pixels(image, n, lower_thr, upper_thr)
    i = 1
    while (len(pixels) < n**2):
        pixels = get_pixels(image, n + i, lower_thr, upper_thr)
        i += 1
    return pixels[:n**2]

def get_Dataset_Pixel(x: np.ndarray, label: np.ndarray, require_n: int = 10, num_classes: int = 5) -> (np.ndarray, np.ndarray):
    # Get a dataset in pixel format
    new_x, lab = [], []
        
    for cls in range(num_classes):
        idx = np.nonzero(label == cls)
        class_images = x[idx]
        for img in class_images:
            s_img = get_Rpixels(img, require_n)
            if len(s_img) != require_n ** 2:
                continue
            new_x += s_img
            lab += [cls for _ in range(len(s_img))]

    return np.array(new_x), np.array(lab)

def get_Dataset_Img(x: np.ndarray, label: np.ndarray, require_n: int = 10, num_classes: int = 5) -> (np.ndarray, np.ndarray):
    # Get a dataset in Image format
    new_x, lab = [], []
    
    for cls in range(num_classes):
        idx = np.nonzero(label == cls)
        class_images = x[idx]
        
        for img in class_images:
            s_img = get_Rpixels(img, require_n, 20, 255)
            if len(s_img) != require_n ** 2:
                continue
            new_x.append(np.array(s_img).reshape(-1))
            lab.append(cls)
    
    return np.array(new_x), np.array(lab)

def train_test_splitV2(X: np.ndarray, label: np.ndarray, num_test: int):
    # Split a data and make sure that each class in train data have a same size
    train_X, test_X, train_label, test_label = [], [], [], []
    n_class = len(np.unique(label))
    
    for cls in range(n_class):
        cls_idx = np.nonzero(label == cls)
        cls_X = X[cls_idx]
        if len(cls_X) < num_test:
            return
        train_X += cls_X[:num_test].tolist()
        train_label += [cls] * num_test
        test_X += cls_X[num_test:].tolist()
        test_label += [cls] * (len(cls_X[num_test:]))
        
    return np.array(train_X, dtype=object), np.array(test_X, dtype=object), np.array(train_label), np.array(test_label)

def plot_result(result: dict):
    err_range = result["err_range"]
    gmm = np.array([np.mean(result["GMM"][str(err)]) for err in err_range])
    gwl = np.array([np.mean(result["GWL"][str(err)]) for err in err_range])
    rfP = np.array([np.mean(result["RFP"][str(err)]) for err in err_range])
    rfI = np.array([np.mean(result["RFI"][str(err)]) for err in err_range])
    plt.errorbar(err_range, gmm, [1.96 * np.std(result["GMM"][str(err)])/np.sqrt(len(test_img)) for err in err_range], color="blue", label="Nolabelingerror-soft")
    plt.errorbar(err_range, gwl, [1.96 * np.std(result["GWL"][str(err)])/np.sqrt(len(test_img)) for err in err_range], color="red", label="Withlabelingerror-soft")
    plt.errorbar(err_range, rfP, [1.96 * np.std(result["RFP"][str(err)])/np.sqrt(len(test_img)) for err in err_range], color="green", label="Nolabelingerror-hard")
    plt.errorbar(err_range, rfI, [1.96 * np.std(result["RFI"][str(err)])/np.sqrt(len(test_img)) for err in err_range], color="orange", label="Withlabelingerror-hard")
    plt.xlabel("Label Error rate (%)")
    plt.ylabel("Accuracy (%)")
    plt.legend()
    plt.show()

In [5]:
# Compare
def ErrorLabelTestByErrRate(err_rate: int, softvote: bool = True):
    # Run test with specific error rate 
    
    #Prepare Data and Array for Output
    pred_gmm, pred_gwl, pred_rfP, pred_rfI = [], [], [], []
    err_per_stone = (np.random.rand(len(train_label)) < err_rate/100).astype("int")
    rdd_per_stone = np.random.choice(range(1, len(np.unique(all_label))), size=len(train_img))
    train_label_err = train_label * (1 - err_per_stone) + ((train_label + rdd_per_stone) % len(np.unique(all_label))) * err_per_stone

    # Get a dataset
    X_trainP, lab_trainP = get_Dataset_Pixel(train_img, train_label_err, 25)
    X_trainI, lab_trainI = get_Dataset_Img(train_img, train_label_err, 25)

    # Create a Classifier
    gmm = GaussianMixtureWithNoLabelingError(n_components=5, n_sub_mixtures=2, max_iters=80)
    gwl = GaussianMixtureWithLabelingError(n_components=5, n_sub_mixtures=2, max_iters=80)
    rfP = RandomForestClassifier(criterion="entropy")
    rfI = RandomForestClassifier(criterion="entropy")
    
    # Fit the model
    gmm.fit(X_trainP, lab_trainP)
    gwl.fit(X_trainP, lab_trainP)
    rfP.fit(X_trainP, lab_trainP)
    rfI.fit(X_trainI, lab_trainI)
            
    # Predict the Test data
    for i in range(len(test_img)):
        pixel = np.array(get_Rpixels(test_img[i], 25))
        image = [pixel.reshape(-1)]
        
        if softvote:
            pred_gmm.append(int(np.argmax(np.log(gmm.compProb(pixel)).sum(1))))
            pred_gwl.append(int(np.argmax(np.log(gwl.compProb(pixel)).sum(1))))
            pred_rfP.append(int(np.argmax(np.log((rfP.predict_proba(pixel) + 1).sum(0)))))
        else:
            pred_gmm.append(int(stats.mode(gmm.predict(np.array(pixel))).mode))
            pred_gwl.append(int(stats.mode(gwl.predict(np.array(pixel))).mode))
            pred_rfP.append(int(stats.mode(rfP.predict(np.array(pixel))).mode))
            
        pred_rfI.append(int(np.argmax(np.log(rfI.predict_proba(image) + 1))))
    return pred_gmm, pred_gwl, pred_rfP, pred_rfI

def TESTER(n_loop: int = 5, softvote: bool = True, err_range: list = list(range(0, 50+1, 5))):
    classifiers = ["GMM", "GWL", "RFP", "RFI"]
    result = {classifier: {str(error_rate): [0 for _ in range(n_loop)] for error_rate in err_range} for classifier in classifiers}
    for i in range(n_loop):
        for err_rate in err_range:
            pred_gmm, pred_gwl, pred_rfP, pred_rfI = ErrorLabelTestByErrRate(err_rate, softvote)
            result["GMM"][str(err_rate)][i] = accuracy_score(test_label, pred_gmm)
            result["GWL"][str(err_rate)][i] = accuracy_score(test_label, pred_gwl)
            result["RFP"][str(err_rate)][i] = accuracy_score(test_label, pred_rfP)
            result["RFI"][str(err_rate)][i] = accuracy_score(test_label, pred_rfI)
    result["err_range"] = err_range
    return result

In [None]:
# NORMAL RUN
stone_folder = "./Dataset/"
all_img, _, all_label = get_ImgNameLabel(stone_folder)
n_class = len(np.unique(all_label))
train_img, test_img, train_label, test_label = train_test_splitV2(all_img, all_label, 50)


result = TESTER(softvote=True)

In [None]:
# PLOT
plot_result(result)

In [None]:
import json
with open("result_OldDS_Soft.json", "w") as f:
    json.dump(result, f, indent=4)

In [None]:
stone_folder = "OldDS"
all_img, all_name, all_label = get_ImgNameLabel(stone_folder)
n_class = len(np.unique(all_label))
train_img, test_img, train_label, test_label = train_test_splitV2(all_img, all_label, 50)

In [None]:
import json
with open('result_ODS_Soft.json', 'r') as f:
    result_soft = json.load(f)

with open('result_ODS_Hard.json', 'r') as f:
    result_hard = json.load(f)

In [None]:
err_range = list(range(0, 51, 5))

gmm = np.array([np.mean(result_soft["GMM"][str(err)]) for err in err_range])
gwl = np.array([np.mean(result_soft["GWL"][str(err)]) for err in err_range])
gmm1 = np.array([np.mean(result_hard["GMM"][str(err)]) for err in err_range])
gwl1 = np.array([np.mean(result_hard["GWL"][str(err)]) for err in err_range])

plt.errorbar(err_range, gmm, [1.96 * np.std(result_soft["GMM"][str(err)])/np.sqrt(len(test_img)) for err in err_range], color="blue", label="Nolabelingerror-soft")
plt.errorbar(err_range, gwl, [1.96 * np.std(result_soft["GWL"][str(err)])/np.sqrt(len(test_img)) for err in err_range], color="red", label="Withlabelingerror-soft")
plt.errorbar(err_range, gmm1, [1.96 * np.std(result_hard["RFP"][str(err)])/np.sqrt(len(test_img)) for err in err_range], color="green", label="Nolabelingerror-hard")
plt.errorbar(err_range, gwl1, [1.96 * np.std(result_hard["RFI"][str(err)])/np.sqrt(len(test_img)) for err in err_range], color="orange", label="Withlabelingerror-hard")

plt.xlabel("Label Error rate (%)")
plt.ylabel("Accuracy (%)")
plt.legend()
plt.show()