# Important Memo #

Things to change when going on the cloud : 
- "Initialization of the Spark Context" -> conf, setMaster
- Creating the Dataframe -> ressource_path -> "Ressources/fruits-360_dataset/fruits-360/Training"

# Imports #

In [1]:
# Standard Libraries
import os
import pandas as pd
import numpy as np
#import scipy.stats as st
from scipy.cluster.vq import kmeans, vq
#import random
import re
import time as tm
from time import time
#from varname import argname2
import matplotlib.pyplot as plt

# Pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import udf
#from pyspark.ml.clustering import KMeans

# Image Preprocessing 
import cv2 as cv

# Machine Learning for Dimensions Reduction
#from sklearn.feature_extraction.text import TfidfVectorizer, TfidfTransformer
from sklearn.decomposition import PCA
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import StandardScaler

# Set-ups #
## Initialization of the Spark context ##

In [2]:
t_omega = time()
conf = SparkConf().setMaster("local").setAppName("Test")
sc = SparkContext(conf=conf)
spark = (SparkSession.builder.appName("p8").getOrCreate())
spark.conf.set("spark.sql.executor.memory", "6g")
spark.conf.set("spark.sql.executor.cores", "4")

21/09/30 17:23:13 WARN Utils: Your hostname, muninn-System-Product-Name resolves to a loopback address: 127.0.1.1; using 192.168.1.73 instead (on interface enp0s31f6)
21/09/30 17:23:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/09/30 17:23:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Variables ##

In [3]:
global ressource_path, image_path, preprocessed_path, data_df, des_df, dot_split

# Get the paths

# All
image_path = 'Ressources/Local_Test/All_Images/'

# Local Test
ressource_path = 'Ressources/Local_Test/'
#image_path = 'Ressources/Local_Test/Images/'
preprocessed_path = 'Ressources/Local_Test/Preprocessed Images/'

# Make a directory for the preprocessed images
os.makedirs(preprocessed_path, exist_ok=True)

# Make a list of all fruit types
fruit_type_list = os.listdir(image_path)

# Create Dataframes for later
data_df = pd.DataFrame()
des_df = pd.DataFrame()

# Regex for later
dot_split = re.compile("\.")

## Functions to be used ##

We will be using quite a number of functions to pipeline the image preprocessing, so we will define them all here.

### DataFrame Related Functions ###

In [4]:
# Fill the dataframe with fruits
def fill_df(fruit_type, image):
    image_name = dot_split.split(image)[0]
    ID = f'{image_name}_{fruit_type}'
    try:
        fruit = {
            'ID': ID,                    
            'Image_name': image_name,
            'Type_of_fruit': fruit_type,
        }
        return fruit
    except:
        fruit = {
            'ID': ID,
            'Image_name': image_name,
            'Type_of_fruit': fruit_type,
            }
        return fruit

### Preprocessing ###

#### Individual Image Preprocessing ####

##### Find the Path of the Image From the Dataframe and Load the Image #####

In [5]:
# Get the path of the image and read it
def find_path_image(image_name, fruit_type):
    image_type_path = image_path + fruit_type
    path = image_type_path + '/' + image_name + '.jpg'
    image = cv.imread(path)
    return image

##### Grey Scale #####

In [6]:
# Turn the image to greyscale
def grey_scale(image):
    grey_image = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
    return grey_image

##### Exposition and Contrast #####
Function form stackoverflow user nathancy, from https://stackoverflow.com/questions/57030125/automatically-adjusting-brightness-of-image-with-opencv

In [7]:
def automatic_brightness_and_contrast(image, clip_hist_percent=25):
    # Calculate grayscale histogram
    hist = cv.calcHist([image],[0],None,[256],[0,256])
    hist_size = len(hist)

    # Calculate cumulative distribution from the histogram
    accumulator = []
    accumulator.append(float(hist[0]))
    for index in range(1, hist_size):
        accumulator.append(accumulator[index -1] + float(hist[index]))

    # Locate points to clip
    maximum = accumulator[-1]
    clip_hist_percent *= (maximum/100.0)
    clip_hist_percent /= 2.0

    # Locate left cut
    minimum_gray = 0
    while accumulator[minimum_gray] < clip_hist_percent:
        minimum_gray += 1

    # Locate right cut
    maximum_gray = hist_size -1
    while accumulator[maximum_gray] >= (maximum - clip_hist_percent):
        maximum_gray -= 1

    # Calculate alpha and beta values
    alpha = 255 / (maximum_gray - minimum_gray)
    beta = -minimum_gray * alpha

    '''
    # Calculate new histogram with desired range and show histogram 
    new_hist = cv2.calcHist([gray],[0],None,[256],[minimum_gray,maximum_gray])
    plt.plot(hist)
    plt.plot(new_hist)
    plt.xlim([0,256])
    plt.show()
    '''

    auto_result = cv.convertScaleAbs(image, alpha=alpha, beta=beta)
    return (auto_result)

