In [None]:
#@title RAG 데이터 확인용_검색 순위 평가 도구

# ------------------------------------------------
# 1. 설치 및 라이브러리 설치
# ------------------------------------------------
!pip install -q openai numpy scipy ipywidgets

import os
import csv
import json
import numpy as np
import openai
from scipy.spatial.distance import cosine
import ipywidgets as widgets
from google.colab import files
from google.colab import userdata
from IPython.display import display, clear_output
import re

# ------------------------------------------------
# 2. API 키 설정 및 클라이언트 초기화
# ------------------------------------------------
try:
    OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
    if not OPENAI_API_KEY:
        raise ValueError("API 키가 설정되어 있지 않습니다. Colab의 비밀 기능으로 'OPENAI_API_KEY'를 설정하십시오.")
    openai_client = openai.OpenAI(api_key=OPENAI_API_KEY)
    print("✅ OpenAI API 키 로드 및 클라이언트 초기화가 완료되었습니다.")
except Exception as e:
    print(f"❌ 오류: {e}")

# ------------------------------------------------
# 3. 전역 변수 (응용 프로그램 상태 관리)
# ------------------------------------------------
documents = [] # 형식: [{"id": int, "text": str, "metadata": dict}, ...]
doc_embeddings = None
last_embedding_model = None
last_division_settings = None
file_name = "uploaded_data"
current_query = ""

# ------------------------------------------------
# 4. UI 구성 요소 정의
# ------------------------------------------------
file_uploader = widgets.FileUpload(accept='.txt,.json', multiple=False, description='파일 업로드')
division_method_selector = widgets.Dropdown(
    options=[('1행을 1문서로 분할', 'line'), ('고정 길이의 청크로 분할', 'chunk'), ('JSON 객체별로 분할', 'json_object')],
    value='line', description='분할 방법:', style={'description_width': 'initial'}
)
json_content_key_input = widgets.Text(value='content', placeholder='콘텐츠의 키 이름', description='JSON 콘텐츠 키:', layout=widgets.Layout(display='none'))
chunk_size_input = widgets.IntText(value=500, description='청크 크기:', layout=widgets.Layout(display='none', width='200px'))
chunk_overlap_input = widgets.IntText(value=50, description='오버랩:', layout=widgets.Layout(display='none', width='200px'))
division_settings_box = widgets.HBox([chunk_size_input, chunk_overlap_input, json_content_key_input])
embedding_model_selector = widgets.Dropdown(
    options=[('사용 안 함(키워드 검색)', 'none'), ('text-embedding-3-small', 'text-embedding-3-small'), ('text-embedding-3-large', 'text-embedding-3-large'), ('text-embedding-ada-002', 'text-embedding-ada-002')],
    value='text-embedding-3-small', description='내장 모델:', style={'description_width': 'initial'}
)
query_input = widgets.Text(value='', placeholder='검색 쿼리 입력...', description='쿼리:', layout=widgets.Layout(width='80%'))
search_button = widgets.Button(description='검색실행', button_style='success', icon='search')
output_area = widgets.Output()
download_area = widgets.Output()

# ------------------------------------------------
# 5. 도우미 함수 (코어 로직)
# ------------------------------------------------
def get_embedding(text, model="text-embedding-3-small"):
    if not text or not isinstance(text, str): return None
    try:
        return openai_client.embeddings.create(input=[text.replace("\n", " ")], model=model).data[0].embedding
    except Exception as e:
        with output_area: print(f"내장 검색 오류: {e}")
        return None

def split_text_into_chunks(text, chunk_size, chunk_overlap):
    if chunk_size <= chunk_overlap: raise ValueError("청크 크기는 오버랩보다 커야 합니다.")
    chunks, start_index = [], 0
    while start_index < len(text):
        end_index = start_index + chunk_size
        chunks.append(text[start_index:end_index])
        start_index += chunk_size - chunk_overlap
    return chunks

def update_documents(new_docs):
    global documents, doc_embeddings, last_division_settings
    documents = new_docs
    doc_embeddings, last_division_settings = None, None
    with output_area: print(f"✅ {len(documents)} 문서를 생성했습니다.")
    display_source_document_downloader()

