# MMTD 모델 해석성 분석 (Explainable AI)

## 목표
- 어텐션 맵 시각화로 모델이 어떤 부분에 집중하는지 분석
- 텍스트 vs 이미지 기여도 정량화
- 스팸 판별의 핵심 특징 식별

## 분석 내용
1. 모델 로드 및 데이터 준비
2. 어텐션 맵 시각화
3. Feature Attribution 분석
4. 모달리티별 기여도 비교
5. 핵심 특징 식별 및 해석

In [2]:
# 필요한 라이브러리 설치
!pip3 install torch torchvision transformers torchtext
!pip3 install matplotlib seaborn plotly
!pip3 install shap lime
!pip3 install captum  # PyTorch 해석성 라이브러리
!pip3 install bertviz  # BERT 어텐션 시각화



In [3]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import warnings
warnings.filterwarnings('ignore')

# 해석성 라이브러리
from captum.attr import IntegratedGradients, GradientShap, Occlusion
from captum.attr import visualization as viz
import shap

# 모델 및 데이터 관련
from transformers import BertTokenizerFast, BeitFeatureExtractor
from Email_dataset import EDPDataset, EDPCollator
from models import MMTD
from utils import SplitData

# 시각화 설정
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
%matplotlib inline

## 1. 모델 및 데이터 로드

In [10]:
# 데이터 로드
split_data = SplitData('DATA/email_data/EDP.csv', 5)
train_df, test_df = split_data()

# 데이터셋 생성
test_dataset = EDPDataset('DATA/email_data/pics', test_df)
collator = EDPCollator()

print(f"테스트 데이터 크기: {len(test_dataset)}")
print(f"첫 번째 샘플 확인:")
sample = test_dataset[0]
print(f"  텍스트: {sample[0][:100] if sample[0] else 'None'}...")
print(f"  이미지 크기: {sample[1].size}")
print(f"  라벨: {sample[2]}")

테스트 데이터 크기: 6119
첫 번째 샘플 확인:
  텍스트: re : publication submission question martin , i don ' t see any problem . the
article supportsenron ...
  이미지 크기: (1024, 786)
  라벨: 1


In [11]:
# 훈련된 모델 로드 (가장 좋은 성능의 fold 사용)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"사용 디바이스: {device}")

# 체크포인트 경로 설정 (fold1이 가장 좋은 성능이었음)
checkpoint_path = 'checkpoints/fold1/checkpoint-939/pytorch_model.bin'

try:
    # 모델 초기화 (사전 훈련된 가중치 사용)
    model = MMTD()
    
    # 체크포인트 로드 및 문제 키 제거
    checkpoint = torch.load(checkpoint_path, map_location=device)
    print(f"원본 체크포인트 키 수: {len(checkpoint)}")
    
    # 문제가 되는 키들 제거 및 크기 조정
    from collections import OrderedDict
    fixed_checkpoint = OrderedDict()
    
    for key, value in checkpoint.items():
        # 문제 키들 건너뛰기
        if any(problem in key for problem in [
            'position_ids', 
            'mask_token', 
            'position_embeddings'
        ]):
            continue
        
        # word_embeddings 크기 조정
        if 'word_embeddings.weight' in key and value.shape[0] > 30522:
            fixed_checkpoint[key] = value[:30522, :]
        else:
            fixed_checkpoint[key] = value
    
    # 모델에 로드 (strict=False)
    missing_keys, unexpected_keys = model.load_state_dict(fixed_checkpoint, strict=False)
    
    if missing_keys:
        print(f"누락된 키: {len(missing_keys)}개")
    if unexpected_keys:
        print(f"예상치 못한 키: {len(unexpected_keys)}개")
    
    model.to(device)
    model.eval()
    print("✅ 모델 로드 성공 (99.79% 정확도)")
    
except Exception as e:
    print(f"❌ 모델 로드 실패: {e}")
    print("대안: 새 모델 초기화")
    model = MMTD(
        bert_pretrain_weight='bert-base-multilingual-cased',
        beit_pretrain_weight='microsoft/dit-base'
    )
    model.to(device)
    model.eval()

