## TSAD_JY[Final_X]

### Hyperparameter setting

In [None]:
from argparse import ArgumentParser

parser = ArgumentParser(description="Univariate with Exogenous TSAD")

parser.add_argument('--train_path', default='data/raw_data/train/Training_Data.csv', type=str, help='path to the data')
parser.add_argument('--test1_path', default='data/raw_data/test/WeldingTest_01_OK.csv', type=str, help='path to the data')
parser.add_argument('--test2_path', default='data/raw_data/test/WeldingTest_02_OK.csv', type=str, help='path to the data')
parser.add_argument('--test3_path', default='data/raw_data/test/WeldingTest_03_NG.csv', type=str, help='path to the data')
parser.add_argument('--test4_path', default='data/raw_data/test/WeldingTest_04_NG.csv', type=str, help='path to the data')
parser.add_argument('--label3_path', default='data/preprocessed/test/WeldingTest_03_NG_Label.csv', type=str, help='path to the data')

parser.add_argument('--horizon', default=10, type=int, help='forecast horizon')
parser.add_argument('--input_size', default=20, type=int, help='input size')
parser.add_argument('--threshold', default=4.0, type=float, help='threshold')

args = parser.parse_args('')

CFG = {
    "train"  : args.train_path,
    "test1"  : args.test1_path,
    "test2"  : args.test2_path,
    "test3"  : args.test3_path,
    "test4"  : args.test4_path,
    "label3" : args.label3_path,
    
    "horizon"     : args.horizon,
    "input_size"  : args.input_size,
    "threshold"   : args.threshold
    }

### preprocess.py

In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'  ## MULTI GPU 사용시 고정

import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

In [None]:
root = os.getcwd()
path = os.path.join(root, CFG['train']) # path 수정
data = pd.read_csv(path)
data = data.rename(columns=lambda col_name: col_name.strip())

data.drop(columns=['SetFrequency', 'SetDuty'], inplace=True)

cols = [data.columns[3]]
df_rp = data[cols]
for i in range(len(data)):
    if data[data.columns[3]][i] < 1300.0:
        df_rp[data.columns[3]][i] = df_rp[data.columns[3]][i] +930.5

data.loc[:, 'RealPower'] = df_rp

In [None]:
use_columns = data.columns.tolist()
# use_columns

In [None]:
len(data) * 0.8

In [None]:
data = data[:10800]
len(data)

In [None]:
path  = os.path.join(root, CFG['test1']) # path 수정
test1 = pd.read_csv(path)
test1 = test1.rename(columns=lambda col_name: col_name.strip())
test1 = test1[use_columns]

cols = [test1.columns[3]]

df_test1 = test1[cols]
for i in range(len(test1)):
    if test1[test1.columns[3]][i] <1300.0:
        df_test1[test1.columns[3]][i] = df_test1[test1.columns[3]][i] +930.5

test1.loc[:, 'RealPower'] = df_test1

In [None]:
path  = os.path.join(root, CFG['test2']) # path 수정
test2 = pd.read_csv(path)
test2 = test2.rename(columns=lambda col_name: col_name.strip())
test2 = test2[use_columns]

cols = [test2.columns[3]]

df_test2 = test2[cols]
for i in range(len(test2)):
    if test2[test2.columns[3]][i] <1300.0:
        df_test2[test2.columns[3]][i] = df_test2[test2.columns[3]][i] +930.5

test2.loc[:, 'RealPower'] = df_test2

In [None]:
path  = os.path.join(root, CFG['test3']) # path 수정
test3 = pd.read_csv(path)
test3 = test3.rename(columns=lambda col_name: col_name.strip())
test3 = test3[use_columns]

cols = [test3.columns[3]]

df_test3 = test3[cols]
for i in range(len(test3)):
    if test3[test3.columns[3]][i] <1300.0:
        df_test3[test3.columns[3]][i] = df_test3[test3.columns[3]][i] +930.5

test3.loc[:, 'RealPower'] = df_test3

In [None]:
path  = os.path.join(root, CFG['test4']) # path 수정
test4 = pd.read_csv(path)
test4 = test4.rename(columns=lambda col_name: col_name.strip())
test4 = test4[use_columns]

cols = [test4.columns[3]]

df_test4 = test4[cols]
for i in range(len(test4)):
    if test4[test4.columns[3]][i] <1300.0:
        df_test4[test4.columns[3]][i] = df_test4[test4.columns[3]][i] +930.5

