In [None]:
!pip install scipy==1.14.0
!pip install sktime --quiet
!pip install pyts --quiet
!pip install tsfresh --quiet
!pip install git+https://github.com/gon-uri/detach_rocket --quiet

## 1) Download and Prepare Data

### Download a dataset from the UCR archive

You can get a description of the datasets in the following webpage: https://www.timeseriesclassification.com/dataset.php

In [None]:
# We will use the UCR dataset

from detach_rocket.utils_datasets import fetch_ucr_dataset
from pyts.datasets import ucr_dataset_list

# Download Dataset
selected_dataset = ['WormsTwoClass']
print("Selected dataset:", selected_dataset)
current_dataset = fetch_ucr_dataset(selected_dataset[0])

print(" ")
# You can later try other UCR datasets (First use WormsTwoClass dataset)
dataset_list = ucr_dataset_list()
print("All UCR Datasets:", dataset_list)

In [None]:
# Function to convert all labels to 0,1,2,3,4,etc..

def convert_labels_to_integers(labels):
    # Create a mapping of unique labels to integers
    unique_labels = sorted(set(labels))
    label_to_int = {label: idx for idx, label in enumerate(unique_labels)}

    # Convert labels using the mapping
    converted_labels = np.asarray([label_to_int[label] for label in labels])
    return converted_labels, label_to_int

The ECG200 dataset was formatted by R. Olszewski as part of his thesis "Generalized feature extraction for structural pattern recognition in time-series data" at Carnegie Mellon University, 2001. Each series traces the electrical activity recorded during one heartbeat. The two classes are a normal heartbeat and a Myocardial Infarction.

### Create the corresponding Train and Test matrices

We unpack the dictionary downloaded form the UCR into the train and test matrices that we will use for training our models.

In [None]:
import numpy as np

# Create data matrices and remove possible rows with nans or infs
print(f"Dataset Matrix Shape: ( # of instances , # of channels , time series length )")
print(f" ")

# Train Matrix
X_train = current_dataset['data_train'].copy()

# This part is to make shure there is no nans of inf in the data
non_nan_mask_train = ~np.isnan(X_train).any(axis=1)
non_inf_mask_train = ~np.isinf(X_train).any(axis=1)
mask_train = np.logical_and(non_nan_mask_train,non_inf_mask_train)
X_train = X_train[mask_train]

# Shuffle dataset to avoid any possible bias (keep indexes to shuffle the labels)
np.random.seed(42)
indexes = np.arange(X_train.shape[0])
np.random.shuffle(indexes)
X_train = X_train[indexes]

# We now reshape the data into shape (num instances, num channels, num timesteps)
# This is to input the data into the feature extractor and the deep learning model.
X_train = X_train.reshape(X_train.shape[0],1,X_train.shape[1])
print(f"Train Matrix Shape: {X_train.shape}")

# We create the labels
y_train = current_dataset['target_train'].copy()

# Conver the labels to integers starting from zero
y_train, label_to_int = convert_labels_to_integers(y_train)

# Remove nans also form labels
y_train = y_train[mask_train]

# Shuffle the labels in the same way as the data
y_train = y_train[indexes]

print(f" ")

# Test Matrix
X_test = current_dataset['data_test'].copy()

# This part is to make shure there is no nans of inf in the data
non_nan_mask_test = ~np.isnan(X_test).any(axis=1)
non_inf_mask_test = ~np.isinf(X_test).any(axis=1)
mask_test = np.logical_and(non_nan_mask_test,non_inf_mask_test)
X_test = X_test[mask_test]

# We now reshape the data into shape (num instances, num channels, num timesteps)
X_test = X_test.reshape(X_test.shape[0],1,X_test.shape[1])
print(f"Test Matrix Shape: {X_test.shape}")

y_test = current_dataset['target_test'].copy()

# Remove nans also form labels
y_test = y_test[mask_test]

# Conver the labels to integers starting from zero
y_test = np.asarray([label_to_int[label] for label in y_test])

