# 정책 성능 비교 노트북

이 노트북에서는 DRL 모델과 베이스라인 정책들(Primary-Only, NPCA-Only, Random)의 성능을 비교분석합니다.

## Step 1: 라이브러리 및 모듈 Import

In [1]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import os
from pathlib import Path
import time
from collections import defaultdict

# 프로젝트 모듈들 import
import sys
sys.path.append('.')
from drl_framework.network import DQN
from drl_framework.configs import PPDU_DURATION_VARIANTS, PPDU_DURATION, RADIO_TRANSITION_TIME, OBSS_GENERATION_RATE
from drl_framework.random_access import STA, Simulator, Channel

# 스타일 설정
plt.style.use('default')
sns.set_palette("husl")

print("✅ 모든 라이브러리 import 완료!")
print(f"PyTorch version: {torch.__version__}")

✅ 모든 라이브러리 import 완료!
PyTorch version: 2.2.1+cu121


## Step 2: 베이스라인 정책들 정의

In [2]:
class BaselinePolicies:
    """베이스라인 정책들 모음"""
    
    @staticmethod
    def primary_only(obs_dict):
        """항상 Primary 채널에서 대기"""
        return 0
    
    @staticmethod 
    def npca_only(obs_dict):
        """항상 NPCA로 스위치"""
        return 1
    
    @staticmethod
    def random_policy(obs_dict):
        """랜덤 액션 선택"""
        return np.random.randint(0, 2)
    
    @staticmethod
    def smart_threshold(obs_dict, threshold=30):
        """임계값 기반 정책 (OBSS > threshold이면 스위치)"""
        obss_remain = obs_dict.get('primary_channel_obss_occupied_remained', 0)
        return 1 if obss_remain > threshold else 0

print("🎯 베이스라인 정책들 준비 완료!")

🎯 베이스라인 정책들 준비 완료!


## Step 3: DRL 모델 로더 클래스

In [3]:
class DRLModelLoader:
    """DRL 모델 로드 및 관리 클래스"""
    
    def __init__(self):
        self.device = torch.device('cpu')
        self.loaded_models = {}
    
    def find_models(self, model_dir='./obss_comparison_results'):
        """사용 가능한 모델들 찾기"""
        model_dir = Path(model_dir)
        model_files = list(model_dir.glob('*/model.pth'))
        
        models_info = []
        for model_file in model_files:
            try:
                checkpoint = torch.load(model_file, map_location=self.device, weights_only=False)
                info = {
                    'path': model_file,
                    'name': model_file.parent.name,
                    'obss_duration': checkpoint.get('obss_duration', 'unknown'),
                    'ppdu_variant': checkpoint.get('ppdu_variant', 'unknown'),
                    'ppdu_duration': checkpoint.get('ppdu_duration', 'unknown'),
                    'steps_done': checkpoint.get('steps_done', 'unknown')
                }
                models_info.append(info)
            except Exception as e:
                print(f"⚠️ 모델 로드 실패: {model_file} - {e}")
        
        return models_info
    
    def load_model(self, model_path):
        """특정 모델 로드"""
        model_path = str(model_path)
        
        if model_path in self.loaded_models:
            return self.loaded_models[model_path]
        
        try:
            checkpoint = torch.load(model_path, map_location=self.device, weights_only=False)
            
            # DQN 모델 초기화 및 가중치 로드
            model = DQN(n_observations=4, n_actions=2).to(self.device)
            model.load_state_dict(checkpoint['policy_net_state_dict'])
            model.eval()
            
            # 모델 래퍼 생성
            def drl_policy(obs_dict):
                # 관측을 벡터로 변환
                obs_vector = [
                    obs_dict.get('primary_channel_obss_occupied_remained', 0),
                    obs_dict.get('radio_transition_time', 1),
                    obs_dict.get('tx_duration', 33),
                    obs_dict.get('cw_index', 0)
                ]
                
                # 예측
                input_tensor = torch.tensor(obs_vector, dtype=torch.float32, device=self.device).unsqueeze(0)
                with torch.no_grad():
                    q_values = model(input_tensor)
                    action = q_values.argmax(dim=1).item()
                return action
            
            self.loaded_models[model_path] = {
                'model': model,
                'policy': drl_policy,
                'info': checkpoint
            }
            
            return self.loaded_models[model_path]
            
        except Exception as e:
            print(f"❌ 모델 로드 실패: {e}")
            return None