test4.loc[:, 'RealPower'] = df_test4

In [None]:
unique_id_cols = ['PageNo', 'Speed', 'Length', 'RealPower', 'SetPower', 'GateOnTime']

data['ds'] = pd.to_datetime(data.index, unit='s')

train_data = data.rename(columns={'RealPower': 'y'}).assign(
    unique_id='RealPower'
)

train_data  = train_data[['ds', 'unique_id', 'y'] + [col for col in unique_id_cols if col != 'RealPower']]

test1['ds'] = pd.to_datetime(test1.index, unit='s')
test_data1  = test1.rename(columns={'RealPower': 'y'}).assign(
    unique_id='RealPower'
)
test_data1  = test_data1[['ds', 'unique_id', 'y'] + [col for col in unique_id_cols if col != 'RealPower']]

test2['ds'] = pd.to_datetime(test2.index, unit='s')
test_data2  = test2.rename(columns={'RealPower': 'y'}).assign(
    unique_id='RealPower'
)
test_data2  = test_data2[['ds', 'unique_id', 'y'] + [col for col in unique_id_cols if col != 'RealPower']]

test3['ds'] = pd.to_datetime(test3.index, unit='s')
test_data3  = test3.rename(columns={'RealPower': 'y'}).assign(
    unique_id='RealPower'
)
test_data3  = test_data3[['ds', 'unique_id', 'y'] + [col for col in unique_id_cols if col != 'RealPower']]

test4['ds'] = pd.to_datetime(test4.index, unit='s')
test_data4  = test4.rename(columns={'RealPower': 'y'}).assign(
    unique_id='RealPower'
)
test_data4  = test_data4[['ds', 'unique_id', 'y'] + [col for col in unique_id_cols if col != 'RealPower']]

### model.py
* 사용 모델: TSMixerx, NHITS, NBEATSx, LSTM, GRU

In [None]:
from neuralforecast import NeuralForecast
from neuralforecast.models import TSMixerx, NHITS, NBEATSx, LSTM, GRU
from neuralforecast.losses.pytorch import MAE, MQLoss, MSE

import math
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import accuracy_score, mean_absolute_error, mean_squared_error

In [None]:
horizon    = CFG['horizon']
input_size = CFG['input_size']

models = [
            TSMixerx(
                h           = horizon,
                input_size  = input_size,
                n_series    = 1,            ## exogenous 변수가 없기 때문 (외부 변수)
                scaler_type = 'identity',
                max_steps   = 500,
                loss        = MSE(),
                valid_loss  = MSE(),
                batch_size  = 128,
                random_seed = 42,
                val_check_steps = 100,
                early_stop_patience_steps = 5,
                hist_exog_list = ['PageNo', 'Speed', 'Length', 'SetPower', 'GateOnTime'],
                ),

            NHITS(
                h           = horizon,
                input_size  = input_size,
                scaler_type = 'identity',
                loss        = MSE(),
                valid_loss  = MSE(),
                batch_size  = 128,
                random_seed = 42,
                val_check_steps = 5,
                hist_exog_list = ['PageNo', 'Speed', 'Length', 'SetPower', 'GateOnTime'],
                early_stop_patience_steps = -1,
                ),

            NBEATSx(
                h           = horizon,
                input_size  = input_size,
                scaler_type = 'identity',
                loss        = MSE(),
                valid_loss  = MSE(),
                batch_size  = 128,
                random_seed = 42,
                val_check_steps = 5,
                hist_exog_list = ['PageNo', 'Speed', 'Length', 'SetPower', 'GateOnTime'],
                early_stop_patience_steps = -1,
                ),
            
            LSTM(
                h           = horizon,
                input_size  = input_size,
                scaler_type = 'identity',
                loss        = MSE(),
                valid_loss  = MSE(),
                batch_size  = 128,
                random_seed = 42,
                val_check_steps = 5,
                hist_exog_list = ['PageNo', 'Speed', 'Length', 'SetPower', 'GateOnTime'],
                early_stop_patience_steps = -1,
                ),
            
            GRU(
                h           = horizon,
                input_size  = input_size,
                scaler_type = 'identity',
                loss        = MSE(),
                valid_loss  = MSE(),
                batch_size  = 128,
                random_seed = 42,
                val_check_steps = 5,
                hist_exog_list = ['PageNo', 'Speed', 'Length', 'SetPower', 'GateOnTime'],
                early_stop_patience_steps = -1,
                ),
            
            ]

