In [52]:
import cv2
import numpy as np
# from PIL import Image
from matplotlib import pyplot as plt
from sklearn.manifold import SpectralEmbedding
import math
from  sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve
from sklearn.multiclass import OneVsRestClassifier
from sklearn.preprocessing import label_binarize
from sklearn.svm import SVC



class Preprocessing:
    def __init__(self):
        pass

    # this method reads an image and preprocess it by rescaling it to 150 x 100 a applying CLAHE over the image
    def read_image(self):
        ear_pos = ['down_ear', 'front_ear', 'left_ear', 'up_ear']
        person_num = ['000', '001', '002', '003', '004','005','006','007','008','009','010']
        images = []
        for i in person_num:
            for j in ear_pos:
                #print(i+'_'+j+".jpg")
                old_img = cv2.imread("EarImages/"+i+'_'+j+".jpg", cv2.IMREAD_GRAYSCALE)
                img = old_img[100:620, 40:450]
                images.append(cv2.resize(cv2.imread(img, dsize=(100, 150), interpolation=cv2.INTER_NEAREST))

        processed_images = []
        for i in range(len(images)):
            gray = images[i]
            mean = cv2.mean(gray)[0]
            variance = np.var(gray)
            m_t = 100
            v_t = 100
            for i in range(150):
                for j in range(100):
                    beta = math.sqrt(v_t * math.pow(gray[i][j] - mean, 2) / variance)
                    if gray[i][j] > mean:
                        gray[i][j] = m_t + beta
                    else:
                        gray[i][j] = m_t - beta
            clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
            cl1 = clahe.apply(gray)
            processed_images.append(cl1)
        print(processed_images[0].shape)
        return processed_images

    # in this function we are creating sw of size 50 x 50 with step size 11 so total 50 SWs
    def sub_window_creation(self, images, kernels):
        gb_all_sw = []
        label = []
        sub_win_dict = {}
        sw_no = 0
        count = -1

        for i in range(0, 100, 11):
            for j in range(0, 50, 11):
                count = count +1
                for k in range(len(images)):
                    image = images[k]
                    sw_image = image[i:i+50, j:j+50]

                    sw_image = cv2.resize(sw_image, dsize=(12, 12), interpolation=cv2.INTER_NEAREST)
                    gabored_image = Preprocessing.process(self, sw_image, kernels)
                    
                    gb_all_sw.append(gabored_image)   
                    label.append(int(k/4))                                                
                sw_no +=1
        print(len(gb_all_sw))
        print(len(gb_all_sw[0]))
        # LEM demension reduction
        model = SpectralEmbedding(n_components=100, n_neighbors=10)
        reduced_sw = model.fit_transform(gb_all_sw)
        print('final', reduced_sw[0].shape)
        print('final', len(reduced_sw))
       # print(label)
        sw_no = 0
        #adding all Sub Windows to a dict with sub window number as keys
        for i in np.arange(0,reduced_sw.shape[0],44): # need to remove hard coded 44 value
            red_sw = reduced_sw[i:i+44,:]             #4 images per person, 11 different person, 44 images -> 44 differnt subwindow 1
            lbl = np.asarray(label)[i:i+44]
            sw_ft_lbl = np.column_stack((red_sw,np.asarray(lbl)))
            sub_win_dict[sw_no] = sw_ft_lbl
            sw_no += 1
        gb_all_sw.clear()
        label.clear()
        return sub_win_dict

    # creating gabor kernel bank
    def gabor_filter(self):
        kernels = []
        for theta in [0,np.pi/4,np.pi/2,np.pi]:
            for sigma in [5, 10, 15, 20]:
                kernel = np.real(cv2.getGaborKernel((50,50), sigma, theta, 1.0, 0.25, 0, ktype=cv2.CV_32F))
                kernel /= 1.5*kernel.sum()
                kernels.append(kernel)
        return kernels

    def process(self, img, filters):
        gabored_images = np.array([])
        # accum = np.zeros_like(img)
        for kern in filters:
            fimg = cv2.filter2D(img, cv2.CV_8UC3, kern)
            gabored_images = np.append(gabored_images, fimg)
        # np.maximum(accum, fimg, accum)
        return gabored_images
    
    def SFFS(self,in_data):
        # applies SFFS algorithm for feature selection
        selected_sw = []
        min_eer = 1
        min_eer_2 = 1
        prev_min_eer = 1
        min_sw_2 = 0
    
        while(len(selected_sw)<5): # 5 -> Number of Subwindows to select
            min_sw = 0
            for (sw_name, sw_data) in in_data.items(): #step 1 of SFFS algorithm
                #print(sw_name)
                Xk = sw_data[:,:-1]
                label = sw_data[:,-1]
                label = label_binarize(label, classes = np.arange(0,11,1)) # multi class problem hence binarizing labels for easy ROC variable calculation
                if(sw_name not in selected_sw):
                    for sw_no in selected_sw:
                        sw_data = in_data[sw_no]
                        Xk = np.column_stack((Xk,sw_data[:,:-1]))
                    #print(Xk.shape)                
                    X_train,X_valid,y_train,y_valid = train_test_split(Xk,label,test_size=0.25)
                    classifier = OneVsRestClassifier(KNeighborsClassifier(n_neighbors = 1))                
                    classifier.fit(X_train,y_train)
                    y_pred =classifier.predict(X_valid)
                    (all_fpr,all_tpr,treshold) = roc_curve(y_valid.ravel(),y_pred.ravel())                
                    all_fnr = 1-all_tpr
                    difference = np.abs(all_fpr-all_fnr)
               
                    min_dif_index = np.argmin(difference)
    
                    EER = np.mean((all_fpr[min_dif_index],all_fnr[min_dif_index]))

                    if(EER < min_eer):
                        min_eer = EER
                        #selected_sw.append(sw_name)
                        min_sw = sw_name
            print(min_eer)
            print("Step 1 selected")
            print(min_sw)
            selected_sw.append(min_sw)
            if(len(selected_sw) >1): #step 2 of SFFS algorithm
                for sw in selected_sw:
                    Xk = np.zeros_like(in_data[sw][:,:-1])
                    for sw_int in selected_sw:
                        if(sw_int != sw):
                            Xk = np.column_stack((Xk,(in_data[sw_int])[:,:-1]))
                    yk = in_data[sw][:,-1]
                    Xkf = Xk[:,100:]
                    #print(Xkf.shape)
                    ykf = label_binarize(yk, classes = np.arange(0,11,1))
                    X_train,X_valid,y_train,y_valid = train_test_split(Xkf,ykf,test_size=0.25)
                    classifier = OneVsRestClassifier(KNeighborsClassifier(n_neighbors = 1))
                    classifier.fit(X_train,y_train)
                    classifier.predict(X_valid)
                    (all_fpr,all_tpr,treshold) = roc_curve(y_valid.ravel(),y_pred.ravel())
                    all_fnr = 1-all_tpr
                    difference = np.abs(all_fpr-all_fnr)
               # print(difference)
                    min_dif_index = np.argmin(difference)
                    EER = np.mean((all_fpr[min_dif_index],all_fnr[min_dif_index]))
                    if(EER < min_eer_2):
                        min_eer_2 = EER
                        min_sw_2 = sw
                if(min_eer_2 <= prev_min_eer):
                    selected_sw.remove(min_sw_2)
                    prev_min_eer = min_eer_2
                    print("Step 2 removed")
                    print(min_sw_2)
                else:
                    prev_min_eer = min_eer
                min_eer = 1  
                min_eer_2 = 1
      #  print(selected_sw)
      #  print(prev_min_eer)
        return selected_sw
    
    def create_matchers(self,inp_data,in_labels):
        classifier =[]
        for i in np.arange(0,inp_data.shape[1],100):
            print("Creating the Matcher : %d" % i)
            sw_data = inp_data[:,i:i+100]
            clf = KNeighborsClassifier(n_neighbors = 1)
            clf.fit(sw_data,in_labels)
            classifier.append(clf)
        return classifier
    
    def fusion_test(self,classifiers,test_data,test_results):
        c_no =0
        #ret_prediction =np.array
        for i in np.arange(0,test_data.shape[1],100):
            classifier = classifiers[c_no]
            pred_prob = classifier.predict(test_data[:,i:i+100])
            print(pred_prob)
            print(test_results)
            c_no += 1
        
        
            






In [53]:
def select_classification_data(sw_dict, chosen_sw):
    combined_data = sw_dict[chosen_sw[0]][:,:-1]
    label = sw_dict[chosen_sw[0]][:,-1]
    for i in range(1,len(chosen_sw)):
        sw_no = chosen_sw[i]
        combined_data = np.column_stack((combined_data,sw_dict[sw_no][:,:-1]))
    X_train,X_test,y_train,y_test = train_test_split(combined_data, label,test_size=0.2)
    return {"xtrain":X_train, "xtest":X_test, "ytrain":y_train, "ytest":y_test}
    
        


obj = Preprocessing()
#preprocess the given image
processed_image = obj.read_image()
# print(len(processed_image))
# print(processed_image[0].shape)
#create gabor filter bank
kernel_bank = obj.gabor_filter()
#feature extraction and transformation
sub_windows_dict = obj.sub_window_creation(processed_image, kernel_bank)
print(sub_windows_dict[0].shape)
#feature selection
chosen_sw = obj.SFFS(sub_windows_dict)
print(chosen_sw)


        
    



(150, 100)
2200
2304
final (100,)
final 2200
(44, 101)


  str(classes[c]))
  str(classes[c]))


