# Homework 3

In this homework, you will test various time series classification methods. You must choose **three** datasets from [the UCR/UEA time series repository](http://timeseriesclassification.com) and perform the tasks by evaluating the models on three selected datasets.

Note that the questions are not 100% the same as the lab tasks. Please carefully read all descriptions. Compared to previous homework, there is no fixed answer, and we will evaluate your assignment based on your trials rather than the results. When the description does not specify or restrict things during your implementation process, you can choose your way freely. We will always grade based on the written description on each task only.


### Task 0: Preparation

You need to choose **three** datasets from the UCR/UEA time series repository. Please be careful since some UCR datasets can take a long time to be processed - You do not need to choose heavy datasets since it would slow your training/testing process. Use sktime's `load_UCR_UEA_dataset` function to perform. Please note that **you should use each dataset's original train/test splits** to train and report the test scores.

In [None]:
!pip install sktime -q
!pip install sktime[all_extras] -q
!pip install sktime-dl -q 

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.9/16.9 MB[0m [31m66.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.2/115.2 kB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m100.0/100.0 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m160.4/160.4 kB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m228.9/228.9 kB[0m [31m18.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.0/178.0 kB[0m [31m16.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py

In [None]:
from sktime.datasets import load_UCR_UEA_dataset
from sktime.datasets import load_italy_power_demand
from sktime.utils.slope_and_trend import _slope

In [None]:
ECG200_Train = load_UCR_UEA_dataset('ECG200', split='train', return_type='numpy2D')
ECG200_Test = load_UCR_UEA_dataset('ECG200', split='test', return_type='numpy2D')

Earthquake_Train = load_UCR_UEA_dataset('Earthquakes', split='train', return_type='numpy2D')
Earthquake_Test = load_UCR_UEA_dataset('Earthquakes', split='test', return_type='numpy2D')


GunPoint_Train = load_UCR_UEA_dataset('GunPoint', split='train', return_type='numpy2D')
GunPoint_Test = load_UCR_UEA_dataset('GunPoint', split='test', return_type='numpy2D')


In [None]:
GunPoint_Train = GunPoint_Train[0], GunPoint_Train[1].astype(int) - 1
GunPoint_Test = GunPoint_Test[0], GunPoint_Test[1].astype(int) - 1

### Task 1: Time series classification using deep learning 1



Time series classification problems can be solved using networks like CNN, RNN, or FCN. You can even try to merge different networks. In this task, you must test your three chosen datasets on four other models.

Try to implement four different models: 1) Fully connected network, 2) **[Bi-directional RNN](https://keras.io/api/layers/recurrent_layers/bidirectional/)**, 3) 1D-CNN only, and 4) 1D-CNN + Any RNN variant. Note that you can always link the network to a fully connected layer to match the output size. You can freely construct any structure you want. Report the average test scores of four models on three datasets you chose. It would be four scores in total. Mark the best model in terms of the average test score. Briefly explain the structure you constructed. There is no definitive answer, and it is up to your own model. Here it would be best if you keep the following rules:

- When initially loading the dataset, use sktime's `load_UCR_UEA_dataset` function. This is for our grading purpose.
- You should use at least **two** Tensorflow callbacks when you fit your model. These can be built-in ones or your personalized callback.
- You should use Tensorflow's data API (`tf.data`) to manage your dataset and use `shuffle`, `batch,` and `prefetch` functions. This means that you need to convert the data format using the `from_tensor_slices` function. This also means that you need to create your own validation set. You are not limited to using any methods to do this, but you may also need to shuffle the dataset before (for that, you can use `np.random.permutation`).
- You need to clearly report the **test accuracy** of the four models. Training and validation accuracy scores are not enough.
- You may need to deal with datasets of different sizes. In this case, it might be helpful to make a function to create a model that can receive different input sizes as a parameter.


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, Conv1D, MaxPooling1D, LSTM, Bidirectional, Flatten, GRU
from tensorflow.keras.regularizers import l2
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Accuracy
from sklearn.model_selection import train_test_split

import numpy as np
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import accuracy_score
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
RANDOM_STATE = 12345

In [None]:
# Set random seeds for reproducibility
import random
seed_value = 42
tf.random.set_seed(seed_value)
np.random.seed(seed_value)
random.seed(seed_value)

In [None]:
Ecg_train, Ecg_val, Ecg_ytest, Ecg_valtest = train_test_split(ECG200_Train[0],ECG200_Train[1], test_size=0.1,random_state=seed_value)

In [None]:
Eq_train, Eq_val, Eq_ytest, Eq_valtest = train_test_split(Earthquake_Train[0],Earthquake_Train[1], test_size=0.1,random_state=seed_value)

In [None]:
Gp_train, Gp_val, Gp_ytest, Gp_valtest = train_test_split(GunPoint_Train[0],GunPoint_Train[1], test_size=0.1,random_state=seed_value)

In [None]:
def preprocess_data(X, y):
    # Convert to TensorFlow dataset
    dataset = tf.data.Dataset.from_tensor_slices((X, y))
    # Shuffle and batch the data
    dataset = dataset.shuffle(buffer_size=len(X))
    dataset = dataset.batch(batch_size=16)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    return dataset


def load_data(X_train, y_train, X_test=None, y_test=None, val=True):
  y_train_binary = to_categorical(y_train)
  train_dataset = preprocess_data(X_train, y_train_binary)
  if val:
    y_test_binary = to_categorical(y_test)
    val_dataset = preprocess_data(X_test, y_test_binary)
    return train_dataset, val_dataset, X_train
  return train_dataset





In [None]:
def load_data_rnn(X_train, y_train, X_test=None, y_test=None, timestep=1, val=True):
  X_train = X_train.reshape(X_train.shape[0],X_train.shape[1],timestep)
  y_train_binary = to_categorical(y_train)
  train_dataset = preprocess_data(X_train, y_train_binary)
  if val:
    X_test = X_test.reshape(X_test.shape[0],X_test.shape[1],timestep)
    y_test_binary = to_categorical(y_test)
    val_dataset = preprocess_data(X_test, y_test_binary)
    return train_dataset, val_dataset, X_train
  return train_dataset

#Fully Connected Network

In [None]:
def create_fc_model(input_shape, num_classes):
    inputs = Input(shape=input_shape)
    x = Flatten()(inputs)
    x = Dense(64, activation='relu')(x)
    x = Dense(32, activation='relu')(x)
    x = Dense(16, activation='relu')(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Define the callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min', restore_best_weights=True)
model_checkpoint = ModelCheckpoint('FCN.h5', monitor='val_accuracy', mode='max', save_best_only=True, save_weights_only=False)

# Create the model
def train_model(train_dataset, val_dataset, test_dataset, X_train, num_classes, epochs, lr=0.01):
  model = create_fc_model(input_shape=X_train.shape[1:], num_classes=num_classes)
  optimizer = Adam(learning_rate=lr)
  model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  history = model.fit(train_dataset, epochs=epochs, validation_data=val_dataset, callbacks=[early_stopping, model_checkpoint])
  test_loss, test_acc = model.evaluate(test_dataset)
  print('Test accuracy:', test_acc)
  return test_loss, test_acc


In [None]:
train_dataset, val_dataset, inputs = load_data(Ecg_train, Ecg_ytest, Ecg_val, Ecg_valtest, val=True)
test_dataset = load_data(ECG200_Test[0],ECG200_Test[1],val=False)

ECG_loss, ECG_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Test accuracy: 1.0


In [None]:
train_dataset, val_dataset, inputs = load_data(Eq_train, Eq_ytest, Eq_val, Eq_valtest, val=True)
test_dataset = load_data(Earthquake_Test[0],Earthquake_Test[1],val=False)

Eq_loss, Eq_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Test accuracy: 0.7338129281997681


In [None]:
train_dataset, val_dataset, inputs = load_data(Gp_train, Gp_ytest, Gp_val, Gp_valtest, val=True)
test_dataset = load_data(GunPoint_Test[0],GunPoint_Test[1],val=False)

Gp_loss, Gp_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Test accuracy: 0.753333330154419


In [None]:
Avg_test_acc = np.mean([ECG_acc, Eq_acc, Gp_acc])
Avg_test_loss = np.mean([ECG_loss, Eq_loss, Gp_loss])
print(f"Average Test Accuracy of the Fully Connected Network:{round(Avg_test_acc,4)}")
print(f"Average Test Loss of the Fully Connected Network: {round(Avg_test_loss,4)}")

Average Test Accuracy of the Fully Connected Network:0.829
Average Test Loss of the Fully Connected Network: 0.3394


#Bi-Directional RNN

In [None]:
def create_bidirectional_rnn_model(input_shape, num_classes, num_units=64, dropout_rate=0.2):
    model = Sequential([
        Input(shape=input_shape),
        Bidirectional(GRU(num_units, dropout=dropout_rate, recurrent_dropout=dropout_rate, return_sequences=True)),
        Bidirectional(GRU(num_units, dropout=dropout_rate, recurrent_dropout=dropout_rate)),
        Dense(num_classes, activation='softmax')
    ])
    return model

# Define the callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min', restore_best_weights=True)
model_checkpoint = ModelCheckpoint('bidirectional_rnn_model.h5', monitor='val_accuracy', mode='max', save_best_only=True, save_weights_only=False)

# Create the model
def train_model(train_dataset, val_dataset, test_dataset, X_train, num_classes, epochs, lr=0.01):
  model = create_bidirectional_rnn_model(input_shape=X_train.shape[1:], num_classes=num_classes)
  optimizer = Adam(learning_rate=lr)
  model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  history = model.fit(train_dataset, epochs=epochs, validation_data=val_dataset, callbacks=[early_stopping, model_checkpoint])
  test_loss, test_acc = model.evaluate(test_dataset)
  print('Test accuracy:', test_acc)
  return test_loss, test_acc

In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Ecg_train, Ecg_ytest, Ecg_val, Ecg_valtest, val=True)
test_dataset = load_data_rnn(ECG200_Test[0],ECG200_Test[1],val=False)

ECG_loss, ECG_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Test accuracy: 1.0


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Eq_train, Eq_ytest, Eq_val, Eq_valtest, val=True)
test_dataset = load_data_rnn(Earthquake_Test[0],Earthquake_Test[1],val=False)

Eq_loss, Eq_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Test accuracy: 0.7769784331321716


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Gp_train, Gp_ytest, Gp_val, Gp_valtest, val=True)
test_dataset = load_data_rnn(GunPoint_Test[0],GunPoint_Test[1],val=False)

Gp_loss, Gp_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Test accuracy: 0.4866666793823242


In [None]:
Avg_test_acc = np.mean([ECG_acc, Eq_acc, Gp_acc])
Avg_test_loss = np.mean([ECG_loss, Eq_loss, Gp_loss])
print(f"Average Test Accuracy of the Bi-Directional RNN:{round(Avg_test_acc,4)}")
print(f"Average Test Loss of the Bi-Directional RNN: {round(Avg_test_loss,4)}")

Average Test Accuracy of the Bi-Directional RNN:0.7545
Average Test Loss of the Bi-Directional RNN: 0.4062


#1D-CNN Only

In [None]:
def create_1d_cnn_model(input_shape, num_classes, num_filters=64, kernel_size=5, pool_size=2, dropout_rate=0.2):
    model = Sequential([
        Input(shape=input_shape),
        Conv1D(num_filters, kernel_size, padding='same', kernel_regularizer=l2(0.01)),
        keras.layers.BatchNormalization(),
        keras.layers.Activation('relu'),
        keras.layers.MaxPooling1D(pool_size=pool_size),
        Conv1D(num_filters, kernel_size, padding='same', kernel_regularizer=l2(0.01)),
        keras.layers.BatchNormalization(),
        keras.layers.Activation('relu'),
        keras.layers.MaxPooling1D(pool_size=pool_size),
        Flatten(),
        Dropout(dropout_rate),
        Dense(num_classes, activation='softmax')
    ])
    return model

# Define the callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min', restore_best_weights=True)
model_checkpoint = ModelCheckpoint('1d_cnn_model.h5', monitor='val_accuracy', mode='max', save_best_only=True, save_weights_only=False)

# Create the model
def train_model(train_dataset, val_dataset, test_dataset, X_train, num_classes, epochs, lr=0.01):
  model = create_1d_cnn_model(input_shape=X_train.shape[1:], num_classes=num_classes)
  optimizer = Adam(learning_rate=lr)
  model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  history = model.fit(train_dataset, epochs=epochs, validation_data=val_dataset, callbacks=[early_stopping, model_checkpoint])
  test_loss, test_acc = model.evaluate(test_dataset)
  print('Test accuracy:', test_acc)
  return test_loss, test_acc

In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Ecg_train, Ecg_ytest, Ecg_val, Ecg_valtest, val=True)
test_dataset = load_data_rnn(ECG200_Test[0],ECG200_Test[1],val=False)

ECG_loss, ECG_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Eq_train, Eq_ytest, Eq_val, Eq_valtest, val=True)
test_dataset = load_data_rnn(Earthquake_Test[0],Earthquake_Test[1],val=False)

Eq_loss, Eq_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Test accuracy: 0.6402877569198608


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Gp_train, Gp_ytest, Gp_val, Gp_valtest, val=True)
test_dataset = load_data_rnn(GunPoint_Test[0],GunPoint_Test[1],val=False)

Gp_loss, Gp_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Test accuracy: 0.7266666889190674


In [None]:
Avg_test_acc = np.mean([ECG_acc, Eq_acc, Gp_acc])
Avg_test_loss = np.mean([ECG_loss, Eq_loss, Gp_loss])
print(f"Average Test Accuracy of the 1D-CNN Model:{round(Avg_test_acc,4)}")
print(f"Average Test Loss of the 1D-CNN Model: {round(Avg_test_loss,4)}")

Average Test Accuracy of the 1D-CNN Model:0.789
Average Test Loss of the 1D-CNN Model: 0.9704


#1D-CNN + Any RNN variant.

In [None]:
def create_1d_cnn_lstm_model(input_shape, num_classes, num_filters=64, kernel_size=5, pool_size=2, dropout_rate=0.2):
    model = Sequential([
        Input(shape=input_shape),
        Conv1D(num_filters, kernel_size, padding='same', kernel_regularizer=l2(0.01)),
        keras.layers.BatchNormalization(),
        keras.layers.Activation('relu'),
        keras.layers.MaxPooling1D(pool_size=pool_size),
        LSTM(num_filters, dropout=dropout_rate, recurrent_dropout=dropout_rate),
        Dropout(dropout_rate),
        Dense(num_classes, activation='softmax')
    ])
    return model

# Define the callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min', restore_best_weights=True)
model_checkpoint = ModelCheckpoint('1d_cnn_lstm_model.h5', monitor='val_accuracy', mode='max', save_best_only=True, save_weights_only=False)

# Create the model
def train_model(train_dataset, val_dataset, test_dataset, X_train, num_classes, epochs, lr=0.01):
  model = create_1d_cnn_lstm_model(input_shape=X_train.shape[1:], num_classes=num_classes)
  optimizer = Adam(learning_rate=lr)
  model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  history = model.fit(train_dataset, epochs=epochs, validation_data=val_dataset, callbacks=[early_stopping, model_checkpoint])
  test_loss, test_acc = model.evaluate(test_dataset)
  print('Test accuracy:', test_acc)
  return test_loss, test_acc

In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Ecg_train, Ecg_ytest, Ecg_val, Ecg_valtest, val=True)
test_dataset = load_data_rnn(ECG200_Test[0],ECG200_Test[1],val=False)