In [None]:
nf = NeuralForecast(models = models, freq = 's')
train_hat_data = nf.cross_validation(
                                    df = train_data,
                                    val_size  = CFG['horizon'],
                                    test_size = CFG['horizon'],
                                    n_windows = None,
                                    verbose   = False,)

In [None]:
train_hat_data = train_hat_data.reset_index()

### Inference NG3

In [None]:
## 예측을 위해 windowed dataset 생성 (NIXTLA 맞춤)

def create_windowed_dataset(data, window_length=input_size, stride=horizon):
    
    total_windowed_data = []
    total_y_true_data   = []

    unique_ids = data['unique_id'].unique()
    sample_data = data[data['unique_id'] == unique_ids[0]]
    
    for start in range(0, len(sample_data) - window_length - horizon + 1, stride):
        windowed_data = []
        y_true_data   = []
    
        end = start + window_length

        for unique_id in unique_ids:
            data_id = data[data['unique_id'] == unique_id]
            window = data_id.iloc[start:end]
            windowed_data.append(window)
            
            y_true = data_id.iloc[end:end+horizon]
            y_true_data.append(y_true)

        total_windowed_data.append(pd.concat(windowed_data))
        total_y_true_data.append(pd.concat(y_true_data))
        
    return total_windowed_data, total_y_true_data

In [None]:
## windowed dataset 예측

def time_series_anomaly_detection(nf, total_windowed_data):
    
    forecasts = []
    
    for window in total_windowed_data:

        forecast = nf.predict(window, verbose=False)
        forecasts.append(forecast)

    return forecasts

In [None]:
def anomaly_score(merged_df, gt, error_name_list, thres=CFG['threshold']):

    total_length = merged_df.shape[0]
    gt   = gt[20 : 20 + total_length]
    gt   = np.array(gt)
    threshold = thres

    for model in model_name_list:

        final_scores = merged_df[f'{model}_error'].to_numpy()
        avg   = np.mean(final_scores)
        sigma = np.std(final_scores)

        Z_score = (final_scores - avg) / sigma
        pred = (np.abs(Z_score) > threshold).astype(int)
        pred = np.array(pred)
        
        accuracy = accuracy_score(gt, pred)
        precision, recall, f_score, support = precision_recall_fscore_support(gt, pred,
            average='binary')
        
        print(
            "Accuracy : {:0.4f}, Precision : {:0.4f}, Recall : {:0.4f}, F-score : {:0.4f}".format(
            accuracy, precision,
            recall, f_score), f" --- {model}")

In [None]:
total_windowed_data, total_y_true_data = create_windowed_dataset(test_data3)

In [None]:
## (주의) 출력이 아주 김


forecasts = time_series_anomaly_detection(nf, total_windowed_data)

In [None]:
forecasts_df = pd.concat(forecasts)
forecasts_df = forecasts_df.sort_values(by=["unique_id", "ds"])
forecasts_df['unique_id'] = forecasts_df.index
forecasts_df.reset_index(drop=True, inplace=True)

# forecasts_df.head()
# forecasts_df.shape

In [None]:
total_y_true_df = pd.concat(total_y_true_data)
total_y_true_df = total_y_true_df.sort_values(by=["unique_id", "ds"])

# total_y_true_df.head()
# total_y_true_df.shape

In [None]:
merged_df = pd.merge(total_y_true_df, forecasts_df, on=["ds", "unique_id"], how="inner")

# merged_df.head()
# merged_df.shape

In [None]:
for model in models:
    merged_df[f'{model}_error'] = merged_df[f'{model}'] - merged_df['y']
    
    # mae_model = mae(merged_df['y'], merged_df[f'{model}'])
    # mse_model = mse(merged_df['y'], merged_df[f'{model}'])
    
    # print(f'{model} horizon {horizon} - MAE: {mae_model:.3f}')
    # print(f'{model} horizon {horizon} - MSE: {mse_model:.3f}')

In [None]:
## 전체 실험 결과 출력

model_name_list = ['TSMixerx', 'NHITS', 'NBEATSx', 'LSTM', 'GRU']
realpower_df  = merged_df[merged_df["unique_id"] == "RealPower"]
label_df      = pd.read_csv(os.path.join(root, CFG['label3']))
true_label    = label_df['label']

anomaly_score(realpower_df, true_label, model_name_list)

### visualization.py

In [None]:
## visual model 결정하면 됨

# model_name_list = ['TSMixerx', 'NHITS', 'NBEATSx', 'LSTM', 'GRU']

