# ⏰ **AN2DL 2022 - CHALLENGE 2** ⏰

# **Project utilities:**


*   Google Drive connection
*   Installation of TSAug for Sequences augmentation
*   Metadata, variables and imports
*   Seed setting
*   Callback and folders creation



In [None]:
#@title **Loading data from gdrive to memory**
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/ANNDL_Challenge_2

In [None]:
!pip install tsaug

In [None]:
#@title **Imports**
import warnings
import logging
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from keras import Sequential
from keras.models import Model
from keras.layers import *
from keras.optimizers import Adam
import numpy as np
import os
import random
import pandas as pd
import scipy
from datetime import datetime
from google.colab import files
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, precision_score, recall_score
from sklearn.preprocessing import LabelBinarizer

In [None]:
#@title **Metadata and variables**

tfk = tf.keras
tfkl = tf.keras.layers

seed = 42
test_percentage = 0.1
nclasses = 12

kfold = False

In [None]:
#@title **Setting seed and/or suppressing warnings**
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)
tf.get_logger().setLevel('INFO')
tf.autograph.set_verbosity(0)

tf.get_logger().setLevel(logging.ERROR)
tf.get_logger().setLevel('ERROR')
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

# Setting random seed for reproducibility
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

In [None]:
#@title **Utility function to create folders and callbacks for training**
def create_folders_and_callbacks(model_name):

  exps_dir = os.path.join('trained_models')
  if not os.path.exists(exps_dir):
      os.makedirs(exps_dir)

  now = datetime.now().strftime('%m-%d_%H-%M-%S')

  exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
  if not os.path.exists(exp_dir):
      os.makedirs(exp_dir)
      
  callbacks = []

  # Model checkpoint
  # ----------------
  ckpt_dir = os.path.join(exp_dir, 'ckpts')
  if not os.path.exists(ckpt_dir):
      os.makedirs(ckpt_dir)

  ckpt_callback = tf.keras.callbacks.ModelCheckpoint(
                                                     filepath=ckpt_dir + '/cp-{val_accuracy:.2f}-{epoch:02d}.ckpt', # Checkpoint is saved with validation accuracy in the filename
                                                     # filepath=ckpt_dir + '/cp-{epoch:02d}.ckpt',
                                                     monitor='val_accuracy', 
                                                     # save_freq='epoch',
                                                     # period=10,
                                                     save_weights_only=True, # True to save only weights
                                                     save_best_only=True, # True to save only the best epoch 
                                                     initial_value_threshold=0.65
                                                     ) # Model is saved only if val_accuracy > initial_value_threshold

  callbacks.append(ckpt_callback)


  # Visualize Learning on Tensorboard
  # ---------------------------------
  tb_dir = os.path.join(exp_dir, 'tb_logs')
  if not os.path.exists(tb_dir):
      os.makedirs(tb_dir)
      
  # By default shows losses and metrics for both training and validation
  tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir, 
                                               profile_batch=0,
                                               histogram_freq=1)  # if > 0 (epochs) shows weights histograms
  callbacks.append(tb_callback)

  # Early Stopping
  # --------------
  es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=20, restore_best_weights=True)
  callbacks.append(es_callback)

  return callbacks, exp_dir

# **Data preprocessing:**
*   Oversampling
*   Undersampling
*   Augmentation 
*   Creation of sliding windows with random validation-test sampling with possible integrated augmentation and oversampling

In [None]:
#@title OneHot-to-categorical translation function

def to_numerical(y):
  return np.argmax(y, axis=1)

In [None]:
#@title Utility method to evaluate classes' weights

def get_num_elements(y):
    elements = np.zeros(nclasses)

    for i in range(nclasses):
         elements[i] = sum(1 for seq in range(y.shape[0]) if y[seq] == i)
    
    return elements


def get_class_weights(y, for_fit=True):

  num_samples = y.shape[0]

  elements = np.zeros(nclasses)
  weights = np.zeros(nclasses)
  elements = get_num_elements(y)

  for i in range(nclasses):
      weights[i] = (1 / elements[i]) * (num_samples / float(nclasses))


  class_weight = {0: weights[0], 1: weights[1], 2: weights[2], 3: weights[3],
                  4: weights[4], 5: weights[5], 6: weights[6], 7: weights[7], 
                  8: weights[8], 9: weights[9], 10: weights[10], 11: weights[11]}
  print("Samples count: ", elements)
  print("Class weights: ", weights)

  if not for_fit:
    return weights

  return class_weight

In [None]:
#@title Sort-back function

