In [None]:
def fit_cluster_sarima(self):
    # 기본값으로 명확한 계절 주기(s=4)를 보장
    domain_params = DOMAIN_SARIMA_PARAMS.get(self.domain, {'order': (2,1,1), 'seasonal_order': (1,1,1,4)})
    order = domain_params['order']
    seasonal_order = domain_params['seasonal_order']

    # 만약 seasonal_order의 s값이 1 이하이면 강제로 4로 변경 (안전장치)
    if seasonal_order[-1] <= 1:
        seasonal_order = seasonal_order[:-1] + (4,)

    try:
        model = SARIMAX(
            self.train_data,
            order=order,
            seasonal_order=seasonal_order,
            enforce_stationarity=False,
            enforce_invertibility=False
        )
        fitted_model = model.fit(disp=False, maxiter=50, method='lbfgs')
        # ...
    except Exception as e:
        print(f"클러스터 SARIMA 모델 오류: {e}")

In [None]:
import numpy as np
import pandas as pd
from collections import defaultdict
import warnings
warnings.filterwarnings("ignore")

# 필요한 라이브러리 임포트
from tslearn.clustering import TimeSeriesKMeans
from tslearn.preprocessing import TimeSeriesResampler
from pmdarima.arima import auto_arima
from aeon.datasets import load_from_tsf_file

def find_optimal_sarima_parameters(time_series_dict, n_clusters=3, seed=42):
    """
    분기별 시계열 데이터를 DTW 기반 KMeans로 클러스터링하고,
    각 클러스터별로 최적의 SARIMA 파라미터를 찾는 함수.
    
    Parameters:
      time_series_dict (dict): 시계열 데이터 딕셔너리 {series_name: (start_time, values)}
      n_clusters (int): 클러스터 수
      seed (int): 랜덤 시드
      
    Returns:
      dict: 각 클러스터별 최적 SARIMA 파라미터와 군집별 시계열 ID 맵.
    """
    print("분기별 시계열 데이터 준비 중...")
    # 최소 길이 8 이상인 시계열만 사용 (분기 데이터는 Horizon=8인 경우가 많음)
    series_data = []
    ts_ids = []
    for ts_id, (start_time, values) in time_series_dict.items():
        if len(values) >= 8:
            series_data.append(values)
            ts_ids.append(ts_id)

    # 모든 시계열을 공통 길이로 리샘플링 (클러스터링 초기 목적)
    initial_length = max(len(s) for s in series_data)
    print(f"초기 리샘플링: {len(series_data)}개의 시계열을 길이 {initial_length}로 변환")
    resampler = TimeSeriesResampler(sz=initial_length)
    X_train = resampler.fit_transform(series_data)

    # DTW 기반 KMeans 클러스터링 수행
    print(f"DTW KMeans 클러스터링 ({n_clusters}개 클러스터)...")
    model = TimeSeriesKMeans(n_clusters=n_clusters,
                             metric="dtw",
                             max_iter=10,
                             random_state=seed,
                             verbose=True)
    y_pred = model.fit_predict(X_train)

    # 클러스터별로 시계열 데이터 및 ID 그룹화
    cluster_series = defaultdict(list)
    cluster_ids = defaultdict(list)
    for i, (ts_id, cluster) in enumerate(zip(ts_ids, y_pred)):
        cluster_series[cluster].append(series_data[i])
        cluster_ids[cluster].append(ts_id)

    # 각 클러스터별 최적 SARIMA 파라미터 찾기
    cluster_params = {}
    for cluster in range(n_clusters):
        print(f"\n클러스터 {cluster} 처리 중 (시계열 수: {len(cluster_series[cluster])})...")
        lengths = [len(s) for s in cluster_series[cluster]]
        target_length = int(np.mean(lengths))
        print(f"  클러스터 {cluster} 평균 길이: {target_length}")

        # 클러스터 내 모든 시계열을 평균 길이(target_length)로 리샘플링
        resampler_cluster = TimeSeriesResampler(sz=target_length)
        resampled_series = resampler_cluster.fit_transform(cluster_series[cluster])

        # 클러스터 내 리샘플링된 시계열들의 평균 계산
        mean_series = np.mean(resampled_series, axis=0)

        # Auto-ARIMA를 사용하여 최적 SARIMA 파라미터 찾기 (계절 주기: 4, 분기 데이터)
        print(f"  클러스터 {cluster}에 대한 Auto-ARIMA 수행 중...")
        try:
            sarima_model = auto_arima(
                mean_series,
                start_p=0, max_p=3,
                start_q=0, max_q=3,
                d=None, max_d=2,
                start_P=0, max_P=3,
                start_Q=0, max_Q=3,
                D=None, max_D=2,
                m=4,  # 분기별 데이터: 1년 = 4분기
                test='none',
                seasonal_test='none',
                seasonal=True,
                stepwise=True,
                trace=True,
                error_action='ignore',
                suppress_warnings=True,
                information_criterion='aic'
            )

            order = sarima_model.order
            seasonal_order = sarima_model.seasonal_order
            aic = sarima_model.aic()

            print(f"  클러스터 {cluster} 최적 파라미터:")
            print(f"    ARIMA{order}x{seasonal_order} (AIC: {aic:.2f})")

            cluster_params[cluster] = {
                'order': order,
                'seasonal_order': seasonal_order,
                'aic': aic,
                'num_series': len(cluster_series[cluster]),
                'sample_ids': cluster_ids[cluster][:5]
            }

        except Exception as e:
            print(f"  클러스터 {cluster} Auto-ARIMA 오류: {e}")
            cluster_params[cluster] = {
                'order': (1, 1, 1),
                'seasonal_order': (1, 1, 1, 4),
                'error': str(e),
                'num_series': len(cluster_series[cluster])
            }

    # 클러스터별 시계열 ID 맵 생성
    ts_cluster_map = {}
    for cluster, ids in cluster_ids.items():
        for ts_id in ids:
            ts_cluster_map[ts_id] = cluster

    # 결과 요약 출력
    print("\n=== 클러스터별 SARIMA 파라미터 요약 ===")
    for cluster, params in cluster_params.items():
        print(f"클러스터 {cluster} ({params['num_series']}개 시계열):")
        print(f"  ARIMA{params['order']}x{params['seasonal_order']}")
        if 'aic' in params:
            print(f"  AIC: {params['aic']:.2f}")
        print(f"  샘플 시계열: {params.get('sample_ids', [])}")
        print()

    return {
        'cluster_params': cluster_params,
        'ts_cluster_map': ts_cluster_map
    }

