In [1]:
import torch
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
from omegaconf import OmegaConf
from torch.utils.data import Dataset, DataLoader
from models.dit import DiT
from models import *
from ddad_utils.dataset import Dataset_maker
from ddad_utils.ddad import DDAD
from tqdm import tqdm
from sklearn.metrics import (
    classification_report,
    precision_recall_curve,
    roc_curve,
    auc
)
import pickle

In [None]:
config = OmegaConf.load("./ddad_utils/config.yaml")

device = torch.device(config.model.device)
torch.manual_seed(config.model.seed)

model = DiT(
    input_size=config.data.seq_len,
    patch_size=5,
    in_channels=config.data.input_channel,
    num_classes=2
).to(device)

best_model_path = os.path.join(config.model.checkpoint_dir, config.model.exp_name, f"{config.model.checkpoint_name}_best.pt")
model.load_state_dict(torch.load(best_model_path, map_location=device))
model.eval()


ddad = DDAD(model, config)
predictions, labels_list, gt_list, forward_list, reconstructed_list = ddad()

In [9]:
from ddad_utils.metrics import Metric

config = OmegaConf.load("./ddad_utils/config.yaml")

# 성능 평가
metric = Metric(labels_array, prediction_array, config)
metric.optimal_threshold()

print("\n[Evaluation Metrics]")
if config.metrics.auroc:
    print('AUROC: ({:.1f}, -)'.format(metric.image_auroc() * 100))
if config.metrics.pro:
    print('PRO: {:.1f}'.format(metric.pixel_pro() * 100))
if config.metrics.misclassifications:
    metric.miscalssified()
metric.precision_recall_f1()


[Evaluation Metrics]
AUROC: (57.6, -)
[Sample 0] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 1] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 2] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 3] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 4] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 5] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 6] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 7] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 8] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 19] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 20] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 21] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 22] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 23] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 24] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 25] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 26] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 27] Predicted: 1 / True: 0.0 --> INCORRECT
[Sample 28] Predicted: 1 / True: 0.0 --> INCORRECT
[

In [None]:
np.save('checkpoints/DDAD_TEST/gt_array.npy',gt_array)
np.save('checkpoints/DDAD_TEST/reconstructed_array.npy',reconstructed_array)
np.save('checkpoints/DDAD_TEST/prediction_array.npy',prediction_array)
np.save('checkpoints/DDAD_TEST/labels_array.npy',labels_array)

In [2]:
gt_array = np.load('checkpoints/DDAD_TEST/gt_array.npy')
reconstructed_array = np.load('checkpoints/DDAD_TEST/reconstructed_array.npy')
prediction_array = np.load('checkpoints/DDAD_TEST/prediction_array.npy')
labels_array = np.load('checkpoints/DDAD_TEST/labels_array.npy')

In [3]:
gt_array.shape

(486, 2, 750)

In [12]:
os.makedirs('checkpoints/DDAD_TEST/results_figures_norm', exist_ok=True)

def signorm(x):

    x_max = x.max()
    x_min = x.min()

    return 2 * (x - x_min) / (x_max - x_min) - 1

for idx in tqdm(range(486)):

    plt.figure(figsize=(10,6))

    plt.subplot(2,3,1)
    plt.plot(signorm(gt_array[idx][0]))
    plt.title('Input signal / Thorax 1')

    plt.subplot(2,3,2)
    plt.plot(reconstructed_array[idx][0])
    plt.title('Reconstructed / Thorax 1')

    plt.subplot(2,3,3)
    plt.plot(prediction_array[idx*750:idx*750+749])
    plt.title('Input - Reconstructed')

    plt.subplot(2,3,4)
    plt.plot(signorm(gt_array[idx][1]), color='darkorange')
    plt.title('Input signal / Thorax 2')

    plt.subplot(2,3,5)
    plt.plot(reconstructed_array[idx][1], color='darkorange')
    plt.title('Reconstructed / Thorax 2')

    plt.subplot(2,3,6)
    plt.plot(labels_array[idx*750:idx*750+749], color='darkorange')
    plt.title('Anomaly mask')

    plt.tight_layout()

    plt.savefig('checkpoints/DDAD_TEST/results_figures_norm/data_norm_%d.png'%(idx+1))

    plt.close()


  0%|          | 0/486 [00:00<?, ?it/s]

100%|██████████| 486/486 [02:48<00:00,  2.89it/s]


In [None]:

# 시각화
reconstructed_list = torch.cat(reconstructed_list, dim=0)  # (N, C, T)
forward_list = torch.cat(forward_list, dim=0)              # (N, C, T)
gt_list = torch.cat(gt_list, dim=0)                        # (N, 1, T)
pred_scores = torch.tensor(predictions).unsqueeze(1)      # (N, 1)
pred_mask = (pred_scores > metric.threshold).float()      # (N, 1)

# os.makedirs('results', exist_ok=True)

# if self.config.metrics.visualisation:
#     visualize(
#         forward_list,
#         reconstructed_list,
#         gt_list,
#         pred_mask,
#         pred_scores,
#         self.config.data.category
#     )

return reconstructed_list, forward_list, gt_list, pred_scores, pred_mask

In [None]:
with open('checkpoints/DDAD_TEST/train_losses.pickle', 'rb') as f:
    loss = pickle.load(f)
plt.plot(loss['train_loss'])

#### One sampling process

In [None]:
ddad = DDAD(model, config, one_step=True)
x_list = ddad()

In [None]:
len(x_list)

index_to_plot = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]

