# 공행성 탐지 (Comovement Detection)

이 노트북에서는 다음을 수행합니다:
1. CCF (Cross-Correlation Function) 분석
2. Granger Causality Test
3. DTW (Dynamic Time Warping) 분석
4. FDR (False Discovery Rate) 다중 검정 보정
5. 결과 통합 및 시각화

In [1]:
import sys
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# 프로젝트 루트를 경로에 추가
sys.path.insert(0, str(Path.cwd().parent))

from config import Config
from src.preprocess import preprocess_pipeline
from src.comovement import (
    calculate_ccf,
    calculate_ccf_matrix,
    calculate_granger_causality,
    calculate_granger_matrix,
    calculate_dtw_distance,
    calculate_dtw_matrix,
    apply_fdr_to_results,
    detect_comovement_comprehensive
)

# 스타일 설정
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')
%matplotlib inline

# 한글 폰트 설정
plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False

Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



## 1. 데이터 로드

In [2]:
# 더미 데이터 로드
data_path = Config.DATA_RAW / "dummy_trade_data.csv"

if not data_path.exists():
    print(f"⚠️ 데이터 파일이 없습니다: {data_path}")
    print("먼저 00_dummy_data_generator.ipynb를 실행하세요.")
else:
    # 전처리 파이프라인 실행
    df_wide = preprocess_pipeline(data_path, check_quality=False)
    print(f"\n데이터 shape: {df_wide.shape}")
    print(f"날짜 범위: {df_wide.index.min()} ~ {df_wide.index.max()}")
    print(f"품목 수: {len(df_wide.columns)}")

⚠️ 데이터 파일이 없습니다: c:\Users\SMART\git\daconai\data\raw\dummy_trade_data.csv
먼저 00_dummy_data_generator.ipynb를 실행하세요.


## 2. 샘플 품목 선택

분석 속도를 위해 일부 품목만 사용합니다. (전체 분석은 시간이 오래 걸립니다)

In [3]:
# 처음 10개 품목만 사용 (테스트용)
# 전체 분석을 원하면 df_sample = df_wide.copy()로 변경
df_sample = df_wide.iloc[:, :10].copy()

print(f"샘플 데이터 shape: {df_sample.shape}")
print(f"분석할 품목: {list(df_sample.columns)}")
print(f"\n예상 분석 쌍 수:")
n_items = len(df_sample.columns)
print(f"  CCF: {n_items * (n_items - 1) // 2}개 쌍")
print(f"  Granger: {n_items * (n_items - 1)}개 쌍 (양방향)")
print(f"  DTW: {n_items * (n_items - 1) // 2}개 쌍")

NameError: name 'df_wide' is not defined

## 3. CCF (Cross-Correlation Function) 분석

두 시계열 간의 교차 상관을 계산하여 선행-후행 관계를 찾습니다.

In [None]:
# 개별 품목 쌍에 대한 CCF 예제
item_x = df_sample.columns[0]
item_y = df_sample.columns[1]

ccf_values, optimal_lag, max_ccf = calculate_ccf(
    df_sample[item_x], 
    df_sample[item_y], 
    max_lag=Config.CCF_LAG_MAX
)

print(f"\n{item_x} vs {item_y}")
print(f"최적 시차: {optimal_lag}개월")
print(f"최대 CCF: {max_ccf:.3f}")

