~~~~# 🔒 3-4단계: 마스킹 모듈 구현

## 개요
- **3단계**: 문맥적 위험 고려 OR Threshold 기반 위험도 판단
- **4단계**: 실제 마스킹 실행

## 입력
- 원문 텍스트
- NER 결과 (1단계)
- 가중치 정보 (2단계 Copula 결과)

## 출력
- 마스킹된 텍스트
- 마스킹 로그

In [4]:
# 필요한 라이브러리 설치 및 임포트
import re
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Any
from dataclasses import dataclass
import warnings
warnings.filterwarnings("ignore")

In [5]:
# 데이터 구조 정의
@dataclass
class NERResult:
    """1단계 NER 결과 데이터 구조"""
    token: str
    entity: str
    start_pos: int = 0
    end_pos: int = 0

@dataclass
class RiskWeight:
    """2단계 위험도 가중치 데이터 구조"""
    token: str
    entity: str
    category: str  # '직접', '간접', '기타'
    risk_weight: int  # 0-100
    copula_feature: str = None

@dataclass
class MaskingResult:
    """마스킹 결과 데이터 구조"""
    original_text: str
    masked_text: str
    masking_log: List[Dict]
    total_entities: int
    masked_entities: int

In [6]:
class ContextualRiskAnalyzer:
    """3단계: 문맥적 위험 분석기"""
    
    def __init__(self):
        # 위험 조합 패턴 정의 (실제로는 더 복잡한 모델 사용 가능)
        self.high_risk_combinations = {
            ('PER', 'ORG', 'DATE'): 1.5,      # 사람 + 기관 + 날짜
            ('PER', 'DISEASE', 'ORG'): 1.8,   # 사람 + 질병 + 기관  
            ('PER', 'CONTACT'): 2.0,          # 사람 + 연락처
            ('ORG', 'DATE', 'DISEASE'): 1.3, # 기관 + 날짜 + 질병
        }
        
        # 의료 도메인 특화 위험 키워드
        self.medical_risk_keywords = {
            '진단': 1.2, '수술': 1.2, '입원': 1.2, '치료': 1.2, '처방': 1.2, '검사': 1.2,
            '암': 1.3, '종양': 1.3, '질환': 1.3, '증상': 1.3,
            '응급': 1.5, '중환자': 1.5, '수술실': 1.5
        }


    def analyze_contextual_risk(self, text: str, risk_weights: List[RiskWeight]) -> List[RiskWeight]:
        """문맥을 고려한 위험도 재조정"""
        adjusted_weights = []
        
        # 현재 텍스트에 등장하는 엔티티 타입들 추출
        entity_types = [self._normalize_entity_type(rw.entity) for rw in risk_weights if rw.entity != 'O']
        
        for risk_weight in risk_weights:
            adjusted_weight = risk_weight.risk_weight
            
            # 1. 조합 패턴 기반 위험도 증가
            combination_multiplier = self._get_combination_multiplier(entity_types)
            
            # 2. 의료 키워드 기반 위험도 증가
            keyword_multiplier = self._get_keyword_multiplier(text)
            
            # 3. 최종 조정된 위험도 계산 (상한 100)
            if adjusted_weight > 0:  # 기존에 위험도가 있는 경우만
                adjusted_weight = min(100, int(adjusted_weight * combination_multiplier * keyword_multiplier))
            
            # 새로운 RiskWeight 객체 생성
            adjusted_weights.append(RiskWeight(
                token=risk_weight.token,
                entity=risk_weight.entity,
                category=risk_weight.category,
                risk_weight=adjusted_weight,
                copula_feature=risk_weight.copula_feature
            ))
        
        return adjusted_weights
    
    def _normalize_entity_type(self, entity: str) -> str:
        """엔티티 타입 정규화 (B-PER -> PER)"""
        if entity.startswith('B-') or entity.startswith('I-'):
            return entity[2:]
        return entity
    
    def _get_combination_multiplier(self, entity_types: List[str]) -> float:
        """엔티티 조합에 따른 위험도 배수 계산"""
        entity_set = set(entity_types)
        max_multiplier = 1.0
        
        for pattern, multiplier in self.high_risk_combinations.items():
            if set(pattern).issubset(entity_set):
                max_multiplier = max(max_multiplier, multiplier)
        
        return max_multiplier
    
    def _get_keyword_multiplier(self, text: str) -> float:
        """의료 키워드에 따른 위험도 배수 계산"""
        max_multiplier = 1.0
        
        for keyword, multiplier in self.medical_risk_keywords.items():
            if keyword in text:
                max_multiplier = max(max_multiplier, multiplier)
        
        return max_multiplier

