# EF Regression Model with Pruning Experiments

EchoNet-Dynamic 데이터셋을 활용한 좌심실 박출률(EF) 회귀 모델 및 Pruning 비교 실험

## 프로젝트 개요
- **모델**: ResNet-18 기반 EF 회귀 모델
- **Pruning 방법**: Unstructured Pruning, Structured Pruning
- **평가 지표**: MAE, Parameters, Sparsity, Latency



## 1. 환경 설정 및 패키지 설치


In [None]:
# 패키지 설치
%pip install torch torchvision opencv-python pandas numpy Pillow matplotlib

# GPU 확인
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")
    print(f"CUDA version: {torch.version.cuda}")



## 2. 데이터 준비

### 옵션 A: Google Drive 마운트 (권장)


In [None]:
# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

# 전체 데이터 경로 설정 (Drive에 전체 데이터가 있는 경우)
# 샘플 데이터를 사용하려면 아래 "옵션 B: 샘플 데이터 생성" 셀을 사용하세요
FULL_DATA_ROOT = "/content/drive/MyDrive/echonet_dynamic"  # 여기를 실제 경로로 수정하세요

# 전체 데이터를 사용할 경우
# DATA_ROOT = FULL_DATA_ROOT


### 옵션 B: 샘플 데이터 사용 (전체 데이터가 너무 큰 경우)

이미 Drive에 준비된 샘플 데이터를 사용하거나, 아래 코드로 샘플 데이터를 생성할 수 있습니다.


In [None]:
# 샘플 데이터 사용 설정
USE_SAMPLE_DATA = True  # True: 샘플 데이터 사용, False: 전체 데이터 사용

if USE_SAMPLE_DATA:
    # 이미 Drive에 준비된 샘플 데이터 경로
    # 폴더 구조: sample_echonet_dynamic/Videos/*.avi, sample_echonet_dynamic/FileList.csv
    DATA_ROOT = "/content/drive/MyDrive/sample_echonet_dynamic"  # 여기를 실제 경로로 수정
    
    from pathlib import Path
    sample_path = Path(DATA_ROOT)
    
    if sample_path.exists():
        # 샘플 데이터 정보 확인
        filelist_path = sample_path / "FileList.csv"
        video_dir = sample_path / "Videos"
        
        if filelist_path.exists() and video_dir.exists():
            import pandas as pd
            df = pd.read_csv(filelist_path)
            video_count = len(list(video_dir.glob("*.avi")))
            
            print(f"✅ Sample dataset found:")
            print(f"  Location: {DATA_ROOT}")
            print(f"  Videos in FileList.csv: {len(df)}")
            print(f"  Video files in Videos/: {video_count}")
            
            # Split distribution 확인
            if 'Split' in df.columns:
                print(f"\nSplit distribution:")
                for split in ['TRAIN', 'VAL', 'TEST']:
                    count = len(df[df['Split'] == split])
                    print(f"  {split}: {count}")
        else:
            print(f"❌ Sample dataset structure incorrect!")
            print(f"  Expected: {filelist_path}")
            print(f"  Expected: {video_dir}")
            DATA_ROOT = None
    else:
        print(f"❌ Sample dataset not found at {DATA_ROOT}")
        print("Please check the path or upload sample data to Drive")
        DATA_ROOT = None
else:
    # 전체 데이터 사용
    DATA_ROOT = "/content/drive/MyDrive/echonet_dynamic"  # 여기를 실제 경로로 수정


### 옵션 C: 샘플 데이터 생성 (필요한 경우)

샘플 데이터가 아직 없다면, 전체 데이터에서 샘플을 생성할 수 있습니다.


In [None]:
# 샘플 데이터 생성 코드 (필요한 경우에만 실행)
# 이미 샘플 데이터가 있다면 이 셀은 건너뛰세요

CREATE_SAMPLE = False  # True로 설정하면 샘플 데이터 생성