ECG_loss, ECG_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Eq_train, Eq_ytest, Eq_val, Eq_valtest, val=True)
test_dataset = load_data_rnn(Earthquake_Test[0],Earthquake_Test[1],val=False)

Eq_loss, Eq_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Test accuracy: 0.7482014298439026


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Gp_train, Gp_ytest, Gp_val, Gp_valtest, val=True)
test_dataset = load_data_rnn(GunPoint_Test[0],GunPoint_Test[1],val=False)

Gp_loss, Gp_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Test accuracy: 0.4933333396911621


In [None]:
Avg_test_acc = np.mean([ECG_acc, Eq_acc, Gp_acc])
Avg_test_loss = np.mean([ECG_loss, Eq_loss, Gp_loss])
print(f"Average Test Accuracy of the 1D-CNN+RNN Variant Model:{round(Avg_test_acc,4)}")
print(f"Average Test Loss of the 1D-CNN+RNN Variant Model: {round(Avg_test_loss,4)}")

Average Test Accuracy of the 1D-CNN+RNN Variant Model:0.7472
Average Test Loss of the 1D-CNN+RNN Variant Model: 0.4317


### Task 2: Time series classification using deep learning 2

There has been several neural network models dedicated to time series classification. Besides your own models that you developed in Lab 4, now you will develop such dedicated models by referring to some papers, and test if they indeed perform better than your rough models. There are two famous papers as follows:
 - [Convolutional neural networks for time series classification (2017)](https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=7870510)
 - [Time Series Classification from Scratch with Deep Neural Networks: A Strong Baseline (2017)](https://arxiv.org/abs/1611.06455)

First paper's idea is already implemented in sktime, with the name `CNNClassifier`. Second paper has three models and those are easy to develop using tensorflow. Now the task is to develop two models (MLP and FCN) in the second paper and test it together with `CNNClassifier`.

Use the same four datasets, and test sktime's `CNNClassifier` and MLP and FCN models you develop. Report test scores of three models (`CNNClassifier`, MLP, and FCN) on three datasets you chose. It would be nine scores in total. For MLP and FCN, you may need to satisfy the following requirement:

- You should use at least **two** Tensorflow callbacks when you fit your model. These can be built-in ones or your personalized callback.
- You should run the model at least 10 epochs.
- You can use the same processed datasets in Task 1. For `CNNClassifier`, as you cannot use `tf.Data`, you may put the training set directly.
- For `CNNClassifier`, you can run it with the default parameters or reduce the number of epoch (default is 2000).
- Please use the predefined test dataset to report the test scores.

Note that the main purpose of this task is to check if you can develop a similar network structure with description. If the detail of the specific part (e.g., size of one layer or some custom parameters like epoch) is missing in the paper, you can set it on your own.
 

#Using SKtime's CNNClassifier

In [None]:
from sktime.classification.deep_learning.cnn import CNNClassifier
def classify(epochs, batch_size, verbose, Train, Test):
  cnn = CNNClassifier(n_epochs=epochs,batch_size=batch_size,verbose=verbose) 
  cnn.fit(Train[0], Train[1]) 
  preds = cnn.predict(Test[0])
  print(f'Test Accuracy Score:{accuracy_score(Test[1],preds)}')
  return accuracy_score(Test[1],preds)

In [None]:
ECG_ACC = classify(20,4,False,ECG200_Train, ECG200_Test)



Test Accuracy Score:0.8


In [None]:
EQ_ACC = classify(20,4,False,Earthquake_Train,Earthquake_Test)



Test Accuracy Score:0.7482014388489209


In [None]:
GP_ACC = classify(20,4,False,GunPoint_Train,GunPoint_Test)



Test Accuracy Score:0.74


In [None]:
Avg_test_acc = np.mean([ECG_ACC, EQ_ACC, GP_ACC])
print(f"Average Test Accuracy of the Sktime CNN Classifier:{round(Avg_test_acc,4)}")

Average Test Accuracy of the Sktime CNN Classifier:0.7627


#FCN Classifier

In [None]:
def FCN_Classifier(input_shape, num_classes):
  input_layer = Input(input_shape)
  conv1 = Conv1D(filters=128, kernel_size=8, padding='same')(input_layer)
  conv1 = keras.layers.BatchNormalization()(conv1)
  conv1 = keras.layers.Activation(activation='relu')(conv1)

  conv2 = Conv1D(filters=256, kernel_size=5, padding='same')(conv1)
  conv2 = keras.layers.BatchNormalization()(conv2)
  conv2 = keras.layers.Activation('relu')(conv2)

  conv3 = Conv1D(128, kernel_size=3,padding='same')(conv2)
  conv3 = keras.layers.BatchNormalization()(conv3)
  conv3 = keras.layers.Activation('relu')(conv3)
  
  gap_layer = keras.layers.GlobalAveragePooling1D()(conv3)
  output_layer = keras.layers.Dense(num_classes, activation='softmax')(gap_layer)

  model = Model(inputs=input_layer, outputs=output_layer)
  return model
 


In [None]:
# Define the callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min', restore_best_weights=True)
model_checkpoint = ModelCheckpoint('FCN_Classifier.h5', monitor='val_accuracy', mode='max', save_best_only=True, save_weights_only=False)

# Create the model
def train_model(train_dataset, val_dataset, test_dataset, X_train, num_classes, epochs, lr=0.01):
  model = FCN_Classifier(input_shape=X_train.shape[1:], num_classes=num_classes)
  optimizer = Adam(learning_rate=lr)
  model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  history = model.fit(train_dataset, epochs=epochs, validation_data=val_dataset, callbacks=[early_stopping, model_checkpoint])
  test_loss, test_acc = model.evaluate(test_dataset)
  print('Test accuracy:', test_acc)
  return test_loss, test_acc

In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Ecg_train, Ecg_ytest, Ecg_val, Ecg_valtest, val=True)
test_dataset = load_data_rnn(ECG200_Test[0],ECG200_Test[1],val=False)