def display_source_document_downloader():
    with download_area:
        clear_output()
        if documents:
            button = widgets.Button(description=f"분할된 전체{len(documents)}개의 문서 다운로드", button_style='info', icon='download')
            def download_source(b):
                base_name = os.path.splitext(file_name)[0]
                dl_filename = f"divided_{base_name}.txt"
                content = "\n".join([f"--- 분할 문서 ID: {d['id']} ---\n{d.get('text', '')}\n" for d in documents])
                with open(dl_filename, "w", encoding="utf-8") as f: f.write(content)
                files.download(dl_filename)
            button.on_click(download_source)
            display(widgets.VBox([widgets.HTML("<hr>"), button]))

def display_results_downloader(results, query):
    """★복원★: 검색 결과를 CSV로 다운로드하는 버튼 표시"""
    with download_area:
        clear_output(wait=True)
        display_source_document_downloader()
        if results:
            button = widgets.Button(description=f"모든 {len(results)}개의 검색 결과를 CSV에서 다운로드", button_style='success', icon='download')
            def download_csv(b):
                query_sanitized = re.sub(r'[\\/*?:"<>|]', "", query)[:20]
                dl_filename = f"search_results_{query_sanitized}.csv"
                with open(dl_filename, 'w', newline='', encoding='utf-8-sig') as f:
                    writer = csv.writer(f)
                    writer.writerow(['Rank', 'Split_ID', 'Score', 'Text'])
                    for i, r in enumerate(results):
                        score_str = f"{r['score']:.6f}" if isinstance(r['score'], float) else str(r['score'])
                        writer.writerow([i + 1, r['doc_info']['id'], score_str, r['doc_info']['text']])
                files.download(dl_filename)
            button.on_click(download_csv)
            display(button)

# ------------------------------------------------
# 6. 이벤트 핸들러 (UI 동작 정의)
# ------------------------------------------------
def on_division_method_change(change):
    method = change['new']
    chunk_size_input.layout.display = 'flex' if method == 'chunk' else 'none'
    chunk_overlap_input.layout.display = 'flex' if method == 'chunk' else 'none'
    json_content_key_input.layout.display = 'flex' if method == 'json_object' else 'none'

def on_file_upload(change):
    # (이 함수는 정상적으로 작동합니다)
    global file_name
    uploaded_file = change['new']
    if not uploaded_file: return
    file_info = next(iter(uploaded_file.values()))
    file_name, content_bytes = file_info['metadata']['name'], file_info['content']
    with output_area:
        clear_output()
        print(f"'{file_name}' 로드 중...")
        try:
            new_docs = []
            method = division_method_selector.value
            if method == 'json_object':
                if not file_name.endswith('.json'): raise ValueError("이 분할 방법은 JSON 파일에서만 사용할 수 있습니다.")
                json_data = json.loads(content_bytes.decode('utf-8'))
                if not isinstance(json_data, list): raise ValueError("JSON 데이터는 객체 목록 형식이어야 합니다.")
                if json_data and isinstance(json_data[0], dict):
                    common_keys = ['content', 'text', 'body', 'document', 'description']
                    found_key = next((key for key in common_keys if key in json_data[0]), None)
                    if found_key:
                        json_content_key_input.value = found_key
                        print(f"ℹ️ 콘텐츠 키로 '{found_key}'를 자동 감지했습니다.")
                content_key = json_content_key_input.value
                for i, item in enumerate(json_data):
                    if not isinstance(item, dict): continue
                    metadata = item.get('metadata', {})
                    content = item.get(content_key, "")
                    meta_str = ", ".join([f"{k}: {v}" for k, v in metadata.items()])
                    combined_text = f"메타데이터: [ {meta_str} ]\n내용: {content}"
                    new_docs.append({"id": i, "text": combined_text, "metadata": metadata})
            else:
                full_text = content_bytes.decode('utf-8')
                if method == 'chunk':
                    chunks = split_text_into_chunks(full_text, chunk_size_input.value, chunk_overlap_input.value)
                    new_docs = [{"id": i, "text": chunk, "metadata": {}} for i, chunk in enumerate(chunks)]
                else:
                    lines = full_text.splitlines()
                    new_docs = [{"id": i, "text": line.strip(), "metadata": {}} for i, line in enumerate(lines) if line.strip()]
            update_documents(new_docs)
        except Exception as e:
            with output_area: print(f"❌ 파일처리 오류: {e}")
            with download_area: clear_output()

