In [1]:
import pandas as pd
import numpy as np
import os
from pathlib import Path
from typing import Dict, List, Optional, Any

from statsmodels.tsa.stattools import adfuller, kpss

import datetime
from datetime import timedelta
# !pip install holidays
import holidays

import warnings
from statsmodels.tools.sm_exceptions import InterpolationWarning
from statsmodels.tsa.stattools import kpss

import matplotlib.pyplot as plt

In [2]:
os.chdir("/content/drive/MyDrive/3. Grad School/LG Aimers")

# 데이터 불러오기
data = pd.read_csv("DATA/train/train.csv")

In [None]:
cols =  ["year_enc", "month_sin", "month_cos", "day_sin", "day_cos", "weekday_sin", "weekday_cos", "season", "is_holiday", "is_holiday_sandwich", "month_weight", "week_weight", "prev_avg_7", "prev_avg_14"]
enc_cols = ["season", "is_holiday", "is_holiday_sandwich"]
# lstm에서는 minmaxscaling 해주는 게 안전
num_cols = ["month_weight", "week_weight", "prev_avg_7", "prev_avg_14"]

In [4]:
class TimeSeries():
    def __init__(self, data, seasonal_period = 12, max_d = 3, max_D = 1, try_seasonal : bool = False):
        self.data = data # 전체 데이터
        self.seasonal_period = seasonal_period # 계절성 주기
        self.max_d = max_d # 일반 차분 최대 횟수
        self.max_D = max_D # 계절 차분 최대 횟수
        self.try_seasonal = try_seasonal # 계절 차분도 진행할지

    def check_stationarity(self, series : pd.DataFrame, alpha = 0.05):
        """메뉴 하나의 series 받아와서 ADF, KPSS 검정 실시"""
        series = series.dropna()
        # ADF Test
        adf_p = adfuller(series)[1]
        # KPSS Test
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", InterpolationWarning)
            kpss_p = kpss(series, regression='c', nlags='auto')[1]
        # 판정
        adf_result = "정상" if adf_p < alpha else "비정상"
        kpss_result = "정상" if kpss_p >= alpha else "비정상"

        return adf_p, adf_result, kpss_p, kpss_result

    def is_stationary(self, adf_result, kpss_result):
        """메뉴 하나의 series 받아와서 검정 결과 출력"""
        return (adf_result == "정상") and (kpss_result == "정상")

    def diff_once(self, series : pd.DataFrame, lag : int = 1):
        """메뉴 하나의 series 받아와서 일반 차분 1회 실시"""
        return series.diff(lag)

    def stationarize_one_menu(self, menu, series : pd.DataFrame):
        """메뉴 하나의 series 받아와서 -> 검정 실시 -> 비정상이면 차분 1회씩 실시 (필요 시 계절 차분도 실시) -> 정상이 될 때까지!"""
        # 기존 행 순서 보존을 위해 index 기억
        original_index = series.index
        df = series.sort_values("영업일자")
        y = pd.to_numeric(df['매출수량'], errors = 'coerce')

        log = {
            "key" : df['영업장명_메뉴명'].iloc[0] if len(df) else None,
            "initial_len": len(df),
            "steps": [],
            "final_d": 0,
            "final_D": 0,
            "final_stationary": False
        }

        # 1차 검정
        _, adf_result, _, kpss_result = self.check_stationarity(y)
        log["steps"].append({"step": "1차 검정", "d" : 0, "D" : 0, "adf": adf_result, "kpss": kpss_result})
        d_used, D_used = 0, 0
        y_diff = y.copy()

        # 둘 중 하나라도 비정상이라면
        while not self.is_stationary(adf_result, kpss_result):
            # 일반 차분
            if d_used < self.max_d:
                d_used += 1
                y_diff = self.diff_once(y_diff, lag = 1)
            elif self.try_seasonal and D_used < self.max_D:
                # 계절 차분
                D_used += 1
                y_diff = self.diff_once(y_diff, lag = self.seasonal_period)
            else:
                print(f"[{menu}] 정상화 실패...")
                break

            _, adf_result, _, kpss_result = self.check_stationarity(y_diff)
            log["steps"].append({"d": d_used, "D": D_used, "adf": adf_result, "kpss": kpss_result})

            # 데이터 너무 줄어들면 stop
            if y_diff.dropna().shape[0] < 10:
                break

        log["final_d"] = d_used
        log["final_D"] = D_used
        log["final_stationary"] = self.is_stationary(adf_result, kpss_result)

        df['매출수량'] = y_diff
        df = df.loc[original_index]

        return df, log

    def stationarize_all_menu(self, data):
        """전체 데이터에 대해 메뉴별로 정상화 진행 -> (정상화된 데이터, 메뉴별 로그)"""
        diffed = []
        logs = []
        for menu, group_df in data.groupby("영업장명_메뉴명"):
            df, log = self.stationarize_one_menu(menu, group_df)
            diffed.append(df)
            logs.append(log)

        output = pd.concat(diffed, axis = 0).loc[self.data.index]
        log_df = pd.DataFrame(logs)

        return output, log_df