# 모델 로더 초기화
model_loader = DRLModelLoader()
print("🤖 DRL 모델 로더 준비 완료!")

🤖 DRL 모델 로더 준비 완료!


## Step 4: 사용 가능한 모델들 확인

In [4]:
# 사용 가능한 모델들 찾기
available_models = model_loader.find_models()

print(f"🔍 발견된 모델 수: {len(available_models)}")
print("\n📋 사용 가능한 모델들:")

models_df = pd.DataFrame(available_models)
if not models_df.empty:
    display_df = models_df[['name', 'obss_duration', 'ppdu_variant', 'ppdu_duration', 'steps_done']].copy()
    display_df.index = range(1, len(display_df) + 1)
    print(display_df.to_string())
else:
    print("❌ 사용 가능한 모델이 없습니다. 먼저 모델을 학습시켜 주세요.")

🔍 발견된 모델 수: 14

📋 사용 가능한 모델들:
                        name  obss_duration ppdu_variant ppdu_duration  steps_done
1          ppdu_long_obss_50             50         long            50         630
2         ppdu_short_obss_50             50        short            20        1658
3         ppdu_long_obss_200            200         long            50        3084
4      trained_model_obss_50             50      unknown       unknown        5014
5   ppdu_extra_long_obss_100            100   extra_long            80        1149
6     trained_model_obss_100            100      unknown       unknown        5170
7         ppdu_long_obss_100            100         long            50        1488
8        ppdu_short_obss_100            100        short            20        3246
9   ppdu_extra_long_obss_200            200   extra_long            80        2220
10      ppdu_medium_obss_100            100       medium            33        1803
11       ppdu_medium_obss_50             50       medium 

## Step 5: 성능 테스트 함수

In [None]:
def test_policy_performance(policy_func, policy_name, test_episodes=30, 
                          obss_duration=100, ppdu_duration=33, verbose=False):
    """
    주어진 정책의 성능을 테스트
    
    Args:
        policy_func: 테스트할 정책 함수
        policy_name: 정책 이름
        test_episodes: 테스트 에피소드 수
        obss_duration: OBSS 지속 시간
        ppdu_duration: PPDU 지속 시간
        verbose: 상세 출력 여부
    
    Returns:
        dict: 성능 결과
    """
    
    device = torch.device("cpu")
    
    # 채널 설정
    channels = [
        Channel(channel_id=0, obss_generation_rate=OBSS_GENERATION_RATE['secondary']),  # NPCA channel
        Channel(channel_id=1, obss_generation_rate=0.01, obss_duration_range=(obss_duration, obss_duration))  # Primary channel
    ]
    
    # 성능 지표
    episode_rewards = []
    action_counts = [0, 0]  # [Stay, Switch]
    total_throughput = 0
    decision_count = 0
    
    start_time = time.time()
    
    for episode in range(test_episodes):
        # STA 설정
        sta = STA(
            sta_id=0,
            channel_id=1,
            primary_channel=channels[1],
            npca_channel=channels[0],
            npca_enabled=True,
            ppdu_duration=ppdu_duration,
            radio_transition_time=RADIO_TRANSITION_TIME
        )
        
        # 정책 설정
        if hasattr(policy_func, '__call__'):
            # 베이스라인 정책
            def create_fixed_policy(policy_func):
                def fixed_action():
                    obs_dict = sta.get_obs()
                    action = policy_func(obs_dict)
                    
                    # 액션 카운트
                    action_counts[action] += 1
                    
                    return action
                return fixed_action
            sta._fixed_action = create_fixed_policy(policy_func)
        
        # 시뮬레이션 실행
        simulator = Simulator(num_slots:=300, channels=channels, stas=[sta])
        simulator.device = device
        simulator.run()
        
        # 결과 수집
        episode_reward = sta.episode_reward
        episode_rewards.append(episode_reward)
        total_throughput += sta.channel_occupancy_time
        
        if verbose and episode < 3:
            print(f"  Episode {episode}: Reward={episode_reward:.1f}, Throughput={sta.channel_occupancy_time}")
    
    test_time = time.time() - start_time
    
    # 결과 계산
    avg_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)
    total_actions = sum(action_counts)
    
    if total_actions > 0:
        action_probs = [count/total_actions for count in action_counts]
    else:
        action_probs = [0.5, 0.5]
    
    efficiency = total_throughput / (test_episodes * num_slots)  # 총 슬롯 대비 처리량
    
    result = {
        'policy_name': policy_name,
        'avg_reward': avg_reward,
        'std_reward': std_reward,
        'total_throughput': total_throughput,
        'efficiency': efficiency,
        'action_distribution': action_probs,
        'episode_rewards': episode_rewards,
        'test_time': test_time,
        'obss_duration': obss_duration,
        'ppdu_duration': ppdu_duration
    }
    
    if not verbose:
        print(f"  {policy_name:12s}: Avg Reward = {avg_reward:6.1f} ± {std_reward:5.1f}, "
              f"Efficiency = {efficiency:.4f}, Stay/Switch = {action_probs[0]:.2f}/{action_probs[1]:.2f}")
    
    return result

