In [None]:
from abc import ABC, abstractmethod, abstractstaticmethod

import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.layers import (
    Conv2D,
    Dense,
    Dropout,
    Flatten,
    Input,
    MaxPool2D,
    RandomContrast,
    RandomFlip,
    RandomRotation, # zamula
    RandomTranslation,
    RandomZoom
)
import keras_tuner as kt

In [None]:
# check if GPU is detected:
len(tf.config.list_physical_devices('GPU')) > 0

In [None]:
# project:

In [None]:
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
x_train = x_train/255.0  # normalization
x_test = x_test/255.0
x_train, x_validation, y_train, y_validation = train_test_split(x_train, y_train, train_size=0.8)

In [None]:
def plot_accuracy(history, model_name):
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.title(model_name)
    plt.show()

def plot_loss(history, model_name):
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.title(model_name)
    plt.show()
def plot_history(history, model_name):
    plot_accuracy(history, model_name)
    plot_loss(history, model_name)

In [None]:
cifar_names = {
    0: "airplane",
    1: "automobile",
    2: "bird",
    3: "cat",
    4: "deer",
    5: "dog",
    6: "frog",
    7: "horse",
    8: "ship",
    9: "truck",
}


In [None]:
class HPConfiguration(ABC):
    
    @abstractstaticmethod
    def build_hp_model(hp):
        pass
    
    @abstractstaticmethod
    def get_tuner():
        pass
    
    @abstractstaticmethod
    def get_callbacks():
        pass
    
    @abstractstaticmethod
    def get_tuner_callbacks():
        pass


In [None]:
# neural networks

class SuperSimpleConvModel(keras.Model, HPConfiguration):
    def __init__(self, 
                 n_filters, 
                 dense_units,
                 **kwargs):
        super().__init__(kwargs)

        self.conv2D_1 = Conv2D(filters=n_filters,kernel_size=(4,4),input_shape=(32,32,3),activation='relu')
        self.max_pool2D_1 = MaxPool2D(pool_size=(2,2))
        self.flatten = Flatten()
        self.dense_1 = Dense(dense_units,activation='relu')
        self.dense_2 = Dense(10,activation='softmax')
        
    def call(self, inputs, training=True):
        #print(inputs)
        x =  self.conv2D_1(inputs)
        x = self.max_pool2D_1(x)
        
        x = self.flatten(x)
        x = self.dense_1(x)
        x = self.dense_2(x)
        return x
   
    def build_hp_model(hp):
        n_filters = hp.Int("n_filters", min_value=4, max_value=32, step=2, sampling="log")
        dense_units = hp.Int("dense_units", min_value=16, max_value=256, step=2, sampling="log")
        model = SuperSimpleConvModel(
            n_filters=n_filters, dense_units=dense_units
        )
        model.compile(loss='sparse_categorical_crossentropy',optimizer='adam',metrics=['accuracy'])
        return model

    def get_tuner():
        return kt.Hyperband(SuperSimpleConvModel.build_hp_model,
                            objective='val_accuracy',
                            #overwrite=True,
                            max_epochs=50,
                            factor=3,
                            directory='tuner/SuperSimpleConvModel',
                            project_name='model'
                           )
    def get_callbacks():
        return [
            tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20),
            #tf.keras.callbacks.ModelCheckpoint(filepath="models/SuperSimpleConvModel", save_best_only=True)
        ]
    
    def get_tuner_callbacks():
        return [
            tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20),
        ]
    
    def get_best_model():
        tuner = SuperSimpleConvModel.get_tuner()
        best_parameters = tuner.get_best_hyperparameters(num_trials=1)[0]
        return SuperSimpleConvModel.build_hp_model(best_parameters)
    
    