In [5]:
ts = TimeSeries(data = data, seasonal_period = 12, max_d = 3, max_D = 1, try_seasonal = True)
df_stationary, log_df = ts.stationarize_all_menu(data = data)

In [13]:
df_stationary

Unnamed: 0,영업일자,영업장명_메뉴명,매출수량
0,2023-01-01,느티나무 셀프BBQ_1인 수저세트,
1,2023-01-02,느티나무 셀프BBQ_1인 수저세트,0.0
2,2023-01-03,느티나무 셀프BBQ_1인 수저세트,0.0
3,2023-01-04,느티나무 셀프BBQ_1인 수저세트,0.0
4,2023-01-05,느티나무 셀프BBQ_1인 수저세트,0.0
...,...,...,...
102671,2024-06-11,화담숲카페_현미뻥스크림,12.0
102672,2024-06-12,화담숲카페_현미뻥스크림,10.0
102673,2024-06-13,화담숲카페_현미뻥스크림,14.0
102674,2024-06-14,화담숲카페_현미뻥스크림,12.0


In [18]:
log_df

Unnamed: 0,key,initial_len,steps,final_d,final_D,final_stationary
0,느티나무 셀프BBQ_1인 수저세트,532,"[{'step': '1차 검정', 'd': 0, 'D': 0, 'adf': '비정상...",1,0,True
1,느티나무 셀프BBQ_BBQ55(단체),532,"[{'step': '1차 검정', 'd': 0, 'D': 0, 'adf': '정상'...",0,0,True
2,"느티나무 셀프BBQ_대여료 30,000원",532,"[{'step': '1차 검정', 'd': 0, 'D': 0, 'adf': '정상'...",0,0,True
3,"느티나무 셀프BBQ_대여료 60,000원",532,"[{'step': '1차 검정', 'd': 0, 'D': 0, 'adf': '정상'...",0,0,True
4,"느티나무 셀프BBQ_대여료 90,000원",532,"[{'step': '1차 검정', 'd': 0, 'D': 0, 'adf': '정상'...",0,0,True
...,...,...,...,...,...,...
188,화담숲카페_메밀미숫가루,532,"[{'step': '1차 검정', 'd': 0, 'D': 0, 'adf': '정상'...",0,0,True
189,화담숲카페_아메리카노 HOT,532,"[{'step': '1차 검정', 'd': 0, 'D': 0, 'adf': '비정상...",1,0,True
190,화담숲카페_아메리카노 ICE,532,"[{'step': '1차 검정', 'd': 0, 'D': 0, 'adf': '비정상...",1,0,True
191,화담숲카페_카페라떼 ICE,532,"[{'step': '1차 검정', 'd': 0, 'D': 0, 'adf': '비정상...",1,0,True


In [8]:
# 일반 차분 횟수
log_df.loc[0, 'final_d']

np.int64(1)

In [19]:
log_df['final_d'].value_counts()
# 2번 차분 안에 모든 데이터가 정상화!

Unnamed: 0_level_0,count
final_d,Unnamed: 1_level_1
1,117
0,74
2,2


In [9]:
# Validation 분리해두기
stores = data['영업장명_메뉴명'].unique()

def make_validation(data):
    dates = sorted(data['영업일자'].unique())
    n = int(len(dates) * 0.2)
    val_dates = dates[-n : ]
    val_mask = data['영업일자'].isin(val_dates)
    train = data[~val_mask]
    validation = data[val_mask]
    return train, validation

train, validation = make_validation(data)
print(f"Train : {train['영업일자'].min()} ~ {train['영업일자'].max()}")
print(f"Validation : {validation['영업일자'].min()} ~ {validation['영업일자'].max()}")

Train : 2023-01-01 ~ 2024-03-01
Validation : 2024-03-02 ~ 2024-06-15


#### 외생변수 없이

##### SARIMA

In [22]:
from itertools import product
from statsmodels.tsa.statespace.sarimax import SARIMAX

