# 高壓二段/三段時間電價

#### 參數與套件

In [None]:
import pandas as pd
from datetime import time

# ====== 你要改的參數 ======
HOURLY_CSV = "宿舍.csv"                 # 你的每小時用電（wide 或 long 都可）
METER_TO_FEEDER_CSV = "meter_to_feeder.csv"  # 方案2：meter對應feeder（可多feeder）
OFFPEAK_DAYS_CSV = "offpeak_days.csv"        # 方案3：離峰日清單（建議）
DEFAULT_FEEDER_ID = "DEFAULT_FEEDER"         # 若 mapping 缺值，會用這個兜底

OUT_MONTHLY = "bill_monthly.csv"
OUT_DETAIL_2TOU = "bill_detail_2tou.csv"
OUT_DETAIL_3TOU = "bill_detail_3tou.csv"


#### 定義：夏月、離峰日、日別

In [None]:
def is_summer(d):
    y = d.year
    start = pd.Timestamp(year=y, month=5, day=16).date()
    end   = pd.Timestamp(year=y, month=10, day=15).date()
    return start <= d <= end

def load_offpeak_days(path):
    """
    offpeak_days.csv 欄位：date,is_offpeak_day,note(可選)
    date: YYYY-MM-DD
    is_offpeak_day: 1 表示該日比照週日（全日離峰）
    """
    if not path:
        return set()
    df = pd.read_csv(path)
    df["date"] = pd.to_datetime(df["date"]).dt.date
    df = df[df["is_offpeak_day"] == 1]
    return set(df["date"].tolist())

def day_type(d, offpeak_days):
    """
    回傳：offpeak_day / sat / wkday
    offpeak_day：週日 或 offpeak_days 清單內的日期
    """
    if d in offpeak_days:
        return "offpeak_day"
    wd = pd.Timestamp(d).weekday()  # Mon=0 ... Sun=6
    if wd == 6:
        return "offpeak_day"
    if wd == 5:
        return "sat"
    return "wkday"


#### 定義：二段式/三段式 時段分類

In [None]:
def is_summer(d):
    y = d.year
    start = pd.Timestamp(year=y, month=5, day=16).date()
    end   = pd.Timestamp(year=y, month=10, day=15).date()
    return start <= d <= end

def load_offpeak_days(path):
    """
    offpeak_days.csv 欄位：date,is_offpeak_day,note(可選)
    date: YYYY-MM-DD
    is_offpeak_day: 1 表示該日比照週日（全日離峰）
    """
    if not path:
        return set()
    df = pd.read_csv(path)
    df["date"] = pd.to_datetime(df["date"]).dt.date
    df = df[df["is_offpeak_day"] == 1]
    return set(df["date"].tolist())

def day_type(d, offpeak_days):
    """
    回傳：offpeak_day / sat / wkday
    offpeak_day：週日 或 offpeak_days 清單內的日期
    """
    if d in offpeak_days:
        return "offpeak_day"
    wd = pd.Timestamp(d).weekday()  # Mon=0 ... Sun=6
    if wd == 6:
        return "offpeak_day"
    if wd == 5:
        return "sat"
    return "wkday"


#### 電價

In [None]:
# 高壓供電（流動電費）— 二段式
RATES_2TOU = {
    ("summer", "peak"): 6.75,
    ("nonsummer", "peak"): 6.37,
    ("summer", "offpeak"): 2.71,
    ("nonsummer", "offpeak"): 2.46,
    ("summer", "sat_semipeak"): 2.77,
    ("nonsummer", "sat_semipeak"): 2.54,
}

# 高壓供電（流動電費）— 三段式（尖峰時間固定）
RATES_3TOU = {
    ("summer", "peak"): 9.39,
    ("summer", "semipeak"): 5.85,
    ("nonsummer", "semipeak"): 5.47,
    ("summer", "offpeak"): 2.53,
    ("nonsummer", "offpeak"): 2.32,
    ("summer", "sat_semipeak"): 2.60,
    ("nonsummer", "sat_semipeak"): 2.41,
}



#### 載入用電

