In [None]:
import json
import numpy as np
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from sentence_transformers import SentenceTransformer, util
import shap
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
from dotenv import load_dotenv
import os
warnings.filterwarnings('ignore')

True

In [None]:
# 1. 모델 및 토크나이저 로드 (양자화 없음)
print("모델 및 토크나이저 로딩 중...")
model_name = "google/gemma-3-12b-it"  # Gemma 3 12B 모델 (instruction tuned)

load_dotenv()

# 토크나이저 로드
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token  # 패딩 토큰 설정

# 모델 로드 (양자화 없음)
try:
    model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    torch_dtype=torch.float16,
    token=os.getenv("HUGGINGFACE_TOKEN")
)
    device = next(model.parameters()).device
    print(f"모델 로드 완료 (device: {device})")
except Exception as e:
    print(f"GPU 로드 실패: {e}")
    print("CPU로 대체 모델 로드 중...")
    
    # GPU 로드 실패 시 더 작은 대체 모델 CPU로 로드
    fallback_model_name = "google/gemma-3-3b-it"  # 가장 작은 Gemma 3 모델
    model = AutoModelForCausalLM.from_pretrained(
        fallback_model_name,
        device_map="cpu",
        torch_dtype=torch.float32,
    )
    tokenizer = AutoTokenizer.from_pretrained(fallback_model_name)
    tokenizer.pad_token = tokenizer.eos_token
    device = torch.device("cpu")
    print(f"대체 모델 로드 완료 (device: {device}, model: {fallback_model_name})")

# sentence embedding 모델 로드 (다국어 지원 모델)
print("임베딩 모델 로딩 중...")
embedder = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2").to(device)
print("임베딩 모델 로드 완료")

모델 및 토크나이저 로딩 중...




Loading checkpoint shards:   0%|          | 0/5 [00:00<?, ?it/s]

모델 로드 완료 (device: cuda:0)
임베딩 모델 로딩 중...
임베딩 모델 로드 완료


In [13]:
# 2. 프롬프트 기반 text generation 함수
def summarize_text(text):
    """텍스트를 요약하는 함수"""
    if not text or len(text.strip()) == 0:
        return ""
    
    # Gemma 모델용 instruction 형식 프롬프트
    prompt = f"<start_of_turn>user\n경제금융 뉴스 기사를 요약해주세요. 핵심 정보만 간결하게 요약해주세요.\n\n기사: {text}<end_of_turn>\n<start_of_turn>model\n"
    
    # 토크나이징 및 입력 준비
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=2048)
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    # 생성 설정
    gen_config = {
        "max_new_tokens": 128,
        "do_sample": True,
        "temperature": 0.7,
        "top_p": 0.95,
        "repetition_penalty": 1.1,
        "eos_token_id": tokenizer.eos_token_id,
        "pad_token_id": tokenizer.pad_token_id
    }
    
    # 요약 생성
    with torch.no_grad():
        output = model.generate(**inputs, **gen_config)
    
    # 디코딩 및 프롬프트 제거
    full_response = tokenizer.decode(output[0], skip_special_tokens=False)
    
    # 응답 파싱 (모델 응답 부분만 추출)
    try:
        # Gemma 응답 형식에서 모델 응답 부분만 추출
        if "<start_of_turn>model" in full_response:
            response_parts = full_response.split("<start_of_turn>model\n")[1]
            if "<end_of_turn>" in response_parts:
                summary = response_parts.split("<end_of_turn>")[0].strip()
            else:
                summary = response_parts.strip()
        else:
            # 프롬프트 제거
            summary = full_response.replace(prompt, "").strip()
            
        return summary
    except Exception as e:
        print(f"요약 파싱 오류: {e}")
        # 오류 발생 시 전체 응답에서 프롬프트 부분 제거 시도
        return full_response.replace(prompt, "").strip()