In [None]:
class SARIMA():
    def __init__(self, train_df = None, test_df = None, horizon = 7, context = 28, seasonal = 7,
                 p_range = range(0, 4), q_range = range(0, 4), d = 0, P_range = range(0, 3), Q_range = range(0, 3), D =1,
                 max_iter = 500, ic = "aic", enforce_stationarity = False, enforce_invertibility = False):
        self.train_df = train_df # 학습 데이터
        self.test_df = test_df # 추론 데이터
        self.horizon = horizon # 며칠 예측할 건지
        self.context = context # 추가할 데이터 (test)
        self.seasonal = seasonal # 일주일 단위.. 일단은
        self.p_range = p_range # p 탐색 범위
        self.q_range = q_range # q 탐색 범위
        self.d = d # 일반 차분 횟수 (이미 차분해서 넣었으니 0)
        self.P_range = P_range # P 탐색 범위
        self.Q_range = Q_range # Q 탐색 범위
        self.D = D # 계절 차분 횟수
        self.max_iter = max_iter # 최대 탐색 횟수
        self.ic = ic # 평가지표
        self.enforce_stationarity = enforce_stationarity
        self.enforce_invertibility = enforce_invertibility

        self.models = {}

    def order_search(self, y):
        """AIC 기준 그리드서치 (p, d, q) * (P, D, Q, m) 선택"""
        best = {"aic": np.inf, "order": None, "seasonal_order": None, "result": None}
        m = self.seasonal

        for p, q, P, Q in product(self.p_range, self.q_range, self.P_range, self.Q_range):
                try:
                    model = SARIMAX(
                        y,
                        order = (p, self.d, q),
                        seasonal_order = (P, self.D if m > 1 else 0, Q, m),
                        enforce_stationarity = self.enforce_stationarity,
                        enforce_invertibility = self.enforce_invertibility,
                        trend=None
                    )
                    res = model.fit(disp = False, maxiter = self.max_iter)
                    if res.aic < best["aic"]:
                        best = {
                            "aic": res.aic,
                            "order": (p, self.d, q),
                            "seasonal_order": (P, self.D if m > 1 else 0, Q, m),
                            "result": res
                        }
                except Exception:
                    continue

        if best["result"] is None:
            raise RuntimeError("SARIMA auto-order search failed.")
        return best

    def fit(self, train_df):
        """train_df 전체 받아서 메뉴별로 train"""
        self.models.clear()
        for menu, group_df in train_df.groupby("영업장명_메뉴명"):
            group_df['매출수량'] = group_df['매출수량'].fillna(0.0)
            y = (group_df.assign(**{'영업일자' : pd.to_datetime(group_df['영업일자'])})
                                 .set_index("영업일자")
                                 .sort_index()["매출수량"]
                                 .asfreq("D")
                                 .astype(float))

            best = self.order_search(y)
            self.models[menu] = {
                "order": best["order"],
                "seasonal_order": best["seasonal_order"],
                "result": best["result"]
            }
        return self.models

    def forecast_with_context(self, test_df, test_prefix, horizon = 7):
        """28일동안의 context 받아서 다음 7일 예측"""
        results = []

        for menu, context in test_df.groupby("영업장명_메뉴명"):
            if menu not in self.models:
                print(f"{menu} 학습 결과가 없어요..")
                continue

            context_y = (context.assign(**{'영업일자' : pd.to_datetime(context['영업일자'])})
                                            .set_index("영업일자")
                                            .sort_index()["매출수량"]
                                            .astype(float))

            res = self.models[menu]['result']

            try:
                res_context = res.append(context_y, refit = False)
            except Exception:
                # 날짜가 이어지지 않는다면.. 강제로 잇기
                train_end = res.model.data.row_labels[-1]
                need_start = pd.to_datetime(train_end) + pd.Timedelta(days=1)
                if context.index.min() > need_start:
                    filler = pd.Series(np.nan, index=pd.date_range(need_start, context_y.index.min() - pd.Timedelta(days = 1), freq = "D"))
                    context_y = pd.concat([filler, context_y])
                res_context = res.append(context_y, refit = False)

            forecast = res_context.get_forecast(steps = horizon)
            mean = forecast.predicted_mean

            pred_dates = [f"{test_prefix}+{i+1}일" for i in range(horizon)]

            for d, val in zip(pred_dates, forecast):
                results.append({
                    '영업일자': d,
                    '영업장명_메뉴명': menu,
                    '매출수량': val
                })

        return pd.DataFrame(results)

In [30]:
sarima = SARIMA()

sarima_models = sarima.fit(train_df = data)



KeyboardInterrupt: 

In [31]:
from itertools import product
from statsmodels.tsa.statespace.sarimax import SARIMAX

In [None]:
import re
import glob
all_preds = []

# 모든 test_*.csv 순회
test_files = sorted(glob.glob('DATA/test/TEST_*.csv'))

for path in test_files:
    test_df = pd.read_csv(path)

    # 파일명에서 접두어 추출 (예: TEST_00)
    filename = os.path.basename(path)
    test_prefix = re.search(r'(TEST_\d+)', filename).group(1)

    pred = sarima.forecast_with_context(test_df = test_df)

##### 외생변수 포함

###### 변수 생성

In [45]:
# 월(month) -> 계절 매핑 딕셔너리
month_to_season = {
    1: "Winter", 2: "Winter", 12: "Winter",
    3: "Spring", 4: "Spring", 5: "Spring",
    6: "Summer", 7: "Summer", 8: "Summer",
    9: "Autumn", 10: "Autumn", 11: "Autumn"}

season_weights = {
    "Winter" : 11.4,
    "Spring": 6.5,
    "Summer" : 6.3,
    "Autumn" : 18.4
}

# 월별 가중치 매핑
monthly_weights = {
    1: 2.2, 2: 1.8, 3: 0.3,
    4: 1.01, 5: 0.7, 6: 0.8,
    7: 0.5, 8: 0.5, 9: 0.8,
    10: 1.55, 11: 1.03, 12: 1.4}

