In [None]:
%cd "."

In [None]:
import Clusterer, Global_feature_extractor, Local_features_extractor, Norms, Image, PCA_reduction, Distances, Autoencoder_train, Encoder_NN
from Dataset_loader import load_dataset
from Accuracy import accuracy_optimised, accuracy
import json, os, cv2, pickle
import numpy as np
from functools import reduce

In [None]:
import importlib
importlib.reload(Encoder_NN)
import Autoencoder_train

In [None]:
PIPELINE_PATHS = {
    "local_patch_extraction": ["SIFT", "A-KAZE"],
    "codebook_generation": ["MiniBatchKMeans", "KMedoids"],
    "feature_encoding_and_pooling": ["BoVW", "VLAD"],
    "dimentionality_reduction": [None, "PCA"]
}
DATASETS = [("IAM", None), ("TrigraphSlant", False), ("TrigraphSlant", True), ("ICDAR", "en"), ("ICDAR", "ar")]

In [None]:
pipline = [0, 0, 0, 0]

training_session = {
    "id": "Akane",
    "datasets": [0,1,2,3,4],
    "training_size": 7,
    "testing_size": 1
}

if not os.path.exists(training_session["id"]):
  os.mkdir(training_session["id"])

In [None]:
train_big_set, test_big_set = list(), list()

for choice in training_session["datasets"]:
    train_mini_set, test_mini_set = load_dataset(dataset=DATASETS[choice][0],
                                                 path="./dataset", 
                                                 size_train=training_session["training_size"], 
                                                 size_test=training_session["testing_size"],
                                                 parametre=DATASETS[choice][1])
    train_big_set.extend(train_mini_set)
    test_big_set.extend(test_mini_set)
    
_, _, images_train_set = map(list, zip(*train_big_set))
writers_test_set, images_names_test_set, images_test_set = map(list, zip(*test_big_set))

In [None]:
print("Number of training images:",len(images_train_set))
print("Number of testing images:",len(images_test_set))

In [None]:
network_configuration = {
    "shape_images": '?',
    "autoencoder_test_ration" : 0.3,
    "EPOCHS" : 25,
    "BS" :64,
    "latentDim": '?',
    "max_key_points" : 250
}

In [None]:
local_patch_extraction_methode = PIPELINE_PATHS["local_patch_extraction"][pipline[0]]

if local_patch_extraction_methode == "SIFT":
    local_features_detector = cv2.xfeatures2d.SIFT_create()
elif local_patch_extraction_methode == "A-KAZE":
    local_features_detector = cv2.AKAZE_create()

In [None]:
local_features_extractor

In [None]:
shapes_images = [8, 16, 32]
latentDims = [8, 16, 32, 64]

In [None]:
def generate_patchs(folder, local_features_detector, images, shapes_images, max_samples_by_image):
    images_patchs = list()
    shapes_images = sorted(shapes_images, reverse=True)
    
    retained_patches = list()
    
    for i, image in enumerate(images):
        key_points = local_features_detector.detect(image,None)
        retained_patches = list()
        for key_point in key_points:
            retained_mini = []
            
            y,x = int(key_point.pt[0]),int(key_point.pt[1])
            xm, ym = len(image[0]), len(image)
            
            max_height = shapes_images[0]
            cropped = image[x-max_height:x+max_height,y-max_height:y+max_height]
            if reduce(lambda x, y: x*y, np.shape(cropped))!=max_height*max_height*4: continue
            retained_mini.append(cropped)
            
            for shape_image in shapes_images[1:]:
                cropped = image[x-shape_image:x+shape_image,y-shape_image:y+shape_image]
                retained_mini.append(cropped)
                
            retained_patches.append(retained_mini)
                
        retained_patches = np.array(retained_patches)
        images_patchs.extend(retained_patches[np.random.choice(retained_patches.shape[0], min(max_samples_by_image,len(retained_patches)), replace=False)])
        if i!=0 and i%50==0: print("50 images treated")
    
    print("Saving the patchs")
    for i, shape_image in enumerate(shapes_images):
        patchs_pickle_path = str(folder)+"/pickle_patchs_"+str(shape_image)+"px.dat"
        with open(patchs_pickle_path, "wb") as f:
            pickle.dump([row[i] for row in images_patchs], f)

In [None]:
generate_patchs(folder=training_session["id"], 
                local_features_detector=local_features_detector, 
                images=images_train_set, 
                shapes_images=shapes_images, 
                max_samples_by_image=network_configuration["max_key_points"])

In [None]:
for shape_image in shapes_images:
    patchs_pickle_path = training_session["id"]+"/pickle_patchs_"+str(shape_image)+"px.dat"
    with open(patchs_pickle_path,'rb') as f:
        data = np.array(pickle.load(f))
        print(data.shape)
        print(len(data[0]))
        print(len(data[0][0]))
        print('*'*17)

In [None]:
mse_values = list()

for shape_image in shapes_images:
    network_configuration["shape_images"] = shape_image
    patchs_pickle_path = training_session["id"]+"/pickle_patchs_"+str(shape_image)+"px.dat"
    for latentDim in latentDims:
        network_configuration["latentDim"] = latentDim
        model_path = training_session["id"]+"/Encoder_model_"+str(shape_image)+"px_"+str(latentDim)+"elem.h5"
        
        autoencoder_builder = Autoencoder_train.Autoencoder_train(configuration=network_configuration,
                                                            data_path=patchs_pickle_path, 
                                                            model_path=model_path)
        
        mse_values.append(autoencoder_builder.train_network())

In [None]:
print(mse_values)

In [None]:
def get_descriptors(local_features_extractor_descriptor, images_train_set, mini_size_sample=12):
    images_pre_clustering = [Image.Image(image, local_feature_extractor=local_features_extractor_descriptor) for image in images_train_set]
    list_local_descriptors = []
    for image in images_pre_clustering:
        mini_list_local_descriptors = np.array(image.local_descriptors)
        list_local_descriptors.extend(mini_list_local_descriptors[np.random.choice(mini_list_local_descriptors.shape[0], min(mini_size_sample,len(mini_list_local_descriptors)), replace=False)])
    return list_local_descriptors

In [None]:
max_no_improvement = 500
clustering_algo = PIPELINE_PATHS["codebook_generation"][pipline[1]]

for shape_image in shapes_images:
    network_configuration["shape_images"] = shape_image
    for latentDim in latentDims:
        network_configuration["latentDim"] = latentDim
        model_path = training_session["id"]+"/Encoder_model_"+str(shape_image)+"px_"+str(latentDim)+"elem.h5"
        
        encoder = Encoder_NN.Encoder_NN((network_configuration["shape_images"]*2, network_configuration["shape_images"]*2),
                                         network_configuration["max_key_points"], 
                                         local_features_detector=local_features_detector)
        encoder.set_model(model_path=model_path)
        local_features_extractor_descriptor = Local_features_extractor.Local_feature_extractor(algorithm=encoder)
        descriptors_sample = get_descriptors(local_features_extractor_descriptor, images_train_set)
        
        Clusterer.Clusterer.choose_number_clusters_clustering(vectors=descriptors_sample, 
                                                        algo=clustering_algo,
                                                        max_no_improvement=max_no_improvement, 
                                                        test_values=range(2, 300, 50),
                                                        verbose=1)