In [None]:
import cv2
import os
import random
import matplotlib.pyplot as plt
import numpy as np
import tensorflow
import glob
import shutil
from random import randint
from os import listdir
from PIL import Image as PImage
from skimage import color, io
from numpy import asarray
from sklearn.utils import shuffle
from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras import layers, optimizers
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans, DBSCAN
from collections import Counter
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler, MinMaxScaler


### Build stacked autoencoder

In [None]:
#Define the autoencoder model
def SAE (X_train, X_test ):  
    # This is our input image
    input_dim = X_train[0].shape[0] 

    # encoder layers
    input_layer =Input(shape=(input_dim, ), name='input')
    encoder = Dense (128, activation='relu')(input_layer)
    encoder = layers.Dense(64, activation='relu')(encoder)

    # bottleneck layer
    encoder = Dense (32, activation='relu', name='encoder_layer')(encoder)

    # decoder layers
    decoder = layers.Dense(64, activation='relu')(encoder)
    decoder = layers.Dense(128, activation='relu')(decoder)

    # Output Layer
    decoder = Dense(input_dim, activation='relu')(decoder)
    autoencoder = Model(inputs=input_layer, outputs=decoder)
   # autoencoder.summary()

    opt = optimizers.Adam(learning_rate = 0.001)
    autoencoder.compile(optimizer=opt, loss='mse')   
    history = autoencoder.fit(X_train, X_train, epochs=20,  batch_size = 50, validation_data=(X_test,X_test), verbose=0)
    visualise (history)

    return autoencoder

In [None]:
# plot loss
def visualise(history):
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='lower right')
    plt.show()

### Display reconstruction images

In [None]:
def decoded_img(X_test, class_name, autoencoder):
    X_test= shuffle(X_test)
    decoded_img = autoencoder.predict(X_test, verbose=1)
    
    n=10
    for i in range(n):
        # Display original
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(X_test[i].reshape(100, 100))
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        
        # Display reconstruction
        ax = plt.subplot(2, n, i + 1 + n)
        plt.imshow(decoded_img[i].reshape(100, 100))
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        # save the decoded images
        plt.savefig(class_name+str(i)+'.png')
        plt.show()

### Features Extraction

In [None]:
def Extract_features(path, class_name,autoencoder):

    # get the encoder layers
    get_encoded_layers = Model(inputs=autoencoder.input, outputs=autoencoder.get_layer("encoder_layer").output)
    features_vector=[]
    
    # Read your input image for which you need to extract features
    for img in os.listdir(path): 
        image= cv2.imread(os.path.join(path,img), cv2.IMREAD_GRAYSCALE) 
        image=cv2.resize(image,(100,100))
        image = np.reshape(image, np.prod(image.shape[:]))

        encoded_imgs = get_encoded_layers.predict(image[None], verbose=0)[0]     
        image= encoded_imgs.reshape(-1)     
        features_vector.append(image)
        
    features_vector = np.array(features_vector)
    print('Extract local features form class {} with shape {}'. format(class_name, features_vector.shape))

    # Save your features as an numpy array    
    np.save('features_'+class_name +'.npy', features_vector)
    
    return features_vector


### PCA for dimensionality reduction

In [None]:
def pca_step(X,className):
    sc = MinMaxScaler()
    data_rescaled  = sc.fit_transform(X)

    ### 95% of variance
    pca = PCA(n_components = 0.95)
    pca.fit(data_rescaled )
    reduced = pca.transform(data_rescaled )
    print('To get 95% of variance explained for class {} we need {} principal components.'. format(className, reduced.shape[1]))

    return reduced

### DBSCAN cluster algorithm

In [None]:
#DBSCAN Parameter Estimation