# =============================================================================
# 실행 코드 블럭: 분기별 데이터 로드 및 클러스터링, 최적 SARIMA 파라미터 도출
# =============================================================================

# 1. 분기별 데이터 로드 (TSF 파일)
tsf_file_path = "./dataset/m4_quarterly_dataset.tsf"  # 실제 파일 경로
df_q, meta_q = load_from_tsf_file(tsf_file_path)

# 2. DataFrame을 시계열 딕셔너리로 변환: {series_name: (start_timestamp, series_value)}
time_series_dict = {}
for idx, row in df_q.iterrows():
    time_series_dict[row['series_name']] = (row['start_timestamp'], row['series_value'])

# 3. 최적 SARIMA 파라미터 도출 함수 실행
optimal_params = find_optimal_sarima_parameters(time_series_dict, n_clusters=3, seed=42)

# 4. 최종 결과 출력
print("\n=== 최적 SARIMA 파라미터 최종 결과 ===")
for cluster, params in optimal_params['cluster_params'].items():
    print(f"클러스터 {cluster}: {params}")

In [None]:
import numpy as np
import pandas as pd
from collections import defaultdict
import warnings
import ast
warnings.filterwarnings("ignore")

# 필요한 라이브러리 임포트
from tslearn.clustering import TimeSeriesKMeans
from tslearn.preprocessing import TimeSeriesResampler
from pmdarima.arima import auto_arima
from aeon.datasets import load_from_tsf_file

def safe_literal_eval(val):
    """
    주어진 값이 문자열로 되어 있고 '['로 시작하면 ast.literal_eval을 시도하고,
    그렇지 않으면 그대로 반환하는 함수.
    """
    if isinstance(val, str) and val.strip().startswith('['):
        try:
            return ast.literal_eval(val)
        except Exception as e:
            print("literal_eval 오류:", e, "값:", val)
            return None
    return val

