In [14]:
import re
def instruct_structure(prompt):
    input_text, output_text = prompt.split('### target')
    input_text = input_text.replace('### glossaries', '### glossary').replace('\n* ', '\n• ')
    input_text = re.sub(r"\[[^\]]+\] ", "[UNK] ", input_text)
    return input_text

project_id = "prod-ai-project"

from google.cloud import bigquery
client = bigquery.Client(project=project_id)
sql = """select series_id, episode_id, org_input_text, org_output_text, prompt 
        from webtoon_translation.structured_240820_ep_line
        where data_split = 'romance_valid'"""
df = client.query(sql).result().to_dataframe()
from tqdm import tqdm
tqdm.pandas()
df['prompt'] = df['prompt'].progress_apply(lambda x: instruct_structure(x))

100%|████████████████████████████████████████████████████████████████████████████████████████████████| 74/74 [00:00<00:00, 18400.43it/s]


In [81]:
import Levenshtein

def word_overlap_boost(s1, s2):
    """공통 단어 개수 비율 계산"""
    words_s1, words_s2 = set(s1.split()), set(s2.split())
    common_words = len(words_s1 & words_s2)
    total_words = len(words_s1 | words_s2)
    return common_words / total_words if total_words > 0 else 0

# def substring_containment(s1, s2):
#     """짧은 문장이 긴 문장에 포함되는 정도 계산"""
#     if len(s1) > len(s2):
#         long_s, short_s = s1, s2
#     else:
#         long_s, short_s = s2, s1
    
#     common_length = len(short_s) if short_s in long_s else sum(c in long_s for c in short_s)
#     return common_length / len(short_s) if len(short_s) > 0 else 0

# def substring_containment(s1, s2):
#     """bi-gram 단위로 짧은 문장이 긴 문장에 포함되는 정도 계산"""
#     if len(s1) > len(s2):
#         long_s, short_s = s1, s2
#     else:
#         long_s, short_s = s2, s1
    
#     # bi-gram(2-gram) 집합 생성
#     def get_bigrams(s):
#         return {s[i:i+2] for i in range(len(s) - 1)}

#     long_bigrams = get_bigrams(long_s)
#     short_bigrams = get_bigrams(short_s)

#     # 짧은 문장의 bi-gram 중 긴 문장에 포함된 bi-gram의 개수
#     common_bigrams = short_bigrams & long_bigrams

#     return len(common_bigrams) / len(short_bigrams) if short_bigrams else 0

def substring_containment(s1, s2, tri_weight=4, bi_weight=2, char_weight=1):
    """bi-gram과 tri-gram 단위로 짧은 문장이 긴 문장에 포함되는 정도 계산 (tri-gram에 높은 가중치)"""
    if len(s1) > len(s2):
        long_s, short_s = s1, s2
    else:
        long_s, short_s = s2, s1
    
    # n-gram(2-gram, 3-gram) 집합 생성
    def get_ngrams(s, n):
        return {s[i:i+n] for i in range(len(s) - (n-1))}

    long_char = get_ngrams(long_s, 1)
    short_char = get_ngrams(short_s, 1)
    
    long_bigrams = get_ngrams(long_s, 2)
    short_bigrams = get_ngrams(short_s, 2)
    
    long_trigrams = get_ngrams(long_s, 3)
    short_trigrams = get_ngrams(short_s, 3)

    #char 계산
    common_char = long_char & short_char
    char_score = len(common_char) / len(short_char) if short_char else 0
    
    # bi-gram 포함도 계산
    common_bigrams = short_bigrams & long_bigrams
    bigram_score = len(common_bigrams) / len(short_bigrams) if short_bigrams else 0

    # tri-gram 포함도 계산
    common_trigrams = short_trigrams & long_trigrams
    trigram_score = len(common_trigrams) / len(short_trigrams) if short_trigrams else 0

    # 최종 점수 계산 (tri-gram의 가중치를 더 높게 설정)
    total_weight = char_weight + bi_weight + tri_weight
    final_score = (char_weight * char_score + bi_weight * bigram_score + tri_weight * trigram_score) / total_weight

    return final_score

