In [1]:
"""
다국어 임베딩 기반 문장 매핑 시스템

이 모듈은 한국어와 영어 스크립트를 의미적 유사성을 기반으로 매핑하는 기능을 제공합니다.
문장 임베딩 벡터를 사용해 언어 간 경계를 넘어 의미적으로 유사한 문장을 찾습니다.
"""

import os
import numpy as np
from typing import List, Tuple, Dict, Optional, Set, Any
from dataclasses import dataclass
import re
import logging
from textgrid import TextGrid, IntervalTier

# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("SentenceMapper")

#####################################################
# 1. 문장 관리 및 전처리 모듈
#####################################################

@dataclass
class Sentence:
    """문장 정보를 저장하는 데이터 클래스"""
    text: str  # 문장 텍스트
    words: List[str]  # 단어 리스트
    start_time: float = -1  # 시작 시간 (초)
    end_time: float = -1  # 종료 시간 (초)
    index: int = -1  # 원본 스크립트에서의 인덱스

    def __post_init__(self):
        if not self.words and self.text:
            # 텍스트만 제공된 경우 단어 리스트 생성
            self.words = self.text.split()

    def __str__(self) -> str:
        return f"Sentence({self.index}): {self.text} ({self.start_time:.2f}s ~ {self.end_time:.2f}s)"


class SentencePreprocessor:
    """문장 전처리 및 정규화를 담당하는 클래스"""
    
    @staticmethod
    def normalize_text(text: str) -> str:
        """텍스트 정규화: 공백 정리, 특수문자 처리 등"""
        # 여러 공백을 하나로 줄이기
        text = re.sub(r'\s+', ' ', text)
        # 양쪽 공백 제거
        text = text.strip()
        # 소문자로 변환 (선택사항)
        # text = text.lower()
        return text

    @staticmethod
    def split_into_sentences(text: str, lang: str = 'ko') -> List[str]:
        """
        텍스트를 문장 단위로 분리
        
        Args:
            text: 분리할 텍스트
            lang: 언어 코드 ('ko': 한국어, 'en': 영어)
            
        Returns:
            문장 리스트
        """
        # 간단한 문장 분리 규칙 (언어별로 다른 규칙 적용 가능)
        if lang == 'ko':
            # 한국어 문장 분리 규칙
            sentence_pattern = r'([^.!?…]+[.!?…]+)'
        else:
            # 영어 문장 분리 규칙
            sentence_pattern = r'([^.!?]+[.!?]+)'
        
        sentences = re.findall(sentence_pattern, text)
        
        # 정규식으로 찾지 못한 마지막 부분이 있다면 추가
        leftover = re.sub(sentence_pattern, '', text).strip()
        if leftover:
            sentences.append(leftover)
        
        # 정규화 적용
        return [SentencePreprocessor.normalize_text(s) for s in sentences if s.strip()]

    @staticmethod
    def extract_sentences_from_textgrid(
        textgrid_path: str, 
        tier_name: str = "words", 
        min_pause: float = 0.5
    ) -> List[Sentence]:
        """
        TextGrid 파일에서 문장 단위로 단어들을 추출
        
        Args:
            textgrid_path: TextGrid 파일 경로
            tier_name: 추출할 계층 이름
            min_pause: 문장 구분을 위한 최소 휴지 시간(초)
            
        Returns:
            Sentence 객체 리스트
        """
        try:
            tg = TextGrid.fromFile(textgrid_path)
            tier = tg.getFirst(tier_name)
        except Exception as e:
            logger.error(f"TextGrid 파일 로드 실패: {e}")
            return []
            
        sentences = []
        current_words = []
        word_times = []
        last_end_time = 0
        sentence_idx = 0
        
        for interval in tier:
            # 빈 mark는 건너뜀
            mark = interval.mark.strip()
            if not mark:
                continue
                
            # 긴 휴지(pause) 감지
            if interval.minTime - last_end_time > min_pause and current_words:
                # 현재까지의 단어로 문장 생성
                sentence_text = ' '.join(current_words)
                start_time = word_times[0][0] if word_times else 0
                end_time = word_times[-1][1] if word_times else 0
                
                sentences.append(Sentence(
                    text=sentence_text,
                    words=current_words.copy(),
                    start_time=start_time,
                    end_time=end_time,
                    index=sentence_idx
                ))
                sentence_idx += 1
                
                # 초기화
                current_words = []
                word_times = []
            
            # 현재 단어 추가
            current_words.append(mark)
            word_times.append((interval.minTime, interval.maxTime))
            last_end_time = interval.maxTime
        
        # 마지막 문장 처리
        if current_words:
            sentence_text = ' '.join(current_words)
            start_time = word_times[0][0] if word_times else 0
            end_time = word_times[-1][1] if word_times else 0
            
            sentences.append(Sentence(
                text=sentence_text,
                words=current_words,
                start_time=start_time,
                end_time=end_time,
                index=sentence_idx
            ))
        
        logger.info(f"TextGrid에서 {len(sentences)}개 문장 추출 완료")
        return sentences


