In [5]:
# ============================================================
# 딥러닝(Pytorch TabTransformer-style)로 "시간대별 오늘 매출" 예측
# 입력: 오늘 요일 + 기온(T1H) + 1시간강수(RN1)
# 출력: HOUR별 오늘_예측매출 (XGBoost 출력 형태와 동일)
# ============================================================

import pandas as pd
import numpy as np
import requests
from datetime import datetime, timedelta, timezone

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import os
from dotenv import load_dotenv
load_dotenv('./../01_python/.env')

# =========================
# 0) 설정
# =========================
CSV_PATH = "SUWON_S_DATA_TABLE_GENDER_SUM.csv"

# 기상청 초단기실황(공공데이터포털) - 필요시 사용
SERVICE_KEY = os.getenv('RAIN_ID')
NX, NY = 60, 121  # 수원 격자좌표(필요시 수정)
ULTRA_URL = "https://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getUltraSrtNcst"

KST = timezone(timedelta(hours=9))

# =========================
# 1) 오늘 요일 코드 (월=1 ... 일=7)
# =========================
def today_day_code():
    return datetime.now(KST).weekday() + 1

# =========================
# 2) 초단기실황(T1H, RN1) 가져오기
#    - API 갱신 지연 고려해서 직전 정시로 base_time 선택
# =========================
def latest_base_date_time(now_kst: datetime):
    base = now_kst.replace(minute=0, second=0, microsecond=0)
    if now_kst.minute < 40:
        base -= timedelta(hours=1)
    return base.strftime("%Y%m%d"), base.strftime("%H%M")

def fetch_weather_ultra_ncst(service_key: str, nx: int, ny: int):
    now = datetime.now(KST)
    base_date, base_time = latest_base_date_time(now)

    params = {
        "serviceKey": service_key,
        "dataType": "JSON",
        "numOfRows": 1000,
        "pageNo": 1,
        "base_date": base_date,
        "base_time": base_time,
        "nx": nx,
        "ny": ny
    }

    r = requests.get(ULTRA_URL, params=params, timeout=20)
    r.raise_for_status()
    items = r.json()["response"]["body"]["items"]["item"]

    temp, rain = None, 0.0
    for it in items:
        if it["category"] == "T1H":
            temp = float(it["obsrValue"])
        elif it["category"] == "RN1":
            try:
                rain = float(it["obsrValue"])
            except:
                rain = 0.0

    if temp is None:
        raise RuntimeError("기온(T1H) 값을 못 가져왔습니다. NX/NY 또는 base_time을 확인하세요.")

    return temp, rain, base_date, base_time

# =========================
# 3) 시간대별(HOUR) 일매출 데이터셋 생성
# =========================
def build_hourly_daily(df: pd.DataFrame, hour: int) -> pd.DataFrame:
    sub = df[df["HOUR"] == hour].copy()
    sub["TA_YMD"] = pd.to_datetime(sub["TA_YMD"], format="%Y%m%d")

    # 시간대 고정 상태에서 "일별"로 매출/날씨/요일 집계
    daily = sub.groupby("TA_YMD", as_index=False).agg(
        AMT_sum=("AMT", "sum"),
        DAY_mode=("DAY", lambda x: int(pd.Series(x).mode()[0])),
        TEMP_mean=("TEMP", "mean"),
        RAIN_sum=("RAIN", "sum"),
    )
    # 날짜 파생(선택: month 넣으면 성능 좋아지는 경우 많음)
    daily["month"] = daily["TA_YMD"].dt.month

    return daily.sort_values("TA_YMD").reset_index(drop=True)

# =========================
# 4) Dataset
# =========================
class TabDataset(Dataset):
    def __init__(self, x_cat, x_num, y):
        self.x_cat = torch.tensor(x_cat, dtype=torch.long)
        self.x_num = torch.tensor(x_num, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32).view(-1, 1)

    def __len__(self): return len(self.y)
    def __getitem__(self, idx):
        return self.x_cat[idx], self.x_num[idx], self.y[idx]