# This function gets unsorted training set and the complete one, and returns the sorted training set together with a set of indices that
# represent where some samples have been taken for validation or test (where the sequences have been broken).
# useful to reconstruct the ordered training set.
# needed to build sliding windows, because stratified sampling for val/test ruins the original order.

def sort_back(X_to_sort, y_to_sort, X, y):

  X_sorted = np.empty(X_to_sort.shape)
  y_sorted = np.empty(y_to_sort.shape)
  holes = [0]

  reordering_indices = []

  for r, row in enumerate(X_to_sort):
    reordering_indices.append(np.where(np.all(row == X, axis=1))[0][0])

  # print("reordering indices: ", reordering_indices)

  last_min = 0
  for r, row in enumerate(X_to_sort):
    min = np.min(reordering_indices)
    if r != 0 and r != X_to_sort.shape[0] and min > last_min + 1:
      holes.append(r-1)
    last_min = min
    reordering_indices.remove(min)
    X_sorted[r] = X[min]
    y_sorted[r] = y[min]
  
  holes.append(X_to_sort.shape[0])

  # print("Holes introduced randomly sampling this class: ", holes)

  return X_sorted, y_sorted, np.array(holes)


In [None]:
#@title Undersampling/Oversampling parameters generator

def get_class_prob(y):

    if y.ndim == 2:
      y = to_numerical(y)

    num_samples = y.shape[0]

    elements = np.zeros(nclasses)
    probs = np.zeros(nclasses)

    for i in range(nclasses):
         elements[i] = sum(1 for seq in range(y.shape[0]) if y[seq] == i)
         probs[i] = elements[i] / float(num_samples)
    
    return probs

# sampling parameters use it wisely 
oversampling_coef = 0.9 # if equal to 0 then oversample_classes() always returns 1
undersampling_coef = 2.0 # if equal to 0 then undersampling_filter() always returns True

def oversample_classes(y_train, nclasses=nclasses):
    """
    Returns the number of copies of given example
    """

    class_prob = get_class_prob(y_train)
    class_target_prob = 1 / nclasses
    prob_ratio = tf.cast(class_target_prob/class_prob, dtype=tf.float32)
    # soften ratio is oversampling_coef==0 we recover original distribution
    prob_ratio = prob_ratio ** oversampling_coef 
    # for classes with probability higher than class_target_prob we
    # want to return 1
    prob_ratio = tf.maximum(prob_ratio, 1) 
    # for low probability classes this number will be very large
    repeat_count = tf.floor(prob_ratio)
    # prob_ratio can be e.g 1.9 which means that there is still 90%
    # of change that we should return 2 instead of 1
    repeat_residual = prob_ratio - repeat_count # a number between 0-1
    residual_acceptance = tf.less_equal(
                        tf.random.uniform([], dtype=tf.float32), repeat_residual
    )

    residual_acceptance = tf.cast(residual_acceptance, tf.int64)
    repeat_count = tf.cast(repeat_count, dtype=tf.int64)
    return repeat_count + residual_acceptance

def undersampling_filter(y_train, nclasses=nclasses):
    """
    Computes if given example is rejected or not.
    """
    class_prob = get_class_prob(y_train)
    class_target_prob = 1 / nclasses
    prob_ratio = tf.cast(class_target_prob/class_prob, dtype=tf.float32)
    prob_ratio = prob_ratio ** undersampling_coef
    prob_ratio = tf.minimum(prob_ratio, 1.0)

    acceptance = tf.less_equal(tf.random.uniform([], dtype=tf.float32), prob_ratio)
    # predicate must return a scalar boolean tensor
    num_deletions = np.array([(1 - ratio) for ratio in prob_ratio])
    return num_deletions

In [None]:
#@title Undersampling function

def undersample(X, y, classes_to_undersample):
  filter = undersampling_filter(y)
  X_new = []
  y_new = []

  if y.ndim == 2:
    y_numerical = to_numerical(y)

  for i in range(X.shape[0]):
    if y_numerical[i] not in classes_to_undersample or (y_numerical[i] in classes_to_undersample and np.random.random() > filter[y_numerical[i]]/2):
      X_new.append(X[i])
      y_new.append(y[i])

  return np.array(X_new), np.array(y_new) 


In [None]:
#@title Oversampling function
def oversample(X, y_categorical):
  y_numerical = to_numerical(y_categorical)

  num_of_copies = oversample_classes(y_numerical, 4)

  X_new = []
  y_new = []
  
  for i in range(X.shape[0]):
    for _ in range(num_of_copies[y_numerical[i]]):
      X_new.append(X[i])
      y_new.append(y_categorical[i])

  return np.array(X_new), np.array(y_new)

In [None]:
#@title Plot of effect of Over/Undersampling on samples distributions 