if CREATE_SAMPLE:
    import pandas as pd
    import shutil
    from pathlib import Path
    import random
    
    # 원본 데이터 경로
    SOURCE_DATA_ROOT = "/content/drive/MyDrive/echonet_dynamic"  # 전체 데이터 경로
    TARGET_DATA_ROOT = "/content/drive/MyDrive/sample_echonet_dynamic"  # 샘플 저장 경로
    
    SAMPLE_SIZES = {
        'TRAIN': 100,  # Train 샘플 수
        'VAL': 20,     # Val 샘플 수
        'TEST': 20     # Test 샘플 수
    }
    
    source_path = Path(SOURCE_DATA_ROOT)
    target_path = Path(TARGET_DATA_ROOT)
    
    if source_path.exists():
        print("Creating sample dataset...")
        
        # Read original FileList.csv
        filelist_path = source_path / "FileList.csv"
        df = pd.read_csv(filelist_path)
        print(f"Original dataset: {len(df)} videos")
        
        # Create target directories
        target_path.mkdir(exist_ok=True, parents=True)
        (target_path / "Videos").mkdir(exist_ok=True, parents=True)
        
        # Sample videos by split
        sample_df_list = []
        random.seed(42)
        
        for split, num_samples in SAMPLE_SIZES.items():
            split_df = df[df['Split'] == split].copy()
            if len(split_df) < num_samples:
                print(f"Warning: Only {len(split_df)} samples in {split}, using all")
                selected_df = split_df
            else:
                selected_df = split_df.sample(n=num_samples, random_state=42)
            
            print(f"{split}: Selected {len(selected_df)} samples")
            sample_df_list.append(selected_df)
            
            # Copy video files
            source_video_dir = source_path / "Videos"
            target_video_dir = target_path / "Videos"
            
            copied = 0
            for _, row in selected_df.iterrows():
                filename = row['FileName']
                if not filename.endswith('.avi'):
                    filename = filename + '.avi'
                
                source_video = source_video_dir / filename
                target_video = target_video_dir / filename
                
                if source_video.exists():
                    shutil.copy2(source_video, target_video)
                    copied += 1
                    if copied % 10 == 0:
                        print(f"  Copied {copied}/{len(selected_df)} videos...")
                else:
                    print(f"Warning: Video not found: {source_video}")
        
        # Combine and save sample FileList.csv
        sample_df = pd.concat(sample_df_list, ignore_index=True)
        sample_filelist_path = target_path / "FileList.csv"
        sample_df.to_csv(sample_filelist_path, index=False)
        
        print(f"\n✅ Sample dataset created:")
        print(f"  Total videos: {len(sample_df)}")
        print(f"  Location: {target_path}")
        
        # Print split distribution
        print(f"\nSplit distribution:")
        for split in ['TRAIN', 'VAL', 'TEST']:
            count = len(sample_df[sample_df['Split'] == split])
            print(f"  {split}: {count}")
    else:
        print(f"❌ Source data not found at {SOURCE_DATA_ROOT}")
else:
    print("Skipping sample data creation (CREATE_SAMPLE = False)")


## 3. 필요한 모듈 로드

아래 셀들을 실행하여 프로젝트 파일들을 업로드하거나, GitHub에서 클론하세요.


In [None]:
# 방법 1: GitHub에서 클론 (권장)
!git clone https://github.com/5seoyoung/2025_edge_computing_task1.git
import sys
sys.path.append('/content/2025_edge_computing_task1')

# 방법 2: 파일 직접 업로드
# from google.colab import files
# files.upload()  # config.py, dataset.py, model.py 등을 업로드


## 4. 설정 및 모듈 임포트


In [None]:
import torch
import torch.nn as nn
import pandas as pd
from pathlib import Path
import json
from datetime import datetime
import sys

# 프로젝트 경로 추가
sys.path.append('/content/2025_edge_computing_task1')

# 모듈 임포트
from config import Config
from dataset import create_data_loaders
from model import EFRegressionModel
from train import train_model, load_checkpoint
from prune_utils import apply_pruning_experiment
from metrics import count_params, calculate_sparsity, evaluate_model, measure_latency