# Print the different unique classes
print(f" ")
print(f"Unique Classes: {np.unique(y_train)}")
number_of_classes = len(np.unique(y_train))

# Print the proportion of each class
print(f" ")
for i in np.unique(y_train):
    print(f"Class {i} has {np.sum(y_train==i)/len(y_train)*100:.2f}% of the data")

### Plotting the time series data

In [None]:
# Plot the 10 instances of each class in a subplot
import matplotlib.pyplot as plt

# Create a dictionary to map the class to a color
color_dict = {0:'red',1:'blue',2:'green',3:'yellow',4:'black',5:'orange',6:'purple',7:'pink',8:'brown',9:'cyan', -1:'cyan'}

# Plot the first 10 instances
plt.figure(figsize=(18,10))
for i in range(10):
    # pick random instance
    idx = np.random.randint(0,X_train.shape[0])
    plt.subplot(number_of_classes,5,i+1)
    plt.plot(X_train[idx,0],color=color_dict[y_train[idx]])
    plt.title(f"Class: {y_train[idx]}")
plt.tight_layout()
plt.show()

## 2) Feature Based Methods

### Create a Function to train and evaluate a Logistic Regression Classifier

We will create a simple function to train and evaluate a logistic regression classifier using sci-kit learn library.

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Define a function to train and evaluate a logistic regression classifier
def train_and_evaluate(X_train, y_train, X_test, y_test, verbose=True):

    # Create the classifier
    clf = LogisticRegression(max_iter=1000, penalty='l2',class_weight = 'balanced')
    # Yo can try a different classifier (e.g. SVM, random forest, etc.)
    # clf = SVC(kernel='rbf',C=1.0)
    # clf = RandomForestClassifier(n_estimators=50,max_depth=10)

    # Fit the classifier
    clf.fit(X_train,y_train)

    # Predict the train set
    y_pred_train = clf.predict(X_train)

    # Calculate the accuracy
    accuracy_train = accuracy_score(y_train,y_pred_train)

    # Compute the confusion matrix for the train set
    conf_matrix_train = confusion_matrix(y_train,y_pred_train)

    # Predict the test set
    y_pred_test = clf.predict(X_test)

    # Calculate the accuracy
    accuracy_test = accuracy_score(y_test,y_pred_test)

    # Compute the confusion matrix
    conf_matrix = confusion_matrix(y_test,y_pred_test)

    if verbose:
        print(f"Train Accuracy: {accuracy_train}")
        print(f" ")
        print(f"Confusion Matrix on Train Set: ")
        # plot confusion matrix
        disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix_train,display_labels=np.unique(y_train))
        disp.plot()
        plt.show()
        print(f" ")
        print(f"Test Accuracy: {accuracy_test}")
        print(f" ")
        print(f"Confusion Matrix on Test Set: ")
        # plot confusion matrix
        disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix,display_labels=np.unique(y_test))
        disp.plot()
        plt.show()

    return accuracy_train, accuracy_test, conf_matrix, clf

### A) Classify using TSFresh features
We first create the large set of features for each time series using TSFresh.

In [None]:
from sktime.transformations.panel.tsfresh import TSFreshFeatureExtractor

# Create TSFresh trasformation
ts_fresh_transform = TSFreshFeatureExtractor(default_fc_parameters="efficient", show_warnings=False, disable_progressbar=False)

# Fit and transform Time Series
X_train_ts = ts_fresh_transform.fit_transform(X_train)
X_test_ts = ts_fresh_transform.transform(X_test)

X_train_ts = X_train_ts.values
# This part is to make shure there is no nans of inf in the data
non_nan_mask_train = ~np.isnan(X_train_ts).any(axis=1)
non_inf_mask_train = ~np.isinf(X_train_ts).any(axis=1)
mask_train = np.logical_and(non_nan_mask_train,non_inf_mask_train)
X_train_ts = X_train_ts[mask_train]
y_train_ts = current_dataset['target_train'].copy()
y_train_ts = y_train_ts[mask_train]