X = np.load("training_dataset_homework2/x_train.npy")
y = np.load("training_dataset_homework2/y_train.npy")

# putting labels from Categorical to One-Hot
label_as_binary = LabelBinarizer()
y = label_as_binary.fit_transform(y)

X_os, y_os = oversample(X, y)
X_us, y_us = undersample(X, y, [i for i in range(nclasses)])

elements_original = get_num_elements(y=to_numerical(y))
elements_os = get_num_elements(y=to_numerical(y_os))
elements_us = get_num_elements(y=to_numerical(y_us))

x_axis = ["Wish",
    "Another",
    "Comfortably",
    "Money",
    "Breathe",
    "Time",
    "Brain",
    "Echoes",
    "Wearing",
    "Sorrow",
    "Hey",
    "Shine"]

fig, axs = plt.subplots(1, 3, figsize=(20, 6))

# We can set the number of bins with the *bins* keyword argument.
axs[0].set_ylim([0, 800])
axs[0].tick_params(labelrotation=90)
axs[0].set_title("Original distribution")
axs[0].bar(x_axis, elements_original, align="edge")
axs[1].set_ylim([0, 800])
axs[1].tick_params(labelrotation=90)
axs[1].set_title("Distribution after Oversampling")
axs[1].bar(x_axis, elements_os)
axs[2].set_ylim([0, 800])
axs[2].tick_params(labelrotation=90)
axs[2].set_title("Distribution after Undersampling")
axs[2].bar(x_axis, elements_us)

plt.show()


In [None]:
#@title Augmentation function

import tsaug
from tsaug import AddNoise, Convolve, Crop, Drift, Pool, Quantize, Resize, Reverse, TimeWarp

heavy_augmenter = (
                tsaug.AddNoise(scale=(0.5, 0.8)) @ 0.1
                # + tsaug.Convolve(window="hamming", size=12) @ 0.2
                # + tsaug.Drift(max_drift=(0, 0.02)) @ 0.2
                + tsaug.TimeWarp(50) @ 0.3
                + tsaug.Pool(size=4) @ 0.3
              )
light_augmenter = (
                tsaug.AddNoise(scale=(0.05, 0.2)) @ 0.5
                # + tsaug.Convolve(window="hamming", size=12) @ 0.2
                # + tsaug.Drift(max_drift=(0, 0.02)) @ 0.2
                + tsaug.TimeWarp(2) @ 0.3
                + tsaug.Pool(size=4) @ 0.3
              )

def augment_dataset(X, y):
  y_numerical = to_numerical(y)
  X_aug = []
  for i in range(X.shape[0]):
    print("Augmenting element " + str(i))
    if y_numerical[i] in [4, 5, 7, 11]:
      augmenter = light_augmenter
    else:
      augmenter = heavy_augmenter
    aug_sequence = np.empty(X[i].shape)
    aug_sequence = augmenter.augment(X[i])
    X_aug = append_sequence(X_aug, aug_sequence)

  X_aug = np.array(X_aug)

  return X_aug


In [None]:
#@title Sliding Windows with augmentation and oversampling

def permute(X, y):
    permutation = np.random.permutation(X.shape[0])
    X_permuted = X[permutation]
    y_permuted = y[permutation]

    return X_permuted, y_permuted


def append_sequence(X, seq):
  if len(X)==0:
    X_new = seq
  else:
    X_new = np.r_[X, seq]

  return X_new

import tsaug
from tsaug import AddNoise, Convolve, Crop, Drift, Pool, Quantize, Resize, Reverse, TimeWarp

heavy_augmenter = (
                tsaug.AddNoise(scale=(0.5, 0.8)) @ 0.1
                # + tsaug.Convolve(window="hamming", size=12) @ 0.2
                # + tsaug.Drift(max_drift=(0, 0.02)) @ 0.2
                + tsaug.TimeWarp(50) @ 0.3
                + tsaug.Pool(size=4) @ 0.3
              )
light_augmenter = (
                tsaug.AddNoise(scale=(0.05, 0.2)) @ 0.5
                # + tsaug.Convolve(window="hamming", size=12) @ 0.2
                # + tsaug.Drift(max_drift=(0, 0.02)) @ 0.2
                + tsaug.TimeWarp(2) @ 0.3
                + tsaug.Pool(size=4) @ 0.3
              )