# Colab용 설정 업데이트
# 위에서 설정한 DATA_ROOT를 사용
if 'DATA_ROOT' in locals() and DATA_ROOT is not None:
    Config.DATA_ROOT = Path(DATA_ROOT)
    Config.VIDEO_DIR = Config.DATA_ROOT / "Videos"
    Config.FILELIST_PATH = Config.DATA_ROOT / "FileList.csv"
    print(f"✅ Using data from: {Config.DATA_ROOT}")
else:
    print("❌ DATA_ROOT not set! Please run data preparation cells above.")
    raise ValueError("DATA_ROOT must be set. Check data preparation section.")

# 디렉토리 생성
Config.setup_directories()

# 디바이스 확인
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
print(f"Data root: {Config.DATA_ROOT}")
print(f"Video dir: {Config.VIDEO_DIR}")
print(f"FileList: {Config.FILELIST_PATH}")


## 5. 데이터 로딩


In [None]:
# 데이터 경로 확인
if not Config.VIDEO_DIR.exists():
    raise FileNotFoundError(f"Video directory not found: {Config.VIDEO_DIR}")
if not Config.FILELIST_PATH.exists():
    raise FileNotFoundError(f"FileList.csv not found: {Config.FILELIST_PATH}")

# 데이터 로더 생성
print("Loading dataset...")
train_loader, val_loader, test_loader = create_data_loaders(
    video_dir=Config.VIDEO_DIR,
    filelist_path=Config.FILELIST_PATH,
    num_frames=Config.NUM_FRAMES,
    image_size=Config.IMAGE_SIZE,
    batch_size=Config.BATCH_SIZE,
    num_workers=Config.NUM_WORKERS
)

print(f"Train samples: {len(train_loader.dataset)}")
print(f"Val samples: {len(val_loader.dataset)}")
print(f"Test samples: {len(test_loader.dataset)}")

# Latency 측정용 샘플 입력 준비
sample_videos, _ = next(iter(val_loader))
sample_input = sample_videos[:1].to(device)


## 6. Baseline 모델 학습


In [None]:
# Baseline 모델 생성
baseline_model = EFRegressionModel(
    num_frames=Config.NUM_FRAMES,
    pretrained=True
)

baseline_checkpoint_path = Config.CHECKPOINT_DIR / "baseline_best.pth"

# Baseline 모델 학습 또는 로드
if not baseline_checkpoint_path.exists():
    print("Training baseline model...")
    baseline_model = train_model(
        model=baseline_model,
        train_loader=train_loader,
        val_loader=val_loader,
        num_epochs=Config.NUM_EPOCHS,
        learning_rate=Config.LEARNING_RATE,
        weight_decay=Config.WEIGHT_DECAY,
        device=device,
        checkpoint_dir=Config.CHECKPOINT_DIR
    )
    
    # 최종 모델 저장
    torch.save(baseline_model.state_dict(), baseline_checkpoint_path)
    print(f"Baseline model saved to {baseline_checkpoint_path}")
else:
    print(f"Loading baseline model from {baseline_checkpoint_path}")
    baseline_model.load_state_dict(torch.load(baseline_checkpoint_path, map_location=device))
    baseline_model = baseline_model.to(device)


## 7. Baseline 모델 평가


In [None]:
# Baseline 평가
print("Evaluating baseline model...")
baseline_model.eval()
baseline_results = evaluate_model(baseline_model, test_loader, device)
baseline_params = count_params(baseline_model)
baseline_sparsity = calculate_sparsity(baseline_model)
baseline_latency = measure_latency(
    baseline_model, sample_input, device,
    Config.LATENCY_WARMUP_ITERATIONS,
    Config.LATENCY_MEASURE_ITERATIONS
)

baseline_summary = {
    'model_type': 'baseline',
    'num_params': baseline_params,
    'sparsity': baseline_sparsity,
    'MAE': baseline_results['MAE'],
    'latency_ms_per_video': baseline_latency
}

