In [1]:
import numpy as np
import wfdb
import _pickle as pickle

In [2]:
import glob
import numpy as np
from scipy.signal import resample_poly
import wfdb
import math
import _pickle as pickle

def Data_Preparation(samples):
    print('Getting the Data ready ...')

    # Set random seed for reproducibility
    seed = 1234
    np.random.seed(seed=seed)

    # Load QT Database
    with open('data/QTDatabase.pkl', 'rb') as input:
        qtdb = pickle.load(input)

    print(f"[INFO] Loaded QTDatabase with {len(qtdb.keys())} signals")

    # Load combined noise
    with open('data/CombinedNoise.pkl', 'rb') as input:
        combined_noise = pickle.load(input)
    print(f"[INFO] Loaded CombinedNoise with {len(combined_noise)} channels")

    #####################################
    # Data split
    #####################################
    test_set = ['sel123', 'sel233', 'sel302', 'sel307', 'sel820', 'sel853', 
                'sel16420', 'sel16795', 'sele0106', 'sele0121', 'sel32', 'sel49', 
                'sel14046', 'sel15814']

    beats_train = []
    beats_test = []
    valid_train_indices = []  # To keep track of valid indices in training data
    valid_test_indices = []   # To keep track of valid indices in test data
    sn_train = []
    sn_test = []
    noise_indices_train = []
    noise_indices_test = []    
    
    skip_beats = 0
    # samples = 512
    qtdb_keys = list(qtdb.keys())

    print(f"[INFO] Processing QTDatabase, {len(qtdb_keys)} signals to process.")
# b_np.shape는 (512,)로, 패딩을 포함한 전체 샘플 크기가 512임을 알 수 있습니다.
    for signal_name in qtdb_keys:
        for b_idx, b in enumerate(qtdb[signal_name]):
            b_np = np.zeros(samples)
            b_sq = np.array(b)

            init_padding = 16
            if b_sq.shape[0] > (samples - init_padding):
                skip_beats += 1
                continue
