In [2]:
# =========================================================================
# 1) 라이브러리 준비
# -------------------------------------------------------------------------
import os, pickle, glob, warnings
from pathlib import Path
import numpy as np
import pandas as pd
import scipy.signal as signal
import matplotlib.pyplot as plt
from darts import TimeSeries       # 설치 버전에 따라 ImportError가 날 수도 있음

# =========================================================================
# 2) 설정값
# -------------------------------------------------------------------------
ROOT_DIR = Path(r"matrix-nilm-test/matrix-nilm-test/ts_pickle")
SAMPLE_RATE = 30            # [Hz]  ← 데이터 가이드라인에 명시 :contentReference[oaicite:1]{index=1}
OUT_DIR = ROOT_DIR / "fft_result"
OUT_DIR.mkdir(exist_ok=True)

# =========================================================================
# 3) 유틸리티 함수
# -------------------------------------------------------------------------
def load_ts_pkl(pkl_path: Path):
    """
    Darts 버전에 따라 TimeSeries.load()가 제거됐을 수 있음.
    - 0.28+ : TimeSeries.from_pickle
    - 그 외  : pickle.load 후 TimeSeries.from_dataframe 등
    """
    try:
        ts = TimeSeries.from_pickle(str(pkl_path))
    except AttributeError:
        # 백업 플랜 : pickle 로드 → TimeSeries 생성
        with open(pkl_path, "rb") as f:
            obj = pickle.load(f)
        if isinstance(obj, TimeSeries):
            ts = obj
        else:                       # 이미 pd.Series / pd.DataFrame 인 경우
            ts = TimeSeries.from_series(obj) if obj.ndim == 1 else TimeSeries.from_dataframe(obj)
    return ts


def fft_features(y: np.ndarray, fs: float):
    """
    FFT 스펙트럼으로부터 주요 특성 추출
    Returns
    -------
    dict : {
        'freqs'        : 주파수 벡터(양수 성분),
        'amp'          : 진폭 스펙트럼,
        'harmonics'    : [(k, f_k, amp_k), ...]  # 60 Hz 배수 기준 상위 N개
        'peak_freq'    : 공진점 주파수(최대 진폭),
        'bandwidth'    : (f_low, f_high),        # –3dB 대역폭
        'attenuation'  : dB/dec  (피크 이후 기울기, 20*log10),
    }
    """
    n = len(y)
    freqs = np.fft.rfftfreq(n, d=1 / fs)          # 0 ~ fs/2
    fft_vals = np.fft.rfft(y - y.mean())
    amp = 2 * np.abs(fft_vals) / n                # 진폭 스케일링

    # ---------- 공진점 ----------
    peak_idx = np.argmax(amp)
    peak_freq, peak_amp = freqs[peak_idx], amp[peak_idx]

    # ---------- –3 dB 대역폭 ----------
    half_power = peak_amp / np.sqrt(2)            # –3 dB
    # 좌·우 교차점 탐색
    left = np.where(amp[:peak_idx] < half_power)[0]
    f_low = freqs[left[-1]] if len(left) else freqs[0]
    right = np.where(amp[peak_idx:] < half_power)[0]
    f_high = freqs[peak_idx + right[0]] if len(right) else freqs[-1]

    # ---------- 감쇠 특성 (피크 이후 평균 기울기) ----------
    if f_high < freqs[-1]:
        slope, _ = np.polyfit(np.log10(freqs[peak_idx + 1:]), 20 * np.log10(amp[peak_idx + 1:]), 1)
        attenuation = slope       # dB/dec (음수)
    else:
        attenuation = np.nan

    # ---------- 고조파 (60 Hz 기준) ----------
    FUND = 60                     # [Hz] 전형적인 전력 주파수
    idxs = [np.argmin(np.abs(freqs - k * FUND)) for k in range(1, 11) if k * FUND < fs / 2]
    harmonics = [(k + 1, freqs[i], amp[i]) for k, i in enumerate(idxs)]

    return dict(freqs=freqs,
                amp=amp,
                harmonics=harmonics,
                peak_freq=peak_freq,
                bandwidth=(f_low, f_high),
                attenuation=attenuation)


def plot_spectrum(meta, result):
    """진폭 스펙트럼 + 주요 특징 주석"""
    freqs, amp = result['freqs'], result['amp']
    hms = result['harmonics']
    peak_f = result['peak_freq']
    bw_low, bw_high = result['bandwidth']

    plt.figure(figsize=(10, 5))
    plt.semilogy(freqs, amp, label='Amplitude spectrum')
    plt.title(f"FFT Spectrum – {meta['file_name']}")
    plt.xlabel("Frequency [Hz]"); plt.ylabel("Amplitude")

    # 공진점
    plt.scatter([peak_f], [amp[np.argmax(freqs == peak_f)]], c='r', s=50, label=f'Peak {peak_f:.1f} Hz')
    # 대역폭
    plt.axvline(bw_low, color='g', linestyle='--', label=f'BW low {bw_low:.1f} Hz')
    plt.axvline(bw_high, color='g', linestyle='--', label=f'BW high {bw_high:.1f} Hz')
    # 고조파
    for k, f_k, a_k in hms[:5]:                 # 상위 5개만 표시
        plt.annotate(f'{k}H', (f_k, a_k), textcoords="offset points", xytext=(0, 6), ha='center')

    plt.grid(True, which='both', ls=':')
    plt.legend()
    plt.tight_layout()
    # 저장
    out_png = OUT_DIR / f"{meta['file_stem']}_fft.png"
    plt.savefig(out_png, dpi=200)
    plt.close()
    print(f"[✓] {out_png} 저장")