ECG_loss, ECG_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Test accuracy: 1.0


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Eq_train, Eq_ytest, Eq_val, Eq_valtest, val=True)
test_dataset = load_data_rnn(Earthquake_Test[0],Earthquake_Test[1],val=False)

Eq_loss, Eq_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Test accuracy: 0.7482014298439026


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Gp_train, Gp_ytest, Gp_val, Gp_valtest, val=True)
test_dataset = load_data_rnn(GunPoint_Test[0],GunPoint_Test[1],val=False)

Gp_loss, Gp_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Test accuracy: 0.4933333396911621


In [None]:
Avg_test_acc = np.mean([ECG_acc, Eq_acc, Gp_acc])
Avg_test_loss = np.mean([ECG_loss, Eq_loss, Gp_loss])
print(f"Average Test Accuracy of the FCN Network:{round(Avg_test_acc,4)}")
print(f"Average Test Loss of the FCN Network: {round(Avg_test_loss,4)}")

Average Test Accuracy of the FCN Network:0.7472
Average Test Loss of the FCN Network: 1.3313


#MLP Classifier

In [None]:
def MLP_Classifier(input_shape, num_classes):
  input_layer = Input(input_shape)
  # flatten/reshape because when multivariate all should be on the same axis 
  input_layer_flattened = keras.layers.Flatten()(input_layer)
  layer_1 = Dropout(0.1)(input_layer_flattened)
  layer_1 = Dense(500, activation='relu')(layer_1)
  layer_2 = Dropout(0.2)(layer_1)
  layer_2 = Dense(500, activation='relu')(layer_2)

  layer_3 = Dropout(0.2)(layer_2)
  layer_3 = Dense(500, activation='relu')(layer_3)

  output_layer = Dropout(0.3)(layer_3)
  output_layer = Dense(num_classes, activation='softmax')(output_layer)

  model = Model(inputs=input_layer, outputs=output_layer)
  return model