# CCF 플롯
fig, ax = plt.subplots(figsize=(10, 5))
lags = np.arange(len(ccf_values))
ax.bar(lags, ccf_values, color='steelblue', alpha=0.7)
ax.axhline(Config.CCF_THRESHOLD, color='red', linestyle='--', label=f'임계값 ({Config.CCF_THRESHOLD})')
ax.axhline(-Config.CCF_THRESHOLD, color='red', linestyle='--')
ax.axhline(0, color='black', linewidth=0.8)
ax.axvline(optimal_lag, color='green', linestyle='--', linewidth=2, label=f'최적 시차 ({optimal_lag})')
ax.set_xlabel('Lag (개월)', fontsize=11)
ax.set_ylabel('CCF', fontsize=11)
ax.set_title(f'Cross-Correlation: {item_x} → {item_y}', fontsize=12, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

In [None]:
# 모든 품목 쌍에 대한 CCF 계산
ccf_results = calculate_ccf_matrix(
    df_sample, 
    max_lag=Config.CCF_LAG_MAX, 
    threshold=Config.CCF_THRESHOLD
)

print("\nCCF 결과 (상위 10개):")
ccf_results.head(10)

In [None]:
# CCF 결과 시각화
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# CCF 분포
axes[0].hist(ccf_results['max_ccf'], bins=20, color='steelblue', alpha=0.7, edgecolor='black')
axes[0].axvline(Config.CCF_THRESHOLD, color='red', linestyle='--', linewidth=2, label=f'임계값 ({Config.CCF_THRESHOLD})')
axes[0].axvline(-Config.CCF_THRESHOLD, color='red', linestyle='--', linewidth=2)
axes[0].set_xlabel('Max CCF', fontsize=11)
axes[0].set_ylabel('빈도', fontsize=11)
axes[0].set_title('CCF 값 분포', fontsize=12, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# 최적 시차 분포
lag_counts = ccf_results['optimal_lag'].value_counts().sort_index()
axes[1].bar(lag_counts.index, lag_counts.values, color='coral', alpha=0.7, edgecolor='black')
axes[1].set_xlabel('최적 시차 (개월)', fontsize=11)
axes[1].set_ylabel('빈도', fontsize=11)
axes[1].set_title('최적 시차 분포', fontsize=12, fontweight='bold')
axes[1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

print(f"\n유의미한 공행성 쌍: {ccf_results['is_significant'].sum()}개")

## 4. Granger Causality Test

한 시계열이 다른 시계열을 Granger-cause 하는지 검정합니다.

In [None]:
# 개별 품목 쌍에 대한 Granger 검정 예제
result = calculate_granger_causality(
    df_sample[item_x], 
    df_sample[item_y], 
    max_lag=Config.GRANGER_MAX_LAG
)

print(f"\nGranger Causality: {item_x} → {item_y}")
print(f"최적 시차: {result['best_lag']}개월")
print(f"P-value: {result['best_p_value']:.4f}")
print(f"인과관계 존재: {result['is_causal']}")

if 'all_p_values' in result:
    # 시차별 p-value 플롯
    fig, ax = plt.subplots(figsize=(10, 5))
    lags = list(result['all_p_values'].keys())
    p_vals = list(result['all_p_values'].values())
    
    ax.plot(lags, p_vals, marker='o', linewidth=2, markersize=8, color='steelblue')
    ax.axhline(0.05, color='red', linestyle='--', linewidth=2, label='α=0.05')
    ax.set_xlabel('Lag (개월)', fontsize=11)
    ax.set_ylabel('P-value', fontsize=11)
    ax.set_title(f'Granger Causality P-values: {item_x} → {item_y}', fontsize=12, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

In [None]:
# 모든 품목 쌍에 대한 Granger 검정 (시간이 걸릴 수 있음)
print("Granger 인과관계 검정 수행 중...")
granger_results = calculate_granger_matrix(
    df_sample, 
    max_lag=Config.GRANGER_MAX_LAG, 
    p_threshold=Config.GRANGER_PVAL_THRESHOLD
)

print("\nGranger 결과 (상위 10개):")
granger_results.head(10)

In [None]:
# FDR 보정 적용
granger_fdr = apply_fdr_to_results(
    granger_results, 
    p_value_col='p_value', 
    alpha=Config.FDR_ALPHA
)

print("\nFDR 보정 후 결과:")
print(f"보정 전 유의한 쌍: {granger_results['is_causal'].sum()}개")
print(f"보정 후 유의한 쌍: {granger_fdr['fdr_rejected'].sum()}개")
print(f"FDR 임계값: {granger_fdr['fdr_threshold'].iloc[0]:.4f}")

In [None]:
# Granger 결과 시각화
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# P-value 분포
axes[0].hist(granger_fdr['p_value'].dropna(), bins=20, color='steelblue', alpha=0.7, edgecolor='black')
axes[0].axvline(Config.GRANGER_PVAL_THRESHOLD, color='red', linestyle='--', linewidth=2, label='α=0.05')
axes[0].axvline(granger_fdr['fdr_threshold'].iloc[0], color='green', linestyle='--', linewidth=2, label='FDR 임계값')
axes[0].set_xlabel('P-value', fontsize=11)
axes[0].set_ylabel('빈도', fontsize=11)
axes[0].set_title('Granger P-value 분포', fontsize=12, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# 인과관계 네트워크 (상위 인과관계만)
significant_pairs = granger_fdr[granger_fdr['fdr_rejected']].head(20)
if len(significant_pairs) > 0:
    # 간단한 네트워크 플롯
    from collections import Counter
    
    # 가장 많이 등장하는 원인 품목
    cause_counts = Counter(significant_pairs['item_cause'])
    effect_counts = Counter(significant_pairs['item_effect'])
    
    top_causes = pd.Series(cause_counts).head(10)
    axes[1].barh(range(len(top_causes)), top_causes.values, color='coral', alpha=0.7, edgecolor='black')
    axes[1].set_yticks(range(len(top_causes)))
    axes[1].set_yticklabels(top_causes.index)
    axes[1].set_xlabel('인과관계 수', fontsize=11)
    axes[1].set_title('주요 선행 품목 (원인)', fontsize=12, fontweight='bold')
    axes[1].grid(True, alpha=0.3, axis='x')
else:
    axes[1].text(0.5, 0.5, '유의미한 인과관계 없음', ha='center', va='center', fontsize=14)
    axes[1].axis('off')

plt.tight_layout()
plt.show()

## 5. DTW (Dynamic Time Warping) 분석

**주의**: DTW 분석을 실행하려면 `dtw-python` 패키지가 필요합니다.

```bash
pip install dtw-python
```

In [None]:
# DTW 분석 (dtw-python 설치 필요)
try:
    dtw_results = calculate_dtw_matrix(
        df_sample, 
        threshold=Config.DTW_THRESHOLD
    )
    
    print("\nDTW 결과 (상위 10개):")
    print(dtw_results.head(10))
    
    # DTW 거리 분포
    fig, ax = plt.subplots(figsize=(10, 5))
    ax.hist(dtw_results['dtw_distance'], bins=20, color='steelblue', alpha=0.7, edgecolor='black')
    ax.axvline(Config.DTW_THRESHOLD, color='red', linestyle='--', linewidth=2, label=f'임계값 ({Config.DTW_THRESHOLD})')
    ax.set_xlabel('DTW 거리', fontsize=11)
    ax.set_ylabel('빈도', fontsize=11)
    ax.set_title('DTW 거리 분포', fontsize=12, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    print(f"\n유사 패턴 쌍: {dtw_results['is_similar'].sum()}개")
    
except ModuleNotFoundError:
    print("⚠️ dtw-python 패키지가 설치되지 않았습니다.")
    print("설치 명령: pip install dtw-python")
    dtw_results = None

## 6. 종합 공행성 탐지

CCF, Granger, DTW를 모두 사용한 종합 분석

In [None]:
# 종합 분석 (시간이 오래 걸릴 수 있음)
print("종합 공행성 탐지 시작...")
print("(DTW는 dtw-python이 설치된 경우에만 실행됩니다)\n")

try:
    comprehensive_results = detect_comovement_comprehensive(
        df_sample,
        ccf_threshold=Config.CCF_THRESHOLD,
        granger_alpha=Config.GRANGER_PVAL_THRESHOLD,
        dtw_threshold=Config.DTW_THRESHOLD,
        max_lag=Config.CCF_LAG_MAX,
        apply_fdr=True,
        fdr_alpha=Config.FDR_ALPHA
    )
    
    ccf_comp = comprehensive_results['ccf']
    granger_comp = comprehensive_results['granger']
    dtw_comp = comprehensive_results['dtw'] if 'dtw' in comprehensive_results else None
    
except Exception as e:
    print(f"⚠️ 종합 분석 중 오류 발생: {e}")
    print("개별 분석 결과를 사용합니다.")
    ccf_comp = ccf_results
    granger_comp = granger_fdr
    dtw_comp = dtw_results

In [None]:
# 결과 요약
print("\n" + "=" * 60)
print("종합 공행성 탐지 결과 요약")
print("=" * 60)
print(f"\n분석 품목 수: {len(df_sample.columns)}개")
print(f"\n1. CCF 분석:")
print(f"   총 분석 쌍: {len(ccf_comp)}개")
print(f"   유의미한 쌍: {ccf_comp['is_significant'].sum()}개 ({ccf_comp['is_significant'].sum()/len(ccf_comp)*100:.1f}%)")

print(f"\n2. Granger Causality:")
print(f"   총 분석 쌍: {len(granger_comp)}개")
print(f"   원본 유의: {granger_comp['is_causal'].sum()}개")
if 'fdr_rejected' in granger_comp.columns:
    print(f"   FDR 보정 후: {granger_comp['fdr_rejected'].sum()}개")

if dtw_comp is not None:
    print(f"\n3. DTW 분석:")
    print(f"   총 분석 쌍: {len(dtw_comp)}개")
    print(f"   유사 패턴 쌍: {dtw_comp['is_similar'].sum()}개 ({dtw_comp['is_similar'].sum()/len(dtw_comp)*100:.1f}%)")

print("\n" + "=" * 60)

## 7. 결과 저장

In [None]:
# 결과 저장
output_dir = Config.DATA_PROCESSED

ccf_comp.to_csv(output_dir / "comovement_ccf.csv", index=False)
print(f"✓ CCF 결과 저장: {output_dir / 'comovement_ccf.csv'}")

granger_comp.to_csv(output_dir / "comovement_granger.csv", index=False)
print(f"✓ Granger 결과 저장: {output_dir / 'comovement_granger.csv'}")

if dtw_comp is not None:
    dtw_comp.to_csv(output_dir / "comovement_dtw.csv", index=False)
    print(f"✓ DTW 결과 저장: {output_dir / 'comovement_dtw.csv'}")

print("\n✓ 모든 공행성 분석 결과가 저장되었습니다!")

## 8. 발견된 공행성 쌍 검증

더미 데이터에서 의도적으로 생성된 공행성 쌍을 제대로 탐지했는지 확인합니다.

In [None]:
# 상위 공행성 쌍 시각화
top_ccf_pairs = ccf_comp[ccf_comp['is_significant']].head(5)

if len(top_ccf_pairs) > 0:
    fig, axes = plt.subplots(len(top_ccf_pairs), 1, figsize=(12, 4*len(top_ccf_pairs)))
    
    if len(top_ccf_pairs) == 1:
        axes = [axes]
    
    for i, (idx, row) in enumerate(top_ccf_pairs.iterrows()):
        item_x = row['item_x']
        item_y = row['item_y']
        lag = row['optimal_lag']
        ccf_val = row['max_ccf']
        
        # 시계열 플롯
        ax = axes[i]
        ax2 = ax.twinx()
        
        ax.plot(df_sample.index, df_sample[item_x], label=f'{item_x} (선행)', color='blue', linewidth=2)
        ax2.plot(df_sample.index, df_sample[item_y], label=f'{item_y} (후행)', color='red', linewidth=2, alpha=0.7)
        
        ax.set_xlabel('날짜')
        ax.set_ylabel(f'{item_x}', color='blue')
        ax2.set_ylabel(f'{item_y}', color='red')
        ax.set_title(f'공행성 쌍 #{i+1}: {item_x} → {item_y} (Lag={lag}, CCF={ccf_val:.3f})', 
                     fontsize=12, fontweight='bold')
        ax.grid(True, alpha=0.3)
        
        # 범례 통합
        lines1, labels1 = ax.get_legend_handles_labels()
        lines2, labels2 = ax2.get_legend_handles_labels()
        ax.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
    
    plt.tight_layout()
    plt.show()
else:
    print("유의미한 공행성 쌍이 발견되지 않았습니다.")