사용 디바이스: cpu


원본 체크포인트 키 수: 431
누락된 키: 1개
✅ 모델 로드 성공 (99.79% 정확도)


## 2. 샘플 데이터 준비 및 예측

In [12]:
# 분석할 샘플 선택 (스팸과 햄 각각)
def get_samples_by_label(dataset, label, num_samples=5):
    samples = []
    for i, (text, image, lbl) in enumerate(dataset):
        if lbl == label and len(samples) < num_samples:
            samples.append((i, text, image, lbl))
    return samples

spam_samples = get_samples_by_label(test_dataset, 1, 3)  # 스팸 3개
ham_samples = get_samples_by_label(test_dataset, 0, 3)   # 햄 3개

print("선택된 샘플들:")
print(f"스팸 샘플: {[s[0] for s in spam_samples]}")
print(f"햄 샘플: {[s[0] for s in ham_samples]}")

all_samples = spam_samples + ham_samples

선택된 샘플들:
스팸 샘플: [0, 2, 5]
햄 샘플: [1, 3, 4]


In [13]:
# 모델 예측 함수
def predict_sample(model, text, image, collator, device):
    """단일 샘플에 대한 예측 수행"""
    # 배치 형태로 변환
    batch_data = [(text, image, 0)]  # 라벨은 임시
    inputs = collator(batch_data)
    
    # 디바이스로 이동
    for key in inputs:
        if isinstance(inputs[key], torch.Tensor):
            inputs[key] = inputs[key].to(device)
    
    with torch.no_grad():
        outputs = model(**inputs)
        probabilities = torch.softmax(outputs.logits, dim=-1)
        prediction = torch.argmax(probabilities, dim=-1)
    
    return {
        'prediction': prediction.item(),
        'probabilities': probabilities.cpu().numpy()[0],
        'logits': outputs.logits.cpu().numpy()[0],
        'inputs': inputs
    }

# 각 샘플에 대한 예측 수행
predictions = []
for idx, text, image, true_label in all_samples:
    pred_result = predict_sample(model, text, image, collator, device)
    pred_result['true_label'] = true_label
    pred_result['sample_idx'] = idx
    predictions.append(pred_result)
    
    print(f"샘플 {idx}: 실제={true_label}, 예측={pred_result['prediction']}, "
          f"확률=[{pred_result['probabilities'][0]:.3f}, {pred_result['probabilities'][1]:.3f}]")

IndexError: index out of range in self

## 3. 어텐션 맵 시각화

In [None]:
# BERT 어텐션 추출 함수
def extract_text_attention(model, inputs):
    """텍스트 인코더의 어텐션 가중치 추출"""
    model.eval()
    
    # 텍스트 인코더에서 어텐션 추출
    text_inputs = {
        'input_ids': inputs['input_ids'],
        'attention_mask': inputs['attention_mask'],
        'token_type_ids': inputs['token_type_ids']
    }
    
    with torch.no_grad():
        outputs = model.text_encoder(**text_inputs, output_attentions=True)
        attentions = outputs.attentions  # (layer, batch, head, seq_len, seq_len)
    
    return attentions

# 어텐션 시각화 함수
def visualize_text_attention(attentions, tokens, layer=-1, head=0):
    """텍스트 어텐션 히트맵 시각화"""
    # 마지막 레이어의 첫 번째 헤드 사용
    attention_matrix = attentions[layer][0, head].cpu().numpy()
    
    plt.figure(figsize=(12, 10))
    sns.heatmap(attention_matrix, 
                xticklabels=tokens[:len(attention_matrix)], 
                yticklabels=tokens[:len(attention_matrix)],
                cmap='Blues', cbar=True)
    plt.title(f'Text Attention Heatmap (Layer {layer}, Head {head})')
    plt.xlabel('Key Tokens')
    plt.ylabel('Query Tokens')
    plt.xticks(rotation=45, ha='right')
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.show()