print(f"\nBaseline Results:")
print(f"  Parameters: {baseline_params:,}")
print(f"  Sparsity: {baseline_sparsity:.4f}")
print(f"  MAE: {baseline_results['MAE']:.4f}")
print(f"  Latency: {baseline_latency:.4f} ms/video")


## 8. Unstructured Pruning 실험


In [None]:
print("="*70)
print("Unstructured Pruning Experiments")
print("="*70)

unstructured_results = apply_pruning_experiment(
    model=baseline_model,
    train_loader=train_loader,
    val_loader=val_loader,
    pruning_type="unstructured",
    pruning_ratios=Config.UNSTRUCTURED_PRUNING_RATIOS,
    fine_tune_epochs=Config.FINE_TUNE_EPOCHS,
    learning_rate=Config.LEARNING_RATE * 0.1,  # Lower LR for fine-tuning
    device=device,
    sample_input=sample_input
)


## 9. Structured Pruning 실험


In [None]:
print("="*70)
print("Structured Pruning Experiments")
print("="*70)

# Unstructured pruning이 모델을 수정했으므로 baseline 다시 로드
baseline_model.load_state_dict(torch.load(baseline_checkpoint_path, map_location=device))
baseline_model = baseline_model.to(device)

structured_results = apply_pruning_experiment(
    model=baseline_model,
    train_loader=train_loader,
    val_loader=val_loader,
    pruning_type="structured",
    pruning_ratios=Config.STRUCTURED_PRUNING_RATIOS,
    fine_tune_epochs=Config.FINE_TUNE_EPOCHS,
    learning_rate=Config.LEARNING_RATE * 0.1,
    device=device,
    sample_input=sample_input
)


## 10. 결과 저장 및 시각화


In [None]:
# 모든 결과 저장
all_results = {
    'baseline': baseline_summary,
    'unstructured_pruning': unstructured_results,
    'structured_pruning': structured_results,
    'timestamp': datetime.now().isoformat(),
    'config': {
        'num_frames': Config.NUM_FRAMES,
        'image_size': Config.IMAGE_SIZE,
        'batch_size': Config.BATCH_SIZE,
        'num_epochs': Config.NUM_EPOCHS,
        'learning_rate': Config.LEARNING_RATE,
        'fine_tune_epochs': Config.FINE_TUNE_EPOCHS
    }
}

# JSON으로 저장
results_path = Config.RESULTS_DIR / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(results_path, 'w') as f:
    json.dump(all_results, f, indent=2)

print(f"Results saved to: {results_path}")

# 요약 테이블 생성
summary_data = []

# Baseline
summary_data.append({
    'Model': 'Baseline',
    'Pruning Ratio': '-',
    'Parameters': baseline_summary['num_params'],
    'Sparsity': f"{baseline_summary['sparsity']:.4f}",
    'MAE': f"{baseline_summary['MAE']:.4f}",
    'Latency (ms/video)': f"{baseline_summary['latency_ms_per_video']:.4f}"
})

# Unstructured pruning
for result in unstructured_results:
    summary_data.append({
        'Model': 'Unstructured',
        'Pruning Ratio': f"{result['pruning_ratio']:.2f}",
        'Parameters': result['num_params'],
        'Sparsity': f"{result['sparsity']:.4f}",
        'MAE': f"{result['MAE']:.4f}",
        'Latency (ms/video)': f"{result['latency_ms_per_video']:.4f}" if result['latency_ms_per_video'] else 'N/A'
})

# Structured pruning
for result in structured_results:
    summary_data.append({
        'Model': 'Structured',
        'Pruning Ratio': f"{result['pruning_ratio']:.2f}",
        'Parameters': result['num_params'],
        'Sparsity': f"{result['sparsity']:.4f}",
        'MAE': f"{result['MAE']:.4f}",
        'Latency (ms/video)': f"{result['latency_ms_per_video']:.4f}" if result['latency_ms_per_video'] else 'N/A'
})

