In [4]:
# 모듈 임포트
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.svm import OneClassSVM
from sklearn.metrics import classification_report
import joblib
import time
import matplotlib.pyplot as plt

In [None]:
# 데이터 불러오기
# train_df = pd.read_excel('../dataset/preprocessed/50ohm_정상_train.xlsx')
column_names = ['timestamp', 'v_raw', 'c_raw', 'voltage', 'current', 'label']
train_df = pd.read_csv('../realtime/raw_dataset_normal.csv', names=column_names, header=None)
test_df = pd.read_excel('../dataset/preprocessed/50ohm_통합_test.xlsx')

# test_df = pd.concat([test_df] * 20, ignore_index=True)

# 컬럼명 정리
rename_columns = {
    'power_voltage': 'voltage',
    'power_current': 'current',
}
train_df = train_df.rename(columns=rename_columns)
test_df = test_df.rename(columns=rename_columns)

FileNotFoundError: [Errno 2] No such file or directory: '../dataset/preprocessed/50ohm_통합_test.xlsx'

In [None]:
# 파생 피처 추가 함수
def add_features(df):
    df['voltage_diff'] = df['voltage'].diff().fillna(0).abs()
    df['current_diff'] = df['current'].diff().fillna(0).abs()
    df['voltage_ma'] = df['voltage'].rolling(5).mean().bfill()
    df['current_ma'] = df['current'].rolling(5).mean().bfill()
    df['power'] = df['voltage'] * df['current']
    df['power_diff'] = df['power'].diff().fillna(0).abs()
    return df

train_df = add_features(train_df).dropna()
test_df = add_features(test_df).dropna()


In [None]:
features = ['voltage', 'current', 'voltage_diff', 'current_diff',
            'voltage_ma', 'current_ma', 'power', 'power_diff']
X_train = train_df[features].values
X_test = test_df[features].values
y_true = test_df['label'].values

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [None]:
model = OneClassSVM(nu=0.005, kernel='rbf', gamma='scale')
model.fit(X_train_scaled)

joblib.dump(model, './model/one_class_svm_model.joblib')
joblib.dump(scaler, './model/scaler.joblib')

In [None]:
# 예측 및 평가
y_pred = model.predict(X_test_scaled)
y_pred_bin = (y_pred == -1).astype(int)

print(y_pred_bin[:10])

In [None]:
def postprocess_anomalies(y_pred_bin, min_consecutive=8):
    y_post = np.zeros_like(y_pred_bin)
    count = 0
    for i, val in enumerate(y_pred_bin):
        if val == 1:
            count += 1
        else:
            if count >= min_consecutive:
                y_post[i - count:i] = 1
            count = 0
    if count >= min_consecutive:
        y_post[len(y_pred_bin)-count:] = 1
    return y_post

y_pred_post = postprocess_anomalies(y_pred_bin, min_consecutive=8)
y_true_post = postprocess_anomalies(y_true, min_consecutive=8)

In [None]:
print("📋 (후처리 적용) 분류 리포트:\n")
print(classification_report(y_true_post, y_pred_post, target_names=['정상', '아크']))

In [None]:
def calculate_detection_delay(y_pred_bin, label, sampling_interval=0.0001):
    arc_start = np.where(label == 1)[0][0] # 아크의 시작 인덱스 찾기
    detected = np.where(y_pred_bin[arc_start:] == 1)[0] # 예측 값에서 아크를 처음 발견한 인덱스 찾기
    if len(detected) == 0:
        print("⚠️ 아크를 탐지하지 못했습니다.")
        return None
    detect_index = detected[0] + arc_start
    delay = detect_index - arc_start
    time_detected = detect_index * sampling_interval
    print(f"✅ 아크 시작 인덱스: {arc_start}")
    print(f"✅ 모델 이상 감지 인덱스: {detect_index}")
    print(f"⏱️ 감지 지연 시간: {delay} 샘플")
    print(f"⏱️ 감지된 시점 (초): {time_detected:.6f} 초")
    return arc_start, detect_index, delay

arc_start, detect_index, delay = calculate_detection_delay(y_pred_post, y_true_post)

In [None]:
import time

# 전체 예측 시간
start_total = time.perf_counter()
y_pred = model.predict(X_test_scaled)
elapsed_total = time.perf_counter() - start_total
print(f"⏱️ 전체 예측 소요 시간: {elapsed_total:.10f}초")