def perform_sliding_window(X, y, kfold=False, stride=1, oversample=False, augment=False):

    y_numerical = to_numerical(y)

    num_of_copies = oversample_classes(y_numerical)

    # unwrapping sequences to a single long sequence
    sliding_windows = []
    y_of_windows = []
    X_test_global = []
    y_test_global = []

    if not kfold:
        X_val_global = []
        y_val_global = []

    for i in range(nclasses):
        print("windowing and oversampling class %d ..." %i)

        X_train, X_test, y_train, y_test = train_test_split(X[to_numerical(y) == i], y[to_numerical(y) == i], test_size=test_percentage, shuffle=True, stratify=to_numerical(y[to_numerical(y) == i]), random_state=seed)

        # for each class, we keep 10% for val, while the rest is used to create sliding windows
        if not kfold:
            X_train, X_val, y_train, y_val = train_test_split(X_train[to_numerical(y_train) == i], y_train[to_numerical(y_train) == i], test_size=test_percentage, shuffle=True, stratify=to_numerical(y_train[to_numerical(y_train) == i]), random_state=seed)

            # holes contains the indices of all the "holes" introduced to the sequence by Splitting
            # (needed to avoid making sliding windows that "jump over separated zones of the sequences")
            X_train, y_train, holes = sort_back(X_train, y_train, X, y)

            X_val_global = append_sequence(X_val_global, X_val)
            y_val_global = append_sequence(y_val_global, y_val)
        
        X_test_global = append_sequence(X_test_global, X_test)
        y_test_global = append_sequence(y_test_global, y_test)

        class_unwrapped = []
        for seq in X_train:
            class_unwrapped = append_sequence(class_unwrapped, seq)
            
        class_unwrapped = np.array(class_unwrapped)

        if i in [4, 5, 7, 11]:
          augmenter = light_augmenter
        else:
          augmenter = heavy_augmenter

        # perform sliding windows (skipping holes)
        for h in range(holes.shape[0]-1):
          for j in range(holes[h]*36, int((holes[h+1]*36-36) / stride)):
            
            if augment and not oversample:
              window = augmenter.augment(class_unwrapped[stride*j:stride*j+36, :])
            else:
              window = class_unwrapped[stride*j:stride*j+36, :]
            sliding_windows.append(window)
            y_of_windows.append(i)

            if oversample:
              for _ in range(num_of_copies[i]-1):
                  if augment:
                    window = augmenter.augment(class_unwrapped[stride*j:stride*j+36, :])
                  else:
                    window = class_unwrapped[stride*j:stride*j+36, :]
                  sliding_windows.append(window)
                  y_of_windows.append(i)
                

    # re-formatting labels to OneHot to make them ready-to-use
    label_as_binary = LabelBinarizer()
    y_of_windows = label_as_binary.fit_transform(y_of_windows)


    X_test_global, y_test_global = permute(np.array(X_test_global), np.array(y_test_global))
    X_train_global, y_train_global = permute(np.array(sliding_windows), np.array(y_of_windows))

    if not kfold:
        X_val_global, y_val_global = permute(np.array(X_val_global), np.array(y_val_global))
        return X_train_global, y_train_global, X_val_global, y_val_global, X_test_global, y_test_global
    
    else:
        return X_train_global, y_train_global, X_test_global, y_test_global


    

In [None]:
#@title Scaling function (manual RobustScaler)

def scale(x):
  median_list = []
  iq_list = []

  x_s = []
  x_norm = x.copy()

  for m in range(x.shape[2]):
      x_reshaped = np.reshape(x[:,:,m], (x.shape[0]*x.shape[1]))
      median = np.median(x_reshaped)
      median_list.append(median)

      perc75 = np.percentile(x_reshaped, 75)
      perc25 = np.percentile(x_reshaped, 25)

      iq = perc75 - perc25
      iq_list.append(iq)

      x_s.append((x_reshaped - median) / iq)
      x_norm[:, :, m] = (x[:,:,m] - median) / iq
        
  return x_norm, median_list, iq_list

In [None]:
#@title Train/val/test split

X = np.load("training_dataset_homework2/x_train.npy")
y = np.load("training_dataset_homework2/y_train.npy")

# robust scaling and saving medians and inter-quantiles
# X, y = undersample(X, y, [3,9])

X, median, iq = scale(X)
np.save('median', np.array(median))
np.save('iq', np.array(iq))

y_numerical = y.copy()

# putting labels from Categorical to One-Hot
label_as_binary = LabelBinarizer()
y = label_as_binary.fit_transform(y)

print("Before windowing and oversampling: ")
get_class_weights(to_numerical(y))

X_train, y_train, X_val, y_val, X_test, y_test = perform_sliding_window(X, y, stride=1, oversample=False, augment=True)

print("After windowing and oversampling: ")
get_class_weights(to_numerical(y_train))

y_numerical = to_numerical(y_train)

X_train.shape, y_train.shape, X_val.shape, y_val.shape