def get_initial_sarima_parameters(time_series_dict, sample_size=20, seed=42):
    """
    도메인 내 시계열 데이터 중 무작위로 sample_size 개의 시계열을 선택하고,
    각 시계열에 대해 Auto-ARIMA를 수행한 후, 가장 낮은 AIC 값을 가진 모델의
    order와 seasonal_order를 초기 파라미터로 선택하는 함수.
    
    Returns:
      best_order, best_seasonal_order
    """
    np.random.seed(seed)
    keys = list(time_series_dict.keys())
    if len(keys) >= sample_size:
        sampled_keys = np.random.choice(keys, size=sample_size, replace=False)
    else:
        sampled_keys = keys
    best_aic = np.inf
    best_order = None
    best_seasonal_order = None
    
    for k in sampled_keys:
        _, values = time_series_dict[k]
        try:
            model = auto_arima(
                values,
                start_p=0, max_p=3,
                start_q=0, max_q=3,
                d=None, max_d=2,
                start_P=0, max_P=3,
                start_Q=0, max_Q=3,
                D=None, max_D=2,
                m=4,
                test=None,
                seasonal_test=None,
                seasonal=True,
                stepwise=True,
                trace=False,
                error_action='ignore',
                suppress_warnings=True,
                information_criterion='aic'
            )
            aic = model.aic()
            if aic < best_aic:
                best_aic = aic
                best_order = model.order
                best_seasonal_order = model.seasonal_order
        except Exception as e:
            print(f"Auto-ARIMA 오류 (키: {k}): {e}")
    
    print(f"선택된 초기 파라미터: ARIMA{best_order}x{best_seasonal_order} (AIC: {best_aic:.2f})")
    return best_order, best_seasonal_order

def find_optimal_sarima_parameters(time_series_dict, n_clusters=3, seed=42, best_order=None, best_seasonal_order=None):
    """
    분기별 시계열 데이터를 DTW 기반 KMeans로 클러스터링하고,
    각 클러스터별로 최적의 SARIMA 파라미터를 찾는 함수.
    
    Parameters:
      time_series_dict (dict): 시계열 데이터 딕셔너리 {series_name: (start_time, values)}
      n_clusters (int): 클러스터 수
      seed (int): 랜덤 시드
      best_order (tuple): 도메인 전체에 대해 선정된 초기 ARIMA order
      best_seasonal_order (tuple): 도메인 전체에 대해 선정된 초기 seasonal_order
      
    Returns:
      dict: 각 클러스터별 최적 SARIMA 파라미터와 군집별 시계열 ID 맵.
    """
    print("분기별 시계열 데이터 준비 중...")
    # 최소 길이 8 이상인 시계열만 사용
    series_data = []
    ts_ids = []
    for ts_id, (start_time, values) in time_series_dict.items():
        if values is None:
            continue
        if len(values) >= 8:
            series_data.append(values)
            ts_ids.append(ts_id)

    # 모든 시계열을 공통 길이로 리샘플링: 중앙값을 사용
    initial_length = int(np.median([len(s) for s in series_data]))
    print(f"초기 리샘플링: {len(series_data)}개의 시계열을 길이 {initial_length}로 변환")
    resampler = TimeSeriesResampler(sz=initial_length)
    X_train = resampler.fit_transform(series_data)

    # DTW 기반 KMeans 클러스터링 수행
    print(f"DTW KMeans 클러스터링 ({n_clusters}개 클러스터)...")
    model = TimeSeriesKMeans(n_clusters=n_clusters,
                             metric="dtw",
                             max_iter=10,
                             random_state=seed,
                             verbose=True)
    y_pred = model.fit_predict(X_train)

    # 클러스터별 시계열 데이터 및 ID 그룹화
    cluster_series = defaultdict(list)
    cluster_ids = defaultdict(list)
    for i, (ts_id, cluster) in enumerate(zip(ts_ids, y_pred)):
        cluster_series[cluster].append(series_data[i])
        cluster_ids[cluster].append(ts_id)

    # 각 클러스터별 최적 SARIMA 파라미터 찾기
    cluster_params = {}
    for cluster in range(n_clusters):
        print(f"\n클러스터 {cluster} 처리 중 (시계열 수: {len(cluster_series[cluster])})...")
        lengths = [len(s) for s in cluster_series[cluster]]
        target_length = int(np.median(lengths))
        print(f"  클러스터 {cluster} 중앙값 길이: {target_length}")

        # 클러스터 내 모든 시계열을 중앙값 길이(target_length)로 리샘플링
        resampler_cluster = TimeSeriesResampler(sz=target_length)
        resampled_series = resampler_cluster.fit_transform(cluster_series[cluster])

        # 클러스터 내 리샘플링된 시계열들의 평균 계산
        mean_series = np.mean(resampled_series, axis=0)

        # Auto-ARIMA를 사용하여 최적 SARIMA 파라미터 찾기, 초기 파라미터 사용
        print(f"  클러스터 {cluster}에 대한 Auto-ARIMA 수행 중...")
        try:
            sarima_model = auto_arima(
                mean_series,
                order=best_order,
                seasonal_order=best_seasonal_order,
                seasonal=True,
                trace=True,
                error_action='ignore',
                suppress_warnings=True,
                information_criterion='aic'
            )

            order = sarima_model.order
            seasonal_order = sarima_model.seasonal_order
            aic = sarima_model.aic()

            print(f"  클러스터 {cluster} 최적 파라미터:")
            print(f"    ARIMA{order}x{seasonal_order} (AIC: {aic:.2f})")

            cluster_params[cluster] = {
                'order': order,
                'seasonal_order': seasonal_order,
                'aic': aic,
                'num_series': len(cluster_series[cluster]),
                'sample_ids': cluster_ids[cluster][:5]
            }

        except Exception as e:
            print(f"  클러스터 {cluster} Auto-ARIMA 오류: {e}")
            cluster_params[cluster] = {
                'order': (1, 1, 1),
                'seasonal_order': (1, 1, 1, 4),
                'error': str(e),
                'num_series': len(cluster_series[cluster])
            }

    # 클러스터별 시계열 ID 맵 생성
    ts_cluster_map = {}
    for cluster, ids in cluster_ids.items():
        for ts_id in ids:
            ts_cluster_map[ts_id] = cluster

    # 결과 요약 출력
    print("\n=== 클러스터별 SARIMA 파라미터 요약 ===")
    for cluster, params in cluster_params.items():
        print(f"클러스터 {cluster} ({params['num_series']}개 시계열):")
        print(f"  ARIMA{params['order']}x{params['seasonal_order']}")
        if 'aic' in params:
            print(f"  AIC: {params['aic']:.2f}")
        print(f"  샘플 시계열: {params.get('sample_ids', [])}")
        print()

    return {
        'cluster_params': cluster_params,
        'ts_cluster_map': ts_cluster_map
    }