print("🧪 성능 테스트 함수 준비 완료!")

🧪 성능 테스트 함수 준비 완료!


## Step 6: 종합 비교 실험 실행

In [11]:
def run_comprehensive_comparison(selected_models_idx=None, test_scenarios=None):
    """
    종합적인 정책 비교 실험
    
    Args:
        selected_models_idx: 테스트할 모델 인덱스 리스트 (None이면 모든 모델)
        test_scenarios: 테스트 시나리오 리스트
    """
    
    if test_scenarios is None:
        test_scenarios = [
            {'obss_duration': 50, 'ppdu_duration': 33, 'name': 'Short OBSS'},
            {'obss_duration': 150, 'ppdu_duration': 33, 'name': 'Long OBSS'},
            {'obss_duration': 200, 'ppdu_duration': 20, 'name': 'Extreme OBSS, Short PPDU'},
            {'obss_duration': 200, 'ppdu_duration': 80, 'name': 'Extreme OBSS, Long PPDU'},
        ]
    
    print("🚀 종합 비교 실험 시작...\n")
    
    all_results = []
    
    for scenario_idx, scenario in enumerate(test_scenarios):
        print(f"{'='*60}")
        print(f"시나리오 {scenario_idx + 1}: {scenario['name']}")
        print(f"OBSS Duration: {scenario['obss_duration']}, PPDU Duration: {scenario['ppdu_duration']}")
        print(f"{'='*60}")
        
        scenario_results = []
        
        # 베이스라인 정책들 테스트
        baseline_policies = [
            (BaselinePolicies.primary_only, 'Primary-Only'),
            (BaselinePolicies.npca_only, 'NPCA-Only'),
            (BaselinePolicies.random_policy, 'Random'),
            (lambda obs: BaselinePolicies.smart_threshold(obs, 30), 'Smart-30'),
            (lambda obs: BaselinePolicies.smart_threshold(obs, 100), 'Smart-100')
        ]
        
        print("\n📊 베이스라인 정책들:")
        for policy_func, policy_name in baseline_policies:
            result = test_policy_performance(
                policy_func, policy_name,
                test_episodes=200,
                obss_duration=scenario['obss_duration'],
                ppdu_duration=scenario['ppdu_duration']
            )
            result['scenario'] = scenario['name']
            result['policy_type'] = 'Baseline'
            scenario_results.append(result)
        
        # DRL 모델들 테스트
        print("\n🤖 DRL 모델들:")
        
        models_to_test = available_models
        if selected_models_idx is not None:
            models_to_test = [available_models[i] for i in selected_models_idx if i < len(available_models)]
        
        for model_info in models_to_test[:5]:  # 최대 5개 모델만 테스트
            loaded_model = model_loader.load_model(model_info['path'])
            if loaded_model:
                model_name = f"DRL-{model_info['ppdu_variant']}"
                result = test_policy_performance(
                    loaded_model['policy'], model_name,
                    test_episodes=200,
                    obss_duration=scenario['obss_duration'],
                    ppdu_duration=scenario['ppdu_duration']
                )
                result['scenario'] = scenario['name']
                result['policy_type'] = 'DRL'
                result['model_info'] = model_info
                scenario_results.append(result)
        
        all_results.extend(scenario_results)
        
        # 시나리오별 최고 성능 출력
        best_result = max(scenario_results, key=lambda x: x['avg_reward'])
        print(f"\n🏆 최고 성능: {best_result['policy_name']} (평균 보상: {best_result['avg_reward']:.1f})\n")
    
    return all_results