##### Noise #####

In [8]:
# Filtering the noise of the image by blurring it
def filtering_image(image):
    # This argument will enable us to have an output image the same size of the input one
    ddepth = -1
    
    # Creating a kernel for a normalized box filter
    kernel = np.array(([[1,-1,1],
                        [-1,1,-1],
                        [1,-1,1]]), dtype=np.float32)
    
    # Apply the filter
    output_image = cv.filter2D(image, ddepth, kernel)
    
    return output_image

##### Create Descriptors #####

In [9]:
# Get the descriptors of the image thanks to the ORB method
def create_descriptors(image):
    orb = cv.ORB_create(nfeatures=1200, edgeThreshold=6)
    kp, des = orb.detectAndCompute(image, None)
    return kp, des

##### Save Descriptors in Dictionnary #####

In [10]:
# Make a dictionnary of the descriptor for one given image
def create_des_dict(ID, descriptors):
    des_dict= {
        'ID': ID,
        'descriptors': descriptors
    }
    return des_dict

#### Compile All These Steps ####

In [11]:
def preprocessing_image(ID, data_df=data_df, preprocessed_path=preprocessed_path):
    # Getting the image's name and fruit type
    split_ID = re.compile("_")
    fruit_type = split_ID.split(ID)[-1]
    split_fruit = re.compile(f"_{fruit_type}")
    image_name = split_fruit.split(ID)[0]
    # Preprocessing in itself
    image = find_path_image(image_name, fruit_type)
    try:
        image = grey_scale(image)
    except:
        print(image_name)
        print(fruit_type)
        print(image.shape)
        return
    image = automatic_brightness_and_contrast(image)
    image = filtering_image(image)

    # Saving the image
    cv.imwrite(f"{preprocessed_path}/{fruit_type}/{image_name}.jpg", image)

    # Creating and saving descriptors
    keypoints, descriptors = create_descriptors(image)
    np.savetxt(f"{preprocessed_path}/{fruit_type}/{image_name}_preprocessed.csv", descriptors, delimiter=',')
    des_dict = create_des_dict(ID, descriptors)

    return des_dict

### Reducing Dimension ###

#### Stack Descriptors in a List ####

In [12]:
# Stacking verticaly the descriptors in a list to cluster them later
def stack_descriptors():
    print('Stacking Descriptors...')
    t0 =time()
    # Getting the list of descriptors with unique sized array
    des_list = []
    for i in range(len(descriptor_list)):
        des_list.append(descriptor_list[i]['descriptors'])
    
    # Function to stack a descriptor vertically in a numpy array
    def create_descriptors(x):
        descriptor = np.vstack(interval[x])
        return descriptor
    
    # To avoid overloading the memory, we will split the task
    interval_list = []
    for n in range(len(des_list)):
        if (n+1)% 200 == 0:
            interval_list.append(des_list[n-199:n+1])
            if n > len(des_list)-200:
                if n+1 != len(des_list):
                    interval_list.append(des_list[n+1:])
    
    # Stacking all the descriptors vertically
    descriptors = 'primer'
    for interval in interval_list:
        fraction = len(interval)
        fraction_rdd = sc.parallelize([i for i in range(fraction)])
        result_list = fraction_rdd.map(lambda x: create_descriptors(x)).collect()
        result_list = np.vstack(result_list)
        try:
            descriptors = np.vstack((descriptors, result_list))
        except:
            descriptors = np.vstack(result_list)

    ## K-Means working only on float, we will convert the descriptors
    descriptors = descriptors.astype(float)

    t1 = time()
    print("Time taken to stack descriptors : {} s or {} min.".format(t1-t0, (t1-t0)/60))
    
    return descriptors, des_list

#### Creations of Bags of Visual Words ####

In [13]:
# Clustering the descriptors making Bags of Visual Words
def create_BoVW(descriptors, des_list):
    print('Clustering descriptors...')
    k = int(round(np.sqrt(len(des_list)), 0))
    print('Estimated number of clusters : {}'.format(k))
    t0 = time()

    kmeans = MiniBatchKMeans(n_clusters=k, init_size=3*k, random_state=0)
    kmeans.fit(descriptors, k, 1)

    voc = kmeans.cluster_centers_

    t1 = time()
    elapsed = t1 -t0
    print('Clustering time : {:.2f} s.'.format(elapsed))
    
    return voc, k