In [14]:
# 3. 데이터셋 가져오기
def load_dataset(file_path, max_samples=50):
    """JSON 파일에서 데이터셋 로드"""
    print(f"데이터셋 로딩 중: {file_path}")
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        # JSON 구조에 따라 적절히 변환
        if isinstance(data, list):
            df = pd.DataFrame(data)
        else:
            # JSON 구조가 다르면 적절히 변환 필요
            df = pd.DataFrame([data])
        
        # 컬럼 이름 확인 및 변환
        if 'article' in df.columns and 'text' not in df.columns:
            df['text'] = df['article']
        if 'summary' not in df.columns and 'abstractive' in df.columns:
            df['summary'] = df['abstractive']
        
        # 최대 샘플 수 제한
        if len(df) > max_samples:
            df = df.sample(max_samples, random_state=42)
        
        print(f"데이터셋 로딩 완료: {len(df)} 샘플")
        return df
    
    except Exception as e:
        print(f"데이터셋 로딩 실패: {e}")
        # 예시 데이터 생성
        print("예시 데이터 사용")
        example_data = {
            'text': [
                "한국은행이 통화정책 결정 회의에서 기준금리를 동결했다. 한국은행은 지난해 4분기 이후 계속된 경기침체의 영향으로 고용 시장이 위축되고 있으며, 물가상승률은 목표 수준으로 안정되고 있다고 판단했다. 시장 전문가들은 한국 경제의 회복세가 예상보다 더디게 진행되고 있어 금리 인하 가능성도 있다고 전망했다.",
                "금융위원회는 오늘 가계부채 관리 방안을 발표했다. 주요 내용은 총부채원리금상환비율(DSR) 규제를 40%로 강화하고, 다주택자에 대한 주택담보대출 제한을 확대하는 것이다. 또한 실수요자에 대한 대출 지원은 확대하되, 투기 목적의 대출에는 제재를 강화하는 투트랙 전략을 펼치기로 했다."
            ],
            'summary': [
                "한국은행이 통화정책 결정 회의에서 기준금리 동결, 경기침체로 인한 고용시장 위축과 물가 안정 판단",
                "금융위, 가계부채 관리방안 발표 - DSR 40% 강화, 다주택자 대출제한 확대, 실수요자 지원 확대"
            ]
        }
        return pd.DataFrame(example_data)