# 이 평균값을 b_sq의 각 값에서 빼는 과정은 신호의 중앙화 작업입니다. 즉, 신호의 값들이 배열의 양 끝 값의 평균을 기준으로 대칭적으로 배치되도록 변환됩니다.
# 이 계산을 통해 신호의 첫 값과 마지막 값에 대한 편향을 제거하고, 신호를 중앙으로 이동시키는 효과가 있습니다.
            b_np[init_padding:b_sq.shape[0] + init_padding] = b_sq - (b_sq[0] + b_sq[-1]) / 2

            if signal_name in test_set:
                beats_test.append(b_np)
                valid_test_indices.append(len(beats_test) - 1)  # Track valid test beat index
            else:
                beats_train.append(b_np)
                valid_train_indices.append(len(beats_train) - 1)  # Track valid train beat index

        print(f"[DEBUG] Processed signal {signal_name}, total beats in train: {len(beats_train)}, total beats in test: {len(beats_test)}")

    #####################################
    # Adding noise to train and test sets
    #####################################
    print(f"[INFO] Adding noise to train and test sets")
    # Random scaling factor for train and test
    # size=len(beats_train): beats_train의 길이만큼 난수를 생성합니다. 즉, beats_train에 있는 심박 데이터의 개수와 동일한 수의 난수를 생성합니다.
    rnd_train = np.random.randint(low=20, high=200, size=len(beats_train)) / 100
    noise_index = 0
    # Adding noise to train
    # https://chatgpt.com/g/g-cKXjWStaE-python/c/66e1471b-57b4-8006-b921-233e7803fcab
    for beat_idx, beat in enumerate(beats_train):
        if (beat_idx // 10) % 2 == 0:
            selected_channel = beat_idx % 2  # 0과 1을 번갈아 선택
        else:
            selected_channel = (beat_idx + 1) % 2  # 반대 순서로 선택

        # 노이즈 조합도 순차적으로 선택, 주기적으로 변화를 줌 (매 8회 주기)
        noise_combination_idx = (beat_idx % 7) + 1  # 1부터 7까지 순차적으로 선택
        noise = combined_noise[selected_channel][:, noise_combination_idx]
        noise_segment = noise[noise_index:noise_index + samples]
        beat_max_value = np.max(beat) - np.min(beat)
        noise_max_value = np.max(noise_segment) - np.min(noise_segment)
        Ase = noise_max_value / beat_max_value
        alpha = rnd_train[beat_idx] / Ase
        signal_noise = beat + alpha * noise_segment
        sn_train.append(signal_noise)
        noise_indices_train.append(noise_combination_idx)  # 노이즈 인덱스 저장
        noise_index += samples
        if noise_index > (len(noise) - samples):
            noise_index = 0

                
    # Adding noise to test
    noise_index = 0
    rnd_test = np.random.randint(low=20, high=200, size=len(beats_test)) / 100
    # Saving the random array so we can use it on the amplitude segmentation tables
    np.save('rnd_test.npy', rnd_test)
    print('rnd_test shape: ' + str(rnd_test.shape))
    
    for beat_idx, beat in enumerate(beats_test):
        # if np.random.rand() < channel_ratio:
        if (beat_idx // 10) % 2 == 0:
            selected_channel = beat_idx % 2  # 0과 1을 번갈아 선택
        else:
            selected_channel = (beat_idx + 1) % 2  # 반대 순서로 선택
        # 노이즈 조합도 순차적으로 선택, 주기적으로 변화를 줌 (매 8회 주기)
        noise_combination_idx = (beat_idx % 7) + 1  # 1부터 7까지 순차적으로 선택
        noise = combined_noise[selected_channel][:, noise_combination_idx]
        noise_segment = noise[noise_index:noise_index + samples]
        beat_max_value = np.max(beat) - np.min(beat)
        noise_max_value = np.max(noise_segment) - np.min(noise_segment)
        Ase = noise_max_value / beat_max_value
        alpha = rnd_test[beat_idx] / Ase
        signal_noise = beat + alpha * noise_segment
        sn_test.append(signal_noise)
        noise_indices_test.append(noise_combination_idx)  # 노이즈 인덱스 저장
        noise_index += samples
        if noise_index > (len(noise) - samples):
            noise_index = 0
    X_train = np.array(sn_train)[valid_train_indices]  # Match noisy and original beats
    X_test = np.array(sn_test)[valid_test_indices]

    y_train = np.array(beats_train)[valid_train_indices]  # Match noisy and original beats
    y_test = np.array(beats_test)[valid_test_indices]

    X_train = np.expand_dims(X_train, axis=2)
    y_train = np.expand_dims(y_train, axis=2)

    X_test = np.expand_dims(X_test, axis=2)
    y_test = np.expand_dims(y_test, axis=2)

    Dataset = [X_train, y_train, X_test, y_test]
    print(f"[INFO] Final shapes -> X_train: {X_train.shape}, y_train: {y_train.shape}, X_test: {X_test.shape}, y_test: {y_test.shape}")
    print('Dataset ready to use.')

    return Dataset, valid_train_indices, valid_test_indices, noise_indices_train, noise_indices_test


In [3]:
import numpy as np
from scipy.fft import fft
import glob
from scipy.signal import resample_poly
import wfdb
import math
import _pickle as pickle


def make_fourier(inputs, n, fs):
    """
    주파수 도메인 정보 추출 및 time-domain과 같은 shape으로 만듦.
    
    Parameters:
    inputs: 입력 신호 (원본 신호, 2D 배열 - (배치 크기, 샘플 수))
    n: FFT 샘플 수
    fs: 샘플링 주파수 (예: 360 Hz)
    
    Returns:
    주파수 도메인에서 얻은 신호 (FFT), time-domain과 동일한 크기
    """
    T = n / fs
    k = np.arange(n)
    freq = k / T
    freq = freq[range(int(n / 2))]

    signal_list = []
    for i in range(inputs.shape[0]):
        y = inputs[i, :]
        Y = fft(y) / n  # FFT 수행 후 정규화
        Y = np.abs(Y[range(int(n / 2))])
        # Magnitude 값을 두 배로 늘려 time-domain과 동일한 shape으로 맞춤 (512)
        Y_full = np.hstack([Y, Y])  # Duplicate to match time-domain size
        signal_list.append(Y_full)

    return np.asarray(signal_list)

def Data_Preparation_with_Fourier(samples, fs=360):
    print('Getting the Data ready ...')

    # Set random seed for reproducibility
    seed = 1234
    np.random.seed(seed=seed)

    # Load QT Database
    with open('data/QTDatabase.pkl', 'rb') as input:
        qtdb = pickle.load(input)

    print(f"[INFO] Loaded QTDatabase with {len(qtdb.keys())} signals")
    # Load combined noise
    with open('data/CombinedNoise.pkl', 'rb') as input:
        combined_noise = pickle.load(input)
    print(f"[INFO] Loaded CombinedNoise with {len(combined_noise)} channels")

    #####################################
    # Data split
    #####################################
    test_set = ['sel123', 'sel233', 'sel302', 'sel307', 'sel820', 'sel853', 
                'sel16420', 'sel16795', 'sele0106', 'sele0121', 'sel32', 'sel49', 
                'sel14046', 'sel15814']

    beats_train = []
    beats_test = []
    fourier_train_x = []
    fourier_test_x = []
    fourier_train_y = []
    fourier_test_y = []
    valid_train_indices = []  # To keep track of valid indices in training data
    valid_test_indices = []   # To keep track of valid indices in test data
    # 노이즈 인덱스 저장 리스트
    noise_indices_train = []
    noise_indices_test = []
    sn_train = []
    sn_test = []
    
    skip_beats = 0
    qtdb_keys = list(qtdb.keys())

    print(f"[INFO] Processing QTDatabase, {len(qtdb.keys())} signals to process.")

    for signal_name in qtdb_keys:
        for b_idx, b in enumerate(qtdb[signal_name]):
            b_np = np.zeros(samples)
            b_sq = np.array(b)

            init_padding = 16
            if b_sq.shape[0] > (samples - init_padding):
                skip_beats += 1
                continue

            b_np[init_padding:b_sq.shape[0] + init_padding] = b_sq - (b_sq[0] + b_sq[-1]) / 2

            # Fourier 변환 적용 (주파수 도메인 정보, time-domain과 동일한 shape으로)
            fourier_transformed_y = make_fourier(b_np.reshape(1, -1), samples, fs)

            if signal_name in test_set:
                beats_test.append(b_np)
                fourier_test_y.append(fourier_transformed_y[0])  # Append the single batch
                valid_test_indices.append(len(beats_test) - 1)  # Track valid test beat index
            else:
                beats_train.append(b_np)
                fourier_train_y.append(fourier_transformed_y[0])  # Append the single batch
                valid_train_indices.append(len(beats_train) - 1)  # Track valid train beat index

        print(f"[DEBUG] Processed signal {signal_name}, total beats in train: {len(beats_train)}, total beats in test: {len(beats_test)}")

    #####################################
    # Adding noise to train and test sets
    #####################################
    print(f"[INFO] Adding noise to train and test sets")
    # Random scaling factor for train and test
    rnd_train = np.random.randint(low=20, high=200, size=len(beats_train)) / 100
    noise_index = 0
    # To ensure equal selection of channels
    # Adding noise to train
    for beat_idx, beat in enumerate(beats_train):
        # if np.random.rand() < channel_ratio:
        if (beat_idx // 10) % 2 == 0:
            selected_channel = beat_idx % 2  # 0과 1을 번갈아 선택
        else:
            selected_channel = (beat_idx + 1) % 2  # 반대 순서로 선택

        # 노이즈 조합도 순차적으로 선택, 주기적으로 변화를 줌 (매 8회 주기)
        noise_combination_idx = (beat_idx % 7) + 1  # 1부터 7까지 순차적으로 선택
        noise = combined_noise[selected_channel][:, noise_combination_idx]
        noise_segment = noise[noise_index:noise_index + samples]
        beat_max_value = np.max(beat) - np.min(beat)
        noise_max_value = np.max(noise_segment) - np.min(noise_segment)
        Ase = noise_max_value / beat_max_value
        alpha = rnd_train[beat_idx] / Ase
        signal_noise = beat + alpha * noise_segment
        sn_train.append(signal_noise)
        fourier_transformed_x = make_fourier(signal_noise.reshape(1, -1), samples, fs)  # X에 대한 Fourier 변환
        fourier_train_x.append(fourier_transformed_x[0])  # Append the single batch
        noise_indices_train.append(noise_combination_idx)  # 노이즈 인덱스 저장
        noise_index += samples
        if noise_index > (len(noise) - samples):
            noise_index = 0

    # Adding noise to test
    noise_index = 0
    rnd_test = np.random.randint(low=20, high=200, size=len(beats_test)) / 100
    np.save('rnd_test.npy', rnd_test)
    print('rnd_test shape: ' + str(rnd_test.shape))
        
    for beat_idx, beat in enumerate(beats_test):
        # if np.random.rand() < channel_ratio:
        if (beat_idx // 10) % 2 == 0:
            selected_channel = beat_idx % 2  # 0과 1을 번갈아 선택
        else:
            selected_channel = (beat_idx + 1) % 2  # 반대 순서로 선택
        # 노이즈 조합도 순차적으로 선택, 주기적으로 변화를 줌 (매 8회 주기)
        noise_combination_idx = (beat_idx % 7) + 1  # 1부터 7까지 순차적으로 선택
        noise = combined_noise[selected_channel][:, noise_combination_idx]
        noise_segment = noise[noise_index:noise_index + samples]
        beat_max_value = np.max(beat) - np.min(beat)
        noise_max_value = np.max(noise_segment) - np.min(noise_segment)
        Ase = noise_max_value / beat_max_value
        alpha = rnd_test[beat_idx] / Ase
        signal_noise = beat + alpha * noise_segment
        sn_test.append(signal_noise)
        fourier_transformed_x = make_fourier(signal_noise.reshape(1, -1), samples, fs)  # X에 대한 Fourier 변환
        fourier_test_x.append(fourier_transformed_x[0])  # Append the single batch
        noise_indices_test.append(noise_combination_idx)  # 노이즈 인덱스 저장
        noise_index += samples
        if noise_index > (len(noise) - samples):
            noise_index = 0


    X_train = np.array(sn_train)[valid_train_indices]  # Match noisy and original beats
    X_test = np.array(sn_test)[valid_test_indices]

    y_train = np.array(beats_train)[valid_train_indices]  # Match noisy and original beats
    y_test = np.array(beats_test)[valid_test_indices]

    # Fourier 정보도 포함된 주파수 도메인 데이터셋 생성
    F_train_x = np.array(fourier_train_x)[valid_train_indices]
    F_test_x = np.array(fourier_test_x)[valid_test_indices]
    F_train_y = np.array(fourier_train_y)[valid_train_indices]
    F_test_y = np.array(fourier_test_y)[valid_test_indices]

    # Shape을 time-domain과 동일하게 확장
    X_train = np.expand_dims(X_train, axis=2)
    y_train = np.expand_dims(y_train, axis=2)

    X_test = np.expand_dims(X_test, axis=2)
    y_test = np.expand_dims(y_test, axis=2)

    F_train_x = np.expand_dims(F_train_x, axis=2)
    F_train_y = np.expand_dims(F_train_y, axis=2)
    
    F_test_x = np.expand_dims(F_test_x, axis=2)
    F_test_y = np.expand_dims(F_test_y, axis=2)

    Dataset = [X_train, y_train, X_test, y_test, F_train_x, F_train_y, F_test_x, F_test_y]
    
    print(f"[INFO] Final shapes -> X_train: {X_train.shape}, y_train: {y_train.shape}, X_test: {X_test.shape}, y_test: {y_test.shape}")
    print(f"[INFO] Fourier shapes -> F_train_x: {F_train_x.shape}, F_train_y: {F_train_y.shape}, F_test_x: {F_test_x.shape}, F_test_y: {F_test_y.shape}")
    print('Dataset ready to use.')

    return Dataset, valid_train_indices, valid_test_indices, noise_indices_train, noise_indices_test


In [4]:
# Unpack the dataset
Dataset, valid_train_indices, valid_test_indices, noise_indices_train, noise_indices_test = Data_Preparation_with_Fourier(samples=512, fs=360)
X_train, y_train, X_test, y_test, F_train_x, F_train_y, F_test_x, F_test_y = Dataset

# Print dataset shapes for confirmation
print(f"Time domain train shapes: X_train: {X_train.shape}, y_train: {y_train.shape}")
print(f"Frequency domain train shapes: F_train_x: {F_train_x.shape}, F_train_y: {F_train_y.shape}")
print(f"Time domain test shapes: X_test: {X_test.shape}, y_test: {y_test.shape}")
print(f"Frequency domain test shapes: F_test_x: {F_test_x.shape}, F_test_y: {F_test_y.shape}")

TypeError: Data_Preparation_with_Fourier() got an unexpected keyword argument 'channel_ratio'

In [1]:
import wfdb
import _pickle as pickle
from datetime import datetime
import numpy as np

dl_experiments = ['DRNN',
                  'FCN-DAE',
                  'Vanilla L',
                  'Vanilla NL',
                  'Multibranch LANL',
                  'Multibranch LANLD',
                  'Transformer_DAE',
                  # 'Transformer_FDAE',
                #   'Transformer_FREDAE',
                  'Transformer_COMBDAE',
                  # 'Transformer_COMBDAE_with_CrossDomainAttention',
                  # 'Transformer_COMBDAE_FreTS'
                  ]


In [None]:
# current_date = datetime.now().strftime('%m%d')
# train_time_list = []
# test_time_list = []

# # Classical Filters

# # FIR
# print('Running FIR fiter on the test set. This will take a while (2h)...')
# start_test = datetime.now()
# [X_test_f, y_test_f, y_filter] = FIR_test_Dataset(Dataset)
# end_test = datetime.now()
# train_time_list.append(0)
# test_time_list.append(end_test - start_test)

# test_results_FIR = [X_test_f, y_test_f, y_filter]

# # Save FIR filter results
# with open('test_results_FIR.pkl', 'wb') as output:  # Overwrites any existing file.
#     pickle.dump(test_results_FIR, output)
# print('Results from experiment FIR filter saved')

# # IIR
# print('Running IIR fiter on the test set. This will take a while (25 mins)...')
# start_test = datetime.now()
# [X_test_f, y_test_f, y_filter] = IIR_test_Dataset(Dataset)
# end_test = datetime.now()
# train_time_list.append(0)
# test_time_list.append(end_test - start_test)

# test_results_IIR = [X_test_f, y_test_f, y_filter]

# # Save IIR filter results
# with open('test_results_IIR.pkl', 'wb') as output:  # Overwrites any existing file.
#     pickle.dump(test_results_IIR, output)
# print('Results from experiment IIR filter saved')

In [2]:
# Define Metric functions
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def SSD(y, y_pred):
    return np.sum(np.square(y - y_pred), axis=1)  # axis 1 is the signal dimension


def MAD(y, y_pred):
    return np.max(np.abs(y - y_pred), axis=1) # axis 1 is the signal dimension


def PRD(y, y_pred):
    N = np.sum(np.square(y_pred - y), axis=1)
    D = np.sum(np.square(y_pred - np.mean(y)), axis=1)

    PRD = np.sqrt(N/D) * 100

    return PRD


def COS_SIM(y, y_pred):
    cos_sim = []

    y = np.squeeze(y, axis=-1)
    y_pred = np.squeeze(y_pred, axis=-1)

    for idx in range(len(y)):
        kl_temp = cosine_similarity(y[idx].reshape(1, -1), y_pred[idx].reshape(1, -1))
        cos_sim.append(kl_temp)

    cos_sim = np.array(cos_sim)
    return cos_sim

In [None]:
# # Saving timing list
# timing = [train_time_list, test_time_list]
# with open('timing.pkl', 'wb') as output:  # Overwrites any existing file.
#     pickle.dump(timing, output)
# print('Timing saved')

# # Load timing
# with open('timing.pkl', 'rb') as input:
#     timing = pickle.load(input)
#     [train_time_list, test_time_list] = timing

In [3]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from prettytable import PrettyTable

def generate_table(metrics, metric_values, Exp_names):
    # Print tabular results in the console, in a pretty way
    print('\n')

    tb = PrettyTable()
    ind = 0

    for exp_name in Exp_names:

        tb.field_names = ['Method/Model'] + metrics

        tb_row = []
        tb_row.append(exp_name)

        for metric in metric_values:   # metric_values[metric][model][beat]
            m_mean = np.mean(metric[ind])
            m_std = np.std(metric[ind])
            tb_row.append('{:.3f}'.format(m_mean) + ' (' + '{:.3f}'.format(m_std) + ')')

        tb.add_row(tb_row)
        ind += 1

    print(tb)

def generate_table_time(column_names, all_values, Exp_names, gpu=True):
    # Print tabular results in the console, in a pretty way

    # The FIR and IIR are the last on all_values
    # We need circular shift them to the right
    all_values[0] = all_values[0][-2::] + all_values[0][0:-2]
    all_values[1] = all_values[1][-2::] + all_values[1][0:-2]

    print('\n')

    tb = PrettyTable()
    ind = 0

    if gpu:
        device = 'GPU'
    else:
        device = 'CPU'

    for exp_name in Exp_names:
        tb.field_names = ['Method/Model'] + [column_names[0] + '(' + device + ') h:m:s:ms'] + [
            column_names[1] + '(' + device + ') h:m:s:ms']

        tb_row = []
        tb_row.append(exp_name)
        tb_row.append(all_values[0][ind])
        tb_row.append(all_values[1][ind])

        tb.add_row(tb_row)

        ind += 1

    print(tb)

    if gpu:
        print('* For FIR and IIR Filters is CPU since scipy filters are CPU based implementations')



In [31]:
####### LOAD EXPERIMENTS #######

# Load Results DRNN
with open('1006/test_results_' + dl_experiments[0] + '.pkl', 'rb') as input:
    test_DRNN = pickle.load(input)

# Load Results FCN_DAE
with open('1006/test_results_' + dl_experiments[1] + '.pkl', 'rb') as input:
    test_FCN_DAE = pickle.load(input)

# Load Results Vanilla L
with open('1006/test_results_' + dl_experiments[2] + '.pkl', 'rb') as input:
    test_Vanilla_L = pickle.load(input)

# Load Results Exp Vanilla NL
with open('1006/test_results_' + dl_experiments[3] + '.pkl', 'rb') as input:
    test_Vanilla_NL = pickle.load(input)

# Load Results Multibranch LANL
with open('1006/test_results_' + dl_experiments[4] + '.pkl', 'rb') as input:
    test_Multibranch_LANL = pickle.load(input)

# Load Results Multibranch LANLD
with open('1006/test_results_' + dl_experiments[5] + '.pkl', 'rb') as input:
    test_Multibranch_LANLD = pickle.load(input)
# Load Results Transformer_DAE

with open('1006/test_results_' + dl_experiments[6] + '.pkl', 'rb') as input:
    test_Transformer_DAE = pickle.load(input)

# # Load Results Transformer_FDAE
# with open('0920/test_results_' + dl_experiments[7] + '.pkl', 'rb') as input:
#     test_Transformer_FDAE = pickle.load(input)
    # Transformer_COMBDAE_with_CrossDomainAttention
# Load Results Transformer_FDAE
with open('1006/test_results_' + dl_experiments[7] + '.pkl', 'rb') as input:
    test_Transformer_COMBDAE = pickle.load(input)
        
with open('1004/test_results_' + dl_experiments[8] + '.pkl', 'rb') as input:
    test_Transformer_COMBDAE_FreTS = pickle.load(input)   
    
    
# Load Result FIR Filter
with open('1005/test_results_FIR.pkl', 'rb') as input:
    test_FIR = pickle.load(input)

# Load Result IIR Filter
with open('1005/test_results_IIR.pkl', 'rb') as input:
    test_IIR = pickle.load(input)

####### Calculate Metrics #######

print('Calculating metrics ...')

# DL Metrics

# Exp FCN-DAE

[X_test, y_test, y_pred] = test_DRNN

SSD_values_DL_DRNN = SSD(y_test, y_pred)

MAD_values_DL_DRNN = MAD(y_test, y_pred)

PRD_values_DL_DRNN = PRD(y_test, y_pred)

COS_SIM_values_DL_DRNN = COS_SIM(y_test, y_pred)

# Exp FCN-DAE

[X_test, y_test, y_pred] = test_FCN_DAE

SSD_values_DL_FCN_DAE = SSD(y_test, y_pred)

MAD_values_DL_FCN_DAE = MAD(y_test, y_pred)

PRD_values_DL_FCN_DAE = PRD(y_test, y_pred)

COS_SIM_values_DL_FCN_DAE = COS_SIM(y_test, y_pred)

# Vanilla L

[X_test, y_test, y_pred] = test_Vanilla_L

SSD_values_DL_exp_1 = SSD(y_test, y_pred)

MAD_values_DL_exp_1 = MAD(y_test, y_pred)

PRD_values_DL_exp_1 = PRD(y_test, y_pred)

COS_SIM_values_DL_exp_1 = COS_SIM(y_test, y_pred)

# Vanilla_NL

[X_test, y_test, y_pred] = test_Vanilla_NL

SSD_values_DL_exp_2 = SSD(y_test, y_pred)

MAD_values_DL_exp_2 = MAD(y_test, y_pred)

PRD_values_DL_exp_2 = PRD(y_test, y_pred)

COS_SIM_values_DL_exp_2 = COS_SIM(y_test, y_pred)

# Multibranch_LANL

[X_test, y_test, y_pred] = test_Multibranch_LANL

SSD_values_DL_exp_3 = SSD(y_test, y_pred)

MAD_values_DL_exp_3 = MAD(y_test, y_pred)

PRD_values_DL_exp_3 = PRD(y_test, y_pred)

COS_SIM_values_DL_exp_3 = COS_SIM(y_test, y_pred)

# Multibranch_LANLD

[X_test, y_test, y_pred] = test_Multibranch_LANLD

SSD_values_DL_exp_4 = SSD(y_test, y_pred)

MAD_values_DL_exp_4 = MAD(y_test, y_pred)

PRD_values_DL_exp_4 = PRD(y_test, y_pred)

COS_SIM_values_DL_exp_4 = COS_SIM(y_test, y_pred)


# Transformer_DAE

[X_test, y_test, y_pred] = test_Transformer_DAE

SSD_values_DL_exp_5 = SSD(y_test, y_pred)

MAD_values_DL_exp_5 = MAD(y_test, y_pred)

PRD_values_DL_exp_5 = PRD(y_test, y_pred)

COS_SIM_values_DL_exp_5 = COS_SIM(y_test, y_pred)

# Transformer_FDAE

[X_test, y_test, y_pred] = test_Transformer_COMBDAE

SSD_values_DL_exp_6 = SSD(y_test, y_pred)

MAD_values_DL_exp_6 = MAD(y_test, y_pred)

PRD_values_DL_exp_6 = PRD(y_test, y_pred)

COS_SIM_values_DL_exp_6 = COS_SIM(y_test, y_pred)


# # Transformer_FDAE

[X_test, y_test, y_pred] = test_Transformer_COMBDAE_FreTS

SSD_values_DL_exp_7 = SSD(y_test, y_pred)

MAD_values_DL_exp_7 = MAD(y_test, y_pred)

PRD_values_DL_exp_7 = PRD(y_test, y_pred)

COS_SIM_values_DL_exp_7 = COS_SIM(y_test, y_pred)




# Digital Filtering

# FIR Filtering Metrics
[X_test, y_test, y_filter] = test_FIR

SSD_values_FIR = SSD(y_test, y_filter)

MAD_values_FIR = MAD(y_test, y_filter)

PRD_values_FIR = PRD(y_test, y_filter)

COS_SIM_values_FIR = COS_SIM(y_test, y_filter)

# IIR Filtering Metrics (Best)
[X_test, y_test, y_filter] = test_IIR

SSD_values_IIR = SSD(y_test, y_filter)

MAD_values_IIR = MAD(y_test, y_filter)

PRD_values_IIR = PRD(y_test, y_filter)

COS_SIM_values_IIR = COS_SIM(y_test, y_filter)

####### Results Visualization #######

SSD_all = [SSD_values_FIR,
           SSD_values_IIR,
           SSD_values_DL_FCN_DAE,
           SSD_values_DL_DRNN,
           SSD_values_DL_exp_1,
           SSD_values_DL_exp_2,
           SSD_values_DL_exp_3,
           SSD_values_DL_exp_4,
            SSD_values_DL_exp_5,
           SSD_values_DL_exp_6,
           SSD_values_DL_exp_7
           ]

MAD_all = [MAD_values_FIR,
           MAD_values_IIR,
           MAD_values_DL_FCN_DAE,
           MAD_values_DL_DRNN,
           MAD_values_DL_exp_1,
           MAD_values_DL_exp_2,
           MAD_values_DL_exp_3,
           MAD_values_DL_exp_4,
           MAD_values_DL_exp_5,
           MAD_values_DL_exp_6,
           MAD_values_DL_exp_7
           ]

PRD_all = [PRD_values_FIR,
           PRD_values_IIR,
           PRD_values_DL_FCN_DAE,
           PRD_values_DL_DRNN,
           PRD_values_DL_exp_1,
           PRD_values_DL_exp_2,
           PRD_values_DL_exp_3,
           PRD_values_DL_exp_4,
           PRD_values_DL_exp_5,
           PRD_values_DL_exp_6,
           PRD_values_DL_exp_7
           ]

CORR_all = [COS_SIM_values_FIR,
            COS_SIM_values_IIR,
            COS_SIM_values_DL_FCN_DAE,
            COS_SIM_values_DL_DRNN,
            COS_SIM_values_DL_exp_1,
            COS_SIM_values_DL_exp_2,
            COS_SIM_values_DL_exp_3,
            COS_SIM_values_DL_exp_4,
            COS_SIM_values_DL_exp_5,
            COS_SIM_values_DL_exp_6,
            COS_SIM_values_DL_exp_7
            ]

Exp_names = ['FIR Filter', 'IIR Filter'] + dl_experiments

metrics = ['SSD', 'MAD', 'PRD', 'COS_SIM']
metric_values = [SSD_all, MAD_all, PRD_all, CORR_all]

# Metrics table
generate_table(metrics, metric_values, Exp_names)

# # Timing table
# timing_var = ['training', 'test']
# generate_table_time(timing_var, timing, Exp_names, gpu=True)


Calculating metrics ...


+---------------------------+------------------+---------------+------------------+---------------+
|        Method/Model       |       SSD        |      MAD      |       PRD        |    COS_SIM    |
+---------------------------+------------------+---------------+------------------+---------------+
|         FIR Filter        | 95.588 (159.388) | 1.067 (0.834) | 76.842 (19.781)  | 0.574 (0.235) |
|         IIR Filter        | 88.049 (143.279) | 1.051 (0.847) | 75.632 (20.763)  | 0.586 (0.237) |
|            DRNN           |  7.293 (9.894)   | 0.548 (0.380) | 63.972 (40.631)  | 0.861 (0.139) |
|          FCN-DAE          | 10.567 (15.440)  | 0.621 (0.418) | 83.941 (62.513)  | 0.781 (0.204) |
|         Vanilla L         | 22.038 (25.212)  | 0.753 (0.385) | 107.717 (33.303) | 0.594 (0.179) |
|         Vanilla NL        | 11.456 (14.291)  | 0.582 (0.367) | 96.566 (61.925)  | 0.759 (0.181) |
|      Multibranch LANL     |  9.836 (12.276)  | 0.515 (0.337) | 81.932 (4

In [32]:
np.array(test_DRNN).shape
# # np.array(test_Transformer_COMBDAE_FreTS).shape

(3, 13316, 512, 1)

In [59]:
import os
import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def ensure_directory(directory):
    """ 디렉토리가 존재하지 않으면 생성 """
    if not os.path.exists(directory):
        os.makedirs(directory)

def generate_hboxplot(np_data, description, ylabel, log, save_dir, filename, set_x_axis_size=None):
    # Process the results and store in Pandas DataFrame
    ensure_directory(save_dir)  # 디렉토리 생성
    col = description
    loss_val_np = np.rot90(np_data)
    pd_df = pd.DataFrame.from_records(loss_val_np, columns=col)

    # Set up the matplotlib figure
    sns.set(style="whitegrid")
    f, ax = plt.subplots(figsize=(15, 6))

    ax = sns.boxplot(data=pd_df, orient="h", width=0.4)  # 가로로 누운 boxplot

    if log:
        ax.set_xscale("log")

    if set_x_axis_size is not None:
        ax.set_xlim(set_x_axis_size)

    ax.set(ylabel='Models/Methods', xlabel=ylabel)
    ax = sns.despine(left=True, bottom=True)

    # Save plot to file
    filepath = os.path.join(save_dir, filename)
    plt.savefig(filepath)
    plt.close()
    print(f"Saved: {filepath}")


def generate_violinplots(np_data, description, ylabel, log, save_dir, filename, set_x_axis_size=None):
    # Process the results and store in Pandas DataFrame
    ensure_directory(save_dir)  # 디렉토리 생성
    col = description
    loss_val_np = np.rot90(np_data)
    pd_df = pd.DataFrame.from_records(loss_val_np, columns=col)

    # Set up the matplotlib figure
    f, ax = plt.subplots(figsize=(15, 6))
    sns.set(style="whitegrid")
    ax = sns.violinplot(data=pd_df, palette="Set3", bw=.2, cut=1, linewidth=1, orient="h")  # 가로로 누운 violinplot

    if log:
        ax.set_xscale("log")

    if set_x_axis_size is not None:
        ax.set_xlim(set_x_axis_size)

    ax.set(xlabel='Models/Methods', ylabel=ylabel)
    ax = sns.despine(left=True, bottom=True)

    # Save plot to file
    filepath = os.path.join(save_dir, filename)
    plt.savefig(filepath)
    plt.close()
    print(f"Saved: {filepath}")


def generate_barplot(np_data, description, ylabel, log, save_dir, filename, set_x_axis_size=None):
    # Process the results and store in Pandas DataFrame
    ensure_directory(save_dir)  # 디렉토리 생성
    col = description
    loss_val_np = np.rot90(np_data)
    pd_df = pd.DataFrame.from_records(loss_val_np, columns=col)

    # Set up the matplotlib figure
    f, ax = plt.subplots(figsize=(15, 6))
    sns.set(style="whitegrid")
    ax = sns.barplot(data=pd_df, orient="h")  # 가로로 누운 barplot

    if log:
        ax.set_xscale("log")

    if set_x_axis_size is not None:
        ax.set_xlim(set_x_axis_size)

    ax.set(xlabel='Models/Methods', ylabel=ylabel)
    ax = sns.despine(left=True, bottom=True)

    # Save plot to file
    filepath = os.path.join(save_dir, filename)
    plt.savefig(filepath)
    plt.close()
    print(f"Saved: {filepath}")


def generate_boxplot(np_data, description, ylabel, log, save_dir, filename, set_x_axis_size=None):
    # Process the results and store in Pandas DataFrame
    ensure_directory(save_dir)  # 디렉토리 생성
    col = description
    loss_val_np = np.rot90(np_data)
    pd_df = pd.DataFrame.from_records(loss_val_np, columns=col)

    # Set up the matplotlib figure
    f, ax = plt.subplots(figsize=(15, 6))
    sns.set(style="whitegrid")
    ax = sns.boxplot(data=pd_df, orient="h")  # 가로로 누운 boxplot

    if log:
        ax.set_xscale("log")

    if set_x_axis_size is not None:
        ax.set_xlim(set_x_axis_size)

    ax.set(xlabel='Models/Methods', ylabel=ylabel)
    ax = sns.despine(left=True, bottom=True)

    # Save plot to file
    filepath = os.path.join(save_dir, filename)
    plt.savefig(filepath)
    plt.close()
    print(f"Saved: {filepath}")


# 저장 경로 설정
save_directory = 'plots'

# 파일 이름 설정
filename_ssd_hbox = 'ssd_hboxplot.png'
filename_ssd_violin = 'ssd_violinplot.png'
filename_ssd_bar = 'ssd_barplot.png'
filename_ssd_box = 'ssd_boxplot.png'

filename_mad_hbox = 'mad_hboxplot.png'
filename_mad_violin = 'mad_violinplot.png'
filename_mad_bar = 'mad_barplot.png'
filename_mad_box = 'mad_boxplot.png'

filename_prd_hbox = 'prd_hboxplot.png'
filename_prd_violin = 'prd_violinplot.png'
filename_prd_bar = 'prd_barplot.png'
filename_prd_box = 'prd_boxplot.png'

filename_cos_hbox = 'cos_hboxplot.png'
filename_cos_violin = 'cos_violinplot.png'
filename_cos_bar = 'cos_barplot.png'
filename_cos_box = 'cos_boxplot.png'

# SSD 그래프들 생성
print("Generating SSD plots...")
generate_hboxplot(SSD_all, Exp_names, 'SSD (au)', log=False, save_dir=save_directory, filename=filename_ssd_hbox, set_x_axis_size=(0, 100.1))
generate_violinplots(SSD_all, Exp_names, 'SSD (au)', log=False, save_dir=save_directory, filename=filename_ssd_violin, set_x_axis_size=(0, 100.1))
generate_barplot(SSD_all, Exp_names, 'SSD (au)', log=False, save_dir=save_directory, filename=filename_ssd_bar, set_x_axis_size=(0, 100.1))
generate_boxplot(SSD_all, Exp_names, 'SSD (au)', log=False, save_dir=save_directory, filename=filename_ssd_box, set_x_axis_size=(0, 100.1))

# MAD 그래프들 생성
print("Generating MAD plots...")
generate_hboxplot(MAD_all, Exp_names, 'MAD (au)', log=False, save_dir=save_directory, filename=filename_mad_hbox, set_x_axis_size=(0, 3.01))
generate_violinplots(MAD_all, Exp_names, 'MAD (au)', log=False, save_dir=save_directory, filename=filename_mad_violin, set_x_axis_size=(0, 3.01))
generate_barplot(MAD_all, Exp_names, 'MAD (au)', log=False, save_dir=save_directory, filename=filename_mad_bar, set_x_axis_size=(0, 3.01))
generate_boxplot(MAD_all, Exp_names, 'MAD (au)', log=False, save_dir=save_directory, filename=filename_mad_box, set_x_axis_size=(0, 3.01))

# PRD 그래프들 생성
print("Generating PRD plots...")
generate_hboxplot(PRD_all, Exp_names, 'PRD (au)', log=False, save_dir=save_directory, filename=filename_prd_hbox, set_x_axis_size=(0, 150.1))
generate_violinplots(PRD_all, Exp_names, 'PRD (au)', log=False, save_dir=save_directory, filename=filename_prd_violin, set_x_axis_size=(0, 150.1))
generate_barplot(PRD_all, Exp_names, 'PRD (au)', log=False, save_dir=save_directory, filename=filename_prd_bar, set_x_axis_size=(0, 150.1))
generate_boxplot(PRD_all, Exp_names, 'PRD (au)', log=False, save_dir=save_directory, filename=filename_prd_box, set_x_axis_size=(0, 150.1))

# Cosine Similarity 그래프들 생성
print("Generating Cosine Similarity plots...")
generate_hboxplot(CORR_all, Exp_names, 'Cosine Similarity (0-1)', log=False, save_dir=save_directory, filename=filename_cos_hbox, set_x_axis_size=(0, 1))
generate_violinplots(CORR_all, Exp_names, 'Cosine Similarity (0-1)', log=False, save_dir=save_directory, filename=filename_cos_violin, set_x_axis_size=(0, 1))
generate_barplot(CORR_all, Exp_names, 'Cosine Similarity (0-1)', log=False, save_dir=save_directory, filename=filename_cos_bar, set_x_axis_size=(0, 1))
generate_boxplot(CORR_all, Exp_names, 'Cosine Similarity (0-1)', log=False, save_dir=save_directory, filename=filename_cos_box, set_x_axis_size=(0, 1))


Generating SSD plots...
Saved: plots/ssd_hboxplot.png
Saved: plots/ssd_violinplot.png
Saved: plots/ssd_barplot.png
Saved: plots/ssd_boxplot.png
Generating MAD plots...
Saved: plots/mad_hboxplot.png
Saved: plots/mad_violinplot.png
Saved: plots/mad_barplot.png
Saved: plots/mad_boxplot.png
Generating PRD plots...
Saved: plots/prd_hboxplot.png
Saved: plots/prd_violinplot.png
Saved: plots/prd_barplot.png
Saved: plots/prd_boxplot.png
Generating Cosine Similarity plots...
Saved: plots/cos_hboxplot.png
Saved: plots/cos_violinplot.png
Saved: plots/cos_barplot.png
Saved: plots/cos_boxplot.png


In [60]:

# import seaborn as sns
# import pandas as pd
# import matplotlib.pyplot as plt
# import numpy as np

# def generate_hboxplot(np_data, description, ylabel, log, set_x_axis_size=None):
#     # Process the results and store in Panda objects
#     col = description
#     loss_val_np = np.rot90(np_data)

#     pd_df = pd.DataFrame.from_records(loss_val_np, columns=col)

#     # Set up the matplotlib figure
#     sns.set(style="whitegrid")

#     f, ax = plt.subplots(figsize=(15, 6))

#     ax = sns.boxplot(data=pd_df, orient="h", width=0.4)

#     if log:
#         ax.set_xscale("log")

#     if set_x_axis_size != None:
#         ax.set_xlim(set_x_axis_size)

#     ax.set(ylabel='Models/Methods', xlabel=ylabel)
#     ax = sns.despine(left=True, bottom=True)

#     plt.show()

# # Metrics graphs
# print('SSD Metric comparative graph')
# generate_hboxplot(SSD_all, Exp_names, 'SSD (au)', log=False, set_x_axis_size=(0, 100.1))
# print('MAD Metric comparative graph')
# generate_hboxplot(MAD_all, Exp_names, 'MAD (au)', log=False, set_x_axis_size=(0, 3.01))
# print('PRD Metric comparative graph')
# generate_hboxplot(PRD_all, Exp_names, 'PRD (au)', log=False, set_x_axis_size=(0, 150.1))
# print('Cosine Similarity Metric comparative graph')
# generate_hboxplot(CORR_all, Exp_names, 'Cosine Similarity (0-1)', log=False, set_x_axis_size=(0, 1))

In [62]:
rnd_test = np.load('rnd_test.npy')
# rnd_test = np.concatenate([rnd_test, rnd_test])
segm = [0.2, 0.6, 1.0, 1.5, 2.0]  # real number of segmentations is len(segmentations) - 1
SSD_seg_all = []
MAD_seg_all = []
PRD_seg_all = []
COS_SIM_seg_all = []
for idx_exp in range(len(Exp_names)):
    SSD_seg = [None] * (len(segm) - 1)
    MAD_seg = [None] * (len(segm) - 1)
    PRD_seg = [None] * (len(segm) - 1)
    COS_SIM_seg = [None] * (len(segm) - 1)
    for idx_seg in range(len(segm) - 1):
        SSD_seg[idx_seg] = []
        MAD_seg[idx_seg] = []
        PRD_seg[idx_seg] = []
        COS_SIM_seg[idx_seg] = []
        for idx in range(len(rnd_test)):
            # Object under analysis (oua)
            # SSD
            oua = SSD_all[idx_exp][idx]
            if rnd_test[idx] > segm[idx_seg] and rnd_test[idx] < segm[idx_seg + 1]:
                SSD_seg[idx_seg].append(oua)
            # MAD
            oua = MAD_all[idx_exp][idx]
            if rnd_test[idx] > segm[idx_seg] and rnd_test[idx] < segm[idx_seg + 1]:
                MAD_seg[idx_seg].append(oua)
            # PRD
            oua = PRD_all[idx_exp][idx]
            if rnd_test[idx] > segm[idx_seg] and rnd_test[idx] < segm[idx_seg + 1]:
                PRD_seg[idx_seg].append(oua)
            # COS SIM
            oua = CORR_all[idx_exp][idx]
            if rnd_test[idx] > segm[idx_seg] and rnd_test[idx] < segm[idx_seg + 1]:
                COS_SIM_seg[idx_seg].append(oua)
    # Processing the last index
    # SSD
    SSD_seg[-1] = []
    for idx in range(len(rnd_test)):
        # Object under analysis
        oua = SSD_all[idx_exp][idx]
        if rnd_test[idx] > segm[-2]:
            SSD_seg[-1].append(oua)
    SSD_seg_all.append(SSD_seg)  # [exp][seg][item]
    # MAD
    MAD_seg[-1] = []
    for idx in range(len(rnd_test)):
        # Object under analysis
        oua = MAD_all[idx_exp][idx]
        if rnd_test[idx] > segm[-2]:
            MAD_seg[-1].append(oua)
    MAD_seg_all.append(MAD_seg)  # [exp][seg][item]
    # PRD
    PRD_seg[-1] = []
    for idx in range(len(rnd_test)):
        # Object under analysis
        oua = PRD_all[idx_exp][idx]
        if rnd_test[idx] > segm[-2]:
            PRD_seg[-1].append(oua)
    PRD_seg_all.append(PRD_seg)  # [exp][seg][item]
    # COS SIM
    COS_SIM_seg[-1] = []
    for idx in range(len(rnd_test)):
        # Object under analysis
        oua = CORR_all[idx_exp][idx]
        if rnd_test[idx] > segm[-2]:
            COS_SIM_seg[-1].append(oua)
    COS_SIM_seg_all.append(COS_SIM_seg)  # [exp][seg][item]
# Printing Tables
seg_table_column_name = []
for idx_seg in range(len(segm) - 1):
    column_name = str(segm[idx_seg]) + ' < noise < ' + str(segm[idx_seg + 1])
    seg_table_column_name.append(column_name)
# SSD Table
SSD_seg_all = np.array(SSD_seg_all)
SSD_seg_all = np.swapaxes(SSD_seg_all, 0, 1)
print('\n')
print('Printing Table for different noise values on the SSD metric')
generate_table(seg_table_column_name, SSD_seg_all, Exp_names)
# MAD Table
MAD_seg_all = np.array(MAD_seg_all)
MAD_seg_all = np.swapaxes(MAD_seg_all, 0, 1)
print('\n')
print('Printing Table for different noise values on the MAD metric')
generate_table(seg_table_column_name, MAD_seg_all, Exp_names)
# PRD Table
PRD_seg_all = np.array(PRD_seg_all)
PRD_seg_all = np.swapaxes(PRD_seg_all, 0, 1)
print('\n')
print('Printing Table for different noise values on the PRD metric')
generate_table(seg_table_column_name, PRD_seg_all, Exp_names)
# COS SIM Table
COS_SIM_seg_all = np.array(COS_SIM_seg_all)
COS_SIM_seg_all = np.swapaxes(COS_SIM_seg_all, 0, 1)
print('\n')
print('Printing Table for different noise values on the COS SIM metric')
generate_table(seg_table_column_name, COS_SIM_seg_all, Exp_names)





Printing Table for different noise values on the SSD metric


+---------------------------+-------------------+-------------------+-------------------+-------------------+
|        Method/Model       | 0.2 < noise < 0.6 | 0.6 < noise < 1.0 | 1.0 < noise < 1.5 | 1.5 < noise < 2.0 |
+---------------------------+-------------------+-------------------+-------------------+-------------------+
|         FIR Filter        |  12.707 (16.084)  |  43.508 (52.420)  | 102.134 (120.641) | 199.366 (236.760) |
|         IIR Filter        |  11.738 (14.964)  |  40.651 (50.495)  |  95.015 (112.642) | 182.173 (209.662) |
|            DRNN           |   5.038 (6.624)   |   6.237 (9.049)   |   7.405 (8.984)   |   9.812 (12.342)  |
|          FCN-DAE          |   6.447 (8.270)   |   8.485 (10.975)  |  10.877 (14.359)  |  15.213 (20.816)  |
|         Vanilla L         |   10.072 (7.180)  |  15.072 (14.794)  |  23.063 (20.751)  |  36.440 (35.379)  |
|         Vanilla NL        |   6.226 (6.762)   |   9.43

