In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import tensorflow_datasets as tfds
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, ClassifierMixin

class CNNModel(BaseEstimator, ClassifierMixin):
  def __init__(
      self,
      input_shape,
      layer_metadata,
      convolution_part=['conv1d'],
      intermediate_part=['flatten'],
      dense_part=['dense'],
      convolution_hyperparameter=[{'filters':16, 'kernel_size':3, 'activation':'relu'}],
      intermediate_hyperparameter=[{}],
      dense_hyperparameter=[{'units':32, 'activation':'relu'}],
      optimizer='adam'
  ):
    self.input_shape = input_shape
    self.layer_metadata = layer_metadata
    self.convolution_part = convolution_part
    self.intermediate_part = intermediate_part
    self.dense_part = dense_part
    self.convolution_hyperparameter = convolution_hyperparameter
    self.intermediate_hyperparameter = intermediate_hyperparameter
    self.dense_hyperparameter = dense_hyperparameter
    self.optimizer = optimizer
    self.model = self.build_model(input_shape, convolution_part, intermediate_part, dense_part, convolution_hyperparameter, intermediate_hyperparameter, dense_hyperparameter, optimizer)

  def build_model(
      self,
      input_shape,
      convolution_part,
      intermediate_part,
      dense_part,
      convolution_hyperparameter,
      intermediate_hyperparameter,
      dense_hyperparameter,
      optimizer,
  ):

    def fix_hyperparameter_type(layer, hyperparameter, layer_metadata, warnings=True):
      for layer_info in layer_metadata:
        if(layer == layer_info['layer']):
          for params in layer_info['hyperparameter']:
            val = hyperparameter.get(params['param'], params['default'])
            if(type(val) != params['type']):
              if(warnings): print("Hyperparameter ", params['param'], " Have Invalid Data Type! Using Default..")
              hyperparameter[params['param']] = params['default']
            else:
              hyperparameter[params['param']] = val
        else:
          pass
      return hyperparameter

    model = models.Sequential()
    model.add(layers.Input(input_shape))

    # Convolution Part
    for layer, hyperparameter in zip(convolution_part, convolution_hyperparameter):
      hyperparameter = fix_hyperparameter_type(layer, hyperparameter, self.layer_metadata)

      if(layer == 'conv1d'):
        model.add(layers.Conv1D(filters=hyperparameter['filters'], kernel_size=hyperparameter['kernel_size'], activation=hyperparameter['activation']))
      elif(layer == 'maxpooling1d'):
        model.add(layers.MaxPooling1D(pool_size=hyperparameter['pool_size']))
      else:
        print("'Convolution Part' Layer Invalid!")
        return None

    # Intermediate Part
    for layer, hyperparameter in zip(intermediate_part, intermediate_hyperparameter):
      hyperparameter = fix_hyperparameter_type(layer, hyperparameter, self.layer_metadata)

      if(layer == 'flatten'):
        model.add(layers.Flatten())
      else:
        print("'Intermediate Part' Layer Invalid!")
        return None

    # Dense Part
    for layer, hyperparameter in zip(dense_part, dense_hyperparameter):
      hyperparameter = fix_hyperparameter_type(layer, hyperparameter, self.layer_metadata)

      if(layer == 'dense'):
        model.add(layers.Dense(units=hyperparameter['units'], activation=hyperparameter['activation']))
      else:
        print("'Dense Part' Layer Invalid!")
        return None

    model.add(layers.Dense(2, activation='softmax'))

    # Compile the model
    model.compile(optimizer=optimizer,
                loss='sparse_categorical_crossentropy',
                metrics=[
                    'accuracy'
                ])

    return model

  def fit(self, X_train, y_train, X_test, y_test, epochs, verbose=1, callbacks=[]):
    self.model.fit(X_train, y_train, epochs=epochs, validation_data=(X_test, y_test), verbose=verbose, callbacks=callbacks)

  def score(self, X, y):
    loss, accuracy = self.model.evaluate(X, y, verbose=0)
    loss_inverse = 1/(loss+1e-20)

    return loss_inverse

  def evaluate(self, X, y, verbose=1):
    # Evaluate the model on the test set
    loss, accuracy = self.model.evaluate(X, y, verbose=0)

    if(verbose != 0):
      print(f"Loss: %.3f%%" % (loss*100) )
      print(f"Accuracy: %.3f%%" % (accuracy*100) )

    return loss, accuracy

  def summary(self):
    self.model.summary()

  def predict(self, X):
    return self.model.predict(X, verbose=0)

  def save(self, path, save_format='keras'):
    return self.model.save(path, save_format)

