In [1]:
%run "./2_Methods.ipynb"

In [16]:
def normalized_tanh(x, beta=1.0):
    return (x * K.tanh(beta * x) + 1) / 2 # Alternative to Sigmoid? #not yet tested

# Custom Binary Crossentropy Loss function which penalizes false positives more than false negatives:
class WeightedBinaryCrossentropy(tf.keras.losses.Loss):
    def __init__(self, false_positive_penalty=0.7, false_negative_penalty=0.3):
        super().__init__()
        self.false_positive_penalty = false_positive_penalty        
        self.false_negative_penalty = false_negative_penalty

    def call(self, y_true, y_pred):        
        y_true = tf.dtypes.cast(y_true, tf.float32)
        
        bce_loss = tf.keras.losses.binary_crossentropy(y_true, y_pred, from_logits=False)

        false_positive = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
        false_negative = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))

        fp_penalty = self.false_positive_penalty * false_positive
        fn_penalty = self.false_negative_penalty * false_negative

        loss = bce_loss + fp_penalty + fn_penalty

        return loss


In [12]:
class Classifier:      
    DEFAULT_EPOCHS=350
    
    def __init__(self, autoencoder, optimizer_name, learning_rate, loss_function_name,
                 metrics=DEFAULT_METRICS, test_train_val_ratios=(0.8, 0.1, 0.1), 
                 random_state = None, epochs=DEFAULT_EPOCHS, early_stopping_patience=None,
                 cross_validation_split=None, verbose=None, init=True):        
        if len(test_train_val_ratios) != 3 or sum(test_train_val_ratios) != 1:
            raise ValueError(f'The test, train and validation ratio must be a tule of three numbers whose sum equals 1.0! (Specified: {test_train_val_ratios})')

        self.verbose = verbose
        
        self.random_state = random_state
        
        self.autoencoder = autoencoder
        self.feature_count = autoencoder.input_data.shape[1]

        self.optimizer_name = optimizer_name
        self.learning_rate = learning_rate
        self.loss_function_name = loss_function_name
        self.metrics = metrics
        
        self.max_epochs = epochs
        self.test_train_val_ratios = test_train_val_ratios
        self.early_stopping_patience = early_stopping_patience
        self.cross_validation_split = cross_validation_split

        self.model = None
        self.fit_history = None
        self.train_evaluation = None
        self.test_evaluation = None
        self.positive_sample_size = 1_000        
        self.negative_sample_size = 20_000
        self.epochs = None

        self.kfold = None
        self.kfold_evaluations = None
        
        self.__reset_state()

        self.initialized = False

        if init:
            self.initialize()
            
    def __reset_state(self):
        if self.random_state == None: 
            self.random_state = random.randrange(1000000000)
        reset_random_state(self.random_state)

    def __create_model(self):
        return tf.keras.Sequential([
            tf.keras.layers.Dense(self.feature_count, activation="relu"),
            tf.keras.layers.Dense(self.feature_count/2, activation="relu"),
            tf.keras.layers.Dense(self.feature_count/4, activation="relu"),
            tf.keras.layers.Dense(1, activation="sigmoid")
        ])

    def __create_model_experimental(self):
        return tf.keras.Sequential([
            tf.keras.layers.Dense(self.feature_count, activation=tf.keras.layers.LeakyReLU()), #try leakyrelu
            tf.keras.layers.Dense(self.feature_count/2, activation=tf.keras.layers.LeakyReLU()),
            tf.keras.layers.Dense(self.feature_count/4, activation=tf.keras.layers.LeakyReLU()),
            tf.keras.layers.Dense(1, activation=normalized_tanh) #try tanh
        ])

    def __train(self, data, epochs=DEFAULT_EPOCHS, verbose=False):        
        X_train, X_test_val, y_train, y_test_val = data
        X_test, X_val, y_test, y_val = self.__split_test_val_data(X_test_val, y_test_val)
        
        callbacks = []
        if self.early_stopping_patience != None:            
            callbacks.append(tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=self.early_stopping_patience, restore_best_weights=True))

        if self.cross_validation_split == 1:           
            X_train = np.concatenate((X_train, X_test), axis=0)
            y_train = np.concatenate((y_train, y_test), axis=0)
            
        self.fit_history = self.model.fit(X_train, y_train, epochs=epochs, 
                                          validation_data=(X_val, y_val),
                                          callbacks=callbacks, 
                                          verbose=verbose)     
        self.epochs = len(self.fit_history.epoch)

        train_evaluation = self.model.evaluate(X_train, y_train, return_dict=True, verbose=None)
        test_evaluation = self.model.evaluate(X_test, y_test, return_dict=True, verbose=None) if self.cross_validation_split != 1 else None

        return (train_evaluation, test_evaluation)

    def __split_dataset(self, X, y):
        return train_test_split(X, y, test_size=self.test_train_val_ratios[1], shuffle=True, random_state=random_state) # returns (X_train, X_test, y_train, y_test)

    def __split_test_val_data(self, X, y):
        test_split = self.test_train_val_ratios[1]
        val_split = self.test_train_val_ratios[2]
        val_split = val_split / (test_split + val_split)

        return train_test_split(X, y, test_size=val_split, shuffle=True, random_state=random_state) # returns (X_test, X_val, y_test, y_val)

    def __compile_output(self):
        meta_info = {
            'optimizer': self.optimizer_name,
            'learning rate': self.learning_rate,
            'loss function': self.loss_function_name,
            'random seed': self.random_state,
            'epochs (max)': self.max_epochs,
            'early stopping': self.early_stopping_patience,
            'epochs trained': self.epochs,
            'early stopping patience': self.early_stopping_patience,
        }
        
        if self.cross_validation_split == None or self.cross_validation_split == 1:
            self.output = {
                    'meta-info': meta_info,
                    'testing': self.train_evaluation,
                    'training': self.test_evaluation,
                    'history': self.fit_history,
                }
        else: 
            meta_info['k-fold split'] = self.cross_validation_split
            self.output = {
                'meta-info': meta_info,
                'k-fold means testing': self.kfold_train_evaluation, #sum average over all train evals in kfold_evaluations
                'k-fold means training': self.kfold_test_evaluation, #sum average over all test evals in kfold_evaluations
                'k-fold histories': self.kfold_histories,
                'k-fold evaluations': self.kfold_evaluations
            }

    def __get_loss_function(self, loss_function_name):
        if loss_function_name == 'WeightedBinaryCrossentropy':
            return WeightedBinaryCrossentropy()
        else:
            return tf.keras.losses.get(loss_function_name)

    def predict(self, X, verbose=None):
        return self.model.predict(self.autoencoder.predict(X, verbose=None), verbose=verbose)
            
    def initialize(self):
        if self.initialized:
            raise ValueError(f'Cannot initialize already initialized autoencoder!')
            
        y_pos_value = positive_class
        y_neg_value = negative_class
        if self.loss_function_name == 'CosineSimilarity':
            y_pos_value = float(y_pos_value)
            y_neg_value = float(y_neg_value)

        positive_X = self.autoencoder.generate_positive(self.positive_sample_size)
        negative_X = self.autoencoder.generate_negative(self.negative_sample_size)
        
        self.X = np.concatenate([positive_X, negative_X])
        self.y = np.concatenate([[y_pos_value for i in range(positive_X.shape[0])], 
                                 [y_neg_value for i in range(negative_X.shape[0])]])

        if self.cross_validation_split == None or self.cross_validation_split == 1:
            self.model = self.__create_model()
            self.optimizer = globals()[self.optimizer_name](self.learning_rate)
            self.loss_function = self.__get_loss_function(self.loss_function_name)
                
            self.model.compile(
                optimizer=self.optimizer,
                loss=self.loss_function,
                metrics=self.metrics
            )

            (self.train_evaluation, self.test_evaluation) = self.__train(self.__split_dataset(self.X, self.y), epochs=self.max_epochs, verbose=self.verbose)

        else:
            best_loss = None
            best_model = None
            
            self.kfold = KFold(n_splits=self.cross_validation_split, shuffle=True, random_state=self.random_state)
            self.kfold_evaluations = []
            self.kfold_histories = []
            kfold_test_evaluation = []
            kfold_train_evaluation = []

            for train_idx, test_idx in self.kfold.split(self.X):
                X_train, X_test = self.X[train_idx], self.X[test_idx]
                y_train, y_test = self.y[train_idx], self.y[test_idx]

                self.model = self.__create_model()
                optimizer = globals()[self.optimizer_name](self.learning_rate)
                loss_function = self.__get_loss_function(self.loss_function_name)
                self.model.compile(
                    optimizer=optimizer,
                    loss=loss_function,
                    metrics=self.metrics
                )
            
                (train_evaluation, test_evaluation) = self.__train((X_train, X_test, y_train, y_test), epochs=self.max_epochs, verbose=self.verbose)
                
                kfold_train_evaluation.append(train_evaluation)
                kfold_test_evaluation.append(test_evaluation)
                self.kfold_evaluations.append({
                    'training': train_evaluation,
                    'testing': test_evaluation
                })
                self.kfold_histories.append(
                    self.fit_history
                )
                self.kfold_train_evaluation = dict_mean(kfold_train_evaluation)
                self.kfold_test_evaluation = dict_mean(kfold_test_evaluation)

                test_loss = test_evaluation["loss"]

                if best_loss == None or test_loss < best_loss:
                    best_loss = test_loss
                    best_model = self.model

            self.fit_history = None
            self.train_evaluation = None
            self.test_evaluation = None
            self.model = best_model

        self.__compile_output()
        self.initialized = True

    def print_evaluation(self, show_history=False):
        print(f'Classifier Model:')
        print(f'  autoencoder: {self.autoencoder.optimizer_name} (lr={self.autoencoder.learning_rate}) - {self.autoencoder.loss_function_name} - seed={self.autoencoder.random_state}')
        print(f'    euclidean dist (augmented): {self.autoencoder.euclidean_dist_positive_class}')
        print(f'    euclidean dist delta: {self.autoencoder.euclidean_dist_delta}')
        print(f'    euclidean dist ratio: {self.autoencoder.euclidean_dist_ratio}')
        print(f'  optimizer: {self.optimizer_name}')
        print(f'  learning rate: {self.learning_rate}')
        print(f'  loss function: {self.loss_function_name}')
        print(f'  epochs: {self.epochs}')
        print(f'  random seed: {self.random_state}')
        print(f'  early stopping patience:  {self.early_stopping_patience}')
        print(f'  positive sample size: {self.positive_sample_size}')
        print(f'  negative sample size: {self.negative_sample_size}')
        if self.cross_validation_split == None or self.cross_validation_split == 1:
            print(f'Training Results: ')
            print(f'  true positives: {self.train_evaluation["true positives"]}'), 
            print(f'  true negatives: {self.train_evaluation["true negatives"]}'), 
            print(f'  false positives: {self.train_evaluation["false positives"]}'), 
            print(f'  false negatives: {self.train_evaluation["false negatives"]}'), 
            
            print(f'  accuracy: {self.train_evaluation["accuracy"]}'), 
            print(f'  precision: {self.train_evaluation["precision"]}'), 
            print(f'  recall: {self.train_evaluation["recall"]}'), 
            print(f'  specificity: {self.train_evaluation["specificity"]}'), 

            print(f'  loss: {self.train_evaluation["loss"]}'), 
            print(f'  area under curve: {self.train_evaluation["area-under-curve"]}'), 
            print(f'  precision recall curve: {self.train_evaluation["precision-recall-curve"]}'), 
            
            if self.test_evaluation != None:
                print(f'Test Results: ')
                print(f'  true positives: {self.test_evaluation["true positives"]}'), 
                print(f'  true negatives: {self.test_evaluation["true negatives"]}'), 
                print(f'  false positives: {self.test_evaluation["false positives"]}'), 
                print(f'  false negatives: {self.test_evaluation["false negatives"]}'), 
                
                print(f'  accuracy: {self.test_evaluation["accuracy"]}'), 
                print(f'  precision: {self.test_evaluation["precision"]}'), 
                print(f'  recall: {self.test_evaluation["recall"]}'), 
                print(f'  specificity: {self.test_evaluation["specificity"]}'), 
    
                print(f'  loss: {self.test_evaluation["loss"]}'), 
                print(f'  area under curve: {self.test_evaluation["area-under-curve"]}'), 
                print(f'  precision recall curve: {self.test_evaluation["precision-recall-curve"]}'), 
        else: 
            print(f'  cross-validation-split: {self.cross_validation_split}')
            print(f'K-Fold Mean Training Results: ')
            print(f'  true positives: {self.kfold_train_evaluation["true positives"]}'), 
            print(f'  true negatives: {self.kfold_train_evaluation["true negatives"]}'), 
            print(f'  false positives: {self.kfold_train_evaluation["false positives"]}'), 
            print(f'  false negatives: {self.kfold_train_evaluation["false negatives"]}'), 
            
            print(f'  accuracy: {self.kfold_train_evaluation["accuracy"]}'), 
            print(f'  precision: {self.kfold_train_evaluation["precision"]}'), 
            print(f'  recall: {self.kfold_train_evaluation["recall"]}'), 
            print(f'  specificity: {self.kfold_train_evaluation["specificity"]}'), 

            print(f'  loss: {self.kfold_train_evaluation["loss"]}'), 
            print(f'  area under curve: {self.kfold_train_evaluation["area-under-curve"]}'), 
            print(f'  precision recall curve: {self.kfold_train_evaluation["precision-recall-curve"]}'), 
            
            print(f'K-Fold Mean Test Results: ')
            print(f'  true positives: {self.kfold_test_evaluation["true positives"]}'), 
            print(f'  true negatives: {self.kfold_test_evaluation["true negatives"]}'), 
            print(f'  false positives: {self.kfold_test_evaluation["false positives"]}'), 
            print(f'  false negatives: {self.kfold_test_evaluation["false negatives"]}'), 
            
            print(f'  accuracy: {self.kfold_test_evaluation["accuracy"]}'), 
            print(f'  precision: {self.kfold_test_evaluation["precision"]}'), 
            print(f'  recall: {self.kfold_test_evaluation["recall"]}'), 
            print(f'  specificity: {self.kfold_test_evaluation["specificity"]}'), 

            print(f'  loss: {self.kfold_test_evaluation["loss"]}'), 
            print(f'  area under curve: {self.kfold_test_evaluation["area-under-curve"]}'), 
            print(f'  precision recall curve: {self.kfold_test_evaluation["precision-recall-curve"]}'),
        print(f' ')
            
        if show_history:
            if self.cross_validation_split == None or self.cross_validation_split == 1:
                plot_training_history(self.fit_history)
            else: 
                for idx, history in enumerate(self.kfold_histories):
                    print(f'Fold {idx+1}/{len(self.kfold_histories)}')
                    plot_training_history(history)
            