visual_model = 'TSMixerx'
errors = merged_df.loc[merged_df['unique_id'] == 'RealPower', f'{visual_model}_error'] ## error 바꿔보기

In [None]:
min_  = -10.0
max_  = 10.0
bins_ = 100

hist_counts, bin_edges = np.histogram(errors, bins=bins_, range=(min_, max_))

factor = (max_ - min_) / bins_
bins   = bins_

x = []
for i in range(bins):
    x.append(min_ + factor * float(i))

plt.bar(x, hist_counts, align='center', width=factor)
plt.xlabel('Prediction Errors')
plt.ylabel('Number of Error Values')
plt.title('Histogram of Prediction Errors')
plt.show()

In [None]:
## 실제 값 비교 시각화

realpower_df = merged_df[merged_df["unique_id"] == "RealPower"]

# Plot TSMixer, NHITS, and y over the ds (time) axis
plt.figure(figsize=(30, 12))
plt.plot(realpower_df["ds"], realpower_df[f"{visual_model}"], label=f"{visual_model}", linestyle='-')#, marker='o')
# plt.plot(realpower_df["ds"], realpower_df["NHITS"], label="NHITS", linestyle='-')#, marker='x')
plt.plot(realpower_df["ds"], realpower_df["y"], label="Actual (y)", linestyle='-')#, marker='s')

# Configure the plot
plt.xlabel("Time (ds)")
plt.ylabel("Values")
plt.title("RealPower - TSMixer, NHITS, and Actual Values Over Time")
plt.legend()
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
## 잔차 시각화

realpower_df = merged_df[merged_df["unique_id"] == "RealPower"]

# Plot TSMixer, NHITS, and y over the ds (time) axis
plt.figure(figsize=(30, 12))
plt.plot(realpower_df["ds"], realpower_df[f"{visual_model}_error"], label=f"{visual_model}_error", linestyle='-')#, marker='o')
# plt.plot(realpower_df["ds"], realpower_df["NHITS_error"], label="NHITS_error", linestyle='-')#, marker='x')

# Configure the plot
plt.xlabel("Time (ds)")
plt.ylabel("Values")
plt.title("RealPower - TSMixer, NHITS, and Actual Values Over Time")
plt.legend()
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
## Z 스코어 시각화

# error values
final_scores = realpower_df[f"{visual_model}_error"].to_numpy()  ## error 값 수정하여 비교
threshold    = CFG['threshold']

# Calculate Z-scores
avg     = np.mean(final_scores)
sigma   = np.std(final_scores)
Z_score = (final_scores - avg) / sigma

# Anomaly detection using numpy (vectorized approach)
pred_bin1 = (np.abs(Z_score) > threshold).astype(int)

# Threshold lines for visualization
pred_length = len(final_scores)
thre        = np.full(pred_length, threshold)
thre_minus  = np.full(pred_length, -threshold)

# Plot the Z-scores with thresholds
plt.figure(figsize=(14, 7))
plt.plot(Z_score, label="Z-score", marker='o')
plt.plot(thre, label="Threshold", linestyle="--", color="red")
plt.plot(thre_minus, label="-Threshold", linestyle="--", color="blue")
plt.title("Z-Score Anomaly Detection")
plt.xlabel("Index")
plt.ylabel("Z-Score")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
label_df   = pd.read_csv(os.path.join(root, CFG['label3']))
true_label = label_df['label']

pred_bin = pred_bin1

true = []
true = true_label[20:20+ pred_length]

pred = np.array(pred_bin)
gt   = np.array(true)
print(pred.shape, gt.shape, true_label.shape, pred_length)
accuracy = accuracy_score(gt, pred)
precision, recall, f_score, support = precision_recall_fscore_support(gt, pred,
    average='binary')

print(pred.shape, gt.shape)
print(
    "Accuracy : {:0.4f}, Precision : {:0.4f}, Recall : {:0.4f}, F-score : {:0.4f}".format(
    accuracy, precision,
    recall, f_score))

### Inference OK1

In [None]:
total_windowed_data, total_y_true_data = create_windowed_dataset(test_data1)

In [None]:
## (주의) 출력이 아주 김, 끝나면 출력 삭제


forecasts = time_series_anomaly_detection(nf, total_windowed_data)

In [None]:
forecasts_df = pd.concat(forecasts)
forecasts_df = forecasts_df.sort_values(by=["unique_id", "ds"])
forecasts_df['unique_id'] = forecasts_df.index
forecasts_df.reset_index(drop=True, inplace=True)

