In [None]:
from tsai.all import *
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, balanced_accuracy_score
import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import json
import time

In [None]:
import subprocess

def run_bash_command(command):
    try:
        output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT)
        print("Output:", output.decode('utf-8'))
    except subprocess.CalledProcessError as e:
        print("Error:", e.output.decode('utf-8'))
        return None

In [None]:
class EvaluateTsai(object):
  N_EPOCHS=100
  LR=1e-3
  SOURCEPATH = '..'

  def __init__(self, method, train_dataset, test_dataset):
    self.X_train = None
    self.X_test = None
    self.y_train = None
    self.y_test = None
    self.learn = None
    self.dls = None
    self.method = method
    self.train_dataset = train_dataset
    self.test_dataset = test_dataset

  def split_data(self, dataset):
    data = np.load(f'{self.SOURCEPATH}/data/{dataset}')
    y = data[:,-3:]
    labels = np.argmax(y, axis=1) + 1
    X = data[:,:-3]
    features = X.reshape(X.shape[0], 1, X.shape[1])

    return features, labels

  def load_data(self):
    self.X_train, self.y_train = self.split_data(self.train_dataset)
    self.X_test, self.y_test = self.split_data(self.test_dataset)

    print(f"X_train shape: {self.X_train.shape}")
    print(f"y_train shape: {self.y_train.shape}")

    print(f"X_test shape: {self.X_test.shape}")
    print(f"y_test shape: {self.y_test.shape}")


  def train_model(self, architecture, alias, parameters):
    # Parameters
    bs=[64, 128]

    # Create time series
    splits=get_splits(
        self.y_train,
        valid_size=.3,
        stratify=True,
        random_state=42,
        shuffle=True
        )

    tfms=[None,[Categorize()]]
    dsets=TSDatasets(
        self.X_train,
        self.y_train,
        tfms=tfms,
        splits=splits,
        inplace=True
        )

    dls=TSDataLoaders.from_dsets(
        dsets.train,
        dsets.valid,
        bs=bs,
        batch_tfms=[TSStandardize()]
        )

    model = create_model(architecture, dls=dls, **parameters)

    learn = Learner(dls,
                    model,
                    metrics=accuracy)
    start = time.time()
    learn.fit_one_cycle(self.N_EPOCHS,
                        lr_max=self.LR)
    print(f'Training model: {alias}')
    print('\nElapsed time:', time.time() - start)

    self.learn=learn
    self.dls=dls

  def load_model(self, architecture, parameters, alias):
    model_path = f"""{self.SOURCEPATH}/models/cross_dataset/{self.train_dataset.replace('.npy','')}/{alias}_{parameters}"""


    try:
        self.model = load_learner_all(path=model_path,
                                      dls_fname='dls',
                                      model_fname='model',
                                      learner_fname='learner')
        print(f"Model found: {model_path}")
    except FileNotFoundError:
        print("Model not found!")
        print("Training a new model!")

        splits = get_splits(self.y_train,
                            valid_size=.3,
                            stratify=True,
                            random_state=42,
                            shuffle=True)

        tfms = [None, TSClassification()]
        batch_tfms = TSStandardize(by_sample=True)
        self.model = TSClassifier(self.X_train, self.y_train, splits=splits,
                                  arch=architecture, tfms=tfms, arch_config=parameters,
                                  batch_tfms=batch_tfms, metrics=accuracy, cbs=ShowGraph())
        self.model.fit_one_cycle(self.N_EPOCHS, lr_max=self.LR)

        try:
            run_bash_command(f"mkdir -p {model_path}")
        except:
            print("Folder already exists!")

        self.model.save_all(path=model_path,
                            dls_fname='dls',
                            model_fname='model',
                            learner_fname='learner')

        print(f"Model saved as: {model_path}")

  def get_metrics(self, architecture, alias, parameters, write_flag=False):

    test_probas, test_targets, test_preds = self.model.get_X_preds(
        self.X_test, self.y_test)

    accuracy = accuracy_score(self.y_test.astype(str), test_preds.astype(str))
    precision = precision_score(self.y_test.astype(str), test_preds.astype(str), average=self.method)
    recall = recall_score(self.y_test.astype(str), test_preds.astype(str), average=self.method)
    f1 = f1_score(self.y_test.astype(str), test_preds.astype(str), average=self.method)
    balanced_accuracy = balanced_accuracy_score(self.y_test.astype(str), test_preds.astype(str))

    iter_dict = {"Accuracy":accuracy,
                 "Precision":precision,
                 "Recall":recall,
                 "F1-Score":f1,
                 "Balanced Accuracy":balanced_accuracy,
                 'Model':alias,
                 'Parameters':parameters}

    # Generating the confusion matrix
    cm = confusion_matrix(self.y_test.astype(str), test_preds.astype(str))

    self.model.plot_confusion_matrix()

    # Optional: Display the confusion matrix
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()

    metrics_path = f"""{self.SOURCEPATH}/metrics/cross_dataset/{self.train_dataset.replace('.npy','')}"""

    print(iter_dict)

    if write_flag:
      try:
        run_bash_command(f"mkdir -p {metrics_path}")
      except:
        print("Folder already exist!")

      with open(f"{metrics_path}/{alias}_{parameters}.json", "w") as outfile:
        json.dump(iter_dict, outfile, indent=4)

        np.save(f"{metrics_path}/{alias}_{parameters}_cm.npy", cm)


      print(f"Metrics saved as: {metrics_path}/{alias}_{parameters}.json")

  def evaluate_model(self, archs, write_flag):
    self.load_data()

    for (architecture, parameters, alias) in archs:
      self.load_model(architecture, parameters, alias)
      self.get_metrics(architecture, parameters, alias, write_flag)

In [None]:
method = 'weighted'
train_dataset = 'window_4000_overlap_1000_hzdr_norm.npy'
test_dataset = 'window_4000_overlap_1000_tud_norm.npy'

model_definitions = [(ResNet, {}, 'ResNet'),
                     (LSTM_FCN, {}, 'LSTM-FCN'),
                     (TSTPlus, {},'TSTPlus')]

EVAL = EvaluateTsai(method, train_dataset, test_dataset)
EVAL.evaluate_model(model_definitions, True)

In [None]:
train_dataset = 'window_4000_overlap_1000_tud_norm.npy'
test_dataset = 'window_4000_overlap_1000_hzdr_norm.npy'

model_definitions = [(ResNet, {}, 'ResNet'),
                     (LSTM_FCN, {}, 'LSTM-FCN'),
                     (TSTPlus, {},'TSTPlus')]

EVAL = EvaluateTsai(method, train_dataset, test_dataset)
EVAL.evaluate_model(model_definitions, True)