# **Models:**
*  Vanilla LSTM
*  Bidirectional LSTM
*  Custom 1DConv model using LayerNormalization
*  Transformers
*  ResNet

In [34]:
#@title **Transformer Model**

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Attention and Normalization
    x = tfkl.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(inputs, inputs)
    x = tfkl.Dropout(dropout)(x)
    x = tfkl.LayerNormalization(epsilon=1e-6)(x)
    res = x + inputs

    # Feed Forward Part
    x = tfkl.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
    x = tfkl.Dropout(dropout)(x)
    x = tfkl.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    x = tfkl.LayerNormalization(epsilon=1e-6)(x)
    return x + res

def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0.0,
    mlp_dropout=0.0,
):
    inputs = tfk.Input(shape=input_shape)
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = tfkl.GlobalAveragePooling1D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = tfkl.Dense(dim, activation="relu")(x)
        x = tfkl.Dropout(mlp_dropout)(x)
    outputs = tfkl.Dense(nclasses, activation="softmax")(x)
    return tfk.Model(inputs, outputs)


In [None]:
#@title BiLSTM Model
def build_BiLSTM(input_shape, classes):
    # Build the neural network layer by layer
    input_layer = tfkl.Input(shape=input_shape, name='Input')

    # Feature extractor
    bilstm = tfkl.Bidirectional(tfkl.LSTM(92, return_sequences=True))(input_layer)
    bilstm = tfkl.Bidirectional(tfkl.LSTM(92))(bilstm)
    dropout = tfkl.Dropout(0.5, seed=seed)(bilstm)

    # Classifier
    # classifier = tfkl.Dense(60, activation='relu')(dropout)
    # classifier = tfkl.Dropout(0.3)(classifier)
    classifier = tfkl.Dense(128, activation='relu')(dropout)
    classifier = tfkl.Dropout(0.5)(classifier)
    output_layer = tfkl.Dense(classes, activation='softmax')(classifier)

    # Connect input and output through the Model class
    model = tfk.Model(inputs=input_layer, outputs=output_layer, name='BiLSTM')

    # Compile the model
    model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(), metrics='accuracy')

    # Return the model
    return model

In [None]:
#@title Vanilla LSTM Model

def build_LSTM(input_shape, classes):
    # Build the neural network layer by layer
    input_layer = tfkl.Input(shape=input_shape, name='Input')

    # Feature extractor
    lstm = tfkl.LSTM(256, return_sequences=True)(input_layer)
    lstm = tfkl.LSTM(256)(lstm)
    dropout = tfkl.Dropout(.5, seed=seed)(lstm)

    # Classifier
    classifier = tfkl.Dense(128, activation='relu')(dropout)
    output_layer = tfkl.Dense(classes, activation='softmax')(classifier)

    # Connect input and output through the Model class
    model = tfk.Model(inputs=input_layer, outputs=output_layer, name='VanillaLSTM')

    # Compile the model
    model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(), metrics='accuracy')

    # Return the model
    return model

In [None]:
#@title ResNet Model

