#1. 라이브러리

In [2]:
import psycopg2
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, RepeatVector, LSTM, TimeDistributed
from tensorflow.keras.optimizers import Adam

from scipy.signal import savgol_filter
import ruptures as rpt

#2. PostgreSQL DB Conneciton

In [4]:
conn = psycopg2.connect(
    dbname='chat_memory',
    user='n8n_user',
    password='securepassword123',
    host='localhost',
    port='5432'
)

In [5]:
#셀 데이터 확보하기

#1. 실험정책 parsing하여 2stage 형식으로 확보하기
import re

def parse_policy(policy_str):
    """
    "5.2C(58%)-4C" → [5.2, 58.0, 4.0]
    """
    try:
        pattern = r'([0-9.]+)C\((\d+)%\)-([0-9.]+)C'
        match = re.match(pattern, policy_str.strip())
        if not match:
            raise ValueError(f"정책 파싱 실패: {policy_str}")
        
        c1 = float(match.group(1))
        q1 = float(match.group(2))
        c2 = float(match.group(3))
        return [c1, q1, c2]
    
    except Exception as e:
        print(f"[파싱 오류] {policy_str}: {e}")
        return [0.0, 0.0, 0.0]  # fallback


#2. Cell summary, cell Policy
cells_df = pd.read_sql("SELECT * FROM cells", conn)
summary_df = pd.read_sql("SELECT * FROM cycle_summaries", conn)

  cells_df = pd.read_sql("SELECT * FROM cells", conn)
  summary_df = pd.read_sql("SELECT * FROM cycle_summaries", conn)


In [6]:
#1. cell data into dic form

cell_dict_list = []

for _, row in cells_df.iterrows():
    cell_id = row['cell_id']
    policy_str = row['charge_policy']
    policy = parse_policy(row['charge_policy'])
    cycle_life = row['cycle_life']

    df_cell = summary_df[summary_df['cell_id'] == cell_id].sort_values(by='cycle_index')
    
    qd = df_cell['q_discharge'].values[:]
    ir = df_cell['ir'].values[:]
    tavg = df_cell['tavg'].values[:]
    chargetime = df_cell['chargetime'].values[:]

    y = np.stack([qd, ir, tavg, chargetime], axis=1)  # shape: (seq_len, 4)

    cell_dict = {
        "cell_id": cell_id,
        "policy_str": policy_str,
        "policy": policy,
        "cycle_life": cycle_life,
        "test": y  # numpy array, shape: (seq_len diffrent by test, 4) + dqdv needs to be added
    }
    cell_dict_list.append(cell_dict)


## 데이터 가공 스무딩 및 변화 segment 분할

In [7]:
# 3-1. smoothing + diff
from sklearn.linear_model import LinearRegression


def smooth_and_diff(series, window=11, polyorder=2):
    if len(series) < window:
        return series.values, np.gradient(series)
    smooth = savgol_filter(series, window_length=window, polyorder=polyorder)
    diff = np.diff(smooth, prepend=smooth[0])
    return smooth, diff

# 3-2. segment 탐지 (ruptures 기반)
def detect_segments(diff_series, method="bayesian", threshold=0.005, min_len=5, penalty=10):
    segments = []
    start = 0

    if method == "threshold":
        for i in range(1, len(diff_series)):
            if abs(diff_series[i]) > threshold and (i - start) >= min_len:
                segments.append((start, i))
                start = i
        if (len(diff_series) - start) >= min_len:
            segments.append((start, len(diff_series) - 1))

    elif method == "bayesian":
        series_2d = diff_series.values.reshape(-1, 1)
        algo = rpt.Pelt(model="rbf").fit(series_2d)
        change_points = algo.predict(pen=penalty)

        last = 0
        for cp in change_points:
            if (cp - last) >= min_len:
                segments.append((last, cp - 1))
            last = cp
    return segments

# 3-3. 추세 분석 (rule-based)
def analyze_trend(segment_df, col_diff, label, adaptive=True):
    mean_change = segment_df[col_diff].mean()
    std_change = segment_df[col_diff].std()

    if adaptive:
        t_sharp = 2.5 * std_change
        t_gradual = 1.0 * std_change
    else:
        t_sharp = 0.003
        t_gradual = 0.001

    if abs(mean_change) < t_gradual:
        return f"{label}: stable"
    elif mean_change >= t_sharp:
        return f"{label}: sharp_increase"
    elif mean_change >= t_gradual:
        return f"{label}: gradual_increase"
    elif mean_change <= -t_sharp:
        return f"{label}: sharp_decrease"
    elif mean_change <= -t_gradual:
        return f"{label}: gradual_decrease"
    else:
        return f"{label}: stable"