0.35
Step 1 selected
33


  str(classes[c]))


0.3
Step 1 selected
42
Step 2 removed
33


  str(classes[c]))
  str(classes[c]))
  str(classes[c]))
  str(classes[c]))


0.4
Step 1 selected
6
0.3
Step 1 selected
32
0.4
Step 1 selected
0


  str(classes[c]))
  str(classes[c]))
  str(classes[c]))
  str(classes[c]))


0.4
Step 1 selected
4
[42, 6, 32, 0, 4]


In [54]:
classification_data = select_classification_data(sub_windows_dict,chosen_sw)
#Generating matchers
matchers = obj.create_matchers(classification_data["xtrain"],classification_data["ytrain"])
#fusion of matchers
obj.fusion_test(matchers,classification_data["xtest"],classification_data["ytest"])

Creating the Matcher : 0
Creating the Matcher : 100
Creating the Matcher : 200
Creating the Matcher : 300
Creating the Matcher : 400
[8. 2. 8. 1. 8. 9. 5. 8. 8.]
[6. 3. 6. 4. 5. 4. 3. 8. 6.]
[ 0. 10.  1.  2. 10.  0.  0.  4.  0.]
[6. 3. 6. 4. 5. 4. 3. 8. 6.]
[7. 7. 8. 5. 1. 9. 1. 1. 1.]
[6. 3. 6. 4. 5. 4. 3. 8. 6.]
[10. 10.  0.  1.  0.  0.  6.  2. 10.]
[6. 3. 6. 4. 5. 4. 3. 8. 6.]
[7. 1. 3. 6. 2. 9. 7. 8. 7.]
[6. 3. 6. 4. 5. 4. 3. 8. 6.]
