# <span style='font-family: CMU Sans Serif, sans-serif;'> Feed-Forward Neural-Networks  </span> 

## <span style='font-family: CMU Sans Serif, sans-serif;'> Workflow  </span> 

Below we look at different feed-forward achitectures. To find the *optimal models* within each achitectures we implement grid search with successive handling to stop poorly performing models early. We use a pythonian approach to building the models (*i.e. we create modules for each model*). For architecture we will define the following hyper-parameters.

| Hyper-parameter               | Variable                                                           |
|-------------------------------|--------------------------------------------------------------------|
| Number of epochs              | $E$                                                                |
| Amount of hidden layers       | $R$                                                                |
| Amount of neuron per layer    | $n^{(r)}$                                                          |
| Learning rate                 | $\alpha$                                                           |
| Mini-batch size               | $m$                                                                |
| $L_1$ or $L_2$ regularization | $L_1$ or $L_2$                                                     |
| Regularization factor         | $\lambda$                                                          |
| Optimization algorithm        | $\text{Adam}$, or $\text{RMSprop}$, or $\text{SGD}$ with momentum. |
| Momentum in $\text{SGD}$      | $\mu$                                                              |
| Activation function           | $\text{ReLU}$, $\text{PReLU}$, $\text{Leaky ReLU}$                 |
| Dropout (bool)                | $\text{Dropout}^{T}$ or $\text{Dropout}^{F}$                       |
| Dropout parameter             | $p$                                                                |
| Last hidden layer $\tanh$     | $\text{True}$ or $\text{False}$                                    |

## <span style='font-family: CMU Sans Serif, sans-serif;'> Package Import  </span> 

In [14]:
# Data handling
from typing import Union, List, Dict, Optional
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import pandas as pd
import numpy as np
import inspect
import random
import json
import uuid
import os


# Neural networks
import tensorflow as tf
from tensorflow.keras.utils import plot_model

import keras.optimizers
import keras.losses
import keras.metrics

## <span style='font-family: CMU Sans Serif, sans-serif;'> Data </span> 

### <span style='font-family: CMU Sans Serif, sans-serif;'> Import  </span> 

Below we import the main dataframe and the list of remaining primary features.

In [15]:
# Import main data
data_gfd_usa = pd.read_csv('../data__clean/usa__gfd__cleaned_v2.csv')

# Import primary features remaining
with open('../data__clean/listPrimaryFeaturesRemaining.json', 'r') as f:
    list_primary_features = json.load(f)

### <span style='font-family: CMU Sans Serif, sans-serif;'> Train/Test Split  </span> 

We need to split data into test and train for training and model validation. This is done below with the scikit-learn package.

In [16]:
# Extract features and labels from data
data_features = data_gfd_usa[list_primary_features]
data_labels = data_gfd_usa['ret_exc_lead1m']

# Split features and labels into test and train
data_train_features, data_test_features, data_train_labels, data_test_labels = train_test_split(data_features, data_labels, test_size=0.2)

# <span style='font-family: CMU Sans Serif, sans-serif;'> Building Modules  </span> 