In [7]:
class MaskingExecutor:
    """4단계: 마스킹 실행기"""
    
    def __init__(self, threshold: int = 50):
        self.threshold = threshold
        
        # 마스킹 패턴 정의
        self.mask_patterns = {
            'PER': '[PERSON]',
            'ORG': '[HOSPITAL]', 
            'LOC': '[LOCATION]',
            'DATE': '[DATE]',
            'DISEASE': '[DISEASE]',
            'CONTACT': '[CONTACT]',
            'CVL': '[TITLE]',
            'NUM': '[NUMBER]',
            'default': '[MASKED]'
        }
    
    def execute_masking(self, text: str, risk_weights: List[RiskWeight]) -> MaskingResult:
        """임계값 기반 마스킹 실행"""
        masked_text = text
        masking_log = []
        masked_count = 0
        total_entities = len([rw for rw in risk_weights if rw.entity != 'O'])
        
        # 위험도가 높은 순으로 정렬 (먼저 처리하여 텍스트 위치 변경 최소화)
        sorted_weights = sorted(risk_weights, key=lambda x: x.risk_weight, reverse=True)
        
        for risk_weight in sorted_weights:
            if risk_weight.risk_weight >= self.threshold and risk_weight.entity != 'O':
                # 마스킹 패턴 결정
                entity_type = self._normalize_entity_type(risk_weight.entity)
                mask_pattern = self.mask_patterns.get(entity_type, self.mask_patterns['default'])
                
                # 토큰을 마스킹 패턴으로 대체
                if risk_weight.token in masked_text:
                    masked_text = masked_text.replace(risk_weight.token, mask_pattern, 1)
                    masked_count += 1
                    
                    # 마스킹 로그 기록
                    masking_log.append({
                        'token': risk_weight.token,
                        'entity': risk_weight.entity,
                        'risk_weight': risk_weight.risk_weight,
                        'masked_as': mask_pattern,
                        'reason': f'위험도 {risk_weight.risk_weight} >= 임계값 {self.threshold}'
                    })
        
        return MaskingResult(
            original_text=text,
            masked_text=masked_text,
            masking_log=masking_log,
            total_entities=total_entities,
            masked_entities=masked_count
        )
    
    def _normalize_entity_type(self, entity: str) -> str:
        """엔티티 타입 정규화"""
        if entity.startswith('B-') or entity.startswith('I-'):
            return entity[2:]
        return entity

In [8]:
class MedicalMaskingPipeline:
    """전체 파이프라인 통합 클래스"""
    
    def __init__(self, threshold: int = 50, use_contextual_analysis: bool = True):
        self.threshold = threshold
        self.use_contextual_analysis = use_contextual_analysis
        self.contextual_analyzer = ContextualRiskAnalyzer()
        self.masking_executor = MaskingExecutor(threshold)
    
    def process(self, text: str, risk_weights: List[RiskWeight]) -> MaskingResult:
        """3-4단계 통합 처리"""
        
        # 3단계: 문맥적 위험 분석 (옵션)
        if self.use_contextual_analysis:
            adjusted_weights = self.contextual_analyzer.analyze_contextual_risk(text, risk_weights)
        else:
            adjusted_weights = risk_weights
        
        # 4단계: 마스킹 실행
        result = self.masking_executor.execute_masking(text, adjusted_weights)
        
        return result
    
    def print_analysis_summary(self, result: MaskingResult):
        """분석 결과 요약 출력"""
        print("\n" + "="*60)
        print("🔒 마스킹 분석 결과")
        print("="*60)
        print(f"📝 원문: {result.original_text}")
        print(f"🎭 마스킹: {result.masked_text}")
        print(f"📊 마스킹 통계: {result.masked_entities}/{result.total_entities} 개체")
        
        if result.masking_log:
            print("\n📋 마스킹 상세 로그:")
            for i, log in enumerate(result.masking_log, 1):
                print(f"{i}. '{log['token']}' → {log['masked_as']} (위험도: {log['risk_weight']})")
        else:
            print("\n✅ 마스킹이 필요한 고위험 정보가 없습니다.")