In [None]:
def load_hourly_any_format(path, default_feeder_id="DEFAULT_FEEDER", meter_to_feeder_path=None):
    df = pd.read_csv(path)

    # ---- 判斷 long format：有 timestamp & kwh ----
    cols_lower = {c.lower(): c for c in df.columns}
    if ("timestamp" in cols_lower and "kwh" in cols_lower) or ("timestamp" in df.columns and "kwh" in df.columns):
        if "timestamp" not in df.columns:
            df = df.rename(columns={cols_lower["timestamp"]: "timestamp"})
        if "kwh" not in df.columns:
            df = df.rename(columns={cols_lower["kwh"]: "kwh"})

        if "meter_id" not in df.columns:
            df["meter_id"] = "METER_1"
        if "feeder_id" not in df.columns:
            df["feeder_id"] = default_feeder_id

        out = df[["feeder_id", "meter_id", "timestamp", "kwh"]].copy()
        out["timestamp"] = pd.to_datetime(out["timestamp"])
        out["kwh"] = pd.to_numeric(out["kwh"], errors="coerce").fillna(0.0)
        return out

    # ---- 否則當 wide format：第一欄 Datetime，其他欄是 meter ----
    dt_col = None
    for cand in ["Datetime", "datetime", "DateTime", "timestamp", "Timestamp", "time", "Time"]:
        if cand in df.columns:
            dt_col = cand
            break
    if dt_col is None:
        dt_col = df.columns[0]  # fallback

    meters = [c for c in df.columns if c != dt_col]
    long_df = df.melt(id_vars=[dt_col], value_vars=meters, var_name="meter_id", value_name="kwh")
    long_df = long_df.rename(columns={dt_col: "timestamp"})
    long_df["timestamp"] = pd.to_datetime(long_df["timestamp"])
    long_df["kwh"] = pd.to_numeric(long_df["kwh"], errors="coerce").fillna(0.0)

    # ---- 方案2：meter -> feeder mapping ----
    if meter_to_feeder_path:
        mp = pd.read_csv(meter_to_feeder_path)
        if not {"meter_id", "feeder_id"}.issubset(mp.columns):
            raise ValueError("meter_to_feeder.csv 需要欄位：meter_id, feeder_id")
        long_df = long_df.merge(mp[["meter_id", "feeder_id"]], on="meter_id", how="left")
        long_df["feeder_id"] = long_df["feeder_id"].fillna(default_feeder_id)
    else:
        long_df["feeder_id"] = default_feeder_id

    return long_df[["feeder_id", "meter_id", "timestamp", "kwh"]]


raw = load_hourly_any_format(
    HOURLY_CSV,
    default_feeder_id=DEFAULT_FEEDER_ID,
    meter_to_feeder_path=METER_TO_FEEDER_CSV
)

raw.head()


#### 載入用電日清單

In [None]:
offpeak_days = load_offpeak_days(OFFPEAK_DAYS_CSV)
len(offpeak_days), list(sorted(offpeak_days))[:5]


#### 計算

In [None]:
# 先把同一 feeder 底下多個 meter 在同一小時加總
df = raw.groupby(["feeder_id", "timestamp"], as_index=False)["kwh"].sum()
df["ym"] = df["timestamp"].dt.to_period("M").astype(str)

# --- 2-TOU ---
s2 = df.copy()
s2[["season2", "bucket2"]] = s2["timestamp"].apply(lambda x: pd.Series(classify_2tou(x, offpeak_days)))
s2["rate2"] = [RATES_2TOU[(a, b)] for a, b in zip(s2["season2"], s2["bucket2"])]
s2["cost2"] = s2["kwh"] * s2["rate2"]

# --- 3-TOU ---
s3 = df.copy()
s3[["season3", "bucket3"]] = s3["timestamp"].apply(lambda x: pd.Series(classify_3tou(x, offpeak_days)))
s3["rate3"] = [RATES_3TOU[(a, b)] for a, b in zip(s3["season3"], s3["bucket3"])]
s3["cost3"] = s3["kwh"] * s3["rate3"]

# 每月總額（每 feeder）
monthly_2 = s2.groupby(["feeder_id", "ym"], as_index=False).agg(
    two_tou_kwh=("kwh", "sum"),
    two_tou_cost_ntd=("cost2", "sum"),
)
monthly_3 = s3.groupby(["feeder_id", "ym"], as_index=False).agg(
    three_tou_kwh=("kwh", "sum"),
    three_tou_cost_ntd=("cost3", "sum"),
)

monthly = monthly_2.merge(monthly_3, on=["feeder_id", "ym"], how="outer").sort_values(["feeder_id", "ym"])

# 細項（分 bucket）
detail_2 = s2.groupby(["feeder_id", "ym", "season2", "bucket2"], as_index=False).agg(
    kwh=("kwh", "sum"),
    cost_ntd=("cost2", "sum"),
)
detail_3 = s3.groupby(["feeder_id", "ym", "season3", "bucket3"], as_index=False).agg(
    kwh=("kwh", "sum"),
    cost_ntd=("cost3", "sum"),
)

monthly.head()


#### 輸出

In [None]:
monthly.to_csv(OUT_MONTHLY, index=False, encoding="utf-8-sig")
detail_2.to_csv(OUT_DETAIL_2TOU, index=False, encoding="utf-8-sig")
detail_3.to_csv(OUT_DETAIL_3TOU, index=False, encoding="utf-8-sig")

print("Saved:")
print(" -", OUT_MONTHLY)
print(" -", OUT_DETAIL_2TOU)
print(" -", OUT_DETAIL_3TOU)