#### Extract and Scale Image Features ####

In [14]:
# Extracting and scaling the image features from the descriptors
def image_features(voc, k, des_list):
    print('Extracting and scaling image features...')
    t0 = time()
    # Extractions of the Images Features
    img_features = np.zeros((len(des_list), k), 'float32')
    for i in range(len(des_list)):
        words, distance = vq(des_list[i], voc)
        for w in words:
            img_features[i][w] += 1

    # Scaling of the features
    scaler = StandardScaler()
    scaled_img_features = scaler.fit_transform(img_features)

    t1 = time()
    print("Time taken to extract and scale image features : {} s or {} min.".format(t1-t0, (t1-t0)/60))
    
    return scaled_img_features

#### Apply the PCA ####

In [15]:
# Testing a dimension reduction by PCA on the features
def test_pca(scaled_img_features):
    # Testing the PCA
    print('Testing Optimal Number of Components for the PCA')
    PCA_df = pd.DataFrame(columns = ['Components', 'Variance', 'Time'])

    max_features = scaled_img_features.shape[1]
    for i in range(2, max_features):
        t0 = time()
        pca = PCA(n_components=i)
        X_PCA = pca.fit_transform(scaled_img_features)
        pca_var = pca.explained_variance_ratio_.sum()
        t1 = time()
        pca_time = t1-t0

        PCA_df.loc[i, 'Components'] = i
        PCA_df.loc[i, 'Variance'] = pca_var
        PCA_df.loc[i, 'Time'] = pca_time

    best_component = list(PCA_df[PCA_df['Variance'] > 0.95].index)[0]
    print("Done. Optimal Number of Components is {}.".format(best_component))
    
    return best_component, PCA_df

In [16]:
# Plotting the results of the test
def plotting_test(best_component, PCA_df):
    x = PCA_df['Components']
    y = PCA_df['Variance']
    labels = PCA_df['Time']

    fig = plt.figure(figsize = (12,6))
    ax = fig.add_subplot(111)
    ax.plot(x, y)
    ax.text(1, 1, f'Best number of components = {best_component}')
    plt.axhline(y= 0.95, color = 'r', linestyle='--')
    plt.axvline(x=best_component, color = 'r', linestyle='--')
    ax.set(
        title='Analysis of Variance Explained over Number of Components',
        xlabel='Number of Components',
        ylabel='Variance Explained'
    )
    plt.show()

In [17]:
# Doing the PCA with the best number of components
def doing_pca(scaled_img_features, best_component):
    print('Operating Optimized PCA...')
    t0 = time()
    pca = PCA(n_components=best_component)
    X_PCA = pca.fit_transform(scaled_img_features)
    pca_var = pca.explained_variance_ratio_.sum()
    t1 = time()
    pca_time = t1-t0

    print('PCA with {} components explains {:.2f} variance and is done in {:.2f} s.'
          .format(best_component, pca_var, pca_time))
    
    return X_PCA

#### Save Reduced Features in Dataframe ####

In [18]:
# Saving the PCA Reduced Features
def save_features(X_PCA):
    print('Saving results...')
    try :
        des_df['PCA_Reduced_Image Features'] = des_df['PCA_Reduced_Image_Features'].astype(object)
    except:
        des_df['PCA_Reduced_Image_Features'] = np.nan
        des_df['PCA_Reduced_Image_Features'] = des_df['PCA_Reduced_Image_Features'].astype(object)
    
    for i, v in des_df.iterrows():
        des_df.loc[i, 'PCA_Reduced_Image_Features'] = X_PCA[i]
        fruit_folder = data_df.loc[i, 'Type_of_fruit']
        fruit_image = dot_split.split(data_df.loc[i, 'Image_name'])[0]
        np.savetxt(f"{preprocessed_path}/{fruit_folder}/{fruit_image}_reduced.csv", X_PCA[i], delimiter=',')

    des_df.to_csv(ressource_path + 'descriptors_list.csv', index=False)

#### Compile All These Steps ####

In [19]:
# Doing all the above steps
def reduce_dimensions():
    descriptors, des_list = stack_descriptors()
    voc, k = create_BoVW(descriptors, des_list)
    scaled_img_features = image_features(voc, k, des_list)
    best_component, PCA_df = test_pca(scaled_img_features)
    plotting_test(best_component, PCA_df)
    X_PCA = doing_pca(scaled_img_features, best_component)
    save_features(X_PCA)
    print("Features saved in the file 'descriptors_list.csv'.")