#####################################################
# 2. 다국어 임베딩 모듈
#####################################################

class MultilingualEmbedder:
    """다국어 임베딩 모델을 관리하는 클래스"""
    
    def __init__(self, model_name: str = 'paraphrase-multilingual-MiniLM-L12-v2'):
        """
        다국어 임베딩 모델 초기화
        
        Args:
            model_name: 사용할 Sentence-Transformers 모델 이름
        """
        try:
            from sentence_transformers import SentenceTransformer
            self.model = SentenceTransformer(model_name)
            logger.info(f"다국어 임베딩 모델 '{model_name}' 로드 완료")
        except ImportError:
            logger.error("sentence-transformers 라이브러리가 설치되지 않았습니다.")
            logger.error("pip install sentence-transformers 명령어로 설치하세요.")
            raise

    def encode_sentences(self, sentences: List[Sentence]) -> np.ndarray:
        """
        문장 리스트를 임베딩 벡터로 변환
        
        Args:
            sentences: Sentence 객체 리스트
            
        Returns:
            문장 임베딩 벡터 배열 (shape: n_sentences x embedding_dim)
        """
        texts = [s.text for s in sentences]
        embeddings = self.model.encode(texts, show_progress_bar=True)
        logger.info(f"{len(texts)}개 문장의 임베딩 계산 완료")
        return embeddings


#####################################################
# 3. 유사도 계산 및 매핑 모듈
#####################################################