def on_search_button_clicked(b):
    """★복원★: 검색 버튼을 클릭했을 때의 메인 처리"""
    global documents, doc_embeddings, last_embedding_model, last_division_settings, current_query

    with output_area:
        clear_output()
        current_query = query_input.value
        embedding_model = embedding_model_selector.value
        current_division_settings = (division_method_selector.value, chunk_size_input.value, chunk_overlap_input.value, json_content_key_input.value)

        if not current_query: print("❌ 쿼리를 입력하십시오."); return
        if not documents: print("❌ 검색할 문서가 없습니다. 파일을 업로드하세요."); return

        print(f"🔍 검색 시작...\n쿼리: {current_query}\n포함된 모델: {embedding_model}\n" + "-" * 30)

        results = []
        doc_texts = [d['text'] for d in documents]

        if embedding_model == 'none':
            print("키워드 검색 중...")
            query_words = current_query.lower().split()
            for doc_info in documents:
                score = sum(1 for word in query_words if word in doc_info['text'].lower())
                results.append({'doc_info': doc_info, 'score': score})
            results.sort(key=lambda x: x['score'], reverse=True)
        else:
            if doc_embeddings is None or last_embedding_model != embedding_model or last_division_settings != current_division_settings:
                print(f"문서 포함을 생성하는 중 (모델: {embedding_model})...")
                temp_embeddings = [get_embedding(text, embedding_model) for text in doc_texts]

                valid_docs_with_embeddings = [(documents[i], emb) for i, emb in enumerate(temp_embeddings) if emb is not None]
                if not valid_docs_with_embeddings: print("❌ 모든 문서의 포함 생성에 실패했습니다."); return

                valid_docs, doc_embeddings_list = zip(*valid_docs_with_embeddings)
                documents = list(valid_docs)
                doc_embeddings = np.array(doc_embeddings_list)
                last_embedding_model = embedding_model
                last_division_settings = current_division_settings
                print("✅ 포함 완료.")

            print("검색어 퍼가기 생성 중...")
            query_embedding = get_embedding(current_query, embedding_model)
            if query_embedding is None: print("❌ 쿼리를 삽입하지 못했습니다."); return

            print("유사도 계산 중...")
            similarities = [1 - cosine(query_embedding, doc_emb) for doc_emb in doc_embeddings]
            sorted_indices = np.argsort(similarities)[::-1]
            results = [{'doc_info': documents[i], 'score': similarities[i]} for i in sorted_indices]

        print(f"\n🏆 검색 결과 (전체{len(results)}건) 🏆\n")
        if not results:
            print("일치하는 결과를 찾을 수 없습니다.")
        else:
            for i, result in enumerate(results):
                score_str = f"{result['score']:.4f}" if isinstance(result['score'], float) else str(result['score'])
                print(f"【Rank {i+1}】(분할ID: {result['doc_info']['id']}) Score: {score_str}")
                print(f"   📄 {result['doc_info']['text']}")
                print("-" * 20)

    display_results_downloader(results, current_query)

# ------------------------------------------------
# 7. UI의 최종 조립 및 이벤트 리스너 등록
# ------------------------------------------------
division_method_selector.observe(on_division_method_change, names='value')
file_uploader.observe(on_file_upload, names='value')
search_button.on_click(on_search_button_clicked)
on_division_method_change({'new': division_method_selector.value})

ui = widgets.VBox([
    widgets.HTML("<h2>검색 순위 평가 도구</h2>"),
    widgets.HTML("""
    <p><b>사용법:</b></p>
    <ol>
      <li><b>분할 방법</b>을 선택합니다.(JSON의 경우 'JSON 객체별'을 선택하면 편리합니다.)</li>
      <li><b>파일 업로드</b>: JSON의 경우 콘텐츠 키가 자동으로 추측됩니다.</li>
      <li>(선택 사항) 분할된 모든 문서를 다운로드하고 의도한 대로 확인합니다.</li>
      <li><b>검색어 입력</b>을 통해 '검색 실행'을 수행하면 전체 순위가 표시되며 CSV에서 결과를 다운로드할 수 있습니다.</li>
    </ol>
    """),
    widgets.HBox([file_uploader, division_method_selector]),
    division_settings_box,
    embedding_model_selector,
    widgets.HBox([query_input, search_button]),
    widgets.HTML("<hr>"),
    output_area,
    download_area
])

display(ui)