In [5]:
def evaluate_classifier(classifier, true_positive_data=positive_class_data, true_negative_data=negative_class_data):
    true_pos_predictions, true_neg_predictions = evaluate_authentication(classifier, true_positive_data, true_negative_data)
    
    return {
        'optimizer': classifier.optimizer_name,
        'loss function': classifier.loss_function_name,
        'learning rate': classifier.learning_rate,
        'random state': classifier.random_state,
        'epochs (max)': classifier.max_epochs,
        'early stopping patience': classifier.early_stopping_patience,
        'epochs trained': classifier.epochs,
        'k-fold split': classifier.cross_validation_split,
        'positive sample size': classifier.positive_sample_size,
        'negative sample size': classifier.negative_sample_size,
    
        'true positives (train)': classifier.kfold_train_evaluation['true positives'], 
        'true positives (test)': classifier.kfold_test_evaluation['true positives'], 
        'true negatives (train)': classifier.kfold_train_evaluation['true negatives'], 
        'true negatives (test)': classifier.kfold_test_evaluation['true negatives'], 
        'false positives (train)': classifier.kfold_train_evaluation['false positives'], 
        'false positives (test)': classifier.kfold_test_evaluation['false positives'], 
        'false negatives (train)': classifier.kfold_train_evaluation['false negatives'], 
        'false negatives (test)': classifier.kfold_test_evaluation['false negatives'], 
        
        'accuracy (train)': classifier.kfold_train_evaluation['accuracy'], 
        'accuracy (test)': classifier.kfold_test_evaluation['accuracy'],
        'precision (train)': classifier.kfold_train_evaluation['precision'], 
        'precision (test)': classifier.kfold_test_evaluation['precision'],
        'recall (train)': classifier.kfold_train_evaluation['recall'], 
        'recall (test)': classifier.kfold_test_evaluation['recall'],
        'specificity (train)': classifier.kfold_train_evaluation['specificity'], 
        'specificity (test)': classifier.kfold_test_evaluation['specificity'],
    
        'loss (train)': classifier.kfold_train_evaluation['loss'], 
        'loss (test)': classifier.kfold_test_evaluation['loss'],
        'area-under-curve (train)': classifier.kfold_train_evaluation['area-under-curve'], 
        'area-under-curve (test)': classifier.kfold_test_evaluation['area-under-curve'],
        'precision-recall-curve (train)': classifier.kfold_train_evaluation['precision-recall-curve'], 
        'precision-recall-curve (test)': classifier.kfold_test_evaluation['precision-recall-curve'],
    
        'autoencoder': f'{classifier.autoencoder.optimizer_name} (lr={classifier.autoencoder.learning_rate}) - {classifier.autoencoder.loss_function_name} - seed={classifier.autoencoder.random_state}', 
        'ae l2 (augmented)': classifier.autoencoder.euclidean_dist_positive_class, 
        'ae l2 delta': classifier.autoencoder.euclidean_dist_delta, 
        'ae l2 ratio': classifier.autoencoder.euclidean_dist_ratio,

        'true positive predictions': true_pos_predictions,
        'true negative predictions': true_neg_predictions,   
    }