def adjusted_edit_distance(s1, s2):
    """Levenshtein 거리 기반 + 공통 단어 + 포함도 반영"""
    #edit_distance = Levenshtein.distance(s1, s2)
    #word_overlap = word_overlap_boost(s1, s2)
    containment_ratio = substring_containment(s1, s2)

    #print(word_overlap, containment_ratio)
    
    # 가중치 설정
    #WEIGHT_WORD = 0.4
    #WEIGHT_CONTAINMENT = 1.0
    #WEIGHT_DECREASE = 0.5  # 포함 비율이 높을 때 거리 감소 가중치
    #WEIGHT_INCREASE = 2.5  # 포함 비율이 낮을 때 거리 증가 가중치

    #combined_similarity = (
    #    WEIGHT_WORD * word_overlap +
    #    WEIGHT_CONTAINMENT * containment_ratio
    #)
    combined_similarity = containment_ratio

    # 겹치는 문자가 없으면 후보 제외
    if combined_similarity == 0:
        return -float('inf')

    # 거리 조정 (가중치를 반영한 새로운 유사도 적용)
    #WEIGHT_DISTANCE = 0.5
    #adjusted_distance = edit_distance * (1 - WEIGHT_DISTANCE * combined_similarity)
    #adjusted_distance = edit_distance
    adjusted_distance = combined_similarity
    # 포함 비율에 따른 거리 추가 조정
    # if containment_ratio > 0.7:  # 짧은 문장이 충분히 포함되면 거리 감소
    #     adjusted_distance *= (1 - WEIGHT_DECREASE * containment_ratio)
    # elif containment_ratio < 0.3:  # 짧은 문장이 거의 포함되지 않으면 거리 증가
    #     adjusted_distance *= (1 + WEIGHT_INCREASE * (1 - containment_ratio))

    # 길이 정규화 적용
    len1, len2 = len(s1), len(s2)
    length_factor = max(len1, len2) / min(len1, len2)
    adjusted_distance /= length_factor ** 0.5

    return adjusted_distance

def find_most_similar_sentence(target, sentences):
    """주어진 문장 목록에서 target 문장과 가장 유사한 문장을 찾음"""
    similarities = [(sentence, adjusted_edit_distance(target, sentence)) for sentence in sentences]
    similarities.sort(key=lambda x: x[1],reverse=True)  # 거리 기준 정렬 (높을수록 유사)
    return similarities[:5]

In [82]:
from langchain_community.vectorstores import FAISS
from langchain.docstore.document import Document
import bs4
import ssl
import urllib3
import pandas as pd
import faiss
import os
from langchain.embeddings import HuggingFaceEmbeddings
import torch
import gc

In [83]:
def extract_top(question):
    file_path = "./data/idiom_dict.txt"
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
    origin_lines = [line.split('>')[0].strip() for line in lines]

    result = find_most_similar_sentence(question, origin_lines)
    #문법적 유사성 판단 결과 
    #4점 이하는 문법적 유사성이 낮다고 판단
    result = [r[0] for r in result if r[1] < 3]
    #문법적 유사성으로 1차 필터링
    if len(result) == 0:
        return None
    print(result)
    
    docs = [Document(page_content=f"{line}") for line in result]
    embeddings = HuggingFaceEmbeddings(model_name="nlpai-lab/KURE-v1")
    #print(docs)
    vectorstore = FAISS.from_documents(documents=docs, embedding=embeddings)
    retriever = vectorstore.as_retriever(search_type='similarity', search_kwargs={'k': 5})
    #retriever = vectorstore.as_retriever(search_type='similarity_score_threshold', 
    #                                 search_kwargs={'k':5,'score_threshold' : 0.3})

    docs_with_scores = retriever.vectorstore.similarity_search_with_score(question, k=5)
    doc = docs_with_scores[0][0]
    score = 1 / (1+ docs_with_scores[0][1])
    #print("@@@")
    #for doc, score in docs_with_scores:
        #cosine_sim = 1 / (1 + score)  # FAISS L2 거리 → 코사인 유사도 변환
        #print(f"문서 내용: {doc.page_content}")
        #print(f"유사도 점수 (코사인 유사도 변환): {cosine_sim}")
    #idiom = retriever.invoke(question) #retriever로 문서 가져오고

    if hasattr(vectorstore, "index") and isinstance(vectorstore.index, faiss.Index):
        # FAISS 인덱스를 CPU로 변환 (GPU 해제)
        vectorstore.index = faiss.index_gpu_to_cpu(vectorstore.index)
    
    # FAISS 객체 삭제
    del vectorstore
    
    # Python 가비지 컬렉션 실행
    gc.collect()
    
    # PyTorch GPU 캐시 정리
    torch.cuda.empty_cache()
    
    #print(idiom)
    if score > 0.5:
        return doc.page_content
    else:
        return None

