In [5]:
import os
import pandas as pd
import numpy as np

from sqlalchemy import create_engine, text

engine = create_engine(
    "mysql+pymysql://health_user:strong_password_here@localhost:3306/HEALTH",
    future=True
)

# 1) 테이블 생성 (없으면 생성)
ddl_health_state = """
CREATE TABLE IF NOT EXISTS health_state_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    asset_id VARCHAR(64),
    t_index INT,
    state_value DOUBLE,
    source VARCHAR(32),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""

ddl_prediction = """
CREATE TABLE IF NOT EXISTS prediction_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    asset_id VARCHAR(64),
    t_index INT,
    y_pred DOUBLE,
    error_std DOUBLE,
    model_tag VARCHAR(64),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""

ddl_action = """
CREATE TABLE IF NOT EXISTS insurance_action_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    asset_id VARCHAR(64),
    t_index INT,
    action VARCHAR(32),
    reason VARCHAR(128),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""

with engine.begin() as conn:
    conn.execute(text(ddl_health_state))
    conn.execute(text(ddl_prediction))
    conn.execute(text(ddl_action))

# 2) 중복 방지용 UNIQUE INDEX (이미 있으면 에러 나는데 무시)
unique_sqls = [
    "CREATE UNIQUE INDEX uq_health_state ON health_state_log(asset_id, t_index, source);",
    "CREATE UNIQUE INDEX uq_prediction ON prediction_log(asset_id, t_index, model_tag);",
    "CREATE UNIQUE INDEX uq_action ON insurance_action_log(asset_id, t_index, action);",
]

with engine.begin() as conn:
    for sql in unique_sqls:
        try:
            conn.execute(text(sql))
        except Exception:
            pass

print("Cell A done")

Cell A done


상태 로그 적재 (health + battery) — 중복 제거 포함

In [6]:
def load_state_csv(csv_path: str, source: str) -> pd.DataFrame:
    df = pd.read_csv(csv_path)

    if source == "health":
        # user_id/date/health_state_index -> asset_id/t_index/state_value
        df = df.rename(columns={
            "user_id": "asset_id",
            "health_state_index": "state_value"
        })
        df["date"] = pd.to_datetime(df["date"], errors="coerce")
        df = df.dropna(subset=["date"])
        df = df.sort_values(["asset_id", "date"])
        df["t_index"] = df.groupby("asset_id").cumcount()
    else:
        # nasa / libattery / synthetic 은 asset_id,t_index,state_value 그대로라고 가정
        pass

    df["state_value"] = pd.to_numeric(df["state_value"], errors="coerce")
    df = df.dropna(subset=["asset_id", "t_index", "state_value"])

    df["asset_id"] = df["asset_id"].astype(str)
    df["t_index"] = pd.to_numeric(df["t_index"], errors="coerce").astype("Int64")
    df = df.dropna(subset=["t_index"])
    df["t_index"] = df["t_index"].astype(int)

    df["source"] = source
    df = df[["asset_id", "t_index", "state_value", "source"]]
    df = df.drop_duplicates(subset=["asset_id", "t_index", "source"], keep="first")

    return df


def insert_health_state_log(df: pd.DataFrame):
    # 기존 중복(같은 asset_id,t_index,source) 정리: id 큰 쪽 삭제
    dedup_sql = """
    DELETE t1 FROM health_state_log t1
    JOIN health_state_log t2
      ON t1.asset_id = t2.asset_id
     AND t1.t_index  = t2.t_index
     AND t1.source   = t2.source
     AND t1.id > t2.id;
    """
    with engine.begin() as conn:
        conn.execute(text(dedup_sql))

    # INSERT IGNORE로 중복 insert 방지 (UNIQUE KEY 기반)
    insert_sql = text("""
    INSERT IGNORE INTO health_state_log (asset_id, t_index, state_value, source)
    VALUES (:asset_id, :t_index, :state_value, :source)
    """)

    rows = df.to_dict(orient="records")
    with engine.begin() as conn:
        conn.execute(insert_sql, rows)


# 실행 (상태 로그)
df_nasa = load_state_csv("../data_csv/nasa_core.csv", "nasa")
df_lib  = load_state_csv("../data_csv/libattery_core.csv", "libattery")
df_syn  = load_state_csv("../data_csv/synthetic_degradation_core.csv", "synthetic")
df_hlth = load_state_csv("../data_csv/health_timeseries_core_state.csv", "health")

insert_health_state_log(df_nasa)
insert_health_state_log(df_lib)
insert_health_state_log(df_syn)
insert_health_state_log(df_hlth)

print("Cell B done")

Cell B done


예측 로그 적재 — Core3 output CSV들 읽어서 append (중복 제거 포함)

In [7]:
def load_pred_csv(csv_path: str, model_tag: str) -> pd.DataFrame:
    df = pd.read_csv(csv_path)

    # 필수 컬럼 강제
    need = ["asset_id", "t_index", "y_pred", "error_std"]
    for c in need:
        if c not in df.columns:
            raise KeyError(f"[{csv_path}] missing column: {c}")

    df["asset_id"] = df["asset_id"].astype(str)
    df["t_index"] = pd.to_numeric(df["t_index"], errors="coerce").astype("Int64")
    df["y_pred"] = pd.to_numeric(df["y_pred"], errors="coerce")
    df["error_std"] = pd.to_numeric(df["error_std"], errors="coerce")

    df = df.dropna(subset=["asset_id", "t_index", "y_pred", "error_std"])
    df["t_index"] = df["t_index"].astype(int)

    df["model_tag"] = model_tag
    df = df[["asset_id", "t_index", "y_pred", "error_std", "model_tag"]]
    df = df.drop_duplicates(subset=["asset_id", "t_index", "model_tag"], keep="first")

    return df


def insert_prediction_log(df: pd.DataFrame):
    # 기존 중복 정리
    dedup_sql = """
    DELETE t1 FROM prediction_log t1
    JOIN prediction_log t2
      ON t1.asset_id = t2.asset_id
     AND t1.t_index  = t2.t_index
     AND t1.model_tag = t2.model_tag
     AND t1.id > t2.id;
    """
    with engine.begin() as conn:
        conn.execute(text(dedup_sql))

    insert_sql = text("""
    INSERT IGNORE INTO prediction_log (asset_id, t_index, y_pred, error_std, model_tag)
    VALUES (:asset_id, :t_index, :y_pred, :error_std, :model_tag)
    """)

    rows = df.to_dict(orient="records")
    with engine.begin() as conn:
        conn.execute(insert_sql, rows)


# core3_output 폴더의 *_pred.csv 전부 적재
pred_dir = "../data_csv/core3_output"
pred_files = [f for f in os.listdir(pred_dir) if f.endswith("_pred.csv")]

for f in pred_files:
    model_tag = f.replace("_pred.csv", "")  # 파일명 = model_tag 규칙
    dfp = load_pred_csv(os.path.join(pred_dir, f), model_tag)
    insert_prediction_log(dfp)

print("Cell C done:", len(pred_files), "files")

Cell C done: 7 files


Rule-based action 생성 — prediction_log 기반 df_action 생성

In [8]:
def decide_insurance_action(error_std: float):
    if error_std < 0.3:
        return "approve", "low uncertainty"
    elif error_std < 0.7:
        return "watch", "medium uncertainty"
    else:
        return "deny", "high uncertainty"


# prediction_log 전체에서 action 생성 (원하면 WHERE로 model_tag 제한 가능)
df_pred_all = pd.read_sql(
    text("SELECT asset_id, t_index, error_std FROM prediction_log"),
    con=engine
)

df_action = df_pred_all.copy()
actions = df_action["error_std"].apply(lambda x: decide_insurance_action(float(x)))
df_action["action"] = actions.apply(lambda x: x[0])
df_action["reason"] = actions.apply(lambda x: x[1])

df_action = df_action[["asset_id", "t_index", "action", "reason"]]
df_action = df_action.drop_duplicates(subset=["asset_id", "t_index", "action"], keep="first")

print("Cell D done:", len(df_action))

Cell D done: 2421


action_log 적재 + 샘플 JOIN 20행 출력 (중복 제거 포함)

In [9]:
def insert_action_log(df: pd.DataFrame):
    # 기존 중복 정리
    dedup_sql = """
    DELETE t1 FROM insurance_action_log t1
    JOIN insurance_action_log t2
      ON t1.asset_id = t2.asset_id
     AND t1.t_index  = t2.t_index
     AND t1.action   = t2.action
     AND t1.id > t2.id;
    """
    with engine.begin() as conn:
        conn.execute(text(dedup_sql))

    insert_sql = text("""
    INSERT IGNORE INTO insurance_action_log (asset_id, t_index, action, reason)
    VALUES (:asset_id, :t_index, :action, :reason)
    """)

    rows = df.to_dict(orient="records")
    with engine.begin() as conn:
        conn.execute(insert_sql, rows)


insert_action_log(df_action)

# 샘플 JOIN 20행
df_join = pd.read_sql(text("""
SELECT
  h.asset_id, h.t_index,
  h.source, h.state_value,
  p.model_tag, p.y_pred, p.error_std,
  a.action, a.reason