# 요일별 가중치 매핑
weekly_weights = {
    "Monday": 0.78, "Tuesday": 0.85, "Wednesday": 0.81,
    "Thursday": 9.9, "Friday": 1.2, "Saturday": 1.53,
    "Sunday": 1.3}

In [46]:
class Make_Variables():
        def __init__(self, data = None, date = None, predict = 7, month_to_season = None, monthly_weights = None, weekly_weights = None):
            self.data = data
            self.date = date
            self.predict = predict
            self.month_to_season = month_to_season
            self.monthly_weights = monthly_weights
            self.weekly_weights = weekly_weights

        def update_kor_holidays(self):
            """국경일 추가"""
            kor_holidays = holidays.KR(years = [2023, 2024, 2025])
            kor_holidays.update({
                # datetime.date(2023,2,14) : "Valentine's Day",
                # datetime.date(2023,3,14) : "White Day",
                # datetime.date(2023,11,11) : "Pepero Day",
                # datetime.date(2024,2,14) : "Valentine's Day",
                # datetime.date(2024,3,14) : "White Day",
                # datetime.date(2024,11,11) : "Pepero Day",
                # datetime.date(2025,2,14) : "Valentine's Day",
                # datetime.date(2025,3,14) : "White Day",
                # datetime.date(2025,11,11) : "Pepero Day",

                datetime.date(2024,10,1) : "Temporary Holiday", # 국군의 날 임시공휴일
                datetime.date(2025,1,27) : "Temporary Holiday", # 설날 임시공휴일
                datetime.date(2025,3,3) : "Temporary Holiday", # 삼일절 대체공휴일
                datetime.date(2025, 5, 29) : "Election Period",
                datetime.date(2025, 5, 30) : "Election Period",
                datetime.date(2025, 6, 3) : "Presidential Election Day"})
            return kor_holidays

        def check_holidays(self, date, kor_holidays) -> int:
            """날짜 받아서 공휴일/주말 여부 출력"""
            # date = pd.Timestamp(date)
            if isinstance(date, pd.Series):
                check_holiday = date.dt.date.isin(kor_holidays)
                check_weekend = date.dt.weekday >= 5
            else:
                check_holiday = date.date() in kor_holidays
                check_weekend = date.weekday() >= 5
            is_holiday = (check_holiday | check_weekend)
            return is_holiday

        def get_sandwich_score(self, data, is_holiday_col) -> pd.DataFrame:
            """데이터프레임 기준으로 샌드위치 점수 계산"""
            data = data.reset_index(drop = True)
            data['is_sandwich'] = 0
            is_holiday = data[is_holiday_col].astype(int)
            for idx in range(len(data)):
                if idx == 0 or idx == len(data) - 1: # 첫날, 마지막 날
                    continue

                # 앞/뒤 하루씩 봤을 때 모두 휴일 -> 5점
                if (is_holiday.iloc[idx - 1] == 1) and (is_holiday.iloc[idx + 1] == 1): # 하루 전이랑 다음 날이 공휴일이면
                    data.iloc[idx, data.columns.get_loc('is_sandwich')] = 5

                # 앞/뒤 이틀씩 봤을 때 휴일 3일 -> 3점, 2일 -> 2점
                elif idx > 1 and idx < len(is_holiday) - 2: # 셋째날, 마지막에서 세 번째 날
                    start_idx = idx - 2
                    end_idx = idx + 2
                    nearby_holidays = (is_holiday.iloc[start_idx : end_idx + 1].sum() - is_holiday.iloc[idx])
                    if nearby_holidays == 3:
                        data.iloc[idx, data.columns.get_loc('is_sandwich')] = 3
                    elif nearby_holidays == 2:
                        data.iloc[idx, data.columns.get_loc('is_sandwich')] = 2
                    else:
                        data.iloc[idx, data.columns.get_loc('is_sandwich')] = 0
            return data

        def get_sandwich_score_for_dates(self, date, kor_holidays) -> int:
            """특정 날짜를 받아와서 앞뒤 날짜를 구하고, 샌드위치 점수 계산"""
            # 하루씩
            prev_date, next_date = date - timedelta(days = 1), date + timedelta(days = 1)
            prev_hol, next_hol = self.check_holidays(prev_date, kor_holidays), self.check_holidays(next_date, kor_holidays) # T/F Bool
            if prev_hol and next_hol: # 바로 다음 날들이 휴일이라면
                return 3
            days_offsets = [-2, -1, 1, 2] # 앞뒤로 이틀 살펴보기
            nearby_holidays = sum(self.check_holidays(date + timedelta(days = d), kor_holidays) for d in days_offsets)
            if nearby_holidays == 3: # 앞뒤 4일 중에 3일이 휴일이면
                return 2
            elif nearby_holidays == 2: # 앞뒤 4일 중에 2일이 휴일이면
                return 1
            else:
                return 0

        def get_season_weights(self, data = None, season_weights = season_weights):
            """계절별 가중치 부여"""
            # 데이터프레임 들어오면
            if data is not None:
                data['season_weight'] = data['season'].map(season_weights)
                return data

        def get_month_weights(self, data = None, monthly_weights = monthly_weights):
            """월별 가중치 부여"""
            # 데이터프레임 들어오면
            if data is not None:
                data['month_weight'] = data['month'].map(monthly_weights)
                return data

        def get_week_weights(self, data = None, weekly_weights = weekly_weights):
            """요일별 가중치 부여"""
            # 데이터프레임 들어오면
            if data is not None:
                data['week_weight'] = data['weekday'].map(weekly_weights)
                return data

        def get_prev_days(self, data, test_df = None, date = None, menu = None, howmany = 7):
            """
            일요일 날짜 받아와서 직전 주차의 일-토 매출수량 평균 계산
            주의 - test data에서 생성할 때는 참고할 데이터와 붙여넣을 데이터가 다름
            data : 참고할 데이터
            test_df : 참고할 데이터
            """
            if test_df is None:
                # 혹시 모르니까 검증
                if date.weekday() == 6:
                    # 이전 날짜들
                    prev_start = date - timedelta(days = howmany)
                    prev_end = date - timedelta(days = 1)
                    prev_data = data[(data['영업일자'] >= prev_start) & (data['영업일자'] <= prev_end) & (data['영업장명_메뉴명'] == menu)]
                    prev_avg = prev_data['매출수량'].mean()
                    prev_sd = prev_data['매출수량'].std()
                    # 첫 주 0으로 처리
                    if pd.isna(prev_avg):
                        prev_avg = 0
                    if pd.isna(prev_sd):
                        prev_sd = 0
                    week_end = date + timedelta(days = 6)
                    curr_mask = (data['영업일자'] >= date) & (data['영업일자'] <= week_end) & (data['영업장명_메뉴명'] == menu)
                    colname_mean = f"prev_avg_{howmany}"
                    colname_sd = f"prev_sd_{howmany}"
                    data.loc[curr_mask, colname_mean] = prev_avg
                    data.loc[curr_mask, colname_sd] = prev_sd
                    return data
                else:
                    return np.nan

            # test data라면
            else:
                # 혹시 모르니까 검증
                if date.weekday() == 6:
                    # 이전 날짜들
                    prev_start = date - timedelta(days = howmany)
                    prev_end = date - timedelta(days = 1)
                    prev_data = test_df[(test_df['영업일자'] >= prev_start) & (test_df['영업일자'] <= prev_end) & (test_df['영업장명_메뉴명'] == menu)]
                    prev_avg = prev_data['매출수량'].mean()
                    prev_sd = prev_data['매출수량'].std()
                    # 첫 주 0으로 처리
                    if pd.isna(prev_avg):
                        prev_avg = 0
                    if pd.isna(prev_sd):
                        prev_sd = 0
                    week_end = date + timedelta(days = 6)
                    curr_mask = (data['영업일자'] >= date) & (data['영업일자'] <= week_end) & (data['영업장명_메뉴명'] == menu)
                    colname_mean = f"prev_avg_{howmany}"
                    colname_sd = f"prev_sd_{howmany}"
                    data.loc[curr_mask, colname_mean] = prev_avg
                    data.loc[curr_mask, colname_sd] = prev_sd
                    return data
                else:
                    return np.nan


        # train, test 공통
        def make_fund_variables(self, data, month_to_season = month_to_season):
            # 영업일자 -> datetime
            data['영업일자'] = pd.to_datetime(data['영업일자'])

            # 연, 월, 일, 요일 분리
            data['year'] = data['영업일자'].dt.year
            data['month'] = data['영업일자'].dt.month
            data['day'] = data['영업일자'].dt.day
            data['weekday'] = data['영업일자'].dt.day_name()
            data['weekday_enc'] = data['영업일자'].dt.weekday

            # 계절 변수 생성
            data['season'] = data['month'].map(month_to_season)

            # 연도 차이 변수 생성
            data['year_enc'] = data['year'] - 2023

            # 월, 일, 요일 사이클릭 변환
            data['month_sin'] = np.sin(2 * np.pi * data['month'] / 12)
            data['month_cos'] = np.cos(2 * np.pi * data['month'] / 12)

            data['day_sin'] = np.sin(2 * np.pi * data['day'] / 31)
            data['day_cos'] = np.cos(2 * np.pi * data['day'] / 31)

            data['weekday_sin'] = np.sin(2 * np.pi * data['weekday_enc'] / 7)
            data['weekday_cos'] = np.cos(2 * np.pi * data['weekday_enc'] / 7)

            # 공휴일 확인
            kor_holidays = self.update_kor_holidays()
            check_holiday = data['영업일자'].dt.date.isin(kor_holidays)
            check_weekend = data['weekday'].isin(['Saturday', 'Sunday'])
            data['is_holiday'] = (check_holiday | check_weekend).astype(int) # 공휴일 + 주말
            data['holiday_name'] = data['영업일자'].dt.date.map(kor_holidays)

            return data

        # train의 입력 데이터
        def make_variables_train(self, data):
            data = self.make_fund_variables(data)
            kor_holidays = self.update_kor_holidays()

            ### 샌드위치 데이
            data = self.get_sandwich_score(data, 'is_holiday')

            # 샌드위치 - 첫날
            first = data['영업일자'].min()
            data.loc[data['영업일자'] == first, 'is_sandwich'] = self.get_sandwich_score_for_dates(first, kor_holidays)
            second = data['영업일자'].min() + timedelta(days = 1)
            data.loc[data['영업일자'] == second, 'is_sandwich'] = self.get_sandwich_score_for_dates(second, kor_holidays)

            # 샌드위치 - 마지막 날
            last = data['영업일자'].max()
            data.loc[data['영업일자'] == last, 'is_sandwich'] = self.get_sandwich_score_for_dates(last, kor_holidays)
            before = data['영업일자'].max() - timedelta(days = 1)
            data.loc[data['영업일자'] == before, 'is_sandwich'] = self.get_sandwich_score_for_dates(before, kor_holidays)

            # 샌드위치 포함한 공휴일
            data['is_holiday_sandwich'] = data['is_holiday'].astype(int) | (data['is_sandwich'] > 0).astype(int)

            ### 계절별 가중치
            data = self.get_season_weights(data, season_weights)

            ### 월별 가중치
            data = self.get_month_weights(data, monthly_weights)

            ### 요일별 가중치
            data = self.get_week_weights(data, weekly_weights)

            ### 직전 주차 평균
            sundays = data[data['weekday'] == "Sunday"][["영업일자", "영업장명_메뉴명"]].copy()
            for _, row in sundays.iterrows():
                date = row['영업일자']
                menu = row['영업장명_메뉴명']
                data = self.get_prev_days(data = data, date = date, menu = menu, howmany = 7)
                data = self.get_prev_days(data = data, date = date, menu = menu, howmany = 14)
                data = self.get_prev_days(data = data, date = date, menu = menu, howmany = 21)

            ### 영업장명, 메뉴명 분리
            if '영업장명_메뉴명' in data.columns:
                data[['영업장명', '메뉴명']] = data['영업장명_메뉴명'].str.split('_', expand = True)

            ### 음수 처리
            negative = data[data['매출수량'] < 0]

            for idx, row in negative.iterrows():
                num = row['매출수량']
                if num < -10:
                    date = row['영업일자']
                    menu = row['영업장명_메뉴명']
                    prev_date = pd.to_datetime(date) - pd.Timedelta(days = 1)
                    prev_row = data[(data['영업일자'] == prev_date) & (data['영업장명_메뉴명'] == menu)]

                    if prev_row.iloc[0]["매출수량"] >= abs(num):
                        data.loc[prev_row.index[0], '매출수량'] += num

            # 남은 건 전부 0으로
            data.loc[data['매출수량'] < 0, '매출수량'] = 0

            return data

        # 예측하고자 하는 날들
        def make_variables_test(self, date, test_df, predict):
            """
            date : 최종 날짜 (입력 7일 중 가장 마지막) - TimeStamp
            test_df : 예측할 때 참고해올 데이터 -> 이거로 직전 주차 평균 생성
            """
            date = pd.to_datetime(date)
            future_dates = [date + timedelta(days = i + 1) for i in range(predict)]
            future_df = pd.DataFrame({'영업일자' : future_dates})

            menus_df = (test_df[['영업장명_메뉴명']].drop_duplicates().reset_index(drop = True))
            future_df = future_df.merge(menus_df, how='cross')

            kor_holidays = self.update_kor_holidays()

            # 기본적인 변수들
            future_df = self.make_fund_variables(future_df)

            future_df['영업일자'] = pd.to_datetime(future_df['영업일자']).dt.normalize()

            # 샌드위치
            future_df['is_sandwich'] = future_df['영업일자'].apply(lambda d: self.get_sandwich_score_for_dates(d, kor_holidays))

             # 샌드위치 포함한 공휴일
            future_df['is_holiday_sandwich'] = future_df['is_holiday'].astype(int) | (future_df['is_sandwich'] > 0).astype(int)

            # 월별 가중치
            future_df = self.get_month_weights(future_df, monthly_weights)

            # 요일별 가중치
            future_df = self.get_week_weights(future_df, weekly_weights)

            # 직전 주차 평균 -> 이거는 test 까지 받아오고 생각해야 함..
            sundays =  future_df.loc[future_df['weekday'] == "Sunday", ['영업일자', '영업장명_메뉴명']].copy()
            for _, row in sundays.iterrows():
                date = row['영업일자']
                menu = row['영업장명_메뉴명']
                future_df = self.get_prev_days(data = future_df, test_df = test_df, date = date, menu = menu, howmany = 7)
                future_df = self.get_prev_days(data = future_df, test_df = test_df, date = date, menu = menu, howmany = 14)
                future_df = self.get_prev_days(data = future_df, test_df = test_df, date = date, menu = menu, howmany = 21)

            return future_df