fig, axes = plt.subplots(2, 11, figsize=(24, 3))
fig.tight_layout()

for i, idx in enumerate(index_to_plot):

    data = x_list[idx].squeeze().cpu()
    left = data[0]
    right = data[1]

    axes[0, i].plot(left.numpy())
    axes[0, i].set_title(f"T = {idx} / Thorax 1")
    axes[0, i].set_xticks([])
    axes[0, i].set_yticks([])

    # 오른쪽 채널 subplot
    axes[1, i].plot(right.numpy(), color='darkorange')
    axes[1, i].set_title(f"T = {idx} / Thorax 2")
    axes[1, i].set_xticks([])
    axes[1, i].set_yticks([])
    
plt.savefig('../sampling_example.png')

#### All sampling process

In [None]:
len(predictions[0])

#### 모델 불러오기

In [None]:
model = DiT(input_size=750, patch_size=5, in_channels=1, num_classes=42).to(device)
model.load_state_dict(torch.load("dit_ts_model.pt", map_location=device))
model.eval()

#### 예측값 생성

In [None]:
pred_scores = []

with torch.no_grad():
    for x, y in tqdm(test_loader, desc="Evaluating", unit="batch"):
        x = x.to(device)
        t = torch.zeros(x.size(0), dtype=torch.long).to(device)
        out = model(x, t, y.to(device))  # (B, 2, 750)
        mean_pred = out[:, 0, :]  # 평균 채널만 사용
        pred_scores.append(mean_pred.cpu())

pred_scores = torch.cat(pred_scores, dim=0).numpy().flatten()

#### Precision-Recall 기반 threshold 탐색

In [None]:
precision, recall, thresholds_pr = precision_recall_curve(point_labels, pred_scores)
f1_scores = 2 * (precision * recall) / (precision + recall + 1e-8)
best_idx = np.argmax(f1_scores)
best_threshold = thresholds_pr[best_idx]

#### ROC AUC 계산

In [None]:
fpr, tpr, thresholds_roc = roc_curve(point_labels, pred_scores)
roc_auc = auc(fpr, tpr)

#### 최종 이진 예측

In [None]:
pred_binary = (pred_scores > best_threshold).astype(int)

In [None]:
print(f"▶ Best Threshold (by F1): {best_threshold:.4f}")
print(f"▶ Best F1 Score: {f1_scores[best_idx]:.4f}")
print(f"▶ Precision at best: {precision[best_idx]:.4f}")
print(f"▶ Recall at best: {recall[best_idx]:.4f}")
print(f"▶ ROC AUC Score: {roc_auc:.4f}")

#### F1-score vs Threshold

In [None]:
plt.figure(figsize=(8, 4))
plt.plot(thresholds_pr, f1_scores[1:], label="F1-score")
plt.axvline(best_threshold, color='red', linestyle='--', label=f"Best Threshold = {best_threshold:.4f}")
plt.title("F1-score vs Threshold")
plt.xlabel("Threshold")
plt.ylabel("F1-score")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

#### Precision-Recall Curve

In [None]:
plt.figure(figsize=(6, 6))
plt.plot(recall, precision, label='PR Curve')
plt.scatter(recall[best_idx], precision[best_idx], color='red', label='Best F1 Point')
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

#### ROC Curve

In [None]:
plt.figure(figsize=(6, 6))
plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {roc_auc:.4f})")
plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

#### 성능 출력

In [None]:
print(f"🔍 F1-score 기준 최적 threshold: {best_threshold:.4f}")
print(classification_report(point_labels, pred_binary, target_names=["정상", "이상"]))