# =========================
# 5) TabTransformer-style 회귀 모델
#    - 범주: DAY(7), month(12) -> Embedding -> TransformerEncoder
#    - 수치: TEMP, RAIN -> concat -> MLP
# =========================
class TabTransformerRegressor(nn.Module):
    def __init__(self, cat_cardinalities, num_dim, emb_dim=16, nhead=4, nlayers=2, ff_dim=128, dropout=0.1):
        super().__init__()
        self.embs = nn.ModuleList([nn.Embedding(card, emb_dim) for card in cat_cardinalities])

        enc = nn.TransformerEncoderLayer(
            d_model=emb_dim, nhead=nhead, dim_feedforward=ff_dim, dropout=dropout, batch_first=True
        )
        self.tr = nn.TransformerEncoder(enc, num_layers=nlayers)

        self.mlp = nn.Sequential(
            nn.Linear(emb_dim + num_dim, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
        )

    def forward(self, x_cat, x_num):
        # x_cat: (B, n_cat)
        tokens = torch.stack([emb(x_cat[:, i]) for i, emb in enumerate(self.embs)], dim=1)  # (B, n_cat, emb)
        z = self.tr(tokens).mean(dim=1)  # (B, emb)
        return self.mlp(torch.cat([z, x_num], dim=1))

# =========================
# 6) 시간대별 모델 학습 함수
# =========================
def train_hour_model(daily: pd.DataFrame, epochs=40, batch_size=64, lr=1e-3):
    # 범주형 인덱스: DAY 1~7 -> 0~6, month 1~12 -> 0~11
    day_idx = (daily["DAY_mode"].astype(int).clip(1, 7) - 1).values
    mon_idx = (daily["month"].astype(int).clip(1, 12) - 1).values
    x_cat = np.vstack([day_idx, mon_idx]).T  # (N,2)

    # 수치형
    x_num = daily[["TEMP_mean", "RAIN_sum"]].values.astype(np.float32)  # (N,2)
    y = daily["AMT_sum"].values.astype(np.float32)

    # 시간순 split
    n = len(daily)
    split = int(n * 0.8)
    tr_ds = TabDataset(x_cat[:split], x_num[:split], y[:split])
    te_ds = TabDataset(x_cat[split:], x_num[split:], y[split:])

    tr_loader = DataLoader(tr_ds, batch_size=batch_size, shuffle=True)
    te_loader = DataLoader(te_ds, batch_size=batch_size, shuffle=False)

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = TabTransformerRegressor(cat_cardinalities=[7, 12], num_dim=2).to(device)

    opt = torch.optim.AdamW(model.parameters(), lr=lr)
    loss_fn = nn.SmoothL1Loss()  # Huber

    for _ in range(epochs):
        model.train()
        for xc, xn, yy in tr_loader:
            xc, xn, yy = xc.to(device), xn.to(device), yy.to(device)
            pred = model(xc, xn)
            loss = loss_fn(pred, yy)
            opt.zero_grad()
            loss.backward()
            opt.step()

    # (선택) 간단 MAE 평가
    model.eval()
    maes = []
    with torch.no_grad():
        for xc, xn, yy in te_loader:
            xc, xn, yy = xc.to(device), xn.to(device), yy.to(device)
            pred = model(xc, xn)
            maes.append(torch.mean(torch.abs(pred - yy)).item())
    test_mae = float(np.mean(maes)) if len(maes) else float("nan")

    return model, test_mae

def predict_hour(model, day_code: int, month: int, temp: float, rain_sum: float) -> float:
    device = next(model.parameters()).device
    # 인덱싱
    day_idx = max(1, min(7, int(day_code))) - 1
    mon_idx = max(1, min(12, int(month))) - 1

    xc = torch.tensor([[day_idx, mon_idx]], dtype=torch.long, device=device)
    xn = torch.tensor([[float(temp), float(rain_sum)]], dtype=torch.float32, device=device)

    model.eval()
    with torch.no_grad():
        return float(model(xc, xn).cpu().numpy().ravel()[0])

# =========================
# 7) 메인: 시간대별 오늘 매출 예측
# =========================
def predict_today_sales_by_hour_deep(use_kma_api=True, manual_temp=None, manual_rain=None):
    df = pd.read_csv(CSV_PATH)

    # 오늘 입력값
    day = today_day_code()
    month = datetime.now(KST).month

    if use_kma_api:
        temp, rain, base_date, base_time = fetch_weather_ultra_ncst(SERVICE_KEY, NX, NY)
        print(f"딥러닝\n오늘 요일={day}, 기온={temp}°C, 강수={rain}mm (기상기준 {base_date} {base_time})\n")
    else:
        if manual_temp is None or manual_rain is None:
            raise ValueError("use_kma_api=False이면 manual_temp, manual_rain을 넣어주세요.")
        temp, rain = float(manual_temp), float(manual_rain)
        print(f"딥러닝\n오늘 요일={day}, 기온={temp}°C, 강수={rain}mm (수동입력)\n")

    results = []

    for hour in sorted(df["HOUR"].unique()):
        daily_h = build_hourly_daily(df, int(hour))

        # 데이터가 너무 적으면 스킵
        if len(daily_h) < 80:
            continue

        model_h, test_mae = train_hour_model(daily_h, epochs=40, batch_size=64, lr=1e-3)
        pred = predict_hour(model_h, day_code=day, month=month, temp=temp, rain_sum=rain)

        results.append({
            "HOUR": int(hour),
            "오늘_예측매출": float(pred),
            "test_MAE(원)": float(test_mae)
        })

    out = pd.DataFrame(results).sort_values("HOUR").reset_index(drop=True)
    return out

# 실행:
# 1) 기상청 API 사용
df_pred = predict_today_sales_by_hour_deep(use_kma_api=True)
print(df_pred[["HOUR","오늘_예측매출"]])

# 2) API 막히면 수동 입력으로 테스트
# df_pred = predict_today_sales_by_hour_deep(use_kma_api=False, manual_temp=5.3, manual_rain=0.0)
# print(df_pred[["HOUR","오늘_예측매출"]])


딥러닝
오늘 요일=3, 기온=4.5°C, 강수=0.0mm (기상기준 20251224 1600)

   HOUR        오늘_예측매출
0     1  280397.031250
1     2  320313.531250
2     3  304442.250000
3     4  263437.750000
4     5  247314.359375
5     6  304269.156250
6     7  238343.187500
7     8  238982.078125
8     9  323476.125000
9    10  333195.968750
