In [16]:
# 셀 1: 의존성 설치 및 환경 확인
%pip install numpy sentence-transformers torch scikit-learn pandas

# VEC 문장 분할 및 의미 기반 원문 정렬 (최신 요구사항 반영, \p{Han} 사용)
import unicodedata
import torch
import numpy as np
import pandas as pd
import regex as re
from sentence_transformers import SentenceTransformer, util
from sklearn.metrics.pairwise import cosine_similarity
from concurrent.futures import ThreadPoolExecutor, as_completed


Note: you may need to restart the kernel to use updated packages.


In [17]:
# 0. SBERT 모델 및 토크나이저 로딩
_model = None
_tokenizer = None
def get_model_and_tokenizer():
    global _model, _tokenizer
    if _model is None:
        model_name = 'snunlp/KR-SBERT-V40K-klueNLI-augSTS'
        _model = SentenceTransformer(model_name)
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        _model = _model.to(device)
        _tokenizer = _model.tokenizer
    return _model, _tokenizer

In [18]:
# 임베딩 기반 원문-번역문 의미 단위 분할 함수 정의

def split_original_sentences(text):
    """
    원문을 문장 단위로 분할 (중국어, 한국어, 영어 문장부호)
    너무 짧은 구는 제거 (한자 3자 이하)
    """
    parts = re.split(r'(?<=[。？！.!?])\s*', text)
    # 한자 3자 이상만 유지
    return [p.strip() for p in parts if len(re.findall(r'\p{Han}', p)) > 3]

def align_and_group(orig_segments, trans_sentences, model):
    """
    원문 세그먼트와 번역문 문장 간 의미 기반 유사도 정렬 및 묶기
    """
    if not orig_segments or not trans_sentences:
        return [''.join(orig_segments)]

    emb_orig = model.encode(orig_segments, convert_to_tensor=True)
    emb_trans = model.encode(trans_sentences, convert_to_tensor=True)
    sim_matrix = util.cos_sim(emb_trans, emb_orig)
    assignments = [int(row.argmax()) for row in sim_matrix]

    grouped = []
    if assignments:
        current = [orig_segments[assignments[0]]]
        last_idx = assignments[0]
        for o_idx in assignments[1:]:
            if o_idx == last_idx or o_idx == last_idx + 1:
                current.append(orig_segments[o_idx])
            else:
                grouped.append(''.join(current))
                current = [orig_segments[o_idx]]
            last_idx = o_idx
        grouped.append(''.join(current))
    else:
        grouped = [''.join(orig_segments)]

    return grouped

# -- 사용 예시 --
# model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
# for pid, orig_text in paragraphs_dict.items():
#     segments = split_original_sentences(orig_text)
#     trans_sentences = translations_dict[pid]
#     result = align_and_group(segments, trans_sentences, model)
#     final_split = ' | '.join(result)
#     print(pid, final_split)


In [19]:
# DP 기반 1:N 매핑 지원

def align_by_similarity(src_units, tgt_units, model, sim_threshold=0.3):
    src_vecs = model.encode(src_units, batch_size=32, convert_to_numpy=True,
                            normalize_embeddings=True)
    tgt_vecs = model.encode(tgt_units, batch_size=32, convert_to_numpy=True,
                            normalize_embeddings=True)
    sim = cosine_similarity(src_vecs, tgt_vecs)
    n, m = sim.shape

    # DP 테이블 초기화
    dp = np.full((n + 1, m + 1), -np.inf)
    bt = np.zeros((n + 1, m + 1), dtype=int)
    dp[0, 0] = 0

    # 점수 계산 함수
    def score(i, j):
        return sim[i, j] if sim[i, j] >= sim_threshold else -1.0

    for i in range(n + 1):
        for j in range(m + 1):
            if i < n and j < m:
                s = score(i, j)
                if dp[i, j] + s > dp[i + 1, j + 1]:
                    dp[i + 1, j + 1] = dp[i, j] + s
                    bt[i + 1, j + 1] = 1  # 대각선 (정렬)

            if i < n and dp[i, j] > dp[i + 1, j]:
                dp[i + 1, j] = dp[i, j]
                bt[i + 1, j] = 2  # src는 대응 없음

            if j < m and dp[i, j] > dp[i, j + 1]:
                dp[i, j + 1] = dp[i, j]
                bt[i, j + 1] = 3  # tgt는 대응 없음

    # 역추적
    i, j = n, m
    pairs = []
    while i > 0 or j > 0:
        move = bt[i, j]
        if move == 1:
            i -= 1
            j -= 1
            pairs.append((i, j))
        elif move == 2:
            i -= 1
            pairs.append((i, None))
        elif move == 3:
            j -= 1
            pairs.append((None, j))
        else:
            break
    pairs.reverse()
    return pairs