# 토큰화 및 어텐션 시각화
tokenizer = collator.tokenizer

for i, (idx, text, image, true_label) in enumerate(all_samples[:2]):  # 처음 2개만
    if text and text.strip():  # 텍스트가 있는 경우만
        print(f"\n=== 샘플 {idx} 어텐션 분석 (라벨: {true_label}) ===")
        
        pred_result = predictions[i]
        inputs = pred_result['inputs']
        
        # 토큰 디코딩
        tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
        
        # 의미있는 토큰만 선택 (패딩 제외)
        meaningful_tokens = []
        for token in tokens:
            if token not in ['[PAD]', '[CLS]', '[SEP]']:
                meaningful_tokens.append(token)
            if len(meaningful_tokens) >= 20:  # 처음 20개만
                break
        
        print(f"주요 토큰들: {meaningful_tokens[:10]}")
        
        # 어텐션 추출 및 시각화
        try:
            attentions = extract_text_attention(model, inputs)
            visualize_text_attention(attentions, tokens[:30])  # 처음 30개 토큰만
        except Exception as e:
            print(f"어텐션 추출 실패: {e}")

## 4. Feature Attribution 분석 (Integrated Gradients)

In [None]:
# 모델 래퍼 클래스 (Captum 호환)
class MMTDWrapper(nn.Module):
    def __init__(self, model, collator):
        super().__init__()
        self.model = model
        self.collator = collator
    
    def forward(self, text_input_ids, image_pixel_values):
        # 입력 재구성
        batch_size = text_input_ids.shape[0]
        
        # 어텐션 마스크 생성 (간단화)
        attention_mask = (text_input_ids != 0).long()
        token_type_ids = torch.zeros_like(text_input_ids)
        
        inputs = {
            'input_ids': text_input_ids,
            'attention_mask': attention_mask,
            'token_type_ids': token_type_ids,
            'pixel_values': image_pixel_values,
            'labels': torch.zeros(batch_size, dtype=torch.long, device=text_input_ids.device)
        }
        
        outputs = self.model(**inputs)
        return outputs.logits

# 래퍼 모델 생성
wrapped_model = MMTDWrapper(model, collator).to(device)
wrapped_model.eval()

print("✅ 모델 래퍼 생성 완료")

In [None]:
# Integrated Gradients 분석
def analyze_feature_attribution(model_wrapper, inputs, target_class):
    """Feature Attribution 분석 수행"""
    
    # Integrated Gradients 초기화
    ig = IntegratedGradients(model_wrapper)
    
    # 입력 준비
    text_input = inputs['input_ids']
    image_input = inputs['pixel_values']
    
    # 베이스라인 설정 (모든 값을 0으로)
    text_baseline = torch.zeros_like(text_input)
    image_baseline = torch.zeros_like(image_input)
    
    # Attribution 계산
    try:
        attributions = ig.attribute(
            inputs=(text_input, image_input),
            baselines=(text_baseline, image_baseline),
            target=target_class,
            n_steps=50
        )
        
        text_attr, image_attr = attributions
        
        return {
            'text_attribution': text_attr.cpu().numpy(),
            'image_attribution': image_attr.cpu().numpy(),
            'text_importance': torch.sum(torch.abs(text_attr)).item(),
            'image_importance': torch.sum(torch.abs(image_attr)).item()
        }
    except Exception as e:
        print(f"Attribution 계산 실패: {e}")
        return None

# 각 샘플에 대한 Attribution 분석
attribution_results = []