class SentenceMapper:
    """문장 간 유사도 계산 및 최적 매핑을 찾는 클래스"""
    
    @staticmethod
    def compute_similarity_matrix(
        source_embeddings: np.ndarray, 
        target_embeddings: np.ndarray
    ) -> np.ndarray:
        """
        두 임베딩 집합 간의 코사인 유사도 행렬 계산
        
        Args:
            source_embeddings: 소스 문장 임베딩 (shape: n_source x embedding_dim)
            target_embeddings: 타겟 문장 임베딩 (shape: n_target x embedding_dim)
            
        Returns:
            유사도 행렬 (shape: n_source x n_target)
        """
        # 임베딩 정규화
        source_norm = np.linalg.norm(source_embeddings, axis=1, keepdims=True)
        target_norm = np.linalg.norm(target_embeddings, axis=1, keepdims=True)
        
        # 0으로 나누기 방지
        source_norm = np.where(source_norm == 0, 1e-10, source_norm)
        target_norm = np.where(target_norm == 0, 1e-10, target_norm)
        
        # 정규화된 임베딩
        source_normalized = source_embeddings / source_norm
        target_normalized = target_embeddings / target_norm
        
        # 코사인 유사도 행렬 계산 (내적)
        similarity_matrix = np.dot(source_normalized, target_normalized.T)
        
        return similarity_matrix

    @staticmethod
    def find_optimal_mapping(
        similarity_matrix: np.ndarray, 
        threshold: float = 0.5
    ) -> List[Tuple[int, int, float]]:
        """
        헝가리안 알고리즘을 사용해 최적의 매핑 찾기
        
        Args:
            similarity_matrix: 유사도 행렬
            threshold: 최소 유사도 임계값
            
        Returns:
            매핑 리스트 [(source_idx, target_idx, similarity), ...]
        """
        try:
            from scipy.optimize import linear_sum_assignment
        except ImportError:
            logger.error("scipy 라이브러리가 설치되지 않았습니다.")
            logger.error("pip install scipy 명령어로 설치하세요.")
            raise
            
        # 유사도가 높을수록 좋으므로 음수로 변환하여 비용 행렬로 만듦
        cost_matrix = -similarity_matrix
        
        # 헝가리안 알고리즘으로 최적 매핑 찾기
        source_indices, target_indices = linear_sum_assignment(cost_matrix)
        
        # 결과 형식화 및 임계값 필터링
        mappings = []
        for source_idx, target_idx in zip(source_indices, target_indices):
            similarity = similarity_matrix[source_idx, target_idx]
            if similarity >= threshold:
                mappings.append((source_idx, target_idx, similarity))
        
        # 유사도 기준 내림차순 정렬
        mappings.sort(key=lambda x: x[2], reverse=True)
        
        logger.info(f"임계값 {threshold} 이상의 매핑 {len(mappings)}개 찾음")
        return mappings

    @staticmethod
    def refine_mappings(
        mappings: List[Tuple[int, int, float]], 
        source_sentences: List[Sentence], 
        target_sentences: List[Sentence],
        sequential: bool = True
    ) -> List[Tuple[int, int, float]]:
        """
        매핑 결과 정제
        
        Args:
            mappings: 원본 매핑 리스트
            source_sentences: 소스 문장 리스트
            target_sentences: 타겟 문장 리스트
            sequential: 순차적 매핑 강제 여부
            
        Returns:
            정제된 매핑 리스트
        """
        if not mappings:
            return []
            
        if sequential:
            # 원본 순서를 유지하는 방향으로 정제
            refined = []
            used_source = set()
            used_target = set()
            
            # 유사도 순으로 정렬된 매핑에서 시작
            for source_idx, target_idx, similarity in mappings:
                # 이미 사용된 인덱스는 건너뜀
                if source_idx in used_source or target_idx in used_target:
                    continue
                    
                # 이전에 추가된 매핑이 있는지 확인
                if refined:
                    last_source, last_target, _ = refined[-1]
                    
                    # 순서가 뒤집히는 경우 건너뜀
                    if (source_idx < last_source and target_idx > last_target) or \
                       (source_idx > last_source and target_idx < last_target):
                        continue
                
                refined.append((source_idx, target_idx, similarity))
                used_source.add(source_idx)
                used_target.add(target_idx)
            
            # 원본 문장 순서대로 정렬
            refined.sort(key=lambda x: x[0])
            return refined
        
        return mappings


#####################################################
# 4. 통합 문장 매핑 시스템
#####################################################