What is below: (1 custom classes that can be used on all models, and (2) specific classes for each architecture (or complexities within architectures).

Below we define an optimizer class and a manager class we use to configure and manage the neural networks created. First we define some colors we would like to use when printing the output. 

In [17]:
RESET = "\033[0m"
CYAN  = "\033[1;36m"
MAG   = "\033[1;35m"
YEL   = "\033[1;33m"

### <span style='font-family: CMU Sans Serif, sans-serif;'> Get optimizer function  </span> 

To keep things (*semi*) simple only 3 optimizers can be used: adam, SGD, RMSprop.

In [29]:
def get_optimizer(optimizer: str = "adam", random_search = False, **kwags):
    dflt_vals = {
        "sgd": {"learning_rate": 0.01, "momentum": 0.0},
        "adam": {
            "learning_rate": 0.01,
            "beta_1": 0.9,
            "beta_2": 0.999,
            "epsilon": 1e-07,
        },
        "rmsprop": {
            "learning_rate": 0.01,
            "rho": 0.9,
            "momentum": 0.0,
            "epsilon": 1e-07,
            "centered": False,
        },
    }

    # Optimizer class for each optimizer
    optimizer_classes = {
        "adam": tf.keras.optimizers.Adam,
        "sgd": tf.keras.optimizers.SGD,
        "rmsprop": tf.keras.optimizers.RMSprop,
    }

    # Normalize optimizer name
    optimizer_name = optimizer.lower()

    # Check if optimizer is supported
    if optimizer_name not in dflt_vals:
        raise ValueError(f"Optimizer '{optimizer_name}' not supported.")

    # Get default values for chosen optimizer
    dflt = dflt_vals[optimizer_name]

    # Get optimizer params (dict)
    optimizer_params = {param: kwags.get(param, dflt[param]) for param in dflt}

    # Get optimizer class
    optimizer_class = optimizer_classes[optimizer_name]

    if not random_search: 
        # Return optimizer with given or default params
        print(
            f"{CYAN}INFO:{RESET} {f'[get_optimizer]':<25} {optimizer_name:<25} params: {optimizer_params}"
        )

    return optimizer_class(**optimizer_params)

### <span style='font-family: CMU Sans Serif, sans-serif;'> Model manager class  </span> 

In [142]:
# TODO:
#   See this chat for additional ideas on model manager class https://chatgpt.com/c/67e5e6d4-c9b0-8004-98c4-375eb9e89c38


class ModelManager:
    # Class-level attribute to store the names of every model
    _all_model_names = set()

    @classmethod
    def get_all_model_names(cls):
        return list(cls._all_model_names)

    @classmethod
    def remove_model_name(cls, model_name):
        cls._all_model_names.discard(model_name)

    def __init__(self, model, optimizer, model_name = None, random_search = False):

        self.model = model
        self.model_name = model_name if not random_search else None
        self.random_search = random_search
        self.optimizer = optimizer
        self.callbacks = []
        self.training_history = {}
        if not random_search:
            ModelManager._all_model_names.add(model_name)

    def compile_model(self, loss: str = "mse", metrics: list = ["mse"]):
        self.model.compile(optimizer=self.optimizer, loss=loss, metrics=metrics)
        if not self.random_search:
            params = inspect.getargvalues(inspect.currentframe()).locals
            params.pop("self", None)
            print(
                f"{CYAN}INFO:{RESET} {f'[compile_model]':<25} {self.model_name:<25} params: {params}"
            )

    def setup_callbacks(self, earlystop=False, checkpoint=False, **kwargs):
        # Default parameter values for the callbacks
        callback_map = {
            "earlystop": (
                EarlyStopping,
                {
                    "monitor": "val_loss",
                    "min_delta": 1e-05,
                    "patience": 10,
                    "verbose": 0,
                    "mode": "auto",
                    "baseline": None,
                    "restore_best_weights": False,
                    "start_from_epoch": 0,
                },
            ),
            "checkpoint": (
                ModelCheckpoint,
                {
                    "filepath": "test",  # Placeholder filepath
                    "monitor": "val_loss",
                    "verbose": 0,
                    "save_best_only": False,
                    "save_weights_only": False,
                    "mode": "auto",
                    "save_freq": "epoch",
                    "initial_value_threshold": None,
                },
            ),
        }

        # Start with an empty list of callbacks
        self.callbacks = []

        # EarlyStopping Callback
        if earlystop:
            # Extract the class and default params from the callback_map
            callback_class, default_params = callback_map["earlystop"]
            # Merge defaults with any kwargs passed by the user
            merged_params = {
                **default_params,
                **{k: v for k, v in kwargs.items() if k in default_params},
            }
            earlystop_callback = callback_class(**merged_params)
            self.callbacks.append(earlystop_callback)
            if not self.random_search:
                print(
                    f"{CYAN}INFO:{RESET} {f'[setup_callbacks]':<25} {self.model_name:<25} params: {merged_params}"
                )

        # ModelCheckpoint Callback
        if not self.random_search:
            if checkpoint:
                # Extract the class and default params from the callback_map
                callback_class, default_params = callback_map["checkpoint"]
                # Create the 'check_points' folder if it doesn't exist
                checkpoint_dir = "check_points"
                if not os.path.exists(checkpoint_dir):
                    os.makedirs(checkpoint_dir)

                # Merge defaults with any kwargs passed by the user
                merged_params = {
                    **default_params,
                    **{k: v for k, v in kwargs.items() if k in default_params},
                }
                # Use the model_name for the checkpoint file path
                merged_params["filepath"] = os.path.join(
                    checkpoint_dir, f"{self.model_name}_check_points.keras"
                )
                checkpoint_callback = callback_class(**merged_params)
                self.callbacks.append(checkpoint_callback)
                print(
                    f"{CYAN}INFO:{RESET} {f'[setup_callbacks]':<25} {self.model_name:<25} params: {merged_params}"
                )

    def train_model(
        self,
        x,
        y,
        epochs=10,
        batch_size=32,
        verbose=1,
        validation_split=0.2,
    ):
        if not self.random_search:
            params = inspect.getargvalues(inspect.currentframe()).locals
            params.pop("x", None)
            params.pop("y", None)
            params.pop("self", None)
            print(
                f"{CYAN}INFO:{RESET} {f'[train_model]':<25} {self.model_name:<25} params: {params}" 
            )

        history = self.model.fit(
            x,
            y,
            epochs=epochs,
            batch_size=batch_size,
            verbose=verbose,
            callbacks=self.callbacks,
            validation_split=validation_split,
        )

        self.training_history = history

### <span style='font-family: CMU Sans Serif, sans-serif;'> Random search optimizer  </span> 

We will now create a random search module for finding the *best* hyperparameters within each model/architecture. When defining seach space we define the type of each parameter: 'uncond' or 'cond'. This will determine if the search parameter is conditional on other parameters. If the search parameter is conditional, we check if the condition is met. 

First we need to define the distributions from which the numerical hyperparameteres are drawn. This is done below.

In [20]:
# Define log normal distribution 
def log_norm(min, max):
    log_min = np.log(min)
    log_max = np.log(max)
    mu = (log_min + log_max) / 2
    sigma = (log_max - log_min)/ 2
    return np.random.lognormal(mu, sigma)


Here we define a function which get random parameters specified by the search space.

In [148]:
# Get random parameter from search_space
def get_rand_param(param_info: dict, params: dict):
    # Check if conditional
    if param_info['type'] == 'cond':
        
        # Check if condition met.
        if param_info['condition'](params):

            # Check if distribution should be used for rand generation
            if type(param_info['param_space'][0]) == float:
                return log_norm(param_info['param_space'][0], param_info['param_space'][1])
            # If not select rand element from array
            else:
                return random.choice(param_info['param_space'])
        # If condition not met retun None
        else:
            return None
    # If not a conditional get rand
    else:
        # If float find rand in distrbution 
        if type(param_info['param_space'][0]) == float:
                return log_norm(param_info['param_space'][0], param_info['param_space'][1])
        # Else find rand element in array
        else:
            return random.choice(param_info['param_space'])

In [None]:
#TODO:
# Make it so i can parse constant values to functions (e.g. by creating a new dictionary)
# Define random search function
def random_search(n_trails: int, search_space: dict, model_constructor, x_train, y_train):
    best_val_loss = float('inf')  
    best_result = None
 
    for trail in range(n_trails):
        print(f"Trail: {trail + 1}")
        # Define dictionaries with the randomly generated params (sep separates by 'placement')
        params = {}
        sep_params = {}

        for param in search_space.keys():
            rand_param = get_rand_param(search_space[param], params)

            if rand_param is not None:
                params[param] = rand_param
                placement = search_space[param]['placement']

                if placement not in sep_params:
                    sep_params[placement] = {}

                sep_params[placement][param] = rand_param

        optimizer = get_optimizer(**sep_params.get('get_optimizer', {}), random_search=True) 
        model = model_constructor(**sep_params.get('model_constructor', {}), random_search=True)
        model_mgmt = ModelManager(model = model, optimizer=optimizer, random_search=True)
        model_mgmt.compile_model(metrics=['mse', 'mae'])
        model_mgmt.setup_callbacks(True, restore_best_weights = True)
        model_mgmt.train_model(x=x_train, y=y_train, epochs=100, verbose=0, **sep_params.get('train_model', {}))
        val_loss = model_mgmt.training_history.history['val_loss'][-1]
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_result = {**params, 'val_loss': val_loss}

    return best_result


# <span style='font-family: CMU Sans Serif, sans-serif;'> Building networks  </span> 

## <span style='font-family: CMU Sans Serif, sans-serif;'> Vanilla FFN  </span> 

### <span style='font-family: CMU Sans Serif, sans-serif;'> Vanilla FNN function  </span> 

Below we define a function for building vanilla feed forward neural networks.

We allow for varying the following parameters dynamically:
- number of neurons in each layer;
- type of activation function used in each layer.

In [None]:
#TODO:
# Make it so if only one argument is passed for layer_neurons and activations, then that becomes the standard for all the layers.
def build_vanilla_fnn(
    input_shape=(141,), layer_neurons=[32, 16, 1], activations=["relu", "tanh", None], random_search = False
):
    # Same length of inputs
    assert len(layer_neurons) == len(activations), "layer_neurons and activations must have same length."
    model_id = str(uuid.uuid4())
    model = tf.keras.Sequential()
    model.add(tf.keras.Input(shape=input_shape))

    for i, (units, activation) in enumerate(zip(layer_neurons[:-1], activations[:-1])):
        model.add(
            tf.keras.layers.Dense(
                units, activation=activation, name=f"{model_id}_dense_{i+1}"
            )
        )
    model.add(
        tf.keras.layers.Dense(
            layer_neurons[-1],
            activation=activations[-1],
            name=f"{model_id}_dense_{len(layer_neurons)}",
        )
    )
    if not random_search:
        params = inspect.getargvalues(inspect.currentframe()).locals
        params.pop("model_id", None)
        params.pop("model", None)
        params.pop("i", None)
        params.pop("units", None)
        params.pop("activation", None)

        print(
            f"{CYAN}INFO:{RESET} {f'[build_vanilla_fnn]':<51} params: {params}"
        )

    return model

### <span style='font-family: CMU Sans Serif, sans-serif;'> Creating a NN  </span> 

Lest try our creating a first neural network.

In [137]:
# Define test search space 
search_space = {
    'optimizer': {'type': 'uncond', 'param_space': ['adam', 'sgd', 'rmsprop'], 'placement': 'get_optimizer'},
    'batch_size': {'type': 'uncond', 'param_space': [32, 64, 128], 'placement': 'train_model'},
    'momentum': {'type': 'cond', 'condition': lambda params: params['optimizer'] in ['sgd', 'rmsprop'], 'param_space': [0.2, 0.7], 'placement': 'get_optimizer'}
}

In [147]:
random_search(3, search_space, build_vanilla_fnn, data_train_features, data_train_labels)

Trail: 1
Trail: 2
Trail: 3


{'optimizer': 'adam', 'batch_size': 32, 'val_loss': 0.016122953966259956}