# 후처리 + 첫 이상 감지까지 시간
start_first = time.perf_counter()
y_pred_bin = (y_pred == -1).astype(int)
y_pred_post = postprocess_anomalies(y_pred_bin, min_consecutive=8)
first_index = np.where(y_pred_post == 1)[0]
elapsed_first = time.perf_counter() - start_first

if len(first_index) > 0:
    print(f"🟡 첫 번째 이상 탐지 인덱스: {first_index[0]}")
    print(f"⏱️ 첫 이상 탐지까지 걸린 시간: {elapsed_first:.10f}초")
else:
    print("⚠️ 후처리된 이상 탐지 결과가 없습니다.")


In [None]:
# 모듈 임포트
import psutil
import os

# 리소스 사용량 출력 함수
def print_resource_usage():
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    cpu_percent = process.cpu_percent(interval=1.0)
    print(f"🧠 메모리 사용량: {mem_info.rss / 1024 ** 2:.2f} MB")
    print(f"🧮 CPU 사용률: {cpu_percent:.2f}%")


# 전체 예측 시간 측정 + 시스템 리소스 출력
print("🔍 예측 전 시스템 상태:")
print_resource_usage()

start_total = time.perf_counter()
y_pred = model.predict(X_test_scaled)
elapsed_total = time.perf_counter() - start_total

print("🔍 예측 후 시스템 상태:")
print_resource_usage()

print(f"⏱️ 전체 예측 소요 시간: {elapsed_total:.10f}초")



---
# 시각화 자료

In [None]:
import matplotlib.pyplot as plt
import matplotlib as mpl

# 한글 폰트 설정 (예: 맑은 고딕)
# plt.rcParams['font.family'] = 'Malgun Gothic'  # Windows 사용자
# plt.rcParams['font.family'] = 'AppleGothic'   # macOS 사용자
plt.rcParams['font.family'] = 'NanumGothic'   # Linux 사용자 (Nanum 폰트 설치 필요)

# 마이너스 부호 깨짐 방지
mpl.rcParams['axes.unicode_minus'] = False

In [None]:
def plot_predictions(y_pred_bin, arc_start=None, detect_index=None):
    plt.figure(figsize=(15, 4))
    plt.plot(y_pred_bin, label='예측 이상 (1=이상)', color='red', linewidth=1)

    if arc_start is not None:
        plt.axvline(x=arc_start, color='blue', linestyle='--', label='실제 아크 시작')
    if detect_index is not None:
        plt.axvline(x=detect_index, color='green', linestyle='--', label='감지된 시점')

    plt.title("One-Class SVM 이상 탐지 결과")
    plt.xlabel("샘플 인덱스")
    plt.ylabel("예측값")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

plot_predictions(y_pred_post, arc_start, detect_index)

In [None]:
def plot_current_waveform(test_df):
    plt.figure(figsize=(15, 4))
    plt.plot(test_df['current'], label='전류 파형', alpha=0.7)
    plt.title("전체 전류 파형")
    plt.xlabel("샘플 인덱스")
    plt.ylabel("전류 (A)")
    plt.grid(True)
    plt.tight_layout()
    plt.legend()
    plt.show()

plot_current_waveform(test_df)

In [None]:
def compare_postprocessing(y_pred_bin, y_pred_post, title="후처리 전후 비교"):
    plt.figure(figsize=(15, 4))
    plt.plot(y_pred_bin, label='후처리 전', color='orange', alpha=0.6)
    plt.plot(y_pred_post, label='후처리 후', color='green', alpha=0.6)
    plt.title(title)
    plt.xlabel("샘플 인덱스")
    plt.ylabel("이상 탐지 결과")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

compare_postprocessing(y_pred_bin, y_pred_post)


In [None]:
def compare_prediction_vs_label(y_true, y_pred_post):
    plt.figure(figsize=(15, 4))
    plt.plot(y_true, label='실제 라벨', alpha=0.5)
    plt.plot(y_pred_post, label='모델 예측', alpha=0.7)
    plt.title("실제 라벨 vs 모델 예측 비교")
    plt.xlabel("샘플 인덱스")
    plt.ylabel("값 (0=정상, 1=아크)")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

compare_prediction_vs_label(y_true_post, y_pred_post)