# =========================================================================
# 4) 메인 루프
# -------------------------------------------------------------------------
summary = []
warnings.filterwarnings("ignore", category=UserWarning)

for pkl_path in sorted(ROOT_DIR.glob("*ch09*.ts.pkl")):
    ts = load_ts_pkl(pkl_path)
    df = ts.to_dataframe()        # DatetimeIndex + 다중 컬럼
    if 'active_power' not in df.columns:
        print(f"[!] {pkl_path.name}: 'active_power' 컬럼 없음 – 건너뜀")
        continue

    y = df['active_power'].to_numpy(dtype=float)
    res = fft_features(y, fs=SAMPLE_RATE)
    plot_spectrum({'file_name': pkl_path.name,
                   'file_stem': pkl_path.stem}, res)

    # 집계 테이블용
    summary.append({
        'file': pkl_path.name,
        'peak_freq_Hz': res['peak_freq'],
        'bw_low_Hz': res['bandwidth'][0],
        'bw_high_Hz': res['bandwidth'][1],
        'attenuation_dB/dec': res['attenuation'],
        **{f'h{k+1}_amp': amp for k, _, amp in res['harmonics']}
    })

# =========================================================================
# 5) 결과 요약 CSV 저장
# -------------------------------------------------------------------------
pd.DataFrame(summary).to_csv(OUT_DIR / "fft_summary_ch09.csv", index=False)
print(f"\n[✓] 요약 파일 저장: {OUT_DIR/'fft_summary_ch09.csv'}")