def build_ResNet(input_shape, nb_classes):
        n_feature_maps = 16

        keras = tfk

        input_layer = keras.layers.Input(input_shape)
        # BLOCK 1

        conv_x = keras.layers.Conv1D(filters=n_feature_maps, kernel_size=8, padding='same')(input_layer)
        conv_x = keras.layers.BatchNormalization()(conv_x)
        conv_x = keras.layers.Activation('relu')(conv_x)

        conv_y = keras.layers.Conv1D(filters=n_feature_maps, kernel_size=5, padding='same')(conv_x)
        conv_y = keras.layers.BatchNormalization()(conv_y)
        conv_y = keras.layers.Activation('relu')(conv_y)

        conv_z = keras.layers.Conv1D(filters=n_feature_maps, kernel_size=3, padding='same')(conv_y)
        conv_z = keras.layers.BatchNormalization()(conv_z)

        # expand channels for the sum
        shortcut_y = keras.layers.Conv1D(filters=n_feature_maps, kernel_size=1, padding='same')(input_layer)
        shortcut_y = keras.layers.BatchNormalization()(shortcut_y)

        output_block_1 = keras.layers.add([shortcut_y, conv_z])
        output_block_1 = keras.layers.Activation('relu')(output_block_1)

        # BLOCK 2

        conv_x = keras.layers.Conv1D(filters=n_feature_maps * 2, kernel_size=8, padding='same')(output_block_1)
        conv_x = keras.layers.BatchNormalization()(conv_x)
        conv_x = keras.layers.Activation('relu')(conv_x)

        conv_y = keras.layers.Conv1D(filters=n_feature_maps * 2, kernel_size=5, padding='same')(conv_x)
        conv_y = keras.layers.BatchNormalization()(conv_y)
        conv_y = keras.layers.Activation('relu')(conv_y)

        conv_z = keras.layers.Conv1D(filters=n_feature_maps * 2, kernel_size=3, padding='same')(conv_y)
        conv_z = keras.layers.BatchNormalization()(conv_z)

        # expand channels for the sum
        shortcut_y = keras.layers.Conv1D(filters=n_feature_maps * 2, kernel_size=1, padding='same')(output_block_1)
        shortcut_y = keras.layers.BatchNormalization()(shortcut_y)

        output_block_2 = keras.layers.add([shortcut_y, conv_z])
        output_block_2 = keras.layers.Activation('relu')(output_block_2)

        # BLOCK 3
        
        conv_x = keras.layers.Conv1D(filters=n_feature_maps * 2, kernel_size=8, padding='same')(output_block_2)
        conv_x = keras.layers.BatchNormalization()(conv_x)
        conv_x = keras.layers.Activation('relu')(conv_x)

        conv_y = keras.layers.Conv1D(filters=n_feature_maps * 2, kernel_size=5, padding='same')(conv_x)
        conv_y = keras.layers.BatchNormalization()(conv_y)
        conv_y = keras.layers.Activation('relu')(conv_y)

        conv_z = keras.layers.Conv1D(filters=n_feature_maps * 2, kernel_size=3, padding='same')(conv_y)
        conv_z = keras.layers.BatchNormalization()(conv_z)

        # no need to expand channels because they are equal
        shortcut_y = keras.layers.BatchNormalization()(output_block_2)

        output_block_3 = keras.layers.add([shortcut_y, conv_z])
        output_block_3 = keras.layers.Activation('relu')(output_block_3)
        

        # FINAL

        gap_layer = keras.layers.GlobalAveragePooling1D()(output_block_3)

        output_layer = keras.layers.Dense(nb_classes, activation='softmax')(gap_layer)

        model = keras.models.Model(inputs=input_layer, outputs=output_layer)

        model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=keras.optimizers.Adam(1e-4),
                      metrics=['accuracy'])

        return model

In [None]:
#@title 1D Convolution Model

def build_1DCNN(input_shape, classes):
    # Build the neural network layer by layer
    input_layer = tfkl.Input(shape=input_shape, name='Input')

    # Feature extractor
    cnn = tfkl.Conv1D(256,3,padding='same',activation='relu')(input_layer)
    cnn = tfkl.LayerNormalization()(cnn)
    cnn = tfkl.MaxPooling1D()(cnn)
    cnn = tfkl.Conv1D(256,3,padding='same',activation='relu')(cnn)
    gap = tfkl.GlobalAveragePooling1D()(cnn)
    dropout = tfkl.Dropout(.6, seed=seed)(gap)

    # Classifier
    classifier = tfkl.Dense(128, activation='relu')(dropout)
    classifier = tfkl.Dropout(.5)(classifier)
    output_layer = tfkl.Dense(classes, activation='softmax')(classifier)

    # Connect input and output through the Model class
    model = tfk.Model(inputs=input_layer, outputs=output_layer, name='1DCNN')

    # Compile the model
    model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(), metrics='accuracy')

    # Return the model
    return model

# Build and compile of models

In [40]:
#@title Build and compile of the Transformer model

input_shape = X_train.shape[1:] # shape of the sequence

model = build_model(
    input_shape,
    head_size=256,
    num_heads=16,
    ff_dim=16,
    num_transformer_blocks=4,
    mlp_units=[128],
    mlp_dropout=0.25,
    dropout=0.4,
)

model.compile(
    loss=tfk.losses.CategoricalCrossentropy(),
    optimizer=tfk.optimizers.Adam(learning_rate=1e-4),
    metrics=["accuracy"],
)
 