In [35]:
# 데이터 불러오기
data = pd.read_pickle("/content/drive/MyDrive/3. Grad School/LG Aimers/DATA/train_data_new.pickle")

In [50]:
mv = Make_Variables()
data = mv.get_season_weights(data, season_weights)

###### 학습

In [32]:
class SARIMAX():
    def __init__(self, train_df = None, test_df = None, predict = 7):
        self.train_df = train_df
        self.test_df = test_df
        self.predict = predict

In [62]:
class SARIMAX_Model():
    def __init__(self, train_df = None, test_df = None, horizon = 7, context = 28, seasonal = 7,
                 p_range = range(0, 3), q_range = range(0, 3), d_range = range(0, 2), P_range = range(0, 2), Q_range = range(0, 2), D =1,
                 max_iter = 500, ic = "aic", enforce_stationarity = False, enforce_invertibility = False):
        self.train_df = train_df # 학습 데이터
        self.test_df = test_df # 추론 데이터
        self.horizon = horizon # 며칠 예측할 건지
        self.context = context # 추가할 데이터 (test)
        self.seasonal = seasonal # 일주일 단위.. 일단은
        self.p_range = p_range # p 탐색 범위
        self.q_range = q_range # q 탐색 범위
        self.d_range = d_range # 일반 차분 횟수
        self.P_range = P_range # P 탐색 범위
        self.Q_range = Q_range # Q 탐색 범위
        self.D = D # 계절 차분 횟수
        self.max_iter = max_iter # 최대 탐색 횟수
        self.ic = ic # 평가지표
        self.enforce_stationarity = enforce_stationarity
        self.enforce_invertibility = enforce_invertibility

        self.models = {}

    def get_xy(self, df, cols):
        """df 받아서 x(cols), y 분리"""
        df['영업일자'] = pd.to_datetime(df['영업일자'])
        df = df.sort_values('영업일자')
        y = (df.set_index("영업일자")["매출수량"]
                .asfreq("D")
                .fillna(0.0)
                .astype(float))

        x = (df.set_index("영업일자")[cols]
                .reindex(y.index)
                .fillna(0.0)
                .astype(float))
        return x, y

    def order_search(self, x : pd.DataFrame, y : pd.Series):
        """AIC 기준 그리드서치 (p, d, q) * (P, D, Q, m) 선택"""
        best = {"aic": np.inf, "order": None, "seasonal_order": None, "result": None}
        m = self.seasonal

        for p, q, d, P, Q in product(self.p_range, self.q_range, self.d_range, self.P_range, self.Q_range):
                print(f"Trying order: ({p}, {d}, {q}) x ({P}, {self.D if m > 1 else 0}, {Q}, {m})")
                try:
                    model = SARIMAX(
                        endog = y, exog = x,
                        order = (p, d, q),
                        seasonal_order = (P, self.D if m > 1 else 0, Q, m),
                        enforce_stationarity = self.enforce_stationarity,
                        enforce_invertibility = self.enforce_invertibility,
                        trend=None
                    )
                    res = model.fit(disp = False, maxiter = self.max_iter)
                    if res.aic < best["aic"]:
                        best = {
                            "aic": res.aic,
                            "order": (p, d, q),
                            "seasonal_order": (P, self.D if m > 1 else 0, Q, m),
                            "result": res
                        }
                except Exception:
                    continue

        if best["result"] is None:
            raise RuntimeError("SARIMA auto-order search failed.")
        return best

    def fit(self, train_df, cols):
        """train_df 전체 받아서 메뉴별로 train"""
        self.models.clear()
        for menu, group_df in train_df.groupby("영업장명_메뉴명"):
            group_df['매출수량'] = group_df['매출수량'].fillna(0.0)
            x, y = self.get_xy(group_df, cols)

            best = self.order_search(x, y)
            self.models[menu] = {
                "order": best["order"],
                "seasonal_order": best["seasonal_order"],
                "result": best["result"],
                "cols" : cols
            }
            print(f"{menu} 학습 완료!")
        return self.models

    def forecast_with_context(self, test_df, future_df, test_prefix, horizon = 7):
        """28일동안의 context 받아서 다음 7일 예측 / test_df, future_df에는 cols가 다 있는 상태여야 함!"""
        results = []

        for menu, context in test_df.groupby("영업장명_메뉴명"):
            if menu not in self.models:
                print(f"{menu} 학습 결과가 없어요..")
                continue

            cols = self.models[menu]["cols"]
            res = self.models[menu]['result']

            context["매출수량"] = context["매출수량"].fillna(0.0)
            context_x, context_y = self.get_xy(context, cols = cols)

            try:
                res_context = res.append(endog = context_y, exog = context_x, refit = False)
            except Exception:
                # 날짜가 이어지지 않는다면.. 강제로 잇기
                train_end = res.model.data.row_labels[-1]
                need_start = pd.to_datetime(train_end) + pd.Timedelta(days=1)
                if context.index.min() > need_start:
                    filler = pd.Series(0.0, index=pd.date_range(need_start, context_y.index.min() - pd.Timedelta(days = 1), freq = "D"))
                    context_y = pd.concat([filler, context_y])
                res_context = res.append(endog = context_y, exog = context_x, refit = False)

            # 미래 exog 준비
            fu = future_df[future_df["영업장명_메뉴명"] == menu].copy()
            fu["영업일자"] = pd.to_datetime(fu["영업일자"])
            fu = fu.sort_values("영업일자")
            future_x = (fu.set_index("영업일자")[cols]
                        .iloc[:horizon]
                        .astype(float))

            forecast = res_context.get_forecast(steps = horizon, exog = future_x.values)
            mean = forecast.predicted_mean

            pred_dates = [f"{test_prefix}+{i+1}일" for i in range(horizon)]

            for d, val in zip(pred_dates, mean.values):
                results.append({
                    '영업일자': d,
                    '영업장명_메뉴명': menu,
                    '매출수량': val
                })

        return pd.DataFrame(results)

        #     pred_df = pd.DataFrame({
        #         "영업장명_메뉴명": menu,
        #         "영업일자": mean.index,
        #         "예측값": mean.values
        #     })
        #     preds.append(pred_df)

        # if not preds:
        #     return pd.DataFrame(columns = ['영업일자', '영업장명_메뉴명', "예측값"])

        # return (pd.concat(preds, ignore_index = True).sort_values(['영업장명_메뉴명', '영업일자']).reset_index(drop = True))