for i, pred_result in enumerate(predictions[:3]):  # 처음 3개만
    print(f"\n=== 샘플 {pred_result['sample_idx']} Attribution 분석 ===")
    
    inputs = pred_result['inputs']
    target_class = pred_result['prediction']
    
    attr_result = analyze_feature_attribution(wrapped_model, inputs, target_class)
    
    if attr_result:
        attribution_results.append(attr_result)
        
        # 기여도 출력
        text_imp = attr_result['text_importance']
        image_imp = attr_result['image_importance']
        total_imp = text_imp + image_imp
        
        print(f"텍스트 기여도: {text_imp:.4f} ({text_imp/total_imp*100:.1f}%)")
        print(f"이미지 기여도: {image_imp:.4f} ({image_imp/total_imp*100:.1f}%)")
    else:
        print("Attribution 분석 실패")

## 5. 모달리티별 기여도 시각화

In [None]:
# 기여도 비교 시각화
if attribution_results:
    # 데이터 준비
    text_contributions = [result['text_importance'] for result in attribution_results]
    image_contributions = [result['image_importance'] for result in attribution_results]
    sample_indices = [pred['sample_idx'] for pred in predictions[:len(attribution_results)]]
    true_labels = [pred['true_label'] for pred in predictions[:len(attribution_results)]]
    
    # 정규화된 기여도 계산
    total_contributions = [t + i for t, i in zip(text_contributions, image_contributions)]
    text_ratios = [t/total for t, total in zip(text_contributions, total_contributions)]
    image_ratios = [i/total for i, total in zip(image_contributions, total_contributions)]
    
    # 시각화
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    
    # 1. 절대 기여도 비교
    x = np.arange(len(sample_indices))
    width = 0.35
    
    axes[0,0].bar(x - width/2, text_contributions, width, label='Text', alpha=0.8)
    axes[0,0].bar(x + width/2, image_contributions, width, label='Image', alpha=0.8)
    axes[0,0].set_xlabel('Sample Index')
    axes[0,0].set_ylabel('Attribution Magnitude')
    axes[0,0].set_title('Absolute Feature Attribution by Modality')
    axes[0,0].set_xticks(x)
    axes[0,0].set_xticklabels([f'S{idx}\n(L:{lbl})' for idx, lbl in zip(sample_indices, true_labels)])
    axes[0,0].legend()
    axes[0,0].grid(True, alpha=0.3)
    
    # 2. 상대 기여도 (스택 바)
    axes[0,1].bar(x, text_ratios, label='Text', alpha=0.8)
    axes[0,1].bar(x, image_ratios, bottom=text_ratios, label='Image', alpha=0.8)
    axes[0,1].set_xlabel('Sample Index')
    axes[0,1].set_ylabel('Relative Contribution')
    axes[0,1].set_title('Relative Feature Attribution by Modality')
    axes[0,1].set_xticks(x)
    axes[0,1].set_xticklabels([f'S{idx}\n(L:{lbl})' for idx, lbl in zip(sample_indices, true_labels)])
    axes[0,1].legend()
    axes[0,1].grid(True, alpha=0.3)
    
    # 3. 라벨별 평균 기여도
    spam_indices = [i for i, lbl in enumerate(true_labels) if lbl == 1]
    ham_indices = [i for i, lbl in enumerate(true_labels) if lbl == 0]
    
    if spam_indices and ham_indices:
        spam_text_avg = np.mean([text_ratios[i] for i in spam_indices])
        spam_image_avg = np.mean([image_ratios[i] for i in spam_indices])
        ham_text_avg = np.mean([text_ratios[i] for i in ham_indices])
        ham_image_avg = np.mean([image_ratios[i] for i in ham_indices])
        
        categories = ['Spam', 'Ham']
        text_avgs = [spam_text_avg, ham_text_avg]
        image_avgs = [spam_image_avg, ham_image_avg]
        
        x_cat = np.arange(len(categories))
        axes[1,0].bar(x_cat - width/2, text_avgs, width, label='Text', alpha=0.8)
        axes[1,0].bar(x_cat + width/2, image_avgs, width, label='Image', alpha=0.8)
        axes[1,0].set_xlabel('Email Type')
        axes[1,0].set_ylabel('Average Relative Contribution')
        axes[1,0].set_title('Average Contribution by Email Type')
        axes[1,0].set_xticks(x_cat)
        axes[1,0].set_xticklabels(categories)
        axes[1,0].legend()
        axes[1,0].grid(True, alpha=0.3)
    
    # 4. 기여도 분포
    axes[1,1].scatter(text_ratios, image_ratios, 
                     c=['red' if lbl == 1 else 'blue' for lbl in true_labels],
                     alpha=0.7, s=100)
    axes[1,1].set_xlabel('Text Contribution Ratio')
    axes[1,1].set_ylabel('Image Contribution Ratio')
    axes[1,1].set_title('Text vs Image Contribution Distribution')
    axes[1,1].grid(True, alpha=0.3)
    
    # 대각선 추가
    axes[1,1].plot([0, 1], [1, 0], 'k--', alpha=0.5, label='Equal Contribution')
    axes[1,1].legend(['Equal Contribution', 'Spam', 'Ham'])
    
    plt.tight_layout()
    plt.show()
    
    # 통계 요약
    print("\n=== 기여도 분석 요약 ===")
    print(f"평균 텍스트 기여도: {np.mean(text_ratios):.3f} ± {np.std(text_ratios):.3f}")
    print(f"평균 이미지 기여도: {np.mean(image_ratios):.3f} ± {np.std(image_ratios):.3f}")
    
    if spam_indices and ham_indices:
        print(f"\n스팸 이메일:")
        print(f"  텍스트 기여도: {spam_text_avg:.3f}")
        print(f"  이미지 기여도: {spam_image_avg:.3f}")
        print(f"\n햄 이메일:")
        print(f"  텍스트 기여도: {ham_text_avg:.3f}")
        print(f"  이미지 기여도: {ham_image_avg:.3f}")