####################
class SimpleConvModel(keras.Model, HPConfiguration):
    def __init__(self, 
                 n_filters_1,
                 n_filters_2,
                 dense_units_1,
                 **kwargs):
        super().__init__(kwargs)

        self.conv2D_1 = Conv2D(filters=n_filters_1,kernel_size=(4,4),input_shape=(32,32,3),activation='relu')
        self.max_pool2D_1 = MaxPool2D(pool_size=(2,2))
        self.conv2D_2 = Conv2D(filters=n_filters_2,kernel_size=(4,4),input_shape=(32,32,3),activation='relu')
        self.max_pool2D_2 = MaxPool2D(pool_size=(2,2))
        self.flatten = Flatten()
        self.dense_1 = Dense(dense_units_1,activation='relu')
        self.dense_2 = Dense(10,activation='softmax')

    def call(self, inputs, training=True):
        #print(inputs)
        x =  self.conv2D_1(inputs)
        x = self.max_pool2D_1(x)
        x = self.conv2D_2(x)
        x = self.max_pool2D_2(x)

        x = self.flatten(x)
        x = self.dense_1(x)
        x = self.dense_2(x)
        return x

    def build_hp_model(hp):
        n_filters_1 = hp.Int("n_filters_1", min_value=4, max_value=32, step=2, sampling="log")
        n_filters_2 = hp.Int("n_filters_2", min_value=2, max_value=32, step=2, sampling="log")
        dense_units_1 = hp.Int("dense_units_1", min_value=16, max_value=256, step=2, sampling="log")
        model = SimpleConvModel(
            n_filters_1=n_filters_1, n_filters_2=n_filters_2, dense_units_1=dense_units_1
        )
        model.compile(loss='sparse_categorical_crossentropy',optimizer='adam',metrics=['accuracy'])
        return model

    def get_tuner():
        return kt.Hyperband(SimpleConvModel.build_hp_model,
                            objective='val_accuracy',
                            #overwrite=True,
                            max_epochs=50,
                            factor=3,
                            directory='tuner/SimpleConvModel',
                            project_name='model'
                           )
    def get_callbacks():
        return [
            tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20),
            #tf.keras.callbacks.ModelCheckpoint(filepath="models/SimpleConvModel", save_best_only=True)
        ]
    
    def get_tuner_callbacks():
        return [
            tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20),
        ]        

In [None]:
model_classes = [SuperSimpleConvModel, SimpleConvModel] # tutaj dorzuć trzeci model do listy

# HYPERPARAMETER TUNING
for model_class in model_classes:
    print(model_class)
    # break
    tuner = model_class.get_tuner()
    #print(tuner)
    tuner.search(x_train, y_train, batch_size=256, epochs=50, validation_data=(x_validation, y_validation), callbacks=model_class.get_tuner_callbacks())
    best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]

In [None]:
# accuracy for every model (for training, validation and test set)
accuracy_results = {
    model_class.__name__ + "_" + str_type: [] for model_class in model_classes for str_type in ["train", "validation", "test"]
}

print(accuracy_results)

In [None]:
model_classes = [SimpleConvModel]
#model_classes = [SuperSimpleConvModel, SimpleConvModel]

for model_class in model_classes:
    tuner = model_class.get_tuner()
    best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]
    print(best_hps.values)
    #best_hps.
    
    
    for replication in range(4):
        print(f'Starting {replication} iteration')
        best_model = model_class.build_hp_model(best_hps)
        print(best_model)
        history = best_model.fit(x_train, y_train, epochs=200, validation_data=(x_validation, y_validation), batch_size=256, callbacks=model_class.get_callbacks(), verbose=0)
   # TODO: dorzucić wczytywanie wag najlepszego modelu (optymalizacja pod kątem max validation_accuracy)     
        print(f"model name: {model_class.__name__}")
        print(f"iteration: {replication}")
        train_loss, train_accuracy = best_model.evaluate(x=x_train, y=y_train)
        accuracy_results[model_class.__name__ + "_train"].append(train_accuracy)
        validation_loss, validation_accuracy = best_model.evaluate(x=x_validation, y=y_validation)
        accuracy_results[model_class.__name__ + "_validation"].append(validation_accuracy)
        test_loss, test_accuracy = best_model.evaluate(x=x_test, y=y_test)
        accuracy_results[model_class.__name__ + "_test"].append(test_accuracy)
        print(f"train accuracy: {train_accuracy}")
        print(f"validation accuracy: {validation_accuracy}")
        print(f"test accuracy: {test_accuracy}")
        plot_history(history, model_class.__name__)
        
        print('============ NEW ITERATION ============')

print(accuracy_results)

In [None]:
# wyniki accuracy results
for model_type, accuracy_list in accuracy_results.items():
    avg = np.average(accuracy_list)
    std = np.std(accuracy_list)
    minimal = np.min(accuracy_list)
    maximal = np.max(accuracy_list)
    print(f"mode: {model_type}, avg: {avg}, std: {std}, min: {minimal}, max: {maximal}")
    print('==============')

In [None]:
#for model_type, accuracy_list in accuracy_results.items():
#    plt.plot(accuracy_list, label=model_type)
#    #plt.violinplot(accuracy_list)
#plt.legend()
#y_min = min(min(accuracy_list) for accuracy_list in accuracy_results.values())
#y_max = max(max(accuracy_list) for accuracy_list in accuracy_results.values()) 
#plt.ylim(y_min, y_max*1.2)
##plt.show()