In [52]:
cols =  ["year_enc", "month_sin", "month_cos", "day_sin", "day_cos", "weekday_sin", "weekday_cos", "is_holiday", "is_sandwich", "is_holiday_sandwich", "season_weight", "month_weight", "week_weight",
         "prev_avg_7", "prev_avg_14", "prev_avg_21", "prev_sd_7", "prev_sd_14", "prev_sd_21"]

In [49]:
data.columns

Index(['영업일자', '영업장명_메뉴명', '매출수량', 'year', 'month', 'day', 'weekday',
       'weekday_enc', 'season', 'year_enc', 'month_sin', 'month_cos',
       'day_sin', 'day_cos', 'weekday_sin', 'weekday_cos', 'is_holiday',
       'holiday_name', 'is_sandwich', 'is_holiday_sandwich', 'month_weight',
       'week_weight', 'prev_avg_7', 'prev_sd_7', 'prev_avg_14', 'prev_sd_14',
       'prev_avg_21', 'prev_sd_21', '영업장명', '메뉴명'],
      dtype='object')

In [63]:
sarimax = SARIMAX_Model()

sarimax_models = sarimax.fit(train_df = data, cols = cols)

Trying order: (0, 0, 0) x (0, 1, 0, 7)
Trying order: (0, 0, 0) x (0, 1, 1, 7)
Trying order: (0, 0, 0) x (1, 1, 0, 7)
Trying order: (0, 0, 0) x (1, 1, 1, 7)
Trying order: (0, 1, 0) x (0, 1, 0, 7)
Trying order: (0, 1, 0) x (0, 1, 1, 7)
Trying order: (0, 1, 0) x (1, 1, 0, 7)
Trying order: (0, 1, 0) x (1, 1, 1, 7)
Trying order: (0, 0, 1) x (0, 1, 0, 7)
Trying order: (0, 0, 1) x (0, 1, 1, 7)
Trying order: (0, 0, 1) x (1, 1, 0, 7)
Trying order: (0, 0, 1) x (1, 1, 1, 7)
Trying order: (0, 1, 1) x (0, 1, 0, 7)
Trying order: (0, 1, 1) x (0, 1, 1, 7)
Trying order: (0, 1, 1) x (1, 1, 0, 7)
Trying order: (0, 1, 1) x (1, 1, 1, 7)
Trying order: (0, 0, 2) x (0, 1, 0, 7)
Trying order: (0, 0, 2) x (0, 1, 1, 7)
Trying order: (0, 0, 2) x (1, 1, 0, 7)
Trying order: (0, 0, 2) x (1, 1, 1, 7)
Trying order: (0, 1, 2) x (0, 1, 0, 7)
Trying order: (0, 1, 2) x (0, 1, 1, 7)
Trying order: (0, 1, 2) x (1, 1, 0, 7)
Trying order: (0, 1, 2) x (1, 1, 1, 7)
Trying order: (1, 0, 0) x (0, 1, 0, 7)
Trying order: (1, 0, 0) x

RuntimeError: SARIMA auto-order search failed.

In [None]:
import re
import glob
all_preds = []

# 모든 test_*.csv 순회
test_files = sorted(glob.glob('DATA/test/TEST_*.csv'))

for path in test_files:
    test_df = pd.read_csv(path)

    # 파일명에서 접두어 추출 (예: TEST_00)
    filename = os.path.basename(path)
    test_prefix = re.search(r'(TEST_\d+)', filename).group(1)

    pred = sarima.forecast_with_context(test_df = test_df)