In [15]:
# 4. SHAP용 파이프라인 및 Explainer 구성
class SummarizationPipeline:
    def __init__(self, model, tokenizer, embedder):
        self.model = model
        self.tokenizer = tokenizer
        self.embedder = embedder
        self.original_text = None
        self.reference_summary = None
        self.perturbation_count = 0  # 처리된 perturbation 수 추적
    
    def set_reference(self, text, summary=None):
        """원본 텍스트와 참조 요약 설정"""
        self.original_text = text
        self.perturbation_count = 0
        
        if summary is None:
            # 참조 요약이 없으면 모델로 생성
            print("참조 요약 생성 중...")
            self.reference_summary = summarize_text(text)
            print(f"생성된 참조 요약: {self.reference_summary}")
        else:
            self.reference_summary = summary
            print(f"제공된 참조 요약: {self.reference_summary}")
        
        # 참조 요약 임베딩 미리 계산
        self.reference_embedding = self.embedder.encode(
            self.reference_summary, convert_to_tensor=True
        )
    
    def __call__(self, texts):
        """
        SHAP용 호출 함수. Perturbation된 텍스트 목록을 받아 각각의 요약 품질 점수 반환
        """
        if not isinstance(texts, list):
            texts = [texts]
        
        batch_size = min(4, len(texts))  # 배치 처리 크기 (메모리 고려)
        all_scores = []
        
        # 진행 상황 출력
        self.perturbation_count += len(texts)
        if self.perturbation_count % 10 == 0:
            print(f"SHAP 분석 진행 중: {self.perturbation_count}개 perturbation 처리됨")
            
        # 배치 단위로 처리
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i+batch_size]
            batch_scores = []
            
            for text in batch_texts:
                if not text or len(text.strip()) == 0:
                    batch_scores.append(0.0)
                    continue
                    
                try:
                    # 요약 생성
                    summary = summarize_text(text)
                    
                    # 생성된 요약의 임베딩 계산
                    summary_embedding = self.embedder.encode(
                        summary, convert_to_tensor=True
                    )
                    
                    # 참조 요약과의 유사도 계산 (코사인 유사도)
                    similarity = util.cos_sim(
                        summary_embedding, self.reference_embedding
                    ).item()
                    
                    # ROUGE 점수 고려 (옵션)
                    # rouge_score = calculate_rouge(summary, self.reference_summary)
                    # combined_score = 0.7 * similarity + 0.3 * rouge_score
                    
                    batch_scores.append(float(similarity))
                except Exception as e:
                    print(f"[!] 점수 계산 오류: {e}")
                    batch_scores.append(0.0)  # 오류 시 0점 처리
            
            all_scores.extend(batch_scores)
        
        return np.array(all_scores)
    
    def explain_specific_tokens(self, tokens, top_n=5):
        """
        특정 토큰들의 중요도를 개별적으로 분석
        """
        results = []
        original_summary = summarize_text(self.original_text)
        
        for token in tokens:
            # 해당 토큰을 제거한 텍스트 생성
            removed_text = self.original_text.replace(token, "")
            
            # 제거 후 요약 생성
            removed_summary = summarize_text(removed_text)
            
            # 원본 요약과 제거 후 요약의 유사도 계산
            original_emb = self.embedder.encode(original_summary, convert_to_tensor=True)
            removed_emb = self.embedder.encode(removed_summary, convert_to_tensor=True)
            
            # 유사도 차이가 클수록 해당 토큰이 중요함
            similarity = util.cos_sim(original_emb, removed_emb).item()
            importance = 1.0 - similarity
            
            results.append({
                "token": token,
                "importance": importance,
                "original_summary": original_summary,
                "removed_summary": removed_summary
            })
            
        # 중요도 순으로 정렬
        results.sort(key=lambda x: x["importance"], reverse=True)
        return results[:top_n]