# 테이블 출력
df_summary = pd.DataFrame(summary_data)
print("\n" + "="*70)
print("Summary Table")
print("="*70)
print("\n" + df_summary.to_string(index=False))

# CSV로 저장
csv_path = Config.RESULTS_DIR / f"summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df_summary.to_csv(csv_path, index=False)
print(f"\nSummary table saved to: {csv_path}")


## 11. 결과 시각화 (선택사항)


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# 결과 시각화
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 1. MAE 비교
ax1 = axes[0, 0]
models = ['Baseline']
mae_values = [baseline_summary['MAE']]

for result in unstructured_results:
    models.append(f"Unstructured\n({result['pruning_ratio']:.1f})")
    mae_values.append(result['MAE'])

for result in structured_results:
    models.append(f"Structured\n({result['pruning_ratio']:.1f})")
    mae_values.append(result['MAE'])

ax1.bar(models, mae_values, color=['blue'] + ['orange']*len(unstructured_results) + ['green']*len(structured_results))
ax1.set_ylabel('MAE')
ax1.set_title('MAE Comparison')
ax1.tick_params(axis='x', rotation=45)

# 2. Parameters 비교
ax2 = axes[0, 1]
param_values = [baseline_summary['num_params']]
for result in unstructured_results:
    param_values.append(result['num_params'])
for result in structured_results:
    param_values.append(result['num_params'])

ax2.bar(models, param_values, color=['blue'] + ['orange']*len(unstructured_results) + ['green']*len(structured_results))
ax2.set_ylabel('Number of Parameters')
ax2.set_title('Parameters Comparison')
ax2.tick_params(axis='x', rotation=45)
ax2.ticklabel_format(style='scientific', axis='y', scilimits=(0,0))

# 3. Sparsity 비교
ax3 = axes[1, 0]
sparsity_values = [baseline_summary['sparsity']]
for result in unstructured_results:
    sparsity_values.append(result['sparsity'])
for result in structured_results:
    sparsity_values.append(result['sparsity'])

ax3.bar(models, sparsity_values, color=['blue'] + ['orange']*len(unstructured_results) + ['green']*len(structured_results))
ax3.set_ylabel('Sparsity')
ax3.set_title('Sparsity Comparison')
ax3.tick_params(axis='x', rotation=45)

# 4. Latency 비교
ax4 = axes[1, 1]
latency_values = [baseline_summary['latency_ms_per_video']]
for result in unstructured_results:
    if result['latency_ms_per_video']:
        latency_values.append(result['latency_ms_per_video'])
    else:
        latency_values.append(0)
for result in structured_results:
    if result['latency_ms_per_video']:
        latency_values.append(result['latency_ms_per_video'])
    else:
        latency_values.append(0)

ax4.bar(models, latency_values, color=['blue'] + ['orange']*len(unstructured_results) + ['green']*len(structured_results))
ax4.set_ylabel('Latency (ms/video)')
ax4.set_title('Latency Comparison')
ax4.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# 결과 다운로드 (Colab에서)
print("\nTo download results:")
print(f"from google.colab import files")
print(f"files.download('{results_path}')")
print(f"files.download('{csv_path}')")


## 12. 결과 다운로드 (Colab에서)

실험 결과를 다운로드하려면 아래 셀을 실행하세요.


In [None]:
# 결과 파일 다운로드
from google.colab import files
import os
import glob

# 최신 결과 파일 찾기
result_files = glob.glob(str(Config.RESULTS_DIR / "results_*.json"))
csv_files = glob.glob(str(Config.RESULTS_DIR / "summary_*.csv"))

if result_files:
    latest_result = max(result_files, key=os.path.getctime)
    files.download(latest_result)
    print(f"Downloaded: {latest_result}")

if csv_files:
    latest_csv = max(csv_files, key=os.path.getctime)
    files.download(latest_csv)
    print(f"Downloaded: {latest_csv}")

# 체크포인트 다운로드 (선택사항)
# files.download(str(baseline_checkpoint_path))