In [84]:
data_idx = 3
data = df['prompt'][data_idx]
example = data.split("### source")[1].strip()
#print(example)

In [85]:
def extract_top_debug(question):
    file_path = "./data/idiom_dict.txt"
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
    origin_lines = [line.split('>')[0].strip() for line in lines]

    result = find_most_similar_sentence(question, origin_lines)
    #문법적 유사성 판단 결과 
    #3점 이하는 문법적 유사성이 높다고 판단
    #result = [(r[0], r[1]) for r in result if r[1] < 3]
    #문법적 유사성으로 1차 필터링
    if len(result) == 0:
        return None
    else:
        return result
    #print(result)

In [86]:
tmp = extract_top_debug("왜 남주가 벌써 내 코앞에서 굴러다니고 있는 거지?")

In [87]:
print(tmp)
print(find_most_similar_sentence("왜 남주가 벌써 내 코앞에서 굴러다니고 있는 거지?",["코 앞이다"]))

[('코앞에서 놓치다', 0.29725995273932915), ('뼈가 있는 말', 0.2571428571428572), ('화성에 가 있는 듯', 0.21461942290627503), ('접시는 깨라고 있는 것이다', 0.21427478217774165), ('코 앞에서 놓치다', 0.21405130869400113)]
[('코 앞이다', 0.048294528841629526)]


In [88]:
sens = example.split('\n')
cnt = 0
for sen in sens: 
    q = sen.split('[UNK]')[1].strip()
    idiom = extract_top_debug(q)
    if idiom and cnt < 30:
        print(sen,'/',idiom)
        print()
        cnt +=1
    else:
        break

000	[UNK] 이 녀석, 남자 주인공인 시그렌 아냐?! / [('곰 같은 남자', 0.1584785162583248), ('여우 같은 여자, 곰 같은 남자', 0.15241143878485672), ('차가운 도시 남자', 0.1435841559129621), ('울지 않는 녀석이 더 무섭다', 0.1261701018119691), ('개도 주인을 알아본다', 0.1257078722109418)]

001	[UNK] 원작 시작까지는 아직 6년이나 남았는데 / [('지는 해', 0.25458753860865774), ('아직 이르다', 0.18835554191923243), ('아직 멀었다', 0.17562881611387887), ('낯 가리는 아이', 0.15955176322610862), ('지는 해가 아름답다', 0.1588246128645101)]

002	[UNK] 왜 남주가 벌써 내 코앞에서 굴러다니고 있는 거지? / [('코앞에서 놓치다', 0.29725995273932915), ('뼈가 있는 말', 0.2571428571428572), ('화성에 가 있는 듯', 0.21461942290627503), ('접시는 깨라고 있는 것이다', 0.21427478217774165), ('코 앞에서 놓치다', 0.21405130869400113)]

003	[UNK] 얘가 이 시기에 아벨을 만나는 거던가? / [('곡식을 만나다', 0.2615750199185652), ('열에 아홉', 0.20447430099874028), ('홍시 먹다가 이 빠진다', 0.1932909224162628), ('하늘을 나는 기분', 0.18203380129379976), ('뛰는 놈 위에 나는 놈', 0.18169720697949243)]

004	[UNK] 아니, 지금 이런 생각 할 때가 아니지! / [('서둘러야 할 때가 아니다', 0.4276135746752218), ('찬 밥 더운 밥 가릴 때가 아니다', 0.2827158321885004), ('찬밥 더운밥 가릴

In [46]:
print(extract_top("그를 눈엣가시처럼 여겨 죽이고 싶어 하는 자들은 많았다."))

None