In [9]:
# 테스트 데이터 준비 (2단계 Copula 결과 시뮬레이션)
def create_test_data():
    """테스트용 위험도 데이터 생성"""
    
    test_cases = [
        {
            "name": "🟢 저위험 케이스",
            "text": "환자는 내일 검사를 받을 예정입니다.",
            "risk_weights": [
                RiskWeight("환자는", "O", "기타", 0),
                RiskWeight("내일", "B-DATE", "간접", 30),
                RiskWeight("검사를", "O", "기타", 0),
                RiskWeight("받을", "O", "기타", 0),
                RiskWeight("예정입니다.", "O", "기타", 0)
            ]
        },
        {
            "name": "🟡 중위험 케이스", 
            "text": "김철수씨가 2023년 10월에 서울대병원에서 진단받았습니다.",
            "risk_weights": [
                RiskWeight("김철수씨가", "B-PER", "직접", 100),
                RiskWeight("2023년", "B-DATE", "간접", 45),
                RiskWeight("10월에", "I-DATE", "간접", 40),
                RiskWeight("서울대병원에서", "B-ORG", "간접", 65),
                RiskWeight("진단받았습니다.", "O", "기타", 0)
            ]
        },
        {
            "name": "🔴 고위험 케이스",
            "text": "박영희(010-1234-5678)는 삼성서울병원에서 간암 수술을 받았다.",
            "risk_weights": [
                RiskWeight("박영희", "B-PER", "직접", 100),
                RiskWeight("010-1234-5678", "B-CONTACT", "직접", 100),
                RiskWeight("삼성서울병원에서", "B-ORG", "간접", 70),
                RiskWeight("간암", "B-DISEASE", "간접", 75),
                RiskWeight("수술을", "O", "기타", 0),
                RiskWeight("받았다.", "O", "기타", 0)
            ]
        }
    ]
    
    return test_cases

In [10]:
# 메인 테스트 실행
def main():
    print("🚀 3-4단계 마스킹 모듈 테스트 시작\n")
    
    # 파이프라인 초기화
    pipeline = MedicalMaskingPipeline(threshold=50, use_contextual_analysis=True)
    
    # 테스트 데이터 로드
    test_cases = create_test_data()
    
    # 각 테스트 케이스 실행
    for i, case in enumerate(test_cases, 1):
        print(f"\n{'='*20} 테스트 {i}: {case['name']} {'='*20}")
        
        # 파이프라인 실행
        result = pipeline.process(case['text'], case['risk_weights'])
        
        # 결과 출력
        pipeline.print_analysis_summary(result)
    
    print("\n" + "="*60)
    print("✅ 모든 테스트 완료!")
    print("="*60)

# 실행
if __name__ == "__main__":
    main()

🚀 3-4단계 마스킹 모듈 테스트 시작



🔒 마스킹 분석 결과
📝 원문: 환자는 내일 검사를 받을 예정입니다.
🎭 마스킹: 환자는 내일 검사를 받을 예정입니다.
📊 마스킹 통계: 0/1 개체

✅ 마스킹이 필요한 고위험 정보가 없습니다.


🔒 마스킹 분석 결과
📝 원문: 김철수씨가 2023년 10월에 서울대병원에서 진단받았습니다.
🎭 마스킹: [PERSON] [DATE] [DATE] [HOSPITAL] 진단받았습니다.
📊 마스킹 통계: 4/4 개체