print("🎯 종합 비교 실험 함수 준비 완료!")

🎯 종합 비교 실험 함수 준비 완료!


## Step 7: 실험 실행

In [12]:
# 실험 실행 (원하는 모델만 선택하거나 None으로 모든 모델 테스트)
# selected_models = [0, 1, 2]  # 특정 모델들만 테스트
selected_models = None  # 모든 모델 테스트

# 사용자 정의 시나리오 (필요시 수정)
custom_scenarios = [
    {'obss_duration': 200, 'ppdu_duration': 33, 'name': 'Extreme OBSS (200)'},
    {'obss_duration': 50, 'ppdu_duration': 33, 'name': 'Short OBSS (50)'},
]

# 실험 실행
print("⏱️  실험 시작 시간:", time.strftime('%H:%M:%S'))
results = run_comprehensive_comparison(selected_models, custom_scenarios)
print("\n✅ 모든 실험 완료!")
print("⏱️  실험 종료 시간:", time.strftime('%H:%M:%S'))

⏱️  실험 시작 시간: 02:12:58
🚀 종합 비교 실험 시작...

시나리오 1: Extreme OBSS (200)
OBSS Duration: 200, PPDU Duration: 33

📊 베이스라인 정책들:
  Primary-Only: Avg Reward =    1.2 ±  16.3, Efficiency = 0.0039, Stay/Switch = 0.50/0.50
  NPCA-Only   : Avg Reward =    2.3 ±  23.0, Efficiency = 0.0077, Stay/Switch = 0.00/1.00
  Random      : Avg Reward =    1.2 ±  16.3, Efficiency = 0.0039, Stay/Switch = 0.50/0.50
  Smart-30    : Avg Reward =  195.5 ±  23.8, Efficiency = 0.6518, Stay/Switch = 0.00/1.00
  Smart-100   : Avg Reward =  195.9 ±  23.8, Efficiency = 0.6529, Stay/Switch = 0.00/1.00

🤖 DRL 모델들:
  DRL-long    : Avg Reward =    1.2 ±  16.3, Efficiency = 0.0039, Stay/Switch = 0.00/1.00
  DRL-short   : Avg Reward =    1.2 ±  16.3, Efficiency = 0.0039, Stay/Switch = 0.50/0.50
  DRL-long    : Avg Reward =   55.8 ±  22.5, Efficiency = 0.1859, Stay/Switch = 1.00/0.00
  DRL-unknown : Avg Reward =    1.2 ±  16.3, Efficiency = 0.0039, Stay/Switch = 0.50/0.50
  DRL-extra_long: Avg Reward =    1.2 ±  16.3, Efficiency 

## Step 8: 결과 분석 및 시각화

In [None]:
# 결과를 DataFrame으로 변환
df_results = pd.DataFrame([{
    'Scenario': r['scenario'],
    'Policy': r['policy_name'],
    'Type': r['policy_type'],
    'Avg_Reward': r['avg_reward'],
    'Std_Reward': r['std_reward'],
    'Efficiency': r['efficiency'],
    'Stay_Prob': r['action_distribution'][0],
    'Switch_Prob': r['action_distribution'][1],
    'OBSS_Duration': r['obss_duration'],
    'PPDU_Duration': r['ppdu_duration']
} for r in results])