[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H001_ch09_20231002.ts_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H002_ch09_20231002.ts_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H003_ch09_20231002.ts_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H004_ch09_20231002.ts_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H005_ch09_20231002.ts_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H006_ch09_20231002.ts_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H007_ch09_20231002.ts_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H008_ch09_20231002.ts_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H009_ch09_20231002.ts_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\H010_ch09_20231002.ts_fft.png 저장

[✓] 요약 파일 저장: matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result\fft_summar

# active_inactive==1

In [4]:
# ================================================================
# 1) 라이브러리 및 기본 설정
# ------------------------------------------------
import os, pickle, warnings
from pathlib import Path

import numpy as np
import pandas as pd
import scipy.signal as signal
import matplotlib.pyplot as plt
from darts import TimeSeries

ROOT_DIR   = Path(r"matrix-nilm-test/matrix-nilm-test/ts_pickle")
SAMPLE_RATE = 30          # Hz  (데이터 사양)
OUT_DIR    = ROOT_DIR / "fft_result_active"
OUT_DIR.mkdir(exist_ok=True, parents=True)

# ================================================================
# 2) TimeSeries 로더 (버전별 예외 처리)
# ------------------------------------------------
def load_ts(pkl_path: Path) -> TimeSeries:
    try:
        return TimeSeries.from_pickle(str(pkl_path))
    except AttributeError:
        with open(pkl_path, "rb") as f:
            obj = pickle.load(f)
        if isinstance(obj, TimeSeries):
            return obj
        # DataFrame / Series → TimeSeries 로 변환
        return (TimeSeries.from_series(obj) if obj.ndim == 1
                else TimeSeries.from_dataframe(obj))

# ================================================================
# 3) FFT 특성 계산 함수 (이전 답변 그대로)
# ------------------------------------------------
def fft_features(y: np.ndarray, fs: float):
    n = len(y)
    freqs = np.fft.rfftfreq(n, d=1 / fs)
    fft_vals = np.fft.rfft(y - y.mean())
    amp = 2 * np.abs(fft_vals) / n

    # 공진점
    peak_idx = np.argmax(amp)
    peak_freq, peak_amp = freqs[peak_idx], amp[peak_idx]

    # –3 dB 대역폭
    half_power = peak_amp / np.sqrt(2)
    left  = np.where(amp[:peak_idx] < half_power)[0]
    right = np.where(amp[peak_idx:] < half_power)[0]
    f_low  = freqs[left[-1]]                if len(left)  else freqs[0]
    f_high = freqs[peak_idx + right[0]]     if len(right) else freqs[-1]

    # 감쇠 기울기(dB/dec)
    attenuation = np.nan
    if peak_idx + 5 < len(freqs):   # 피크 이후 구간이 충분해야 계산
        slope, _ = np.polyfit(np.log10(freqs[peak_idx + 1:]),
                              20 * np.log10(amp[peak_idx + 1:]), 1)
        attenuation = slope

    # 60 Hz 고조파 (샘플링 상 실제 값은 안 보이지만 형식 유지)
    FUND = 60
    idxs = [np.argmin(np.abs(freqs - k * FUND))
            for k in range(1, 11) if k * FUND < fs / 2]
    harmonics = [(k + 1, freqs[i], amp[i]) for k, i in enumerate(idxs)]

    return dict(freqs=freqs, amp=amp, harmonics=harmonics,
                peak_freq=peak_freq, bandwidth=(f_low, f_high),
                attenuation=attenuation)

# ================================================================
# 4) 스펙트럼 플롯
# ------------------------------------------------
def plot_spectrum(meta, res, suffix="_active"):
    freqs, amp = res['freqs'], res['amp']
    peak_f   = res['peak_freq']
    bw_low, bw_high = res['bandwidth']
    hms      = res['harmonics']

    plt.figure(figsize=(10, 5))
    plt.semilogy(freqs, amp, label="Amplitude spectrum")
    plt.title(f"FFT Spectrum – {meta['file_name']} ({suffix})")
    plt.xlabel("Frequency [Hz]"); plt.ylabel("Amplitude")

    plt.scatter([peak_f], [amp[np.argmax(freqs == peak_f)]],
                c='r', s=60, label=f'Peak {peak_f:.2f} Hz')
    plt.axvline(bw_low,  color='g', ls='--', label=f'BW low {bw_low:.2f} Hz')
    plt.axvline(bw_high, color='g', ls='--', label=f'BW high {bw_high:.2f} Hz')

    # 상위 5개 고조파 라벨
    for k, f_k, a_k in hms[:5]:
        plt.annotate(f'{k}H', (f_k, a_k),
                     xytext=(0, 6), textcoords="offset points",
                     ha='center', fontsize=8)

    plt.grid(True, which='both', ls=':')
    plt.legend()
    plt.tight_layout()
    out_png = OUT_DIR / f"{meta['file_stem']}{suffix}_fft.png"
    plt.savefig(out_png, dpi=200)
    plt.close()
    print(f"[✓] {out_png} 저장")

# ================================================================
# 5) 메인 루프 – active_inactive==1 구간만 분석
# ------------------------------------------------
summary = []
warnings.filterwarnings("ignore", category=UserWarning)

for pkl_path in sorted(ROOT_DIR.glob("*ch09*.ts.pkl")):
    ts = load_ts(pkl_path)
    df = ts.to_dataframe()

    # 필수 컬럼 체크 -------------------------------------------------
    if {'active_power', 'active_inactive'}.issubset(df.columns) is False:
        print(f"[!] {pkl_path.name}: active_power / active_inactive 없음 → 건너뜀")
        continue

    # === ① active 구간 추출 ===
    active_df = df[df["active_inactive"] == 1]
    if len(active_df) < 2:
        print(f"[!] {pkl_path.name}: active 구간 샘플 부족 → 건너뜀")
        continue

    y_active = active_df["active_power"].to_numpy(dtype=float)

    # === ② FFT 특성 계산 ===
    res = fft_features(y_active, fs=SAMPLE_RATE)

    # === ③ 시각화 ===
    plot_spectrum({'file_name': pkl_path.name,
                   'file_stem': pkl_path.stem}, res, suffix="_active")

    # === ④ 요약 테이블 ===
    summary.append({
        'file': pkl_path.name,
        'n_active_samples': len(active_df),
        'peak_freq_Hz': res['peak_freq'],
        'bw_low_Hz': res['bandwidth'][0],
        'bw_high_Hz': res['bandwidth'][1],
        'attenuation_dB/dec': res['attenuation'],
        **{f'h{k+1}_amp': amp for k, _, amp in res['harmonics']}
    })

# ⑤ CSV 저장
pd.DataFrame(summary).to_csv(OUT_DIR / "fft_summary_ch09_active.csv", index=False)
print(f"\n[✓] 요약 파일 저장: {OUT_DIR/'fft_summary_ch09_active.csv'}")


[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result_active\H001_ch09_20231002.ts_active_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result_active\H002_ch09_20231002.ts_active_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result_active\H003_ch09_20231002.ts_active_fft.png 저장
[!] H004_ch09_20231002.ts.pkl: active 구간 샘플 부족 → 건너뜀
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result_active\H005_ch09_20231002.ts_active_fft.png 저장
[!] H006_ch09_20231002.ts.pkl: active 구간 샘플 부족 → 건너뜀
[!] H007_ch09_20231002.ts.pkl: active 구간 샘플 부족 → 건너뜀
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result_active\H008_ch09_20231002.ts_active_fft.png 저장
[✓] matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result_active\H009_ch09_20231002.ts_active_fft.png 저장
[!] H010_ch09_20231002.ts.pkl: active 구간 샘플 부족 → 건너뜀

[✓] 요약 파일 저장: matrix-nilm-test\matrix-nilm-test\ts_pickle\fft_result_active\fft_summary_ch09_active.csv