## Creating the Dataframe ##

In [None]:
# For a fruit type, create a path, a list of all the fruit, make a 
# new preprocessed directory and add them the missing fruits to the dataframe
t0 = time()

for fruit_type in fruit_type_list:
    image_type_path = image_path + fruit_type
    fruit_list_rdd = sc.parallelize(os.listdir(image_type_path))
    os.makedirs(preprocessed_path + fruit_type, exist_ok=True)
    fruits_to_append = fruit_list_rdd.map(lambda x: fill_df(fruit_type, x)).collect()
    fruits_to_append = list(filter(None, fruits_to_append))
    data_df = data_df.append(fruits_to_append, ignore_index=True, sort=True)

data_df = data_df[['ID','Image_name', 'Type_of_fruit']]
data_df.to_csv(ressource_path + 'fruits_index.csv', index=False)

t1 = time()
print("Time taken : {} s or {} min.".format(t1-t0, (t1-t0)/60))

In [None]:
# Preprocessing images that are not already preprocessed
t0 =time()
descriptor_list =[]

# Splitting the task in several to avoid tasks being too large
for fruit_type in fruit_type_list:
    fruit_type_masque = data_df[data_df['Type_of_fruit']==fruit_type]
    images_list_rdd = sc.parallelize(fruit_type_masque['ID'])
    fruit_type_descriptor_list = images_list_rdd.map(lambda ID: preprocessing_image(ID)).collect()
    fruit_type_descriptor_list = list(filter(None, fruit_type_descriptor_list))
    descriptor_list.extend(fruit_type_descriptor_list)
        
des_df = des_df.append(descriptor_list, ignore_index=True)
des_df.to_csv(ressource_path + 'descriptors_list.csv', index=False)

t1 =time()
print("Time taken : {} s or {} min.".format(t1-t0, (t1-t0)/60))

In [None]:
descriptors, des_list = stack_descriptors()
#voc, k = create_BoVW(descriptors, des_list)
#scaled_img_features = image_features(voc, k, des_list)
#best_component, PCA_df = test_pca(scaled_img_features)
#plotting_test(best_component, PCA_df)
#X_PCA = doing_pca(scaled_img_features, best_component)
#save_features(X_PCA)
#print("Features saved in the file 'descriptors_list.csv'.")

In [None]:
# Clustering the descriptors making Bags of Visual Words with Pyspark 
from pyspark.ml.clustering import KMeans
from pyspark import Row
from pyspark.sql.types import FloatType, ArrayType,StructType,StructField

def create_BoVW(descriptors, des_list):
    print('Clustering descriptors...')
    k = int(round(np.sqrt(len(des_list)), 0))
    print('Estimated number of clusters : {}'.format(k))
    t0 = time()
    
    #descriptors_rdd = sc.parallelize(descriptors)
    #descriptors_df = pd.DataFrame(descriptors)
    #rint(f"Pandas's DataFrame Done in {time()-t0}s.")
    #descriptors_data = spark.createDataFrame(descriptors_df)
    descriptors_data = spark.createDataFrame(descriptors.tolist(), ArrayType(FloatType(), containsNull=False)).toDF("features")
    print(f"Spark's DataFrame  Done in {time()-t0}s.")
    print(descriptors_data.show(5))
    #descriptors_data.show()
    #descriptors_data = descriptors_rdd.map(lambda line: line[0].split(" ")).toDF()
    #descriptors_data.show()
    
    #clusters = KMeans.train(descriptors_rdd, k ,initializationMode='random')
    #voc = clusters.centers[clusters.predict(descriptors)]
    
    kmeans = KMeans(k=k)
    model = kmeans.fit(descriptors_data)
    voc = np.asarray(model.clusterCenters())

    t1 = time()
    elapsed = t1 -t0
    print('Clustering time : {:.2f}s.'.format(elapsed))
    
    return voc, k

voc_test, k_test = create_BoVW(descriptors, des_list)
print(type(voc_test))
print(len(voc_test))
print(voc_test.shape)

In [None]:
t_zeta = time()
total_time = t_zeta-t_omega
print(f"Total elapsed time for the notebook is : \n{total_time} s, \nor {total_time/60} min.")

In [None]:
for i in range(12):
    print("testing {}.".format(i), end="\r")
    tm.sleep(1)