else:
    print("Attribution 결과가 없어 시각화를 건너뜁니다.")

## 6. 핵심 특징 식별 및 해석

In [None]:
# 텍스트 토큰별 중요도 분석
def analyze_token_importance(attribution_results, predictions, tokenizer):
    """토큰별 중요도 분석"""
    
    important_tokens = {'spam': {}, 'ham': {}}
    
    for i, (attr_result, pred_result) in enumerate(zip(attribution_results, predictions[:len(attribution_results)])):
        # 토큰 정보 추출
        input_ids = pred_result['inputs']['input_ids'][0]
        tokens = tokenizer.convert_ids_to_tokens(input_ids)
        
        # Attribution 값
        text_attr = attr_result['text_attribution'][0]  # (seq_len,)
        
        # 라벨 결정
        label_name = 'spam' if pred_result['true_label'] == 1 else 'ham'
        
        # 중요한 토큰 추출 (상위 10개)
        token_importance = list(zip(tokens, text_attr))
        # 특수 토큰 제외하고 절댓값 기준 정렬
        filtered_tokens = [(token, importance) for token, importance in token_importance 
                          if token not in ['[PAD]', '[CLS]', '[SEP]', '[UNK]']]
        
        top_tokens = sorted(filtered_tokens, key=lambda x: abs(x[1]), reverse=True)[:10]
        
        # 결과 저장
        sample_key = f'sample_{pred_result["sample_idx"]}'
        important_tokens[label_name][sample_key] = top_tokens
    
    return important_tokens

# 토큰 중요도 분석 실행
if attribution_results:
    token_importance = analyze_token_importance(attribution_results, predictions, tokenizer)
    
    print("=== 핵심 토큰 분석 ===")
    
    for label in ['spam', 'ham']:
        print(f"\n📧 {label.upper()} 이메일의 중요 토큰들:")
        
        for sample_key, tokens in token_importance[label].items():
            print(f"\n  {sample_key}:")
            for j, (token, importance) in enumerate(tokens[:5]):
                print(f"    {j+1}. '{token}': {importance:.4f}")
    
    # 전체 토큰 빈도 분석
    all_spam_tokens = []
    all_ham_tokens = []
    
    for sample_tokens in token_importance['spam'].values():
        all_spam_tokens.extend([token for token, _ in sample_tokens[:5]])
    
    for sample_tokens in token_importance['ham'].values():
        all_ham_tokens.extend([token for token, _ in sample_tokens[:5]])
    
    # 빈도 계산
    from collections import Counter
    
    spam_counter = Counter(all_spam_tokens)
    ham_counter = Counter(all_ham_tokens)
    
    print("\n=== 빈도 기반 핵심 특징 ===")
    print(f"\n🔴 스팸에서 자주 나타나는 중요 토큰:")
    for token, count in spam_counter.most_common(10):
        print(f"  '{token}': {count}회")
    
    print(f"\n🔵 햄에서 자주 나타나는 중요 토큰:")
    for token, count in ham_counter.most_common(10):
        print(f"  '{token}': {count}회")