In [None]:
# Define the callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min', restore_best_weights=True)
model_checkpoint = ModelCheckpoint('MLP_Classifier.h5', monitor='val_accuracy', mode='max', save_best_only=True, save_weights_only=False)

# Create the model
def train_model(train_dataset, val_dataset, test_dataset, X_train, num_classes, epochs, lr=0.01):
  model = MLP_Classifier(input_shape=X_train.shape[1:], num_classes=num_classes)
  optimizer = Adam(learning_rate=lr)
  model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  history = model.fit(train_dataset, epochs=epochs, validation_data=val_dataset, callbacks=[early_stopping, model_checkpoint])
  test_loss, test_acc = model.evaluate(test_dataset)
  print('Test accuracy:', test_acc)
  return test_loss, test_acc

In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Ecg_train, Ecg_ytest, Ecg_val, Ecg_valtest, val=True)
test_dataset = load_data_rnn(ECG200_Test[0],ECG200_Test[1],val=False)

ECG_loss, ECG_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Test accuracy: 1.0


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Eq_train, Eq_ytest, Eq_val, Eq_valtest, val=True)
test_dataset = load_data_rnn(Earthquake_Test[0],Earthquake_Test[1],val=False)

Eq_loss, Eq_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Test accuracy: 0.7482014298439026


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Gp_train, Gp_ytest, Gp_val, Gp_valtest, val=True)
test_dataset = load_data_rnn(GunPoint_Test[0],GunPoint_Test[1],val=False)