print("📋 실험 결과 요약:")
summary_df = df_results.groupby(['Scenario', 'Policy']).agg({
    'Avg_Reward': 'mean',
    'Efficiency': 'mean',
    'Stay_Prob': 'mean'
}).round(3)

print(summary_df)

In [None]:
# 종합 시각화
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('정책 성능 종합 비교', fontsize=16, fontweight='bold')

# 1. 시나리오별 평균 보상 비교
ax1 = axes[0, 0]
pivot_reward = df_results.pivot_table(values='Avg_Reward', index='Policy', columns='Scenario', aggfunc='mean')
sns.heatmap(pivot_reward, annot=True, fmt='.1f', cmap='RdYlBu_r', ax=ax1, cbar_kws={'label': 'Avg Reward'})
ax1.set_title('Average Reward by Policy & Scenario')
ax1.set_xlabel('')

# 2. 효율성 비교
ax2 = axes[0, 1]
pivot_eff = df_results.pivot_table(values='Efficiency', index='Policy', columns='Scenario', aggfunc='mean')
sns.heatmap(pivot_eff, annot=True, fmt='.3f', cmap='RdYlBu_r', ax=ax2, cbar_kws={'label': 'Efficiency'})
ax2.set_title('Efficiency by Policy & Scenario')
ax2.set_xlabel('')

# 3. 액션 분포
ax3 = axes[0, 2]
policy_action = df_results.groupby('Policy').agg({
    'Stay_Prob': 'mean',
    'Switch_Prob': 'mean'
})

policy_action.plot(kind='bar', stacked=True, ax=ax3, color=['lightblue', 'lightcoral'])
ax3.set_title('Action Distribution by Policy')
ax3.set_ylabel('Probability')
ax3.legend(['Stay Primary', 'Switch NPCA'])
ax3.tick_params(axis='x', rotation=45)

# 4. 베이스라인 vs DRL 성능 비교
ax4 = axes[1, 0]
type_performance = df_results.groupby(['Type', 'Scenario'])['Avg_Reward'].mean().unstack()
type_performance.plot(kind='bar', ax=ax4)
ax4.set_title('Baseline vs DRL Performance')
ax4.set_ylabel('Average Reward')
ax4.legend(title='Scenario')
ax4.tick_params(axis='x', rotation=0)

# 5. 정책별 성능 분포 (박스플롯)
ax5 = axes[1, 1]
top_policies = df_results.groupby('Policy')['Avg_Reward'].mean().nlargest(6).index
top_results = df_results[df_results['Policy'].isin(top_policies)]

sns.boxplot(data=top_results, x='Policy', y='Avg_Reward', ax=ax5)
ax5.set_title('Top 6 Policies Performance Distribution')
ax5.tick_params(axis='x', rotation=45)

# 6. OBSS Duration vs Performance
ax6 = axes[1, 2]
for policy_type in df_results['Type'].unique():
    subset = df_results[df_results['Type'] == policy_type]
    avg_by_obss = subset.groupby('OBSS_Duration')['Avg_Reward'].mean()
    ax6.plot(avg_by_obss.index, avg_by_obss.values, 'o-', label=policy_type, linewidth=2, markersize=6)

ax6.set_title('Performance vs OBSS Duration')
ax6.set_xlabel('OBSS Duration (slots)')
ax6.set_ylabel('Average Reward')
ax6.legend()
ax6.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Step 9: 최고 성능 정책 분석

In [None]:
# 전체 최고 성능 정책 찾기
overall_best = df_results.loc[df_results['Avg_Reward'].idxmax()]

print("🏆 전체 최고 성능 정책:")
print(f"  정책: {overall_best['Policy']}")
print(f"  시나리오: {overall_best['Scenario']}")
print(f"  평균 보상: {overall_best['Avg_Reward']:.2f} ± {overall_best['Std_Reward']:.2f}")
print(f"  효율성: {overall_best['Efficiency']:.4f}")
print(f"  액션 분포: Stay {overall_best['Stay_Prob']:.2f}, Switch {overall_best['Switch_Prob']:.2f}")