In [None]:
# dodać powtarzalnośc wyników 
# (czyli dla każdego modelu ze znalezionymi hyperparametrami należy puścić uczenie 5 razy 
# i zobaczyć jaka jest średnia i odchykebue standarowe)

# sprawdzić jakie klasy są najczęściej mylone i przygotować model 
# do rozpoznawania tylko tych mylących się klas. Połączyć następnie w całośc i sprawdzić wyniki

In [None]:
# testing augmentation impact (na jakiejś jednej dowolnej klasie żeby sprawdzić różne warianty jak wpływają na wynik)
# czyli np. 
# 1) 

In [None]:
# przygotować pretrenowane modele i sprawdzić wyniki (tutaj raczej nie trzeba wstawiać augmentacji danych).
# 

In [None]:
# augmentations variants (sprawdzanie która augmentacja ma największy wpływ)

augmentation_random_flip =keras.Sequential([
    RandomFlip("horizontal")
], name='augmentation_random_flip')

augmentation_random_zoom =keras.Sequential([
    RandomZoom(0.2)
], name='augmentation_random_zoom')

augmentation_random_rotation = keras.Sequential([
    RandomRotation(0.2)
], name='augmentation_random_rotation')


augmentation_random_translation = keras.Sequential([
    RandomTranslation(0.1, 0.1)
], name='augmentation_random_translation')

augmentation_random_contrast = keras.Sequential([
    RandomContrast(0.1)
], name='augmentation_random_contrast')

augmentation_combined = keras.Sequential([
    RandomFlip("horizontal"),
    RandomZoom(0.2),
    RandomRotation(0.2),
    RandomTranslation(0.1, 0.1),
    RandomContrast(0.1)
], name='augmentation_combined')

augmentation_random_flip_translation = keras.Sequential([
    RandomFlip("horizontal"),
    RandomTranslation(0.1, 0.1),
], name='augmentation_random_flip_translation')

augmentation_random_flip_translation_zoom = keras.Sequential([
    RandomFlip("horizontal"),
    RandomTranslation(0.1, 0.1),
    RandomZoom(0.2)
], name='augmentation_random_flip_translation_zoom')

In [None]:
augmentations = [
    #augmentation_random_flip,
    #augmentation_random_zoom,
    #augmentation_random_rotation,
    #augmentation_random_translation,
    #augmentation_random_contrast,
    augmentation_combined
    #augmentation_random_flip_translation,
    #augmentation_random_flip_translation_zoom
]

augmentation_results = {
    augmentation.name + "_" + str_type: [] for augmentation in augmentations for str_type in ["train", "validation", "test"]
}
print(augmentation_results)
for augmentation in augmentations:
    for replication in range(5):
        model = keras.Sequential([
        augmentation,
        SimpleConvModel(n_filters_1=32,
                       n_filters_2=32,
                       dense_units_1=64)])
        model.compile(loss='sparse_categorical_crossentropy',optimizer='adam',metrics=['accuracy'])
        history = model.fit(x_train, 
                            y_train, 
                            epochs=400, 
                            validation_data=(x_validation, y_validation), 
                            batch_size=256, 
                            callbacks=SimpleConvModel.get_callbacks()
                            #callbacks=EarlyStopping(patience=50,monitor='val_loss')
                           )
        print(f"augmentation name: {augmentation.name}")
        print(f"iteration: {replication}")
        train_loss, train_accuracy = model.evaluate(x=x_train, y=y_train)
        augmentation_results[augmentation.name + "_train"].append(train_accuracy)
        validation_loss, validation_accuracy = model.evaluate(x=x_validation, y=y_validation)
        augmentation_results[augmentation.name + "_validation"].append(validation_accuracy)
        test_loss, test_accuracy = model.evaluate(x=x_test, y=y_test)
        augmentation_results[augmentation.name + "_test"].append(test_accuracy)
        print(f"train accuracy: {train_accuracy}")
        print(f"validation accuracy: {validation_accuracy}")
        print(f"test accuracy: {test_accuracy}")
        plot_history(history, augmentation.name)
        
        print('============ NEW ITERATION ============')

In [None]:
augmentation_results

In [None]:
# printing augmentation results
for augmentation_name, accuracy_list in augmentation_results.items():
    avg = np.average(accuracy_list)
    std = np.std(accuracy_list)
    minimal = np.min(accuracy_list)
    maximal = np.max(accuracy_list)
    
    print(f"mode: {augmentation_name}, avg: {avg}, std: {std}, min: {minimal}, max: {maximal}")
    print('===')

In [None]:
augmentation_results