Gp_loss, Gp_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Test accuracy: 0.7133333086967468


In [None]:
Avg_test_acc = np.mean([ECG_acc, Eq_acc, Gp_acc])
Avg_test_loss = np.mean([ECG_loss, Eq_loss, Gp_loss])
print(f"Average Test Accuracy of the MLP Classifier:{round(Avg_test_acc,4)}")
print(f"Average Test Loss of the MLP Classifier: {round(Avg_test_loss,4)}")

Average Test Accuracy of the MLP Classifier:0.8205
Average Test Loss of the MLP Classifier: 0.3741


### Task 3: Time series classification using deep learning 3

Next, you can try to further improve your model by selecting one of the following ideas (or you can suggest your own ideas that are not trivial):
- Use residual connection to your model (= implement **ResNet** in [this](https://arxiv.org/abs/1611.06455) paper). 
- Add transformer/attention layer to your model (you can choose any model from Tasks 1 and 2).
- Use RNN and CNN networks separately, create two to three layers individually, and concatenate them. This means that until the third (or second) layer, you have two different networks handling the same dataset, and after that, you concatenate the output and finish with any FCN layer. Check [this post](https://stackoverflow.com/questions/59168306/how-to-combine-lstm-and-cnn-in-timeseries-classification) to get inspired.
- Apply any sktime's transformer (not attention transformer) first to the dataset and run any deep learning model you already developed in Tasks 2 and 3. In this case, you need to choose at least two transformers and apply them together.
- Train the model on multiple similar datasets and test it on one specific test set. Check if the model can be improved if it is trained on multiple datasets. However, for this, you may need to choose the similar datasets based on their classification (UCR repository has a specific dataset type such as **AUDIO** or **MOTION**).

Choose one model you want from the models you have developed in Tasks 1 and 2. Select one idea, try implementing it, and check if you can improve the performance. Note that you do not need to prove that the accuracy scores increase but must explain your trials. Report test scores on three datasets you chose.

#ResNet_Classifier

In [None]:
def ResNet_Classifier(input_shape, num_classes):
  n_feature_maps = 64
  input_layer = Input(input_shape)
  # BLOCK 1
  conv_x = Conv1D(filters=n_feature_maps, kernel_size=8, padding='same')(input_layer)
  conv_x = keras.layers.BatchNormalization()(conv_x)
  conv_x = keras.layers.Activation('relu')(conv_x)

  conv_y = Conv1D(filters=n_feature_maps, kernel_size=5, padding='same')(conv_x)
  conv_y = keras.layers.BatchNormalization()(conv_y)
  conv_y = keras.layers.Activation('relu')(conv_y)

  conv_z = Conv1D(filters=n_feature_maps, kernel_size=3, padding='same')(conv_y)
  conv_z = keras.layers.BatchNormalization()(conv_z)

  # expand channels for the sum
  shortcut_y = Conv1D(filters=n_feature_maps, kernel_size=1, padding='same')(input_layer)
  shortcut_y = keras.layers.BatchNormalization()(shortcut_y)
  output_block_1 = keras.layers.add([shortcut_y, conv_z])
  output_block_1 = keras.layers.Activation('relu')(output_block_1)

  # BLOCK 2
  conv_x = Conv1D(filters=n_feature_maps * 2, kernel_size=8, padding='same')(output_block_1)
  conv_x = keras.layers.BatchNormalization()(conv_x)
  conv_x = keras.layers.Activation('relu')(conv_x)

  conv_y = Conv1D(filters=n_feature_maps * 2, kernel_size=5, padding='same')(conv_x)
  conv_y = keras.layers.BatchNormalization()(conv_y)
  conv_y = keras.layers.Activation('relu')(conv_y)

  conv_z = Conv1D(filters=n_feature_maps * 2, kernel_size=3, padding='same')(conv_y)
  conv_z = keras.layers.BatchNormalization()(conv_z)

  # expand channels for the sum
  shortcut_y = Conv1D(filters=n_feature_maps * 2, kernel_size=1, padding='same')(output_block_1)
  shortcut_y = keras.layers.BatchNormalization()(shortcut_y)

  output_block_2 = keras.layers.add([shortcut_y, conv_z])
  output_block_2 = keras.layers.Activation('relu')(output_block_2)

  # BLOCK 3
  conv_x = Conv1D(filters=n_feature_maps * 2, kernel_size=8, padding='same')(output_block_2)
  conv_x = keras.layers.BatchNormalization()(conv_x)
  conv_x = keras.layers.Activation('relu')(conv_x)

  conv_y = Conv1D(filters=n_feature_maps * 2, kernel_size=5, padding='same')(conv_x)
  conv_y = keras.layers.BatchNormalization()(conv_y)
  conv_y = keras.layers.Activation('relu')(conv_y)

  conv_z = Conv1D(filters=n_feature_maps * 2, kernel_size=3, padding='same')(conv_y)
  conv_z = keras.layers.BatchNormalization()(conv_z)

  # no need to expand channels because they are equal
  shortcut_y = keras.layers.BatchNormalization()(output_block_2)
  output_block_3 = keras.layers.add([shortcut_y, conv_z])
  output_block_3 = keras.layers.Activation('relu')(output_block_3)

  # FINAL
  gap_layer = keras.layers.GlobalAveragePooling1D()(output_block_3)
  output_layer = Dense(num_classes, activation='softmax')(gap_layer)

  model = Model(inputs=input_layer, outputs=output_layer)
  return model

In [None]:
# Define the callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min', restore_best_weights=True)
model_checkpoint = ModelCheckpoint('ResNet_Classifier.h5', monitor='val_accuracy', mode='max', save_best_only=True, save_weights_only=False)

# Create the model
def train_model(train_dataset, val_dataset, test_dataset, X_train, num_classes, epochs, lr=0.01):
  model = ResNet_Classifier(input_shape=X_train.shape[1:], num_classes=num_classes)
  optimizer = Adam(learning_rate=lr)
  model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  history = model.fit(train_dataset, epochs=epochs, validation_data=val_dataset, callbacks=[early_stopping, model_checkpoint])
  test_loss, test_acc = model.evaluate(test_dataset)
  print('Test accuracy:', test_acc)
  return test_loss, test_acc

In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Ecg_train, Ecg_ytest, Ecg_val, Ecg_valtest, val=True)
test_dataset = load_data_rnn(ECG200_Test[0],ECG200_Test[1],val=False)

ECG_loss, ECG_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Test accuracy: 1.0


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Eq_train, Eq_ytest, Eq_val, Eq_valtest, val=True)
test_dataset = load_data_rnn(Earthquake_Test[0],Earthquake_Test[1],val=False)

Eq_loss, Eq_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Test accuracy: 0.7482014298439026


In [None]:
train_dataset, val_dataset, inputs = load_data_rnn(Gp_train, Gp_ytest, Gp_val, Gp_valtest, val=True)
test_dataset = load_data_rnn(GunPoint_Test[0],GunPoint_Test[1],val=False)

Gp_loss, Gp_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 2, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Test accuracy: 0.4933333396911621


### Task 4: Time series classification using sktime

As we saw above, we can use RandomizedSearch to find optimal parameter options. However, it does not support scikit-learn's classifiers such as DecisionTree or RandomForest. However, as we tried, it is also possible to use the output of the sktime transformer (e.g., catch22) to train scikit-learn models such as RandomForest. Sktime supports this with `SklearnClassifierPipeline` to put a  scikit-learn classifier and sktime's transformer together and you need to implement it.

Pick one classifier from scikit-learn (that can be anything! e.g., Decision Tree or Logistic Regressor) and two transformers from sktime and create `SklearnClassifierPipeline.` As we tried in this lab, that can be **Rocket with RandomForest** or **Catch22 with DecisionTree**. Pick one parameter from each module (in total, three, one from the classifier and two from two transformers) and run a randomized search on the pipeline you define and report the test score of the best model found by the randomized Search. Compare your best score to the score from the same model with the default setting.

Task 4 involves a time-consuming process, so you can only choose **one dataset** to perform the task above. Also, note that you do not need to perform better by conducting a randomized search for this task (but still good to try!).

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sktime.transformations.panel.rocket import Rocket
from sktime.transformations.panel.dictionary_based import _paa
from sktime.transformations.series.exponent import ExponentTransformer
from sktime.classification.compose import SklearnClassifierPipeline
from sktime.classification.base import BaseClassifier

#Baseline SklearnClassifierPipeline

In [None]:
X_train = Ecg_train
X_test = Ecg_val
y_train = Ecg_ytest
y_test = Ecg_valtest

In [None]:
clf = RandomForestClassifier()
rocket = Rocket(num_kernels=10000)
exp = ExponentTransformer()
pipeline = SklearnClassifierPipeline(clf, [rocket, exp])
pipeline = pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
print(f'Test Accuracy Score: {accuracy_score(y_test,y_pred)}')

Test Accuracy Score: 0.9


#RandomizedSearchCV for SKlearnClassifierPipeline

In [None]:
# define the classifier and transformers
clf = RandomForestClassifier()
rocket = Rocket(num_kernels=10000)
exp = ExponentTransformer()

# define the pipeline
pipeline = SklearnClassifierPipeline(clf, [rocket, exp])

# define the parameter grid for randomized search
param_grid = {
    'classifier__n_estimators': [100, 500, 1000],
    'Rocket__num_kernels': [500, 1000, 5000, 10000],
    'ExponentTransformer__power': [0.1, 0.3, 0.5]
}

# define the randomized search
rs = RandomizedSearchCV(
    pipeline, param_grid, cv=5, n_iter=10, random_state=42, n_jobs=-1
)

# fit the randomized search
rs.fit(X_train, y_train)

# get the best estimator from randomized search
best_estimator = rs.best_estimator_

# evaluate the best estimator on test data
y_pred = best_estimator.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(best_estimator)
print("Test accuracy:", accuracy)


SklearnClassifierPipeline(classifier=RandomForestClassifier(n_estimators=1000),
                          transformers=[Rocket(), ExponentTransformer()])
Test accuracy: 0.9


### Task 5: Multivariate time series classification

Time series can be **multivariate**, which means there can be many values (= data points) describing one time point. In this task, you will use one **multivariate** dataset (**BasicMotions**) and try to run one deep learning model and one sktime model to see if those models work well on multivariate time series.

- Use [BasicMotions](https://timeseriesclassification.com/description.php?Dataset=BasicMotions) dataset in the UCR/UEA repository.
- Run **ROCKET** and **the best tensorflow deep learning model from the previous tasks** on BasicMotions. You need to adjust the deep learning model's input layer to handle this multivariate dataset.
- Use sktime's `load_UCR_UEA_dataset` function to perform. You should use each dataset's original train/test splits.
- For the deep learning model, you should transform it using TensorFlow data API (`tf.data`) to manage your dataset and use `shuffle`, `batch`, and `prefetch` functions. This means that you need to create the validation set first. 
- For training, you need to run at least 10 epochs for your deep learning model. For ROCKET, you can keep the default parameter options. 
- Report the test scores of both models on the predefined test set.

In [None]:
BasicMotions_Train = load_UCR_UEA_dataset('BasicMotions', split='train', return_type='numpy3d')
BasicMotions_Test = load_UCR_UEA_dataset('BasicMotions', split='test', return_type='numpy3d')

In [None]:
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()


In [None]:
BasicMotions_Train = BasicMotions_Train[0], le.fit_transform(BasicMotions_Train[1])
BasicMotions_Test = BasicMotions_Test[0], le.fit_transform(BasicMotions_Test[1])

In [None]:
BM_train, BM_val, BM_ytest, BM_valtest = train_test_split(BasicMotions_Train[0],BasicMotions_Train[1], test_size = 0.1, random_state=seed_value)

#ROCKET Classifier

In [None]:
from sktime.classification.kernel_based import RocketClassifier
clf = RocketClassifier(num_kernels=5000) 
clf.fit(BM_train, BM_ytest)
preds = clf.predict(BM_val)
test_preds = clf.predict(BasicMotions_Test[0])
print(f'val_accuracy: {accuracy_score(BM_valtest,preds)}')
print(f'test_accuracy: {accuracy_score(BasicMotions_Test[1],test_preds)}')

val_accuracy: 1.0
test_accuracy: 1.0


#Deep Learning Model

In [None]:
def create_1d_cnn_lstm_model(input_shape, num_classes, num_filters=64, kernel_size=5, pool_size=2, dropout_rate=0.2):
    model = Sequential([
        Input(shape=input_shape),
        Conv1D(num_filters, kernel_size, padding='same', kernel_regularizer=l2(0.01)),
        keras.layers.BatchNormalization(),
        keras.layers.Activation('relu'),
        keras.layers.MaxPooling1D(pool_size=pool_size),
        LSTM(num_filters, dropout=dropout_rate, recurrent_dropout=dropout_rate),
        Dropout(dropout_rate),
        Dense(num_classes, activation='softmax')
    ])
    return model

# Define the callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min', restore_best_weights=True)
model_checkpoint = ModelCheckpoint('1d_cnn_lstm_model.h5', monitor='val_accuracy', mode='max', save_best_only=True, save_weights_only=False)

# Create the model
def train_model(train_dataset, val_dataset, test_dataset, X_train, num_classes, epochs, lr=0.01):
  model = create_1d_cnn_lstm_model(input_shape=X_train.shape[1:], num_classes=num_classes)
  optimizer = Adam(learning_rate=lr)
  model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  history = model.fit(train_dataset, epochs=epochs, validation_data=val_dataset, callbacks=[early_stopping, model_checkpoint])
  test_loss, test_acc = model.evaluate(test_dataset)
  print('Test accuracy:', test_acc)
  return test_loss, test_acc

In [None]:
train_dataset, val_dataset, inputs = load_data(BM_train, BM_ytest, BM_val, BM_valtest, val=True)
test_dataset = load_data(BasicMotions_Test[0],BasicMotions_Test[1],val=False)

BM_loss, BM_acc = train_model(train_dataset, val_dataset, test_dataset, inputs, 4, 100)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Test accuracy: 0.824999988079071


### Put everything together

You have tested various models on your four chosen datasets. Which model shows the best performance? You need to write a simple report which should be max two pages about your trials.
 - Task 0: State three univariate time series datasets you chose for the tasks.
 - Task 1: Report the average test scores of four models on three datasets you chose. It would be four scores in total. Mark the best model in terms of the average test score. Briefly explain the structure you constructed. 
 - Task 2: Report test scores of three models on three datasets you chose. It would be nine scores in total. Report the rank of average accuracy scores of three models. Briefly explain two deep learning model structures you constructed to confirm that you correctly developed the models in the paper.
 - Task 3: Briefly explain which model you chose, how you improved it. Report test scores of the model on three datasets you chose.
 - Task 4: Explain your choice of dataset and (classifier, transformer) pair and the parameters you tried to optimize. Report the best score and estimator from the randomized search instance and the test score of the best model.  Compare your best score to the score from the same model with the default setting.
 - Task 5: Report the best model (ROCKET vs your deep learning model) in terms of the test score. Briefly explain how you handle the multivariate dataset for your two models.