# 시나리오별 최고 성능 정책
print("\n🎯 시나리오별 최고 성능:")
for scenario in df_results['Scenario'].unique():
    scenario_best = df_results[df_results['Scenario'] == scenario].loc[
        df_results[df_results['Scenario'] == scenario]['Avg_Reward'].idxmax()
    ]
    print(f"  {scenario:25s}: {scenario_best['Policy']:15s} (보상: {scenario_best['Avg_Reward']:6.1f})")

# 정책 타입별 평균 성능
print("\n📊 정책 타입별 평균 성능:")
type_avg = df_results.groupby('Type').agg({
    'Avg_Reward': ['mean', 'std'],
    'Efficiency': 'mean'
})
type_avg.columns = ['Avg_Reward_Mean', 'Avg_Reward_Std', 'Efficiency_Mean']

for policy_type in type_avg.index:
    row = type_avg.loc[policy_type]
    print(f"  {policy_type:10s}: 보상 {row['Avg_Reward_Mean']:6.1f} ± {row['Avg_Reward_Std']:5.1f}, "
          f"효율성 {row['Efficiency_Mean']:.4f}")

## Step 10: 결과 저장

In [None]:
# 결과를 CSV로 저장
output_dir = Path('./comparison_results')
output_dir.mkdir(exist_ok=True)

timestamp = time.strftime('%Y%m%d_%H%M%S')
csv_filename = output_dir / f'policy_comparison_{timestamp}.csv'

df_results.to_csv(csv_filename, index=False)
print(f"📄 결과 저장됨: {csv_filename}")

# 요약 통계 저장
summary_filename = output_dir / f'policy_summary_{timestamp}.txt'
with open(summary_filename, 'w') as f:
    f.write("정책 성능 비교 요약\n")
    f.write("=" * 50 + "\n\n")
    
    f.write(f"전체 최고 성능: {overall_best['Policy']} (보상: {overall_best['Avg_Reward']:.2f})\n\n")
    
    f.write("시나리오별 최고 성능:\n")
    for scenario in df_results['Scenario'].unique():
        scenario_best = df_results[df_results['Scenario'] == scenario].loc[
            df_results[df_results['Scenario'] == scenario]['Avg_Reward'].idxmax()
        ]
        f.write(f"  {scenario}: {scenario_best['Policy']} (보상: {scenario_best['Avg_Reward']:.1f})\n")
    
    f.write(f"\n정책 타입별 평균 성능:\n")
    for policy_type in type_avg.index:
        row = type_avg.loc[policy_type]
        f.write(f"  {policy_type}: 보상 {row['Avg_Reward_Mean']:.1f} ± {row['Avg_Reward_Std']:.1f}\n")

print(f"📄 요약 저장됨: {summary_filename}")
print("\n✅ 모든 분석 완료!")

## 🎉 완료!

이 노트북에서 다음을 수행했습니다:

### ✅ **수행한 작업:**
1. **베이스라인 정책** vs **DRL 모델** 종합 비교
2. **다양한 시나리오**에서 성능 테스트
3. **상세한 성능 지표** 분석 (보상, 효율성, 액션 분포)
4. **시각화 및 히트맵**으로 결과 비교
5. **결과 저장** (CSV, 요약 텍스트)

### 📊 **주요 지표:**
- **평균 보상**: 에피소드당 누적 보상
- **효율성**: 처리량/총시간 비율
- **액션 분포**: Stay vs Switch 선택 비율
- **정책 타입 비교**: Baseline vs DRL 성능

### 🔧 **커스터마이징:**
- **Step 7**: `selected_models`와 `custom_scenarios` 수정하여 원하는 모델/시나리오 테스트
- **Step 6**: `test_episodes` 수정하여 테스트 에피소드 수 조정
- **Step 5**: 새로운 베이스라인 정책 추가 가능

DRL 모델이 베이스라인 정책들과 비교해서 어떤 상황에서 우수한지 명확하게 확인할 수 있습니다! 🚀