def analyze_with_shap(text, reference_summary=None, num_samples=50, verbose=True):
    """
    단일 텍스트에 대해 SHAP 분석 수행
    
    Args:
        text: 분석할 원문 텍스트
        reference_summary: 참조 요약 (없으면 모델로 생성)
        num_samples: SHAP 샘플링 수
        verbose: 자세한 출력 여부
    
    Returns:
        shap_values: SHAP 값
        summary: 생성된 요약
        pipeline: 분석에 사용된 파이프라인 객체
    """
    if verbose:
        print(f"\n{'='*80}\n원문 분석 시작\n{'='*80}")
        print(f"원문 (일부): {text[:200]}...")
    
    # 입력 텍스트가 너무 길면 잘라내기 (Gemma 모델 컨텍스트 길이 제한 고려)
    max_input_length = 4096  # Gemma 모델의 최대 입력 길이보다 작게 설정
    if len(text) > max_input_length:
        print(f"⚠️ 입력 텍스트가 너무 깁니다. {max_input_length}자로 잘라냅니다.")
        text = text[:max_input_length]
    
    # 파이프라인 초기화 및 참조 설정
    pipeline = SummarizationPipeline(model, tokenizer, embedder)
    pipeline.set_reference(text, reference_summary)
    
    # SHAP Explainer 생성 (Text Masker 사용)
    try:
        # 토크나이저에 mask_token이 있는지 확인
        mask_token = tokenizer.mask_token if hasattr(tokenizer, 'mask_token') and tokenizer.mask_token else "[MASK]"
        
        # Gemma 모델은 토크나이저 기반 마스킹 대신 단어 단위 마스킹 사용
        partition_masker = shap.maskers.Text(
            tokenizer=None,  # None으로 설정하면 단어 단위 분할 사용
            mask_token="",   # 빈 문자열로 마스킹
            collapse_mask_token=True
        )
        
        if verbose:
            print(f"SHAP Explainer 초기화 (마스크 토큰: {mask_token})")
        
        # Explainer 생성 (auto 알고리즘 사용)
        explainer = shap.Explainer(pipeline, partition_masker, algorithm="permutation")
        
        # SHAP 값 계산
        if verbose:
            print(f"SHAP 값 계산 중 (샘플 수: {num_samples})...")
        
        # 배치 크기와 최대 평가 수 조정 (Gemma 모델 메모리 요구사항 고려)
        shap_values = explainer(
            [text], 
            max_evals=num_samples, 
            batch_size=1,
            silent=not verbose
        )
        
        if verbose:
            print("SHAP 분석 완료")
        
        return shap_values, pipeline.reference_summary, pipeline
        
    except Exception as e:
        print(f"SHAP 분석 중 오류 발생: {e}")
        print("대체 분석 방법 사용...")
        
        # SHAP 실패 시 대체 분석 방법: 핵심 키워드 추출 및 중요도 직접 계산
        try:
            from sklearn.feature_extraction.text import TfidfVectorizer
            
            # TF-IDF로 핵심 키워드 추출
            vectorizer = TfidfVectorizer(max_features=100)
            tfidf_matrix = vectorizer.fit_transform([text])
            
            # 중요 키워드 및 점수 추출
            feature_names = vectorizer.get_feature_names_out()
            scores = tfidf_matrix.toarray()[0]
            
            # Dummy SHAP 값 생성 (TF-IDF 기반)
            dummy_values = np.zeros((1, len(text.split())))
            dummy_data = np.array([text])
            
            # 기본 SHAP 결과 형식 모방
            from collections import namedtuple
            DummyShapValues = namedtuple('DummyShapValues', ['values', 'data', 'feature_names'])
            
            dummy_shap = DummyShapValues(
                values=dummy_values,
                data=dummy_data,
                feature_names=['word_' + str(i) for i in range(len(text.split()))]
            )
            
            # 핵심 키워드 목록 출력
            print("\n핵심 키워드 (TF-IDF 기반):")
            for word, score in sorted(zip(feature_names, scores), key=lambda x: x[1], reverse=True)[:10]:
                print(f"  - {word}: {score:.4f}")
            
            return dummy_shap, pipeline.reference_summary, pipeline
            
        except Exception as e2:
            print(f"대체 분석도 실패: {e2}")
            return None, pipeline.reference_summary, pipeline


def print_important_features(shap_values, summary, top_n=10):
    """
    SHAP 값을 기반으로 중요 feature를 출력
    """
    # SHAP 값의 절대값을 기준으로 정렬
    token_importances = []
    
    for i, token in enumerate(shap_values.data[0].split()):
        importance = abs(shap_values.values[0][i])
        token_importances.append((token, importance, shap_values.values[0][i]))
    
    # 중요도 기준 정렬
    sorted_importances = sorted(token_importances, key=lambda x: x[1], reverse=True)
    
    print(f"\n{'='*80}")
    print(f"생성된 요약: {summary}")
    print(f"{'='*80}")
    print(f"상위 {top_n}개 중요 단어 (SHAP 기준):")
    print(f"{'단어':<15} | {'중요도':>10} | {'영향':>10} | {'해석'}")
    print(f"{'-'*60}")
    
    for token, importance, raw_value in sorted_importances[:top_n]:
        effect = "긍정적 영향" if raw_value > 0 else "부정적 영향"
        print(f"{token:<15} | {importance:>10.4f} | {raw_value:>10.4f} | {effect}")
    
    print(f"{'='*80}")