In [None]:
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score

class MetricsCallback(tf.keras.callbacks.Callback):
    def __init__(self, validation_data=()):
        super(MetricsCallback, self).__init__()
        self.validation_data = validation_data
        self.logs = {}

    def on_epoch_end(self, epoch, logs={}):
        if not self.validation_data:
            raise RuntimeError("Requires validation_data.")

        X_val, y_val = self.validation_data
        y_pred = self.model.predict(X_val, verbose=0)

        # Convert one-hot encoded labels to class labels
        y_pred = np.argmax(y_pred, axis=1)

        # Compute Metrics score
        f1score = f1_score(y_val, y_pred, average='micro')
        roc_auc = roc_auc_score(y_val, y_pred, average='micro')

        # Add Metrics score to logs
        self.logs['roc_auc'] = roc_auc
        self.logs['f1score'] = f1score

In [None]:
tf.random.set_seed(37)
random.seed(37)

X_train, X_val, y_train, y_val = prep_data(df_raw)

log = None

schema_log = [
    row.to_dict()
    for index, row in
    log_raw[
        ['convolution_part', 'convolution_hyperparameter', 'dense_part', 'dense_hyperparameter', 'input_shape', 'layer_metadata']
    ].iterrows()
]

input_shape = X_train.shape[1:]
output_shape = 1

param_grid = {
  'n_conv': [1, 2, 3],
  'n_dense':[1, 2, 3],
  'conv_layer':['conv1d', 'maxpooling1d'],
  'dense_layer':['dense'],
  'filters':[2, 4, 8],
  'kernel_size':[3],
  'units': [16, 32, 64],
  'activation': ['relu']
}

def save_result(save_state, target_path):
  log.to_csv(target_path, index=False)

def get_schema_combination(part_name, n_layer, layer_values, layer_metadata, param_grid):
  schema_combination = []
  for n in n_layer:
    part_combination=itertools.product(layer_values, repeat=n)

    for part in part_combination:
      hyperparameters = []

      for selected_layer in part:

        # Get selected layers's possible hyperparameters
        hyperparameter_values = {}
        for layer in layer_metadata:
          if(selected_layer == layer['layer']):
            for params in layer['hyperparameter']:
              hyperparameter_values[params['param']] = [params['default']]
              if params['param'] in list(param_grid):
                hyperparameter_values[params['param']] = param_grid[params['param']]

        # get hyperparameter combination to grid search
        hyperparameter_combination = []
        for combination in list(itertools.product(*hyperparameter_values.values())):
          layer_hyperparameter = {}
          for key, val in zip(hyperparameter_values.keys(), combination):
            layer_hyperparameter[key] = val
          hyperparameter_combination.append(layer_hyperparameter)

        hyperparameters.append(hyperparameter_combination)

      for hyperparameter in list(itertools.product(*hyperparameters)):
        schema = {part_name+'_part': list(part), part_name+'_hyperparameter': list(hyperparameter)}
        schema_combination.append(schema)

  return schema_combination

conv_shcema = get_schema_combination(part_name='convolution',
                       n_layer = param_grid.get('n_conv', [1]),
                       layer_values = param_grid.get('conv_layer', ['conv1d']),
                       layer_metadata=layer_metadata,
                       param_grid=param_grid)

dense_schema = get_schema_combination(part_name='dense',
                       n_layer = param_grid.get('n_dense', [1]),
                       layer_values = param_grid.get('dense_layer', ['dense']),
                       layer_metadata=layer_metadata,
                       param_grid=param_grid)

model_schema_combination = []
for schema in itertools.product(conv_shcema, dense_schema):
  model_schema = {}
  for schema_dict in schema:
    model_schema.update(schema_dict)

  model_schema_combination.append(model_schema)

print(len(model_schema_combination))
print(model_schema_combination[0])
print(model_schema_combination[-1])