# forecasts_df.head()
# forecasts_df.shape

In [None]:
total_y_true_df = pd.concat(total_y_true_data)
total_y_true_df = total_y_true_df.sort_values(by=["unique_id", "ds"])

# total_y_true_df.head()
# total_y_true_df.shape

In [None]:
merged_df = pd.merge(total_y_true_df, forecasts_df, on=["ds", "unique_id"], how="inner")

# merged_df.head()
# merged_df.shape

In [None]:
for model in models:
    merged_df[f'{model}_error'] = merged_df[f'{model}'] - merged_df['y']
    
    # mae_model = mae(merged_df['y'], merged_df[f'{model}'])
    # mse_model = mse(merged_df['y'], merged_df[f'{model}'])
    
    # print(f'{model} horizon {horizon} - MAE: {mae_model:.3f}')
    # print(f'{model} horizon {horizon} - MSE: {mse_model:.3f}')

In [None]:
## visual model 결정하면 됨

# model_name_list = ['TSMixerx', 'NHITS', 'NBEATSx', 'LSTM', 'GRU']

visual_model = 'TSMixerx'
errors = merged_df.loc[merged_df['unique_id'] == 'RealPower', f'{visual_model}_error'] ## error 바꿔보기

In [None]:
min_  = -10.0
max_  = 10.0
bins_ = 100

hist_counts, bin_edges = np.histogram(errors, bins=bins_, range=(min_, max_))

factor = (max_ - min_) / bins_
bins   = bins_

x = []
for i in range(bins):
    x.append(min_ + factor * float(i))

plt.bar(x, hist_counts, align='center', width=factor)
plt.xlabel('Prediction Errors')
plt.ylabel('Number of Error Values')
plt.title('Histogram of Prediction Errors')
plt.show()

In [None]:
## 실제 값 비교 시각화

realpower_df = merged_df[merged_df["unique_id"] == "RealPower"]

# Plot TSMixer, NHITS, and y over the ds (time) axis
plt.figure(figsize=(30, 12))
plt.plot(realpower_df["ds"], realpower_df[f"{visual_model}"], label=f"{visual_model}", linestyle='-')#, marker='o')
# plt.plot(realpower_df["ds"], realpower_df["NHITS"], label="NHITS", linestyle='-')#, marker='x')
plt.plot(realpower_df["ds"], realpower_df["y"], label="Actual (y)", linestyle='-')#, marker='s')

# Configure the plot
plt.xlabel("Time (ds)")
plt.ylabel("Values")
plt.title("RealPower - TSMixer, NHITS, and Actual Values Over Time")
plt.legend()
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
## 잔차 시각화

realpower_df = merged_df[merged_df["unique_id"] == "RealPower"]

# Plot TSMixer, NHITS, and y over the ds (time) axis
plt.figure(figsize=(30, 12))
plt.plot(realpower_df["ds"], realpower_df[f"{visual_model}_error"], label=f"{visual_model}_error", linestyle='-')#, marker='o')
# plt.plot(realpower_df["ds"], realpower_df["NHITS_error"], label="NHITS_error", linestyle='-')#, marker='x')

# Configure the plot
plt.xlabel("Time (ds)")
plt.ylabel("Values")
plt.title("RealPower - TSMixer, NHITS, and Actual Values Over Time")
plt.legend()
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
## Z 스코어 시각화

# error values
final_scores = realpower_df[f"{visual_model}_error"].to_numpy()  ## error 값 수정하여 비교
threshold    = CFG['threshold']

# Calculate Z-scores
avg     = np.mean(final_scores)
sigma   = np.std(final_scores)
Z_score = (final_scores - avg) / sigma

# Anomaly detection using numpy (vectorized approach)
pred_bin1 = (np.abs(Z_score) > threshold).astype(int)

# Threshold lines for visualization
pred_length = len(final_scores)
thre        = np.full(pred_length, threshold)
thre_minus  = np.full(pred_length, -threshold)

# Plot the Z-scores with thresholds
plt.figure(figsize=(14, 7))
plt.plot(Z_score, label="Z-score", marker='o')
plt.plot(thre, label="Threshold", linestyle="--", color="red")
plt.plot(thre_minus, label="-Threshold", linestyle="--", color="blue")
plt.title("Z-Score Anomaly Detection")
plt.xlabel("Index")
plt.ylabel("Z-Score")
plt.legend()
plt.grid(True)
plt.show()