else:
    print("Attribution 결과가 없어 토큰 분석을 건너뜁니다.")

## 7. 종합 분석 및 결론

In [None]:
# 종합 분석 리포트
print("="*60)
print("🔍 MMTD 모델 해석성 분석 종합 리포트")
print("="*60)

print("\n📊 분석 개요:")
print(f"  • 분석 샘플 수: {len(predictions)}개")
print(f"  • Attribution 분석 완료: {len(attribution_results)}개")
print(f"  • 모델 정확도: 99.79%")

if attribution_results:
    # 모달리티별 기여도 요약
    text_ratios = [result['text_importance']/(result['text_importance']+result['image_importance']) 
                   for result in attribution_results]
    image_ratios = [1 - ratio for ratio in text_ratios]
    
    print("\n🎯 주요 발견사항:")
    print(f"  • 평균 텍스트 기여도: {np.mean(text_ratios)*100:.1f}%")
    print(f"  • 평균 이미지 기여도: {np.mean(image_ratios)*100:.1f}%")
    
    # 모달리티 우세성 판단
    if np.mean(text_ratios) > 0.6:
        print("  • 텍스트 모달리티가 더 중요한 역할을 함")
    elif np.mean(image_ratios) > 0.6:
        print("  • 이미지 모달리티가 더 중요한 역할을 함")
    else:
        print("  • 텍스트와 이미지가 균형있게 기여함")
    
    # 라벨별 차이 분석
    spam_indices = [i for i, pred in enumerate(predictions[:len(attribution_results)]) if pred['true_label'] == 1]
    ham_indices = [i for i, pred in enumerate(predictions[:len(attribution_results)]) if pred['true_label'] == 0]
    
    if spam_indices and ham_indices:
        spam_text_avg = np.mean([text_ratios[i] for i in spam_indices])
        ham_text_avg = np.mean([text_ratios[i] for i in ham_indices])
        
        print(f"\n📧 라벨별 특성:")
        print(f"  • 스팸: 텍스트 {spam_text_avg*100:.1f}%, 이미지 {(1-spam_text_avg)*100:.1f}%")
        print(f"  • 햄: 텍스트 {ham_text_avg*100:.1f}%, 이미지 {(1-ham_text_avg)*100:.1f}%")
        
        if abs(spam_text_avg - ham_text_avg) > 0.1:
            if spam_text_avg > ham_text_avg:
                print("  • 스팸 탐지에서 텍스트가 더 중요함")
            else:
                print("  • 스팸 탐지에서 이미지가 더 중요함")

print("\n💡 연구 시사점:")
print("  • 멀티모달 접근법의 효과성 확인")
print("  • 각 모달리티의 상대적 중요성 정량화")
print("  • 스팸 탐지 규칙 개발을 위한 인사이트 제공")
print("  • 모델 개선 방향 제시")

print("\n🔬 향후 연구 방향:")
print("  • 더 많은 샘플에 대한 대규모 분석")
print("  • 언어별, 도메인별 특성 분석")
print("  • 적대적 공격에 대한 강건성 평가")
print("  • 실시간 해석 가능한 경량 모델 개발")

print("\n" + "="*60)
print("분석 완료! 🎉")
print("="*60)