def save_shap_visualization(shap_values, output_file="shap_summary_analysis.png"):
    """SHAP 시각화 저장"""
    plt.figure(figsize=(12, 10))
    shap.plots.text(shap_values, display=False)
    plt.tight_layout()
    plt.savefig(output_file, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"SHAP 시각화 저장 완료: {output_file}")

In [16]:
# 5. 메인 실행 코드
if __name__ == "__main__":
    # 데이터셋 로드
    dataset_path = "train_original.json"  # 실제 파일 경로로 변경
    df = load_dataset(dataset_path, max_samples=10)  # 테스트용 10개만
    
    # 텍스트와 참조 요약 추출
    texts = df["text"].tolist()
    references = df["summary"].tolist() if "summary" in df.columns else [None] * len(texts)
    
    print(f"\n총 {len(texts)}개 샘플 분석 시작")
    
    # 각 샘플에 대해 SHAP 분석 수행
    for i, (text, reference) in enumerate(zip(texts, references)):
        print(f"\n샘플 {i+1}/{len(texts)} 분석:")
        
        try:
            shap_values, summary, _ = analyze_with_shap(
                text, 
                reference_summary=reference,
                num_samples=100,  # 샘플링 수 조정 (높을수록 정확하지만 느림)
                verbose=True
            )
            
            # 중요 feature 출력
            print_important_features(shap_values, summary, top_n=15)
            
            # SHAP 시각화 저장 - 객체 타입 확인 후 실행
            # 'DummyShapValues'인 경우 시각화 건너뛰기
            if not hasattr(shap_values, '__class__') or shap_values.__class__.__name__ != 'DummyShapValues':
                save_shap_visualization(shap_values, f"shap_analysis_sample_{i+1}.png")
            else:
                print("SHAP 시각화를 생성할 수 없습니다 - 대체 분석 방법 사용됨")
        except Exception as e:
            print(f"샘플 {i+1} 분석 중 오류 발생: {e}")
            print("이 샘플은 건너뜁니다.")
            continue
    
    print("\n모든 분석 완료!")

데이터셋 로딩 중: train_original.json
데이터셋 로딩 실패: Expecting value: line 9032102 column 19 (char 226168964)
예시 데이터 사용

총 2개 샘플 분석 시작

샘플 1/2 분석:

원문 분석 시작
원문 (일부): 한국은행이 통화정책 결정 회의에서 기준금리를 동결했다. 한국은행은 지난해 4분기 이후 계속된 경기침체의 영향으로 고용 시장이 위축되고 있으며, 물가상승률은 목표 수준으로 안정되고 있다고 판단했다. 시장 전문가들은 한국 경제의 회복세가 예상보다 더디게 진행되고 있어 금리 인하 가능성도 있다고 전망했다....
제공된 참조 요약: 한국은행이 통화정책 결정 회의에서 기준금리 동결, 경기침체로 인한 고용시장 위축과 물가 안정 판단
SHAP 분석 중 오류 발생: list index out of range
대체 분석 방법 사용...

핵심 키워드 (TF-IDF 기반):
  - 있다고: 0.3203
  - 4분기: 0.1601
  - 가능성도: 0.1601
  - 결정: 0.1601
  - 경기침체의: 0.1601
  - 경제의: 0.1601
  - 계속된: 0.1601
  - 고용: 0.1601
  - 금리: 0.1601
  - 기준금리를: 0.1601

생성된 요약: 한국은행이 통화정책 결정 회의에서 기준금리 동결, 경기침체로 인한 고용시장 위축과 물가 안정 판단
상위 15개 중요 단어 (SHAP 기준):
단어              |        중요도 |         영향 | 해석
------------------------------------------------------------
한국은행이           |     0.0000 |     0.0000 | 부정적 영향
통화정책            |     0.0000 |     0.0000 | 부정적 영향
결정              |     0.0000 |     0.0000 | 부정적 영향
회의에서            | 