class SentenceMappingSystem:
    """문장 매핑 시스템 통합 클래스"""
    
    def __init__(
        self, 
        model_name: str = 'paraphrase-multilingual-MiniLM-L12-v2', 
        similarity_threshold: float = 0.5,
        enforce_sequential: bool = True
    ):
        """
        문장 매핑 시스템 초기화
        
        Args:
            model_name: 사용할 임베딩 모델 이름
            similarity_threshold: 매핑 임계값
            enforce_sequential: 순차적 매핑 강제 여부
        """
        self.preprocessor = SentencePreprocessor()
        self.embedder = MultilingualEmbedder(model_name)
        self.similarity_threshold = similarity_threshold
        self.enforce_sequential = enforce_sequential
        logger.info("문장 매핑 시스템 초기화 완료")
        
    def map_scripts(
        self, 
        source_text: str, 
        target_text: str, 
        source_lang: str = 'ko',
        target_lang: str = 'en'
    ) -> List[Tuple[Sentence, Sentence, float]]:
        """
        텍스트 스크립트를 문장 단위로 매핑
        
        Args:
            source_text: 소스 텍스트
            target_text: 타겟 텍스트
            source_lang: 소스 언어 코드
            target_lang: 타겟 언어 코드
            
        Returns:
            매핑된 문장 쌍 리스트 [(source_sentence, target_sentence, similarity), ...]
        """
        # 1. 문장 분리
        source_sentences_text = self.preprocessor.split_into_sentences(source_text, source_lang)
        target_sentences_text = self.preprocessor.split_into_sentences(target_text, target_lang)
        
        # 2. Sentence 객체 생성
        source_sentences = [Sentence(text=s, words=s.split(), index=i) 
                           for i, s in enumerate(source_sentences_text)]
        target_sentences = [Sentence(text=s, words=s.split(), index=i) 
                           for i, s in enumerate(target_sentences_text)]
        
        return self._map_sentence_objects(source_sentences, target_sentences)
        
    def map_textgrids(
        self, 
        source_textgrid_path: str, 
        target_textgrid_path: str,
        source_tier: str = "words",
        target_tier: str = "words",
        min_pause: float = 0.5
    ) -> List[Tuple[Sentence, Sentence, float]]:
        """
        TextGrid 파일에서 문장을 추출하고 매핑
        
        Args:
            source_textgrid_path: 소스 TextGrid 파일 경로
            target_textgrid_path: 타겟 TextGrid 파일 경로
            source_tier: 소스 계층 이름
            target_tier: 타겟 계층 이름
            min_pause: 문장 구분을 위한 최소 휴지 시간(초)
            
        Returns:
            매핑된 문장 쌍 리스트 [(source_sentence, target_sentence, similarity), ...]
        """
        # TextGrid에서 문장 추출
        source_sentences = self.preprocessor.extract_sentences_from_textgrid(
            source_textgrid_path, source_tier, min_pause)
        target_sentences = self.preprocessor.extract_sentences_from_textgrid(
            target_textgrid_path, target_tier, min_pause)
        
        return self._map_sentence_objects(source_sentences, target_sentences)
        
    def _map_sentence_objects(
        self, 
        source_sentences: List[Sentence], 
        target_sentences: List[Sentence]
    ) -> List[Tuple[Sentence, Sentence, float]]:
        """
        Sentence 객체 리스트 간 매핑 수행
        
        Args:
            source_sentences: 소스 Sentence 객체 리스트
            target_sentences: 타겟 Sentence 객체 리스트
            
        Returns:
            매핑된 문장 쌍 리스트 [(source_sentence, target_sentence, similarity), ...]
        """
        if not source_sentences or not target_sentences:
            logger.warning("빈 문장 리스트가 입력되었습니다.")
            return []
            
        # 1. 임베딩 계산
        source_embeddings = self.embedder.encode_sentences(source_sentences)
        target_embeddings = self.embedder.encode_sentences(target_sentences)
        
        # 2. 유사도 행렬 계산
        similarity_matrix = SentenceMapper.compute_similarity_matrix(
            source_embeddings, target_embeddings)
        
        # 3. 최적 매핑 찾기
        raw_mappings = SentenceMapper.find_optimal_mapping(
            similarity_matrix, self.similarity_threshold)
        
        # 4. 매핑 정제
        refined_mappings = SentenceMapper.refine_mappings(
            raw_mappings, source_sentences, target_sentences, self.enforce_sequential)
        
        # 5. 결과 형식화
        results = []
        for source_idx, target_idx, similarity in refined_mappings:
            source_sentence = source_sentences[source_idx]
            target_sentence = target_sentences[target_idx]
            results.append((source_sentence, target_sentence, similarity))
            
            # 로그 출력
            logger.info(f"매핑: (유사도 {similarity:.4f})")
            logger.info(f"  소스: {source_sentence}")
            logger.info(f"  타겟: {target_sentence}")
        
        return results

    def export_mapping_to_csv(
        self, 
        mappings: List[Tuple[Sentence, Sentence, float]], 
        output_path: str
    ) -> None:
        """
        매핑 결과를 CSV 파일로 내보내기
        
        Args:
            mappings: 매핑 결과 리스트
            output_path: 저장할 CSV 파일 경로
        """
        import csv
        
        with open(output_path, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(['Source Index', 'Source Text', 'Source Start', 'Source End', 
                           'Target Index', 'Target Text', 'Target Start', 'Target End', 
                           'Similarity'])
            
            for source, target, similarity in mappings:
                writer.writerow([
                    source.index, source.text, source.start_time, source.end_time,
                    target.index, target.text, target.start_time, target.end_time,
                    similarity
                ])
                
        logger.info(f"매핑 결과를 {output_path}에 저장했습니다.")


#####################################################
# 5. 응용 예시
#####################################################

def calculate_isochrony_score(
    mappings: List[Tuple[Sentence, Sentence, float]]
) -> float:
    """
    매핑된 문장 쌍 리스트에서 isochrony 점수 계산
    
    Args:
        mappings: 매핑된 문장 쌍 리스트
        
    Returns:
        isochrony 점수 (0.0 ~ 1.0)
    """
    if not mappings:
        return 0.0
        
    # 각 매핑된 문장 쌍의 시간 차이 계산
    duration_diffs = []
    for source, target, _ in mappings:
        if source.start_time >= 0 and source.end_time >= 0 and \
           target.start_time >= 0 and target.end_time >= 0:
            source_dur = source.end_time - source.start_time
            target_dur = target.end_time - target.start_time
            diff = abs(source_dur - target_dur)
            duration_diffs.append(diff)
    
    if not duration_diffs:
        return 0.0
        
    # 전체 오디오 길이 추정
    max_source_end = max(s.end_time for s, _, _ in mappings)
    max_target_end = max(t.end_time for _, t, _ in mappings)
    total_duration = max(max_source_end, max_target_end)
    
    # isochrony 점수 계산
    ichron_score = 1.0 - (np.mean(duration_diffs) / total_duration)
    
    return max(0.0, min(1.0, ichron_score))


def visualize_mappings(
    mappings: List[Tuple[Sentence, Sentence, float]], 
    output_path: str = "sentence_mappings.png"
) -> None:
    """
    매핑 결과 시각화
    
    Args:
        mappings: 매핑된 문장 쌍 리스트
        output_path: 저장할 이미지 파일 경로
    """
    try:
        import matplotlib.pyplot as plt
        
        fig, ax = plt.subplots(figsize=(12, len(mappings)*1.5))
        
        for i, (source, target, similarity) in enumerate(mappings):
            if source.start_time >= 0 and source.end_time >= 0 and \
               target.start_time >= 0 and target.end_time >= 0:
                source_dur = source.end_time - source.start_time
                target_dur = target.end_time - target.start_time
                
                # 막대 그래프 그리기
                ax.broken_barh([(source.start_time, source_dur)], (i*2.0 + 0.2, 0.6), 
                               facecolors='tab:blue', label='Source' if i == 0 else "")
                ax.broken_barh([(target.start_time, target_dur)], (i*2.0 + 1.0, 0.6), 
                               facecolors='tab:orange', label='Target' if i == 0 else "")
                
                # 문장 정보 표시
                ax.text(0, i*2.0 + 0.1, 
                       f"Pair {i+1} (sim={similarity:.2f})", 
                       fontsize=9, verticalalignment='bottom')
        
        # 스타일
        ax.set_yticks([])
        ax.set_xlabel("Time (s)")
        ax.set_title("Sentence Alignment Visualization")
        ax.legend()
        plt.tight_layout()
        plt.savefig(output_path)
        plt.close()
        
        logger.info(f"시각화 결과를 {output_path}에 저장했습니다.")
    except ImportError:
        logger.error("matplotlib 라이브러리가 설치되지 않았습니다.")
        logger.error("pip install matplotlib 명령어로 설치하세요.")


#####################################################
# 6. 사용 예시
#####################################################

def example_usage_scripts():
    """스크립트 텍스트 매핑 예시"""
    # 샘플 스크립트
    ko_script = """
    안녕하세요. 제 이름은 조윤장입니다. 
    만나서 반갑습니다. 잘 부탁드립니다.
    """
    
    en_script = """
    Hello. My name is Jo Yoon Jang.
    Nice to meet you. Please take care of me.
    """
    
    # 시스템 초기화 및 매핑
    system = SentenceMappingSystem(similarity_threshold=0.5)
    mappings = system.map_scripts(ko_script, en_script, 'ko', 'en')
    
    # 결과 출력
    for source, target, similarity in mappings:
        print(f"유사도: {similarity:.4f}")
        print(f"  한국어: {source.text}")
        print(f"  영어: {target.text}")
        print()
    
    # CSV로 내보내기
    system.export_mapping_to_csv(mappings, "script_mappings.csv")


def example_usage_textgrids():
    """TextGrid 파일 매핑 예시"""
    # TextGrid 파일 경로
    ko_tg_path = "./sound_sample/korean_yunjang/윤장목소리1.TextGrid"
    en_tg_path = "./sound_sample/AI_eng_yunjang/mine_en.TextGrid"
    
    # 시스템 초기화 및 매핑
    system = SentenceMappingSystem(similarity_threshold=0.4, enforce_sequential=True)
    mappings = system.map_textgrids(ko_tg_path, en_tg_path, "words", "words", 0.4)
    
    # isochrony 점수 계산
    score = calculate_isochrony_score(mappings)
    print(f"\n✅ Final Isochrony Score: {score:.3f}")
    
    # 결과 시각화
    visualize_mappings(mappings, "textgrid_mappings.png")


if __name__ == "__main__":
    # 스크립트 매핑 예시 실행
    # example_usage_scripts()
    
    # TextGrid 매핑 예시 실행
    example_usage_textgrids()

  from .autonotebook import tqdm as notebook_tqdm
2025-04-23 12:11:32,712 - sentence_transformers.SentenceTransformer - INFO - Use pytorch device_name: cpu
2025-04-23 12:11:32,712 - sentence_transformers.SentenceTransformer - INFO - Load pretrained SentenceTransformer: paraphrase-multilingual-MiniLM-L12-v2
2025-04-23 12:11:37,085 - SentenceMapper - INFO - 다국어 임베딩 모델 'paraphrase-multilingual-MiniLM-L12-v2' 로드 완료
2025-04-23 12:11:37,085 - SentenceMapper - INFO - 문장 매핑 시스템 초기화 완료
2025-04-23 12:11:37,095 - SentenceMapper - INFO - TextGrid에서 5개 문장 추출 완료
2025-04-23 12:11:37,096 - SentenceMapper - INFO - TextGrid에서 6개 문장 추출 완료
Batches: 100%|██████████| 1/1 [00:00<00:00, 10.89it/s]
2025-04-23 12:11:37,196 - SentenceMapper - INFO - 5개 문장의 임베딩 계산 완료
Batches: 100%|██████████| 1/1 [00:00<00:00, 37.97it/s]
2025-04-23 12:11:37,231 - SentenceMapper - INFO - 6개 문장의 임베딩 계산 완료
2025-04-23 12:11:37,257 - SentenceMapper - INFO - 임계값 0.4 이상의 매핑 5개 찾음
2025-04-23 12:11:37,257 - SentenceMapper - INFO - 매핑: (유사


✅ Final Isochrony Score: 0.949


2025-04-23 12:11:37,921 - SentenceMapper - INFO - 시각화 결과를 textgrid_mappings.png에 저장했습니다.