# 3-4. 기울기 계산
def compute_slope(x, y):
    model = LinearRegression()
    model.fit(x.reshape(-1, 1), y)
    return model.coef_[0]

In [25]:
# 해당 내용에 대한 라벨링 저장
#1. 세그먼트 별 데이터 저장
def generate_segment_statistics(x, y, segments, signal_name="Qd"):
    results = []
    for (start, end) in segments:
        segment_x = x[start:end]
        segment_y = y[start:end]
        slope = compute_slope(np.array(segment_x), np.array(segment_y))
        mean = np.mean(segment_y)
        std = np.std(segment_y)
        label = analyze_trend(pd.DataFrame({signal_name: segment_y}), signal_name, signal_name)

        results.append({
            "start": int(start),
            "end": int(end),
            "mean": float(mean),
            "slope": float(slope),
            "std": float(std),
            "trend_label": label
        })
    return results

#2. 세그먼트 별 전체 시계열 처리 루프 - 보고서용 요약 문장 예시
def analyze_signal(cycle_index, signal_series, signal_name="Qd"):
    smooth, diff = smooth_and_diff(signal_series)
    segments = detect_segments(pd.Series(diff), method="bayesian")
    stats = generate_segment_statistics(cycle_index, smooth, segments, signal_name)
    
    summary_text = f"{signal_name} shows " + ", then ".join(
        [s['trend_label'].replace(f"{signal_name}: ", "") for s in stats]
    ) + "."
    
    return {
        "signal": signal_name,
        "segments": stats,
        "summary_text": summary_text
    }

# 7. 자연어 설명 템플릿
def describe_segment(signal, segment):
    s, e = segment['start'], segment['end']
    slope = segment['slope']
    label = segment['trend_label'].split(": ")[-1]

    if label == "stable":
        return f"{signal} is stable from cycle {s} to {e}."
    elif "gradual" in label:
        return f"{signal} shows a gradual {label.split('_')[1]} from cycle {s} to {e} (slope: {slope:.4f})."
    elif "sharp" in label:
        return f"{signal} exhibits a sharp {label.split('_')[1]} after cycle {s} (slope: {slope:.4f})."

# 8. 셀 요약 전체 생성
def generate_cell_summary(cell_id, cycle_index, qd, ir, eff):
    qd_result = analyze_signal(cycle_index, np.array(qd), signal_name="Qd")
    ir_result = analyze_signal(cycle_index, np.array(ir), signal_name="IR")
    eff_result = analyze_signal(cycle_index, np.array(eff), signal_name="Efficiency")

    # 자연어 요약 생성
    qd_texts = [describe_segment("Qd", seg) for seg in qd_result["segments"]]
    ir_texts = [describe_segment("IR", seg) for seg in ir_result["segments"]]
    eff_texts = [describe_segment("Efficiency", seg) for seg in eff_result["segments"]]

    final_summary = (
        f"Cell {cell_id} analysis summary:\n"
        + " ".join(qd_texts) + " "
        + " ".join(ir_texts) + " "
        + " ".join(eff_texts)
    )

    # 최종 JSON 구조
    return {
        "cell_id": cell_id,
        "Qd_summary": qd_result,
        "IR_summary": ir_result,
        "Efficiency_summary": eff_result,
        "summary_text": final_summary
    }


# summarize trend table
from collections import Counter
import numpy as np

# 트렌드 라벨 요약 함수
def summarize_trend_labels(segments, signal_name="Qd"):
    label_counts = Counter()
    regions_by_label = {}

    for seg in segments:
        label = seg["trend_label"].split(": ")[-1]
        label_counts[label] += 1
        regions_by_label.setdefault(label, []).append((seg["start"], seg["end"]))

    phrases = []
    for label, count in label_counts.items():
        ranges = [f"{s}-{e}" for s, e in regions_by_label[label]]
        label_phrase = label.replace("_", " ")
        phrases.append(f"{signal_name} shows {label_phrase} in {count} segments (cycles {', '.join(ranges)}).")

    return " ".join(phrases)


In [26]:
cell_summaries = []