# model.summary()

Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_5 (InputLayer)           [(None, 36, 6)]      0           []                               
                                                                                                  
 multi_head_attention_8 (MultiH  (None, 36, 6)       110598      ['input_5[0][0]',                
 eadAttention)                                                    'input_5[0][0]']                
                                                                                                  
 dropout_28 (Dropout)           (None, 36, 6)        0           ['multi_head_attention_8[0][0]'] 
                                                                                                  
 layer_normalization_18 (LayerN  (None, 36, 6)       12          ['dropout_28[0][0]']       

In [36]:
#@title Build and compile of 1D Conv model

input_shape = X_train.shape[1:]

conv_model = build_1DCNN(input_shape, nclasses)
conv_model.summary()

Model: "1DCNN"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 Input (InputLayer)          [(None, 36, 6)]           0         
                                                                 
 conv1d_29 (Conv1D)          (None, 36, 256)           4864      
                                                                 
 layer_normalization_17 (Lay  (None, 36, 256)          512       
 erNormalization)                                                
                                                                 
 max_pooling1d_1 (MaxPooling  (None, 18, 256)          0         
 1D)                                                             
                                                                 
 conv1d_30 (Conv1D)          (None, 18, 256)           196864    
                                                                 
 global_average_pooling1d_4   (None, 256)              0     

In [37]:
#@title Build and compile of ResNet model

input_shape = X_train.shape[1:] # shape of the build_model(input_shape, nclasses)

resnet_model = build_ResNet(input_shape, nclasses)
resnet_model.summary()

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, 36, 6)]      0           []                               
                                                                                                  
 conv1d_31 (Conv1D)             (None, 36, 16)       784         ['input_4[0][0]']                
                                                                                                  
 batch_normalization_12 (BatchN  (None, 36, 16)      64          ['conv1d_31[0][0]']              
 ormalization)                                                                                    
                                                                                                  
 activation_9 (Activation)      (None, 36, 16)       0           ['batch_normalization_12[0]

In [38]:
#@title Build and compile of LSTM model

input_shape = X_train.shape[1:] # shape of the build_model(input_shape, nclasses)

lstm_model = build_LSTM(input_shape, nclasses)
lstm_model.summary()

Model: "VanillaLSTM"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 Input (InputLayer)          [(None, 36, 6)]           0         
                                                                 
 lstm_4 (LSTM)               (None, 36, 256)           269312    
                                                                 
 lstm_5 (LSTM)               (None, 256)               525312    
                                                                 
 dropout_25 (Dropout)        (None, 256)               0         
                                                                 
 dense_14 (Dense)            (None, 128)               32896     
                                                                 
 dense_15 (Dense)            (None, 12)                1548      
                                                                 
Total params: 829,068
Trainable params: 829,068
Non-tra

In [39]:
#@title Build and compile of BiLSTM model

input_shape = X_train.shape[1:]

bilstm_model = build_BiLSTM(input_shape, 4)
bilstm_model.summary()

Model: "BiLSTM"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 Input (InputLayer)          [(None, 36, 6)]           0         
                                                                 
 bidirectional_2 (Bidirectio  (None, 36, 184)          72864     
 nal)                                                            
                                                                 
 bidirectional_3 (Bidirectio  (None, 184)              203872    
 nal)                                                            
                                                                 
 dropout_26 (Dropout)        (None, 184)               0         
                                                                 
 dense_16 (Dense)            (None, 128)               23680     
                                                                 
 dropout_27 (Dropout)        (None, 128)               0    

# Training of models

In [None]:
#@title Training of the model
callbacks, model_folder_dir = create_folders_and_callbacks(model_name='1DConv')

history = conv_model.fit(
    x=X_train,
    y=y_train,
    validation_data=(X_val, y_val),
    epochs=200,
    batch_size=128,
    callbacks=[
        callbacks,
        tfk.callbacks.ReduceLROnPlateau(monitor='val_accuracy', mode='max', patience=5, factor=0.4, min_lr=1e-5)
    ]
)


In [None]:
#@title Training model and validating using Stratified KFold

from sklearn.model_selection import StratifiedKFold

callbacks, model_folder_dir = create_folders_and_callbacks(model_name='1DConv')

# K-Fold cross validation
k = 10
skf = StratifiedKFold(n_splits=k)
validation_scores = []

# store initial model's weights
weights_init = model.get_weights()

i=0

# train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_percentage, shuffle=True, random_state=seed, stratify=to_numerical(y))


for train_index, test_index in skf.split(X_train, to_numerical(y_train)):
    i += 1
    training_data = X_train[train_index]
    training_label = y_train[train_index]
    validation_data = X_train[test_index]
    validation_label = y_train[test_index]

    print("K = " + str(i))

    # reset mdoel's weights
    conv_model = build_1DCNN(input_shape, nclasses)
    # fit
    conv_model.fit(
        training_data, 
        training_label, 
        validation_data = (validation_data, validation_label),
        epochs=100, 
        batch_size=64,
        callbacks=[
          callbacks,
          tfk.callbacks.ReduceLROnPlateau(monitor='val_accuracy', mode='max', patience=5, factor=0.5, min_lr=1e-5)
        ]
    )

    validation_score = model.evaluate(validation_data, validation_label)[1]
    validation_scores.append(validation_score)


validation_score = np.average(validation_scores)
print(validation_score) 

# Ensemble

In [None]:
#@title Training of the 4 classes classifier
base_augmenter = (
                tsaug.AddNoise(scale=(0.5, 0.8)) @ 0.1
                + tsaug.TimeWarp(50) @ 0.3
                + tsaug.Pool(size=10) @ 0.3
              )

callbacks, model_folder_dir = create_folders_and_callbacks(model_name='1DConv')

# Selecting only samples from classes 4,5,7,9
X_train_4579 = X_train[np.in1d(to_numerical(y_train), [4,5,7,9])]
y_train_4579 = y_train[np.in1d(to_numerical(y_train), [4,5,7,9])]
X_val_4579 = X_val[np.in1d(to_numerical(y_val), [4,5,7,9])]
y_val_4579 = y_val[np.in1d(to_numerical(y_val), [4,5,7,9])]
X_test_4579 = X_test[np.in1d(to_numerical(y_test), [4,5,7,9])]
y_test_4579 = y_test[np.in1d(to_numerical(y_test), [4,5,7,9])]

y_train_4579 = to_numerical(y_train_4579)
label_as_binary = LabelBinarizer()
y_train_4579 = label_as_binary.fit_transform(y_train_4579)

y_val_4579 = to_numerical(y_val_4579)
label_as_binary = LabelBinarizer()
y_val_4579 = label_as_binary.fit_transform(y_val_4579)

y_test_4579 = to_numerical(y_test_4579)
label_as_binary = LabelBinarizer()
y_test_4579 = label_as_binary.fit_transform(y_test_4579)

# Oversampling and augmentation
X_train_4579, y_train_4579 = oversample(X_train_4579, y_train_4579)
X_train_4579 = base_augmenter.augment(X_train_4579)

history = bilstm_model.fit(
    x=X_train_4579,
    y=y_train_4579,
    validation_data=(X_val_4579, y_val_4579),
    epochs=200,
    batch_size=128,
    callbacks=[
        callbacks,
        tfk.callbacks.ReduceLROnPlateau(monitor='val_accuracy', mode='max', patience=5, factor=0.4, min_lr=1e-5)
    ]
)


In [None]:
#@title Prediction using Ensemble
predictions = conv_model.predict(X_test)
bilstm_predictions = bilstm_model.predict(X_test)

ensembled_predictions = np.ndarray(shape=predictions.shape, dtype=float)

for i in range(predictions.shape[0]):
    predicted_class = np.argmax(predictions[i])
    if predicted_class == 9:
        bilstm_prediction = bilstm_predictions[i]
        complete_bilstm_prediction = [0, 0, 0, 0, bilstm_prediction[0], bilstm_prediction[1], 0, bilstm_prediction[2], 0, bilstm_prediction[3], 0, 0]

        ensembled_predictions[i] = complete_bilstm_prediction
    else:
        ensembled_predictions[i] = predictions[i]
        
predictions = ensembled_predictions

# Confusion matrix and training trend (with TensorBoard)

In [None]:
conv_model.evaluate(X_test, y_test, verbose=1)

In [None]:
# Map activities to integers
label_mapping = {
    0: "Wish",
    1: "Another",
    2: "Comfortably",
    3: "Money",
    4: "Breathe",
    5: "Time",
    6: "Brain",
    7: "Echoes",
    8: "Wearing",
    9: "Sorrow",
    10: "Hey",
    11: "Shine"
}

In [None]:
# Predict the test
predictions = conv_model.predict(X_test)

In [None]:
# Compute the confusion matrix
cm = confusion_matrix(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), normalize="true")

# Compute the classification metrics
accuracy = accuracy_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1))
precision = precision_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), average='macro')
recall = recall_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), average='macro')
f1 = f1_score(np.argmax(y_test, axis=-1), np.argmax(predictions, axis=-1), average='macro')
print('Accuracy:',accuracy.round(4))
print('Precision:',precision.round(4))
print('Recall:',recall.round(4))
print('F1:',f1.round(4))

# Plot the confusion matrix
plt.figure(figsize=(10,8))
sns.heatmap(cm.T, cmap='Blues', xticklabels=list(label_mapping.keys()), yticklabels=list(label_mapping.keys()), annot=True)
plt.xlabel('True labels')
plt.ylabel('Predicted labels')
plt.show()

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

import tensorflow as tf
import datetime

%tensorboard --logdir trained_models/1DConv_12-17_14-15-11/tb_logs

import IPython

# Save models

In [None]:
evaluation_dict = conv_model.evaluate(X_test, y_test, verbose=1, return_dict=True)

In [None]:
conv_model.save('saved_models/' + conv_model.name + '_' + "{:.4f}".format(evaluation_dict['accuracy']))