#@background # concurrency breaks reproducability!
def train_classifier(classifier, index):
    start = time.time()    
    print(f'Starting initialization on classifier #{index}.')
    classifier.initialize()
    print(f'Finished training classifier #{index} after {(time.time() - start):.2f} seconds.')
    
    return evaluate_classifier(classifier)

def cycle_eval_classifiers(autoencoders, optimizers, loss_functions, learning_rates, random_states_count=20, cross_validation_split=5, output=None):     
    classifiers = []
    classifier_results = []
    
    autoencoders_count = len(autoencoders)
    optimizers_count = len(optimizers)
    loss_functions_count = len(loss_functions)
    learning_rates_count = len(learning_rates)

    total_training_cycles = autoencoders_count * optimizers_count * loss_functions_count * learning_rates_count * random_states_count

    print(f'Starting training & evaluation for {autoencoders_count*optimizers_count*loss_functions_count*learning_rates_count*random_states_count} classifiers:')

    for ae_idx in range(autoencoders_count):       
        autoencoder_cycles = ae_idx * optimizers_count * loss_functions_count * learning_rates_count * random_states_count
        ae = autoencoders[ae_idx]
        
        for opt_idx in range(optimizers_count):       
            optimizer_cycles = opt_idx * loss_functions_count * learning_rates_count * random_states_count
    
            for loss_idx in range(loss_functions_count):           
                loss_function_cycles = loss_idx * learning_rates_count * random_states_count
                
                for learn_idx in range(learning_rates_count):
                    learning_rate_cycles = learn_idx * random_states_count
    
                    for rand_idx in range(random_states_count):       
                        random_seed = random.randrange(1000000000)
                        current_cycle = 1+rand_idx+learning_rate_cycles+loss_function_cycles+optimizer_cycles+autoencoder_cycles
        
                        classifier = Classifier(ae, optimizers[opt_idx], learning_rates[learn_idx], 
                                    loss_functions[loss_idx], epochs=Classifier.DEFAULT_EPOCHS, 
                                    early_stopping_patience=20, random_state=random_seed,
                                    cross_validation_split=5, init=False)

                    
                        if output != None:
                            classifier_evaluation_row_to_csv(train_classifier(classifier, current_cycle), output[0], output[1])
                            #classifier.print_evaluation(show_history=False)
                            del classifier
                            gc.collect()
                        else:
                            classifier_results.append(train_classifier(classifier, current_cycle))
                            classifiers.append(classifier)
    
    if output == None:
        return classifier_results, classifiers
    else:
        return None, None