📋 마스킹 상세 로그:
1. '김철수씨가' → [PERSON] (위험도: 100)
2. '서울대병원에서' → [HOSPITAL] (위험도: 100)
3. '2023년' → [DATE] (위험도: 81)
4. '10월에' → [DATE] (위험도: 72)


🔒 마스킹 분석 결과
📝 원문: 박영희(010-1234-5678)는 삼성서울병원에서 간암 수술을 받았다.
🎭 마스킹: [PERSON]([CONTACT])는 [HOSPITAL] [DISEASE] 수술을 받았다.
📊 마스킹 통계: 4/4 개체

📋 마스킹 상세 로그:
1. '박영희' → [PERSON] (위험도: 100)
2. '010-1234-5678' → [CONTACT] (위험도: 100)
3. '삼성서울병원에서' → [HOSPITAL] (위험도: 100)
4. '간암' → [DISEASE] (위험도: 100)

✅ 모든 테스트 완료!


In [11]:
# 추가: 임계값별 마스킹 비교 분석
def threshold_comparison_analysis():
    """다양한 임계값에서의 마스킹 결과 비교"""
    
    test_text = "김철수(010-1234-5678)는 2023년 10월 서울대병원에서 간암 진단을 받았습니다."
    test_weights = [
        RiskWeight("김철수", "B-PER", "직접", 100),
        RiskWeight("010-1234-5678", "B-CONTACT", "직접", 100), 
        RiskWeight("2023년", "B-DATE", "간접", 45),
        RiskWeight("10월", "I-DATE", "간접", 40),
        RiskWeight("서울대병원에서", "B-ORG", "간접", 65),
        RiskWeight("간암", "B-DISEASE", "간접", 75)
    ]
    
    thresholds = [30, 50, 70, 90]
    
    print("\n🔍 임계값별 마스킹 비교 분석")
    print("="*80)
    print(f"📝 원문: {test_text}")
    print("="*80)
    
    for threshold in thresholds:
        pipeline = MedicalMaskingPipeline(threshold=threshold, use_contextual_analysis=True)
        result = pipeline.process(test_text, test_weights)
        
        print(f"\n🎯 임계값 {threshold}: {result.masked_entities}/{result.total_entities} 개체 마스킹")
        print(f"   결과: {result.masked_text}")

# 임계값 비교 실행
threshold_comparison_analysis()


🔍 임계값별 마스킹 비교 분석
📝 원문: 김철수(010-1234-5678)는 2023년 10월 서울대병원에서 간암 진단을 받았습니다.

🎯 임계값 30: 6/6 개체 마스킹
   결과: [PERSON]([CONTACT])는 [DATE] [DATE] [HOSPITAL] [DISEASE] 진단을 받았습니다.

🎯 임계값 50: 6/6 개체 마스킹
   결과: [PERSON]([CONTACT])는 [DATE] [DATE] [HOSPITAL] [DISEASE] 진단을 받았습니다.

🎯 임계값 70: 6/6 개체 마스킹
   결과: [PERSON]([CONTACT])는 [DATE] [DATE] [HOSPITAL] [DISEASE] 진단을 받았습니다.

🎯 임계값 90: 6/6 개체 마스킹
   결과: [PERSON]([CONTACT])는 [DATE] [DATE] [HOSPITAL] [DISEASE] 진단을 받았습니다.


## 📋 마스킹 모듈 사용법

### 1. 기본 사용법
```python
# 파이프라인 초기화
pipeline = MedicalMaskingPipeline(threshold=50, use_contextual_analysis=True)

# 2단계에서 받은 위험도 데이터
risk_weights = [...] # RiskWeight 객체 리스트

# 마스킹 실행
result = pipeline.process(original_text, risk_weights)

# 결과 확인
print(result.masked_text)
```

### 2. 설정 옵션
- `threshold`: 마스킹 임계값 (기본값: 50)
- `use_contextual_analysis`: 문맥 분석 사용 여부 (기본값: True)

### 3. 출력 형태
- `masked_text`: 마스킹된 텍스트
- `masking_log`: 마스킹 상세 로그
- `total_entities`: 전체 개체 수
- `masked_entities`: 마스킹된 개체 수