X_test_ts = X_test_ts.values
# This part is to make shure there is no nans of inf in the data
non_nan_mask_test = ~np.isnan(X_test_ts).any(axis=1)
non_inf_mask_test = ~np.isinf(X_test_ts).any(axis=1)
mask_test = np.logical_and(non_nan_mask_test,non_inf_mask_test)
X_test_ts = X_test_ts[mask_test]
y_test_ts = current_dataset['target_test'].copy()
y_test_ts = y_test_ts[mask_test]

print(f" ")
print(f"TSFresh Features Matrix Shape: ( # of instances , # of features )")
print(f" ")
print(f"Train: {X_train_ts.shape}")
print(f" ")
print(f"Test: {X_test_ts.shape}")

We evaluate how well these features can classify the data.

In [None]:
# Train and evaluate the classifier for the features created by TSFresh
print(f"Results for TSFresh:")
print(f" ")
acc_train_ts, acc_test_ts, conf_matrix_ts, classifier_ts = train_and_evaluate(X_train_ts, y_train_ts, X_test_ts, y_test_ts)

### B) Classify using Catch22
We now compute the reduced set of features proposed in Catch22.

In [None]:
from sktime.transformations.panel.catch22 import Catch22

# Create Catch22 trasformation
catch22_transform = Catch22()

# Fit and transform Time Series
X_train_catch22 = catch22_transform.fit_transform(X_train)
X_test_catch22 = catch22_transform.transform(X_test)

X_train_catch22 = X_train_catch22.values
# This part is to make shure there is no nans of inf in the data
non_nan_mask_train = ~np.isnan(X_train_catch22).any(axis=1)
non_inf_mask_train = ~np.isinf(X_train_catch22).any(axis=1)
mask_train = np.logical_and(non_nan_mask_train,non_inf_mask_train)
X_train_catch22 = X_train_catch22[mask_train]
y_train_catch22 = current_dataset['target_train'].copy()
y_train_catch22 = y_train_catch22[mask_train]

X_test_catch22 = X_test_catch22.values
# This part is to make shure there is no nans of inf in the data
non_nan_mask_test = ~np.isnan(X_test_catch22).any(axis=1)
non_inf_mask_test = ~np.isinf(X_test_catch22).any(axis=1)
mask_test = np.logical_and(non_nan_mask_test,non_inf_mask_test)
X_test_catch22 = X_test_catch22[mask_test]
y_test_catch22 = current_dataset['target_test'].copy()
y_test_catch22 = y_test_catch22[mask_test]

print(f" ")
print(f"Catch22 Features Matrix Shape: ( # of instances , # of features )")
print(f" ")
print(f"Train: {X_train_catch22.shape}")
print(f" ")
print(f"Test: {X_test_catch22.shape}")

We evaluate how well these features can classify the data.

In [None]:
# Train and evaluate the classifier for the features created by TSFresh
print(f"RResults for Catach22:")
print(f" ")
acc_train_catch22, acc_test_catch22, conf_matrix_catch22, classifier_catch22 = train_and_evaluate(X_train_catch22, y_train_catch22, X_test_catch22, y_test_catch22)

## 3) Deep Learning Method: CNN Model
We will train a vanilla CNN architecture to solve the classification task.

In [None]:
# The definition of these callbacks is just to store the accuracy values during training process.
# There is no need to modify or understand this part of the code.

import tensorflow as tf
from sklearn.metrics import accuracy_score

acc_list_train = []
acc_list_test = []

class AccuracyCallbackTrain(tf.keras.callbacks.Callback):
    def __init__(self, X, y, threshold=0.5):
        super().__init__()
        self.X = np.swapaxes(X, 1, 2) # Correct the shape if necessary
        self.y = y
        self.threshold = threshold  # Threshold for binary classification (default 0.5)

    def on_epoch_end(self, epoch, logs=None):

        # Get predictions on the entire training set
        predictions = self.model.predict(self.X)
        y_pred = predictions.argmax(axis=1)
        accuracy = accuracy_score(self.y, y_pred)

        # Store accuracy value
        acc_list_train.append(accuracy)
        #print(f"Epoch {epoch + 1}: Accuracy = {accuracy:.4f}")

