In [None]:
import comet_ml
from datetime import datetime
import keras
import keras.backend as K
from keras.layers import Dense, Dropout
import numpy as np


def metrics_report(self, y_test: np.ndarray, y_pred: np.ndarray):

        y_test = np.ravel(y_test)
        y_pred = np.ravel(y_pred)

        true_positives = np.sum(y_test * y_pred)
        false_positives = np.sum(np.abs(y_test - 1) * y_pred)
        true_negatives = np.sum((y_test - 1) * (y_pred - 1))
        false_negatives = np.sum(y_test * np.abs(y_pred - 1))

        accuracy = round((true_positives + true_negatives) / (true_positives + true_negatives + false_positives + false_negatives), 4)
        precision = round(true_positives / (true_positives + false_positives), 4)
        recall = round(true_positives / (true_positives + false_negatives), 4)
        specificity = round(true_negatives / (true_negatives + false_positives), 4)
        npv = round(true_negatives / (true_negatives + false_negatives), 4)
        f1_1 = round(2 * (precision * recall) / (precision + recall), 4)
        f1_0 = round(2 * (specificity * npv) / (specificity + npv), 4)
        f1_macro = round((f1_1 + f1_0) / 2, 4)

        scores_dict = dict(
            accuracy = accuracy, f1_macro = f1_macro,
            f1_1 = f1_1, f1_0 = f1_0,
            Precision = precision, Recall = recall,
            Specificity = specificity, npv = npv,
            TP = int(true_positives), FP = int(false_positives), FN = int(false_negatives),
            TN = int(true_negatives),
            y_test_shape = y_test.shape,
            y_pred_shape = y_pred.shape,
            total_samples = true_negatives + true_positives + false_positives + false_negatives,
        )

        return scores_dict


def create_model():

    self.model.add(Dense(units = layer_dict['units'],
                         input_shape = (self.split_sets['X_train'].shape[1],),
                         activation = layer_dict['activation'],
                         kernel_initializer = self.hypermethods['initializer'],
                         kernel_regularizer = self.hypermethods['regularizer']))

    self.model.add(Dense(units = layer_dict['units'],
                         activation = layer_dict['activation'],
                         kernel_initializer = self.hypermethods['initializer'],
                         kernel_regularizer = self.hypermethods['regularizer']))

    self.model.add(Dropout(rate = layer_dict['rate']))

    self.model.compile(loss = self.hypermethods['loss'],
                       optimizer = self.hypermethods['optimizer'],
                       metrics = ["accuracy"],
    )

    print(self.model.summary())


X_train = pd.read_csv(f'{PROJECT_DIR}/DATA\MODELS_DATA\AFDB_CUDB_MITDB\Ann\AnnSegments_Clean\AFvsNoAF_Oversampled_AB_B_T\Splits\\{SPLIT_ID}/features_{SEGMENT_LENGTH.lower()}_train.csv')
X_test = pd.read_csv(f'{PROJECT_DIR}/DATA\MODELS_DATA\AFDB_CUDB_MITDB\Ann\AnnSegments_Clean\AFvsNoAF_Oversampled_AB_B_T\Splits\\{SPLIT_ID}/features_{SEGMENT_LENGTH.lower()}_test.csv')
y_train = X_train.pop('AF Clean Class')
y_test = X_test.pop('AF Clean Class')

episodes_train = X_train.pop('Clean Class')
episodes_test = X_test.pop('Clean Class')

# Data selection

beat_cols = [col for col in X_train.columns if re.match(r'Beat\d', col)]
rr_cols = [col for col in X_train.columns if re.match(r'RR\d', col)]
rrbpm_cols = [col for col in X_train.columns if re.match(r'RRBPM\d', col)]

if RRS_ID == 'RR':
    X_train.drop(rrbpm_cols, inplace = True, axis = 1)
    X_test.drop(rrbpm_cols, inplace = True, axis = 1)

if RRS_ID == 'RRBPM':
    X_train.drop(rr_cols, inplace = True, axis = 1)
    X_test.drop(rr_cols, inplace = True, axis = 1)

if BEATS_ID == 'NB':
    X_train.drop(beat_cols, inplace = True, axis = 1)
    X_test.drop(beat_cols, inplace = True, axis = 1)

In [None]:
search_space = dict(
        layers_list = [
            [
                {'units': int(X_train.shape[1]), 'type': 'dense', 'activation': 'relu'},
                {'units': int(X_train.shape[1] / 2), 'type': 'dense', 'activation': 'relu'},
            ],
            [
                {'units': int(X_train.shape[1]), 'type': 'dense', 'activation': 'relu'},
                {'units': int(X_train.shape[1] * 2 / 3), 'type': 'dense', 'activation': 'relu'},
                {'units': int(X_train.shape[1] * 1 / 3), 'type': 'dense', 'activation': 'relu'},
            ],
            [
                {'units': int(X_train.shape[1] * 2), 'type': 'dense', 'activation': 'relu'},
                {'units': int(X_train.shape[1]), 'type': 'dense', 'activation': 'relu'},
                {'units': int(X_train.shape[1] / 2), 'type': 'dense', 'activation': 'relu'},
            ],
        ],
        batch_size = [10000,5000,1000],
        learning_rate = [0.2, 0.1, 0.03],
        optimizer = ['Adam', 'Nadam', 'RMSProp'],
        loss = ['CategoricalHinge', 'CategoricalCrossentropy', 'Poisson'],
        regularizer = ['tf.keras.regularizers.l1_l2(l1=0.01, l2=0.01)', None],
        initializer = ['VarianceScaling', 'GlorotNormal'],
        patience = [500],
        decay = [1e-2],
        early_stopping_flag = [True],
        validation_split = [0.05],
        epochs = [5],
)

# combinations
keys, values = zip(*search_space.items())
combinations_dicts = [dict(zip(keys, v)) for v in itertools.product(*values)]

# tabular combinations
model_names = [f'{EXPERIMENT_ID}_{i+1}' for i in range(len(combinations_dicts))]
combinations_df = pd.DataFrame(combinations_dicts)
combinations_df.insert(0, 'Model', model_names)
combinations_df['Trained'] = 'No'

# save
search_space_path = f'{metadata_search_space_dir.to_string}\\search_space_{EXPERIMENT_ID}.xlsx'
combinations_df.to_excel(search_space_path, index = False)



self.callbacks.append(keras.callbacks.EarlyStopping(
    monitor = 'val_loss', mode = 'min', verbose = 10,
    patience = callbacks_dict_of_dicts['early_stopping']['patience']
))

            self.callbacks.append(
                keras.callbacks.EarlyStopping(monitor = 'val_loss',
                                              mode = 'min',
                                              verbose = 10,
                                              patience = self.callbacks_dict_of_dicts['early_stopping']['patience'])
            )


with self.comet_experiment.train():

    fit_metadata_tracker = self.model.fit(x = self.split_sets['X_train'],
                                          y = self.split_sets['y_train'],
                                          epochs = self.hyperparameters['epochs'],
                                          batch_size = self.hyperparameters['batch_size'],
                                          validation_split = self.hyperparameters['validation_split'],
                                          callbacks = self.callbacks,
                                          verbose = 10,
                                          class_weight = self.split_sets_metadata['class_weight'],
                                          workers = -1
                                          )

return self.metrics_report(y_pred = self.model.predict_classes(self.split_sets['X_test']), y_test = self.split_sets['y_test'])