#### 예측 점수 시각화

파란색 선: 모델이 출력한 각 시점의 예측 score (pred_scores)

주황색 영역: 실제 이상이 존재하는 시점 (point_labels)

빨간 점선: F1-score 기준으로 선택된 threshold (0.5436)

In [None]:
plt.figure(figsize=(14, 5))
plt.plot(pred_scores, label="Predicted Scores", linewidth=1)
plt.plot(point_labels * 1.0, label="True Anomaly", linestyle='--')  # ✔ 스케일 조절
plt.axhline(best_threshold, color='red', linestyle=':', label=f"Threshold = {best_threshold:.4f}")
plt.title("Predicted Scores vs True Anomaly Labels")
plt.xlabel("Time Point Index")
plt.ylabel("Score")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(8, 4))
plt.hist(pred_scores, bins=100, color='steelblue')
plt.axvline(best_threshold, color='red', linestyle='--', label=f"Threshold = {best_threshold:.4f}")
plt.title("Distribution of Predicted Scores")
plt.xlabel("Score")
plt.ylabel("Frequency")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# 시계열 데이터 추출
X = test_dataset.X.numpy()         # shape: (N, 1, T)
test_series = X[:, 0, :]           # shape: (N, T)

# point_labels와 pred_scores reshape
point_labels = point_labels.reshape(test_series.shape)
pred_scores = pred_scores.reshape(test_series.shape)

test_labels = test_dataset.y.numpy()

In [None]:
i = 0
signal = test_series[i]
true_anom = point_labels[i]
pred_score = pred_scores[i]
pred_anom = (pred_score > best_threshold).astype(int)

plt.figure(figsize=(12, 5))
plt.plot(signal, label="Signal")
plt.plot(true_anom * np.max(signal), label="True Anomaly", linestyle='--')
plt.plot(pred_anom * np.max(signal), label="Predicted Anomaly", linestyle=':')
plt.title(f"Sample {i} - True vs Predicted Anomalies")
plt.xlabel("Time")
plt.ylabel("Value")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# 고유 클래스 목록
unique_classes = sorted(np.unique(test_labels))

# subplot 시각화
n_rows, n_cols = 6, 7  # 42개 클래스 기준
plt.figure(figsize=(22, 18))

for idx, cls in enumerate(unique_classes):
    # 해당 클래스의 첫 번째 시계열 index 찾기
    sample_idx = np.where(test_labels == cls)[0][0]
    
    signal = test_series[sample_idx]
    true_anom = point_labels[sample_idx]
    pred_score = pred_scores[sample_idx]
    pred_anom = (pred_score > best_threshold).astype(int)

    plt.subplot(n_rows, n_cols, idx + 1)
    plt.plot(signal, label='Signal', linewidth=1)
    plt.plot(true_anom * np.max(signal), '--', label='True Anomaly', linewidth=1)
    plt.plot(pred_anom * np.max(signal), ':', label='Predicted Anomaly', linewidth=1)
    plt.title(f"Class {cls+1} (idx {sample_idx})")
    plt.xticks([])
    plt.yticks([])
    plt.grid(True)

# 전체 제목 및 범례
plt.suptitle("Class-wise Anomaly Detection: True vs Predicted", fontsize=18, y=0.92)
plt.tight_layout(rect=[0, 0, 1, 0.94])
plt.legend(loc='lower right', bbox_to_anchor=(1.15, 0.1))
plt.show()

## 이전

In [None]:
train = pd.read_csv('C:/Users/Pro/Desktop/AnomalyDiT-main/AnomalyDiT-main/Dataset/ECG_Train_with_anomaly.csv', sep='\t', index_col=False, header=None)
test = pd.read_csv('C:/Users/Pro/Desktop/AnomalyDiT-main/AnomalyDiT-main/Dataset/ECG_Test_with_anomaly.csv', sep='\t', index_col=None, header=None)

In [None]:
train = torch.Tensor(train.drop(columns=0).to_numpy()).unsqueeze(1)
test = torch.Tensor(test.drop(columns=0).to_numpy()).unsqueeze(1)

In [None]:
test.shape

In [None]:
net = DiT(input_size=750,
        patch_size=5,
        in_channels=1,
        hidden_size=300,
        depth=28,
        num_heads=10,
        mlp_ratio=4.0,
        class_dropout_prob=0.1,
        learn_sigma=False,)

In [None]:
x = torch.randn(4,1,750)
y = test[:4]
t = torch.randint(low=0, high=10000, size=(4,))