def DBSCAN_algorithm(X, class_Item):
    #Step 1: Calculate the average distance between each point in the data set and its 20 nearest neighbors (my selected MinPts value).
    neighbors = NearestNeighbors(n_neighbors=20)
    neighbors_fit = neighbors.fit(X)
    distances, indices = neighbors_fit.kneighbors(X)
    
    #Step 2: Sort distance values by ascending value and plot
    # Plotting K-distance Graph
    distances = np.sort(distances, axis=0)
    distances = distances[:,1]
    plt.figure(figsize=(5,5))
    plt.plot(distances)
    plt.title('K-distance Graph',fontsize=20)
    plt.xlabel('Data Points sorted by distance',fontsize=14)
    plt.ylabel('Epsilon',fontsize=14)
    plt.show()
    
    # Compute DBSCAN  
    
   # print('Input eps based on the elbow in the k-distance graph:')
    eps=int(input('Input eps based on the elbow in the k-distance graph: '))
    db = DBSCAN(eps=eps, min_samples=100,  metric='minkowski', algorithm='ball_tree', p=2).fit(X)   
    
    core_samples_mask = np.zeros_like(db.labels_, dtype=bool)
    core_samples_mask[db.core_sample_indices_] = True

    #The labels_ property contains the list of clusters and their respective points.
    labels = db.labels_

    # Number of clusters in labels
    no_clusters_ = len(set(labels)) 
    
    print('Estimated no. of clusters: %d' % no_clusters_)

    #creating subfolders
    Clusters =[]
    save_path = 'E:/subclasses_dbscan/'
    
    for c in [i for i in range(no_clusters_)]:
        Cluster_name = class_Item +'_'+ str(c)
        path2 = os.path.join(save_path, Cluster_name)
        os.mkdir(path2)
        Clusters.append(Cluster_name)        
        
    # assign each image into the corresponding label
    for img, j in zip(glob.iglob(os.path.join(src_dir,class_Item, '*.png')), db.labels_):
        shutil.copy(img, save_path+'/'+ Clusters[j-1])

### K-means cluster algorithm

In [None]:
def kmeans_algorithm (X, class_Item, k_value ):
    save_path = 'E:/subclasses_kmeans/'
    
    #Apply k_means cluster algorithm for the class 
    kmeans = KMeans(n_clusters=k_value, random_state=0).fit(X)
    print('The number of samples is: ',Counter(kmeans.labels_)) #display the total number of samples in each cluster
             
    Clusters =[]
    
    #creating subfolders
    for c in [i for i in range(k_value)]:
        Cluster_name = class_Item +'_'+ str(c)
        path2 = os.path.join(save_path, Cluster_name)
        os.mkdir(path2)
        Clusters.append(Cluster_name)        
        
    # assign each image into the corresponding label
    for img, j in zip(glob.iglob(os.path.join(src_dir,class_Item, '*.png')), kmeans.labels_):
        shutil.copy(img, save_path+'/'+ Clusters[j-1])
             

### Load image dataset

In [None]:
src_dir= ('E:/TrainingDataset') #data path
Folder_dir= os.listdir(src_dir)

# determine parameter k; the number of classes in the class decomposition component.
k=2

for class_name in Folder_dir:
    path = os.path.join(src_dir,class_name)    
    image_list=[]

    print('Read images in class:', class_name)
    for img in os.listdir(path):
        # load the dataset, resize it and collect it in one list
        image= cv2.imread(os.path.join(path,img),cv2.IMREAD_GRAYSCALE) 
        image=cv2.resize(image,(100,100))
        image_list.append(image) 


    training_data=np.array(image_list)     #convert all data to array
    training_data= shuffle(training_data)  #shuffle data before training
    
    # Divid the data set into two groups 80% training and 20% test sets
    X_train,X_test= train_test_split(training_data, test_size =.20, shuffle  = True) 
    
    # Clean and reshap the data as required by the model
    X_train = X_train.astype('float32') / 255.
    X_train = np.reshape(X_train, (len(X_train), np.prod(X_train.shape[1:])))
    X_test = X_test.astype('float32') / 255.
    X_test = np.reshape(X_test, (len(X_test), np.prod(X_train.shape[1:])))
    
    #check items in X_train and X_test   
    print('x_train: ',X_train.shape)
    print('x_test:  ',X_test.shape)
    
    # Run stacked autoencoder model(SAE)
    print('Run stacked autoencoder model')
    autoencoder = SAE(X_train, X_test)
    
    #visualize the reconstruction images
    decoded_img(X_test, class_name, autoencoder)
    
    # Extract deep features from the images dataset
    print('Extract features:')
    features_vector= Extract_features(path, class_name,autoencoder)
    
    # apply PCA technique
    print ('Apply PCA for dimensionality reduction')
    pca_features= pca_step(features_vector,class_name)
    
    # Apply class decomposition approach with dbscan cluster algorithm
    DBSCAN_algorithm(pca_features, class_name)
    
    # Apply class decomposition approach with k-means cluster algorithm
    kmeans_algorithm(pca_features, class_name, k)
    print('====================================================')