# =============================================================================
# 실행 코드 블럭: 분기별 데이터 로드, 도메인별 그룹화, 초기 파라미터 선정, 클러스터링, 최적 SARIMA 파라미터 도출
# =============================================================================

# 1. 병합된 데이터 로드 (CSV 파일)
tsf_file_path = "./dataset/final_dataset/M4_Quarterly_Merged.csv"  # CSV 파일 경로
df_q = pd.read_csv(tsf_file_path)

# 2. 병합된 데이터에서 'category' 컬럼을 기준으로 도메인별 그룹화
domain_time_series = defaultdict(dict)
for idx, row in df_q.iterrows():
    category = row['category']  # 도메인 정보 (예: Macro, Finance 등)
    ts_id = row['series_name']
    # series_value 컬럼은 문자열 형태의 리스트이므로 안전하게 변환
    values = safe_literal_eval(row['series_value'])
    if values is None:
        continue
    domain_time_series[category][ts_id] = (row['start_timestamp'], values)

# 3. 각 도메인별로 초기 SARIMA 파라미터 선정 및 클러스터링/최적 파라미터 도출
domain_results = {}
for domain, ts_dict in domain_time_series.items():
    print(f"\n========== 도메인: {domain} ==========")
    # 도메인 내 시계열이 20개 이상이면 랜덤 20개, 아니면 전체를 사용하여 초기 파라미터 도출
    keys = list(ts_dict.keys())
    if len(keys) >= 20:
        sampled_keys = np.random.choice(keys, size=20, replace=False)
    else:
        sampled_keys = keys
    sample_dict = {k: ts_dict[k] for k in sampled_keys}
    best_order, best_seasonal_order = get_initial_sarima_parameters(sample_dict, sample_size=len(sampled_keys))
    
    print(f"도메인 {domain} 초기 파라미터: ARIMA{best_order}x{best_seasonal_order}")
    
    # 초기 파라미터를 사용하여 도메인 내 모든 데이터에 대해 클러스터링 및 SARIMA 파라미터 도출
    result = find_optimal_sarima_parameters(ts_dict, n_clusters=3, seed=42, 
                                              best_order=best_order, 
                                              best_seasonal_order=best_seasonal_order)
    domain_results[domain] = result

# 4. 최종 결과 출력: 도메인별 최적 SARIMA 파라미터 요약
print("\n=== 도메인별 최적 SARIMA 파라미터 최종 결과 ===")
for domain, res in domain_results.items():
    print(f"\n도메인 {domain}:")
    for cluster, params in res['cluster_params'].items():
        print(f"  클러스터 {cluster}: {params}")