class AccuracyCallbackTest(tf.keras.callbacks.Callback):
    def __init__(self, X, y, threshold=0.5):
        super().__init__()
        self.X = np.swapaxes(X, 1, 2) # Correct the shape if necessary
        self.y = y
        self.threshold = threshold  # Threshold for binary classification (default 0.5)

    def on_epoch_end(self, epoch, logs=None):

        # Get predictions on the entire training set
        predictions = self.model.predict(self.X)
        y_pred = predictions.argmax(axis=1)
        accuracy = accuracy_score(self.y, y_pred)

        # Store accuracy value
        acc_list_test.append(accuracy)
        #print(f"Epoch {epoch + 1}: Accuracy = {accuracy:.4f}")

Let's train the simple CNN model on the raw time series data.

By default, this model will posses the following hyperparameters:

*   kernel_size = 7
*   avg_pool_size = 3
*   n_conv_layers = 2
*   filter_sizes = [6, 12] (this is the number of kernelsper layer, need to be equal to the number of layers)

You can read more in depth about it in the corresponding documentation:
https://www.sktime.net/en/latest/api_reference/auto_generated/sktime.classification.deep_learning.CNNClassifier.html

In [None]:
from sktime.classification.deep_learning import CNNClassifier

# Initialize the callbacks (this is to keep track of the learning process while training)
train_accuracy_callback = AccuracyCallbackTrain(X_train, y_train)
test_accuracy_callback = AccuracyCallbackTest(X_test, y_test)
acc_list_train = []
acc_list_test = []

# Create the classifier
cnn_classifier = CNNClassifier(

    # PARAMETERS
    n_epochs=40, # Number of epochs
    #kernel_size = 7,
    #avg_pool_size = 3,
    #n_conv_layers = 2,
    #filter_sizes = [6, 12],

    verbose=True, # Print information about the training
    callbacks=[train_accuracy_callback,test_accuracy_callback] # Call the custom callbacks we have created
    )

# Train the model
cnn_classifier.fit(X_train, y_train)

print("Training Set Accuracies:", acc_list_train)
print("Test Set Accuracies:", acc_list_test)

# Plot both the loss and the accuracies (train and test) in the same graph using two different y-axis.
import matplotlib.pyplot as plt

fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss', color=color)
ax1.plot(cnn_classifier.history.history['loss'], color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Accuracy', color=color)
ax2.plot(acc_list_train, color=color, linestyle='dashed', label='Train Accuracy')
ax2.plot(acc_list_test, color=color, linestyle='solid', label='Test Accuracy')
ax2.tick_params(axis='y', labelcolor=color)

fig.legend(loc="upper right")
fig.tight_layout()
plt.show()

In [None]:
# Compute and print the accuracy on the train and test set using the final model

from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

y_test_pred = cnn_classifier.predict(X_test)
y_train_pred = cnn_classifier.predict(X_train)

# Predict the train set
y_train_pred = cnn_classifier.predict(X_train)

# Calculate the accuracy
accuracy_train = accuracy_score(y_train,y_train_pred)

# Calculate the accuracy
accuracy_test = accuracy_score(y_test,y_test_pred)

print(f"Accuracy Train = {accuracy_train:.2f}")
print(f"Accuracy Test = {accuracy_test:.2f}")

## Exercises

1) Compare both feature based strategies, which one works better for this dataset? Try a different classifier on the Catch22 features. Are you able to reach a better accuracy?

3) What happens when you train the CNN model on a large number of epochs? Do more epochs help? Explore the architecture by modifying some of the model's hyperparameters (the default architecture is far from the best!)

4) [EXTRA] Implement a recurrent neural network (a GRU classifier). Compare its performance / training time / number of parameters with the CNN. Use the documentation of the SKTime library for help.