for cell in cell_dict_list:
    cell_id = cell["cell_id"]
    y = cell["test"]  # shape: (seq_len, 4)
    seq_len = y.shape[0]
    cycle_idx = np.arange(seq_len)

    # 신호별 분석
    qd_result = analyze_signal(cycle_idx, y[:, 0], signal_name="Qd")
    ir_result = analyze_signal(cycle_idx, y[:, 1], signal_name="IR")
    tavg_result = analyze_signal(cycle_idx, y[:, 2], signal_name="Tavg")
    chargetime_result = analyze_signal(cycle_idx, y[:, 3], signal_name="Chargetime")

    # 요약 텍스트 생성
    qd_summary_text = summarize_trend_labels(qd_result["segments"], signal_name="Qd")
    ir_summary_text = summarize_trend_labels(ir_result["segments"], signal_name="IR")
    tavg_summary_text = summarize_trend_labels(tavg_result["segments"], signal_name="Tavg")
    chargetime_summary_text = summarize_trend_labels(chargetime_result["segments"], signal_name="Chargetime")

    # 전체 요약 통합
    final_summary = (
        f"Cell {cell_id} analysis summary:\n"
        + f"{qd_summary_text} {ir_summary_text} {tavg_summary_text} {chargetime_summary_text}"
    )

    # JSON 구조 저장
    cell_summary = {
        "cell_id": cell_id,
        "Qd_summary": {
            "segments": qd_result["segments"],
            "summary_text": qd_summary_text
        },
        "IR_summary": {
            "segments": ir_result["segments"],
            "summary_text": ir_summary_text
        },
        "Tavg_summary": {
            "segments": tavg_result["segments"],
            "summary_text": tavg_summary_text
        },
        "Chargetime_summary": {
            "segments": chargetime_result["segments"],
            "summary_text": chargetime_summary_text
        },
        "summary_text": final_summary
    }

    cell_summaries.append(cell_summary)



In [28]:
import json

# 첫 번째 딕셔너리 구조 확인
print(json.dumps(cell_summaries[1], indent=2, ensure_ascii=False))

{
  "cell_id": "b1c1",
  "Qd_summary": {
    "segments": [
      {
        "start": 0,
        "end": 274,
        "mean": 1.0981306830202693,
        "slope": -8.130216248481761e-05,
        "std": 0.00710094496483474,
        "trend_label": "Qd: sharp_increase"
      },
      {
        "start": 275,
        "end": 449,
        "mean": 1.029589920291514,
        "slope": -0.0005885510708345298,
        "std": 0.03016030761169064,
        "trend_label": "Qd: sharp_increase"
      },
      {
        "start": 450,
        "end": 512,
        "mean": 0.8970186743161178,
        "slope": -0.0020960219887134696,
        "std": 0.03754627418306569,
        "trend_label": "Qd: sharp_increase"
      }
    ],
    "summary_text": "Qd shows sharp increase in 3 segments (cycles 0-274, 275-449, 450-512)."
  },
  "IR_summary": {
    "segments": [
      {
        "start": 0,
        "end": 19,
        "mean": 0.01782950369512949,
        "slope": -3.5273355449224354e-05,
        "std": 0.000197657346

In [None]:
import matplotlib.pyplot as plt

# 5. 분석 및 시각화
segment_colors = ["yellow", "lightblue", "lightgreen", "orange", "pink", "lightgray", "violet"]

for cell in cell_dict_list:
    cell_id = cell["cell_id"]
    policy_str = cell["policy_str"]
    cycle_life = cell["cycle_life"]
    y = cell["test"]  # shape: (seq_len, 4)
    labels = ["q_discharge", "ir", "tavg", "chargetime"]

    for i, label in enumerate(labels):
        series = pd.Series(y[:, i])
        smooth, diff = smooth_and_diff(series)
        segments = detect_segments(pd.Series(diff), method="bayesian")
        
        # segment별 추세 분석 및 시각화
        plt.figure(figsize=(10, 4))
        plt.plot(series.index, series.values, label=f'{label} (raw)', alpha=0.4)
        plt.plot(series.index, smooth, label=f'{label} (smoothed)', linewidth=2)
        for seg_idx, (start, end) in enumerate(segments):
            color = segment_colors[seg_idx % len(segment_colors)]
            plt.axvspan(series.index[start], series.index[end], color=color, alpha=0.2)
            segment_df = pd.DataFrame({label: series[start:end+1], f"{label}_diff": diff[start:end+1]})
            trend_label = analyze_trend(segment_df, f"{label}_diff", label)
            slope = compute_slope(np.arange(start, end+1), segment_df[label].values)
            plt.text(
                (series.index[start] + series.index[end]) / 2,
                series.max(),
                f"{trend_label}",
                color='black', fontsize=10, ha='center', va='bottom', alpha=0.8
            )
            print(f"[{cell_id}] {policy_str} | {label} | segment {seg_idx}: {trend_label}, slope={slope:.4f}")
        plt.legend()
        plt.title(f"{label} 변화 추이 - {cell_id}")
        plt.xlabel("Cycle Index")
        plt.ylabel(label)
        plt.grid(True)
        plt.tight_layout()
        plt.show()