In [20]:
# src_for_tgt: DP 매핑 결과로 1:N 및 누락 src 포함

def split_src_for_tgt(src_text, tgt_sents):
    tgt_concat = ' '.join(tgt_sents)
    norm_src = unicodedata.normalize('NFKC', src_text)
    norm_src = re.sub(r'\s+', ' ', norm_src.strip())
    norm_tgt = unicodedata.normalize('NFKC', tgt_concat)
    norm_tgt = re.sub(r'\s+', ' ', norm_tgt.strip())

    src_chunks = [chunk for chunk in re.split(r'(?<=[.!?。！？])\s+', norm_src) if chunk.strip()]
    flat_units = src_chunks
    flat_boundaries = []
    idx = 0
    for unit in flat_units:
        start = norm_src.find(unit, idx)
        end = start + len(unit)
        flat_boundaries.append((start, end))
        idx = end

    model, _ = get_model_and_tokenizer()
    pairs = align_by_similarity(flat_units, tgt_sents, model)

    grouped_idx = {}
    unmatched_src = []
    for src_i, tgt_j in pairs:
        # None이 아닌 값만 grouped_idx에 추가
        if tgt_j is not None and src_i is not None:
            grouped_idx.setdefault(tgt_j, []).append(src_i)
        elif src_i is not None:
            unmatched_src.append(src_i)

    aligned = []
    for j in range(len(tgt_sents)):
        # None이 아닌 인덱스만 사용
        idxs = sorted(i for i in grouped_idx.get(j, []) if i is not None)
        if idxs:
            st = flat_boundaries[idxs[0]][0]
            en = flat_boundaries[idxs[-1]][1]
            merged_src = norm_src[st:en]
        else:
            merged_src = ''
        aligned.append((merged_src, tgt_sents[j]))

    for src_i in unmatched_src:
        if src_i is not None:
            st, en = flat_boundaries[src_i]
            aligned.append((norm_src[st:en], ''))
    return aligned

In [21]:
# 메인 처리

def process_text(src_text, tgt_text, para_id):
    # split_translated_sentences 대신 직접 분할
    tgt_chunks = [chunk for chunk in re.split(r'(?<=[.!?。！？])\s+', tgt_text) if chunk.strip()]
    aligned = split_src_for_tgt(src_text, tgt_chunks)
    return [{'문단식별자': para_id, '원문': o, '번역문': t} for o, t in aligned]

def main(input_path, output_path):
    df = pd.read_excel(input_path)
    rows = []
    # 컬럼명: '문단식별자', '원문', '번역문'으로 고정
    for _, row in df.iterrows():
        pid = row['문단식별자']
        src = row['원문']
        tgt = row['번역문']
        rows.extend(process_text(src, tgt, pid))
    out_df = pd.DataFrame(rows)
    out_df.to_excel(output_path, index=False)
    print(f"✅ Completed and saved to {output_path}")

In [22]:
# 실행 예시
input_path = "C:/Users/junto/Downloads/head-repo/private725/PC2024/split_root/input_p.xlsx"
output_path = "C:/Users/junto/Downloads/head-repo/private725/PC2024/split_root/output_p.xlsx"
main(input_path, output_path)

✅ Completed and saved to C:/Users/junto/Downloads/head-repo/private725/PC2024/split_root/output_p.xlsx
