# Improve Architecture of MCDCNN

In [None]:
import itertools

import numpy as np
from tensorflow import keras
import tensorflow as tf
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map, thread_map

import src.LoadData as LoadData
import src.TrainProdecure as TrainProdecure

In [None]:
# General Configurations:
EPOCHS_PER_TRAINING = 10
# DATASETS = LoadData.get_all_datasets_test_train_np_arrays('../datasets/')
DATASETS = LoadData.get_mcdcnn_datasets_test_train_np_arrays('../datasets/')
ds_names = list(DATASETS.keys())

for key in DATASETS:
    train_x, train_y = DATASETS[key]["train_data"]
    DATASETS[key]["train_data"] = (tf.convert_to_tensor(train_x), tf.convert_to_tensor(train_y))
    test_x, test_y = DATASETS[key]["train_data"]
    DATASETS[key]["test_data"] = (tf.convert_to_tensor(test_x), tf.convert_to_tensor(test_y))

ds_names

## Build MCDCNN depending on some configuration

In [None]:
def build_MCDCNN(input_size, output_size,
                 conv_config=(8, 4), kernel_size=5, padding_method="same", dense_config=(732, ),
                 conv_activation="sigmoid", dense_activation="sigmoid",
                 pooling_method=keras.layers.MaxPool1D, pooling_size=2,
                 dropout_rate=0.5, use_batchnorm=False):
    """
    Build a MCDCNN model which architecture is defined by the parameters.
    :param input_size: Size of the input
    :param output_size: Size of the output
    :param conv_config: List of filters for each convolutional layer -> [filters, ...]
    :param kernel_size: Size of the kernel for the convolutional layers
    :param padding_method: The method used for padding the input (do not use "valid" padding) -> "same"  | "causal"
    :param dense_config: List of units for each dense layer -> [units, ...]
    :param conv_activation: Activation function for the convolutional layers
    :param dense_activation: Activation function for the dense layers
    :param pooling_method: The method used for pooling the output of the convolutional layers -> "avg_pool" | "max_pool"
    :param pooling_size: The size of the pooling window
    :param dropout_rate: The rate of the dropout layers. If dropout_rate=0, no dropout layers will be added
    :param use_batchnorm: If True, batch normalization layers will be added after each convolutional and dense layer
    """
    if dropout_rate >= 1:
        raise ValueError(f"Dropout rate must be between 0 and 1 but is set to {dropout_rate}")
    if not conv_config or len(conv_config) == 0:
        raise ValueError(f"Convolutional configuration must not be empty but is set to {conv_config}")
    if not dense_config or len(dense_config) == 0:
        raise ValueError(f"Dense configuration must not be empty but is set to {dense_config}")

    model = keras.Sequential()
    model.add(keras.layers.Input((input_size, 1)))

    # Convolutional Layers
    for filters in conv_config:
        model.add(keras.layers.Conv1D(
            filters=filters,
            kernel_size=kernel_size,
            activation=conv_activation,
            input_shape=(input_size, 1),
            padding=padding_method,
            kernel_initializer=tf.keras.initializers.GlorotUniform(),
        ))

        if use_batchnorm:
            model.add(keras.layers.BatchNormalization())
        if dropout_rate > 0:
            model.add(keras.layers.Dropout(dropout_rate))

        model.add(
            keras.layers.MaxPooling1D(pool_size=pooling_size) if pooling_method == "max_pool"
            else keras.layers.AveragePooling1D(pool_size=pooling_size)
        )


    # Flatten Layer to feed Dense Layers
    model.add(keras.layers.Flatten())

    # Dense Layers
    for units in dense_config:
        model.add(keras.layers.Dense(
            units,
            activation=dense_activation,
            kernel_initializer=tf.keras.initializers.GlorotUniform(),
        ))
        if use_batchnorm:
            model.add(keras.layers.BatchNormalization())
        if dropout_rate > 0:
            model.add(keras.layers.Dropout(dropout_rate))

    # Output Layer
    model.add(keras.layers.Dense(
        output_size,
        activation="softmax",
        kernel_initializer=tf.keras.initializers.GlorotUniform(),
    ))

    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

## Define Configurations to try

In [None]:
PARAMETER_CONFIGURATIONS = {
    "conv_config": [[8,], [8, 4], [8, 8], [4, 4, 4], [8, 8, 8], [16], [16, 16]],
    "kernel_size": [3, 5, 7],
    "padding_method": ["same", "causal"],
    "dense_config": [[732, ], [732, 732], [732, 732, 732], [512], [100], [10], [512, 512], [100, 100], [10, 10], [100, 100, 100], [10, 10, 10]],
    "conv_activation": ["sigmoid", "relu"],
    "dense_activation": ["sigmoid", "relu"],
    "pooling_method": ["avg_pool", "max_pool"],
    "pooling_size": [2, 4],
}

all_configs = [dict(zip(PARAMETER_CONFIGURATIONS.keys(), v)) for v in itertools.product(*PARAMETER_CONFIGURATIONS.values())]
print(f"Number of configurations to try: {len(all_configs)}")

## Train all configurations and save the best ones

In [None]:
def get_config_str(conv_config=(8, 4), kernel_size=5, padding_method="same", dense_config=(732, ),
                 conv_activation="sigmoid", dense_activation="sigmoid",
                 pooling_method=keras.layers.MaxPool1D, pooling_size=2,
                 dropout_rate=0.5, use_batchnorm=False):
    return f"{conv_config}-filters_{kernel_size}-kernel_{padding_method}({conv_activation})--{dense_config}-dense_layers({dense_activation})--{pooling_method}({pooling_size}){dropout_rate if dropout_rate>0 else ''}{'_batchnorm' if use_batchnorm else ''}"

In [None]:
def train_and_evaluate_config(config):
    config_string = get_config_str(**config)
    print(f"Training model with config: {config_string}")

    accuracies_per_dataset = []
    for ds_name in (t2 := tqdm(ds_names, unit="dataset", desc="Training the model on each dataset...")):
        t2.set_postfix_str(f"Dataset: {ds_name}")
        train_x, train_y = DATASETS[ds_name]["train_data"]
        test_x, test_y = DATASETS[ds_name]["test_data"]

        model = build_MCDCNN(input_size=train_x.shape[1], output_size=len(np.unique(train_y)), **config)

        _, test_loss, test_acc, history = TrainProdecure.train_single_model(model, train_x, train_y, test_x, test_y, epochs=EPOCHS_PER_TRAINING, model_name=f"{ds_name}_{config_string}", dataset_name=ds_name)

        accuracies_per_dataset.append(test_acc)

    return np.mean(accuracies_per_dataset), config_string, config

In [None]:
results = process_map(train_and_evaluate_config, all_configs, max_workers=5)

In [None]:
results.sort(key=lambda x: x[0], reverse=True)

In [None]:
highest_accuracy, best_config, best_model = results[0]

In [None]:
print(f"Best model has accuracy {highest_accuracy} and config {get_config_str(**best_config)}")
best_model.summary()