FROM health_state_log h
JOIN prediction_log p
  ON h.asset_id = p.asset_id AND h.t_index = p.t_index
JOIN insurance_action_log a
  ON h.asset_id = a.asset_id AND h.t_index = a.t_index
ORDER BY h.asset_id, h.t_index
LIMIT 20;
"""), con=engine)

print(df_join)

print("Cell E done")

   asset_id  t_index     source  state_value           model_tag    y_pred  \
0        46      134  libattery     0.000000    B_libattery_lstm  0.226006   
1        46      134  libattery     0.000000  B_libattery_linear  0.137980   
2        46      135  libattery     0.000000  B_libattery_linear  0.142323   
3        46      135  libattery     0.000000    B_libattery_lstm  0.122719   
4        46      136  libattery     0.794855  B_libattery_linear -0.215880   
5        46      136  libattery     0.794855    B_libattery_lstm -0.125099   
6        46      137  libattery     0.794855    B_libattery_lstm -0.137751   
7        46      137  libattery     0.794855  B_libattery_linear -0.211997   
8        46      138  libattery     0.794855    B_libattery_lstm -0.103694   
9        46      138  libattery     0.794855  B_libattery_linear -0.067815   
10       46      139  libattery     0.794855  B_libattery_linear -0.071161   
11       46      139  libattery     0.794855    B_libattery_lstm

In [10]:
import os
os.makedirs("../core4_output", exist_ok=True)

# 1️⃣ action_log 적재 (이미 만든 df_action 사용)
insert_action_log(df_action)

# 2️⃣ Core4 전체 JOIN 결과 DataFrame 생성
df_core4 = pd.read_sql(
    text("""
    SELECT
      h.asset_id,
      h.t_index,
      h.source,
      h.state_value,
      p.model_tag,
      p.y_pred,
      p.error_std,
      a.action,
      a.reason
    FROM health_state_log h
    JOIN prediction_log p
      ON h.asset_id = p.asset_id
     AND h.t_index  = p.t_index
    JOIN insurance_action_log a
      ON h.asset_id = a.asset_id
     AND h.t_index  = a.t_index
    ORDER BY h.asset_id, h.t_index
    """),
    con=engine
)

# 3️⃣ Core4 최종 산출물 저장 (Core5 입력)
df_core4.to_csv(
    "../core4_output/core4_state_prediction_action_log.csv",
    index=False
)

# 4️⃣ 확인용 최소 출력 (20행만)
df_core4.head(20)

Unnamed: 0,asset_id,t_index,source,state_value,model_tag,y_pred,error_std,action,reason
0,46,134,libattery,0.0,B_libattery_lstm,0.226006,0.150855,approve,low uncertainty
1,46,134,libattery,0.0,B_libattery_linear,0.13798,0.153604,approve,low uncertainty
2,46,135,libattery,0.0,B_libattery_linear,0.142323,0.153604,approve,low uncertainty
3,46,135,libattery,0.0,B_libattery_lstm,0.122719,0.150855,approve,low uncertainty
4,46,136,libattery,0.794855,B_libattery_lstm,-0.125099,0.150855,approve,low uncertainty
5,46,136,libattery,0.794855,B_libattery_linear,-0.21588,0.153604,approve,low uncertainty
6,46,137,libattery,0.794855,B_libattery_linear,-0.211997,0.153604,approve,low uncertainty
7,46,137,libattery,0.794855,B_libattery_lstm,-0.137751,0.150855,approve,low uncertainty
8,46,138,libattery,0.794855,B_libattery_lstm,-0.103694,0.150855,approve,low uncertainty
9,46,138,libattery,0.794855,B_libattery_linear,-0.067815,0.153604,approve,low uncertainty