def classifier_evaluation_row_to_csv(evaluation, filename, path):    
    full_filename = f'{path}/{filename}.csv'
    if os.path.exists(full_filename) == False:
        with open(full_filename, 'w', newline='') as file:
            csv.DictWriter(file, delimiter=";", fieldnames=evaluation.keys()).writeheader()
     
    with open(full_filename, 'a', newline='') as file:
        csv.DictWriter(file, delimiter=";", fieldnames=evaluation.keys()).writerow(evaluation)

def classifier_evaluation_to_csv(evaluation, filename, path):    
    dataframe = pd.DataFrame.from_dict(evaluations)   
    timestamp = str(datetime.datetime.today()).replace(' ', '_').replace(':', '-').split(".")[0]
    
    dataframe.to_csv(f'{path}/{filename}__{timestamp}.csv', sep=';', decimal=',', index=False)
    

In [4]:
# reduced optimizers and loss functions through previous cyclic evaluations
def find_classifier_hyperparameters(autoencoders, direct_to_file=False):
    optimizers = [
        #'Adam',
        'AdamW',
        #'Lion',
        'Adadelta', # very slow learning!
        'Adagrad',
        'Adamax',
        'Ftrl',
        'Nadam',
        'RMSprop',
        'SGD',
    ]
    
    loss_functions = [
        'BinaryCrossentropy',
        'WeightedBinaryCrossentropy',
        'Huber',
        'MeanSquaredError',
        'Poisson',
        'Hinge',
        #'CosineSimilarity',
        #'MeanSquaredLogarithmicError',
        #'MeanAbsoluteError',
        #'MeanAbsolutePercentageError',
    ]

    EVALUATIONS_OUTPUT_PATH = "evaluations"
    if not os.path.exists(EVALUATIONS_OUTPUT_PATH):
        os.makedirs(EVALUATIONS_OUTPUT_PATH)

    filename = 'classifier_evaluations_phase-1'
    direct_output = (filename, EVALUATIONS_OUTPUT_PATH) if direct_to_file else None
    
    learning_rates = [0.05, 0.03, 0.01]        
    evaluation_results, classifiers = cycle_eval_classifiers(autoencoders, optimizers, loss_functions, learning_rates, random_states_count=1, output=direct_output)

    return evaluation_results, classifiers

In [None]:
# find_classifier_hyperparameters(autoencoders)