# 보드게임 추천 시스템 통합 파이프 라인
이 노트북은 보드게임 리뷰와 메타데이터를 통합하여 벡터화하고 ChromaDB에 저장하는 전체 과정을 담고 있습니다. 한국어 쿼리로 영어 리뷰와 게임 데이터를 검색할 수 있는 크로스 언어 RAG 시스템 구축을 위한 파이프라인입니다.

## 1. 필요한 라이브러리 설치 및 임포트

In [None]:
!pip install ipython==8.16.1
# 노트북 셀에서 패키지 설치
!pip install -U langchain-huggingface langchain-chroma langchain-core langchain-text-splitters

In [None]:
# 필요한 라이브러리 설치 (최초 1회만 실행)
!pip install sentence-transformers langchain langchain-community chromadb pandas tqdm torch numpy

In [None]:
!pip install ipywidgets

In [None]:
!jupyter nbextension enable --py widgetsnbextension
!jupyter nbextension install --py widgetsnbextension


In [None]:
# 필요한 라이브러리 임포트
import pandas as pd
import numpy as np
import os
import time
import torch
from tqdm.notebook import tqdm

from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter


---

# 버그 개선

### 1) gpu 미사용 개선

In [None]:
print("1. 기존 패키지 제거 시작")
!pip uninstall -y torch torchvision torchaudio
print("기존 패키지 제거 완료")
print("pip 캐시 초기화")
!pip cache purge
print("2. CUDA 버전 설치 시작 (상세 로그 활성화)")
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 --verbose
print("설치 완료")

In [None]:
print("확인")

In [None]:
import torch
print(f"CUDA 사용 가능: {torch.cuda.is_available()}")
print(f"PyTorch 버전: {torch.__version__}")

In [None]:
def check_gpu_status_safe():
    """GPU 상태를 안전하게 진단합니다(타임아웃 및 오류 처리 포함)"""
    print("=== GPU 진단 시작 ===")
    
    import torch
    import platform
    import subprocess
    import threading
    import time
    
    # 기본 정보 수집
    print(f"OS: {platform.system()} {platform.release()}")
    print(f"Python: {platform.python_version()}")
    print(f"PyTorch: {torch.__version__}")
    
    # CUDA 가용성 확인
    cuda_available = torch.cuda.is_available()
    print(f"CUDA 사용 가능: {cuda_available}")
    
    if cuda_available:
        try:
            print(f"CUDA 버전: {torch.version.cuda}")
            print(f"GPU 개수: {torch.cuda.device_count()}")
            for i in range(torch.cuda.device_count()):
                print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
        except Exception as e:
            print(f"CUDA 정보 수집 중 오류: {e}")
    else:
        print("CUDA를 사용할 수 없습니다. 가능한 원인:")
        
        # 타임아웃 기능이 있는 subprocess 실행 함수
        def run_with_timeout(cmd, timeout=10):
            result = {"completed": False, "output": ""}
            
            def target():
                try:
                    process = subprocess.Popen(
                        cmd, 
                        shell=True, 
                        stdout=subprocess.PIPE, 
                        stderr=subprocess.PIPE,
                        text=True
                    )
                    stdout, stderr = process.communicate(timeout=timeout)
                    result["output"] = stdout
                    result["completed"] = True
                except subprocess.TimeoutExpired:
                    result["output"] = f"명령 실행 시간 초과 ({timeout}초)"
                except Exception as e:
                    result["output"] = f"오류: {e}"
            
            thread = threading.Thread(target=target)
            thread.start()
            thread.join(timeout=timeout+1)
            
            if thread.is_alive():
                print(f"경고: 명령 실행이 응답하지 않습니다. 실행을 중단합니다.")
                return f"시간 초과 ({timeout}초)"
            
            return result["output"] if result["completed"] else result["output"]
        
        # NVIDIA GPU 존재 여부 확인 (안전한 실행)
        if platform.system() == 'Windows':
            print("Windows에서 GPU 정보 확인 중... (최대 10초 대기)")
            gpu_info = run_with_timeout('nvidia-smi')
            if "NVIDIA" in gpu_info:
                print("NVIDIA GPU 발견:")
                print(gpu_info[:500]) # 출력 제한
                print("→ PyTorch가 CUDA와 함께 설치되지 않았을 수 있습니다.")
                print("→ pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118")
            else:
                print("→ NVIDIA GPU가 감지되지 않거나 드라이버가 설치되지 않았습니다.")
        elif platform.system() == 'Linux':
            print("Linux에서 GPU 정보 확인 중... (최대 10초 대기)")
            gpu_info = run_with_timeout('nvidia-smi')
            if "NVIDIA" in gpu_info:
                print("NVIDIA GPU 발견:")
                print(gpu_info[:500]) # 출력 제한
                print("→ PyTorch가 CUDA와 함께 설치되지 않았을 수 있습니다.")
            else:
                print("→ NVIDIA GPU가 감지되지 않거나 드라이버가 설치되지 않았습니다.")
                
    # PyTorch 빌드 정보 확인 (안전하게)
    try:
        if hasattr(torch, '__config__'):
            if hasattr(torch.__config__, 'show'):
                print("\nPyTorch 빌드 정보:")
                build_info = torch.__config__.show()
                print(build_info)
    except Exception as e:
        print(f"PyTorch 빌드 정보 가져오기 실패: {e}")
    
    print("=== GPU 진단 완료 ===")
    
    # GPU 사용 가능하면 True 반환
    return cuda_available

In [None]:
# 필요한 라이브러리 임포트
import pandas as pd
import numpy as np
import os
import time
import torch
import sys
import platform
import subprocess
import threading

# tqdm 조건부 임포트 (ipywidgets 문제 방지)
try:
    from tqdm.notebook import tqdm
    print("✅ tqdm.notebook 성공적으로 임포트됨")
except ImportError:
    from tqdm import tqdm
    print("⚠️ tqdm.notebook 임포트 실패 - 일반 tqdm 사용")

# GPU 진단 실행
print("\n🖥️ GPU 진단 시작...")
gpu_available = check_gpu_status_safe()
print(f"🖥️ GPU 사용 가능: {'✅ 예' if gpu_available else '❌ 아니오'}")

# 나머지 라이브러리 조건부 임포트
try:
    from langchain_huggingface import HuggingFaceEmbeddings
    from langchain_chroma import Chroma
    from langchain_core.documents import Document
    from langchain_text_splitters import RecursiveCharacterTextSplitter
    print("✅ LangChain 라이브러리 임포트 성공")
except ImportError:
    print("⚠️ 최신 LangChain 라이브러리 임포트 실패")
    try:
        from langchain.embeddings import HuggingFaceEmbeddings
        from langchain.vectorstores import Chroma
        from langchain.schema import Document
        from langchain.text_splitters import RecursiveCharacterTextSplitter
        print("✅ 레거시 LangChain 라이브러리 임포트 성공")
    except ImportError as e:
        print(f"❌ LangChain 라이브러리 임포트 실패: {e}")
        print("필요한 라이브러리를 설치하세요: pip install langchain langchain-chroma langchain-core langchain-huggingface langchain-text-splitters")

print("\n🔍 시스템 준비 완료")

In [None]:
print("안됨")

## 2) 프로그래스바

In [None]:
# Cell 1: 패키지 설치 (별도로 실행)
# 먼저 필요한 패키지들을 재설치합니다
# !pip install --upgrade jupyter notebook ipywidgets

# Cell 2: 설치 확인
import ipywidgets
print(f"ipywidgets 버전: {ipywidgets.__version__}")

# Cell 3: 진행 표시줄 테스트
from tqdm.notebook import tqdm
for i in tqdm(range(10), desc="테스트"):
    import time
    time.sleep(0.1)

In [None]:
def get_progress_bar():
    """환경에 상관없이 작동하는 프로그레스 바를 제공합니다."""
    try:
        # 가장 기본적인 tqdm 임포트 시도
        from tqdm import tqdm
        return tqdm
    except ImportError:
        # tqdm 자체가 없는 경우 가짜 프로그레스 바 사용
        class DummyTqdm:
            def __init__(self, iterable=None, **kwargs):
                self.iterable = iterable
                self.total = len(iterable) if iterable is not None else 0
                self.n = 0
                self.desc = kwargs.get('desc', '')
            
            def __iter__(self):
                for obj in self.iterable:
                    yield obj
                    self.n += 1
                    # 10%마다 진행 상황 출력
                    if self.total > 0 and self.n % max(1, self.total // 10) == 0:
                        print(f"{self.desc}: {self.n}/{self.total} ({self.n/self.total*100:.1f}%)")
            
            def update(self, n=1):
                self.n += n
            
            def close(self):
                pass
            
            def __enter__(self):
                return self
            
            def __exit__(self, *args, **kwargs):
                self.close()
        
        print("tqdm을 로드할 수 없어 기본 진행 표시기를 사용합니다.")
        return DummyTqdm

In [None]:
print("됨?")

## 2. 설정 및 경로 지정

In [None]:
# 환경 설정
CHROMA_PERSIST_DIR = "chroma_db"  # ChromaDB 저장 경로
REVIEW_CSV_PATH = "bgg-26m-reviews.csv"   # 리뷰 CSV 파일 경로
METADATA_CSV_PATH = "boardgames_combined.csv"  # 메타데이터 CSV 파일 경로 (없으면 None)

MIN_REVIEW_LENGTH = 20  # 최소 리뷰 길이 (너무 짧은 리뷰 필터링)
BATCH_SIZE = 1000  # 배치 처리 크기
MODEL_NAME = "intfloat/e5-base-v2"  # 크로스 언어 성능이 좋은 모델
ENRICH_TEXT = True  # 리뷰와 메타데이터를 결합하여 벡터화
MAX_CHUNK_SIZE = 512  # 최대 청크 크기
SAMPLE_SIZE = None  # 샘플링 크기 (None이면 전체 데이터 사용)

# ID 컬럼 설정 (명시적 지정) - 수정 버전
REVIEW_ID_COLUMN = "ID"  # 리뷰 데이터의 ID 컬럼 명시적 지정
METADATA_ID_COLUMN = "Game_Id"  # 메타데이터의 ID 컬럼 명시적 지정 (id가 아닌 Game_Id로 수정)

# 중요 메타데이터 필드 정의 수정
IMPORTANT_META_FIELDS = [
    'Game_Id', 'Title', 'Description', 'description_detail', 'AvgRating',
    'minplayers', 'maxplayers', 'playingtime', 'minage', 
    'category_bert', 'CategoryType', 'boardgamemechanic', 
]

# GPU 사용 가능 여부 확인
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"사용 장치: {device}")


In [None]:
import os
import torch
import sys
from pathlib import Path
import time

def validate_environment_settings():
    """환경 설정을 검증하고 상세 정보를 출력하는 함수"""
    print("\n" + "="*70)
    print("🔍 환경 설정 검증 시작")
    print("="*70)
    
    validation_results = {
        "success": 0,
        "warning": 0,
        "error": 0
    }
    
    # 시스템 정보 출력
    print(f"🖥️  시스템 정보:")
    print(f"   - Python 버전: {sys.version.split()[0]}")
    print(f"   - 운영체제: {sys.platform}")
    print(f"   - 현재 작업 디렉토리: {os.getcwd()}")
    
    # CHROMA_PERSIST_DIR 검증
    print(f"\n📁 ChromaDB 저장 경로: {CHROMA_PERSIST_DIR}")
    if os.path.exists(CHROMA_PERSIST_DIR):
        print(f"   ✅ 경로가 존재합니다.")
        print(f"   - 절대 경로: {os.path.abspath(CHROMA_PERSIST_DIR)}")
        print(f"   - 기존 데이터가 있을 수 있습니다.")
        validation_results["success"] += 1
    else:
        print(f"   ⚠️ 경로가 존재하지 않습니다. 실행 시 자동으로 생성됩니다.")
        # 쓰기 권한 확인
        try:
            parent_dir = os.path.dirname(CHROMA_PERSIST_DIR) or "."
            if os.access(parent_dir, os.W_OK):
                print(f"   ✅ 디렉토리 생성 권한이 있습니다.")
                validation_results["success"] += 1
            else:
                print(f"   ❌ 디렉토리 생성 권한이 없습니다. 경로를 변경하세요.")
                validation_results["error"] += 1
        except Exception as e:
            print(f"   ❌ 권한 확인 중 오류: {e}")
            validation_results["error"] += 1
    
    # 리뷰 CSV 파일 검증
    print(f"\n📄 리뷰 CSV 파일: {REVIEW_CSV_PATH}")
    if REVIEW_CSV_PATH and os.path.exists(REVIEW_CSV_PATH):
        file_size = os.path.getsize(REVIEW_CSV_PATH) / (1024 * 1024)  # MB 단위로 변환
        print(f"   ✅ 파일이 존재합니다.")
        print(f"   - 파일 크기: {file_size:.2f} MB")
        print(f"   - 절대 경로: {os.path.abspath(REVIEW_CSV_PATH)}")
        # 파일 미리보기 (첫 줄)
        try:
            with open(REVIEW_CSV_PATH, 'r', encoding='utf-8') as f:
                first_line = f.readline().strip()
                print(f"   - 첫 줄 미리보기: {first_line[:100]}{'...' if len(first_line) > 100 else ''}")
            validation_results["success"] += 1
        except Exception as e:
            print(f"   ⚠️ 파일 읽기 오류: {e}")
            print(f"   - UTF-8 인코딩이 아닐 수 있습니다. 코드에서 다양한 인코딩을 시도합니다.")
            validation_results["warning"] += 1
    else:
        print(f"   ❌ 파일이 존재하지 않습니다. 경로를 확인하세요.")
        validation_results["error"] += 1
    
    # 메타데이터 CSV 파일 검증
    print(f"\n📄 메타데이터 CSV 파일: {METADATA_CSV_PATH}")
    if METADATA_CSV_PATH and os.path.exists(METADATA_CSV_PATH):
        file_size = os.path.getsize(METADATA_CSV_PATH) / (1024 * 1024)  # MB 단위로 변환
        print(f"   ✅ 파일이 존재합니다.")
        print(f"   - 파일 크기: {file_size:.2f} MB")
        print(f"   - 절대 경로: {os.path.abspath(METADATA_CSV_PATH)}")
        # 파일 미리보기 (첫 줄)
        try:
            with open(METADATA_CSV_PATH, 'r', encoding='utf-8') as f:
                first_line = f.readline().strip()
                print(f"   - 첫 줄 미리보기: {first_line[:100]}{'...' if len(first_line) > 100 else ''}")
            validation_results["success"] += 1
        except Exception as e:
            print(f"   ⚠️ 파일 읽기 오류: {e}")
            print(f"   - UTF-8 인코딩이 아닐 수 있습니다. 코드에서 다양한 인코딩을 시도합니다.")
            validation_results["warning"] += 1
    else:
        print(f"   ⚠️ 파일이 존재하지 않거나 지정되지 않았습니다.")
        print(f"   - 메타데이터 없이 리뷰 데이터만 처리됩니다.")
        validation_results["warning"] += 1
    
    # 기타 설정 검증
    print(f"\n⚙️ 기타 설정 검증:")
    print(f"   - 최소 리뷰 길이: {MIN_REVIEW_LENGTH} 글자")
    if MIN_REVIEW_LENGTH < 10:
        print(f"   ⚠️ 최소 리뷰 길이가 짧습니다. 너무 짧은 리뷰가 포함될 수 있습니다.")
        validation_results["warning"] += 1
    else:
        print(f"   ✅ 최소 리뷰 길이가 적절합니다.")
        validation_results["success"] += 1
    
    print(f"   - 배치 처리 크기: {BATCH_SIZE}")
    if BATCH_SIZE < 100:
        print(f"   ⚠️ 배치 크기가 작습니다. 처리 속도가 느릴 수 있습니다.")
        validation_results["warning"] += 1
    elif BATCH_SIZE > 5000:
        print(f"   ⚠️ 배치 크기가 큽니다. 메모리 문제가 발생할 수 있습니다.")
        validation_results["warning"] += 1
    else:
        print(f"   ✅ 배치 크기가 적절합니다.")
        validation_results["success"] += 1
    
    print(f"   - 임베딩 모델: {MODEL_NAME}")
    # 온라인 연결 확인 (HF 모델에 접근 가능한지)
    print(f"   ⚠️ 모델 다운로드를 위해 인터넷 연결이 필요합니다.")
    validation_results["warning"] += 1
    
    print(f"   - 최대 청크 크기: {MAX_CHUNK_SIZE}")
    if MAX_CHUNK_SIZE < 256:
        print(f"   ⚠️ 청크 크기가 작습니다. 컨텍스트가 불충분할 수 있습니다.")
        validation_results["warning"] += 1
    elif MAX_CHUNK_SIZE > 1024:
        print(f"   ⚠️ 청크 크기가 큽니다. 처리 속도가 느릴 수 있습니다.")
        validation_results["warning"] += 1
    else:
        print(f"   ✅ 청크 크기가 적절합니다.")
        validation_results["success"] += 1
    
    print(f"   - 리뷰와 메타데이터 결합: {'예' if ENRICH_TEXT else '아니오'}")
    if ENRICH_TEXT:
        print(f"   ✅ 리뷰와 메타데이터를 결합하여 더 풍부한 벡터 정보를 생성합니다.")
        validation_results["success"] += 1
    else:
        print(f"   ⚠️ 리뷰 데이터만 벡터화합니다. 메타데이터 정보가 포함되지 않습니다.")
        validation_results["warning"] += 1
    
    # ID 컬럼 설정 검증
    print(f"\n🔑 ID 컬럼 설정:")
    print(f"   - 리뷰 데이터 ID 컬럼: {REVIEW_ID_COLUMN}")
    print(f"   - 메타데이터 ID 컬럼: {METADATA_ID_COLUMN}")
    print(f"   ⚠️ ID 컬럼은 실제 파일에서 다시 확인됩니다.")
    validation_results["warning"] += 1
    
    # CUDA 가용성 검사
    print(f"\n🖥️ 하드웨어 가속 상태:")
    print(f"   - 선택된 장치: {device}")
    if device == "cuda":
        print(f"   ✅ CUDA 사용 가능! GPU 가속으로 처리됩니다.")
        # CUDA 상세 정보
        cuda_available = torch.cuda.is_available()
        device_count = torch.cuda.device_count() if cuda_available else 0
        current_device = torch.cuda.current_device() if cuda_available else -1
        device_name = torch.cuda.get_device_name(current_device) if cuda_available and device_count > 0 else "N/A"
        
        print(f"   - CUDA 버전: {torch.version.cuda if hasattr(torch.version, 'cuda') else 'N/A'}")
        print(f"   - GPU 개수: {device_count}")
        print(f"   - 현재 GPU: {device_name}")
        
        # 메모리 정보 출력 시도
        try:
            if cuda_available and device_count > 0:
                free_mem, total_mem = torch.cuda.mem_get_info(current_device)
                free_mem_gb = free_mem / (1024**3)
                total_mem_gb = total_mem / (1024**3)
                print(f"   - GPU 메모리: {free_mem_gb:.2f}GB 가용 / {total_mem_gb:.2f}GB 전체")
        except:
            print(f"   - GPU 메모리 정보: 확인 불가")
        
        validation_results["success"] += 1
    else:
        print(f"   ⚠️ CPU만 사용 가능합니다. 처리 속도가 느릴 수 있습니다.")
        validation_results["warning"] += 1
    
    # 총평
    print("\n" + "="*70)
    print(f"📊 환경 설정 검증 결과:")
    print(f"   ✅ 성공: {validation_results['success']}개")
    print(f"   ⚠️ 경고: {validation_results['warning']}개")
    print(f"   ❌ 오류: {validation_results['error']}개")
    
    if validation_results["error"] > 0:
        print("\n⛔ 심각한 문제가 발견되었습니다. 위 오류를 해결한 후 다시 시도하세요.")
    elif validation_results["warning"] > 0:
        print("\n⚠️ 일부 경고가 있지만 실행은 가능합니다. 위 경고를 검토하세요.")
    else:
        print("\n✅ 모든 설정이 유효합니다. 실행 준비가 완료되었습니다.")
    
    print("="*70 + "\n")
    
    return validation_results["error"] == 0  # 오류가 없으면 True 반환

# 검증 함수 실행
is_valid = validate_environment_settings()
if is_valid:
    print("환경 설정이 유효합니다. 다음 단계로 진행할 수 있습니다.")
else:
    print("환경 설정에 문제가 있습니다. 위의 오류를 수정한 후 다시 시도하세요.")

## 3. 메타데이터 로드 및 처리 함수

## 4. CSV 리뷰 데이터 로드 및 전처리

In [None]:
import os
import time
import torch
import numpy as np
import pandas as pd
from pathlib import Path
import json
import sys # sys import 추가
import subprocess
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed # as_completed 추가
import ast # 문자열 리스트 파싱을 위해 추가

# tqdm 설정 (콘솔 또는 노트북 환경 자동 감지 시도)
try:
    # 'get_ipython'에 대한 정의 오류 방지
    ipython_shell = get_ipython()
    shell = ipython_shell.__class__.__name__
    if shell == 'ZMQInteractiveShell':
        from tqdm.notebook import tqdm as tqdm_notebook # Jupyter 환경
    else:
        from tqdm import tqdm as tqdm_notebook # 다른 IPython 환경 (예: 터미널)
except NameError:
    from tqdm import tqdm as tqdm_notebook # IPython이 아닌 환경

# 기존 임포트 유지
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter

# --- 설정 변수 ---
CHROMA_PERSIST_DIR = "./chroma_db_integrated4" # 디렉토리 이름 변경 가능
REVIEW_CSV_PATH = "bgg-26m-reviews.csv"
METADATA_CSV_PATH = "boardgames_combined.csv"
MIN_REVIEW_LENGTH = 20
BATCH_SIZE = 500 # add_documents_to_vectorstore 에서 사용할 배치 크기
MODEL_NAME = "intfloat/multilingual-e5-small"
ENRICH_TEXT = True
MAX_CHUNK_SIZE = 512
SAMPLE_SIZE = None # None이면 전체 데이터 처리
CHECKPOINT_DIR = "./checkpoints4" # 체크포인트 저장 디렉토리

REVIEW_ID_COLUMN = "ID" # 리뷰 파일의 게임 ID 컬럼명
METADATA_ID_COLUMN = "Game_Id" # 메타데이터 파일의 게임 ID 컬럼명

IMPORTANT_META_FIELDS = [
    'Game_Id', 'Title', 'Description', 'description_detail', 'AvgRating',
    'minplayers', 'maxplayers', 'playingtime', 'minage',
    'category_bert', 'CategoryType', 'boardgamemechanic',
    'averageweight'
]

# --- E5 모델용 포맷 함수 ---
def format_query(user_query):
    """E5 모델용 쿼리 포맷팅"""
    return f"query: {user_query.strip()}"

def format_passage(doc_text):
    """E5 모델용 문서(Passage) 포맷팅"""
    return f"passage: {doc_text.strip()}"


# --- 1. GPU 진단 및 설정 함수 ---
def check_gpu_status():
    """GPU 상태를 자세히 진단하고 문제점을 보고합니다."""
    print("\n=== GPU 진단 시작 ===")
    cuda_available = torch.cuda.is_available()
    print(f"CUDA 사용 가능: {cuda_available}")

    if cuda_available:
        print(f"CUDA 버전: {torch.version.cuda}")
        print(f"GPU 개수: {torch.cuda.device_count()}")
        for i in range(torch.cuda.device_count()):
            print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
            try:
                free_mem, total_mem = torch.cuda.mem_get_info(i)
                free_mem_gb = free_mem / (1024**3)
                total_mem_gb = total_mem / (1024**3)
                print(f"  GPU {i} 메모리: {free_mem_gb:.2f}GB 가용 / {total_mem_gb:.2f}GB 전체")
            except Exception as e:
                print(f"  GPU {i} 메모리 정보를 가져올 수 없습니다: {e}")
    else:
        print("CUDA를 사용할 수 없습니다.")
        # (추가적인 원인 분석 로직은 필요 시 원래 코드에서 가져올 수 있음)
    print("=== GPU 진단 완료 ===")
    return cuda_available

# --- 2. 프로그레스 바 설정 함수 ---
def setup_progress_bar():
    """환경에 맞는 tqdm 진행 표시줄을 반환합니다."""
    print("\n=== 프로그레스 바 설정 시작 ===")
    progress_bar = tqdm_notebook
    try:
        ipython_shell = get_ipython()
        shell = get_ipython().__class__.__name__
        if shell == 'ZMQInteractiveShell':
            print("Jupyter Notebook 환경 감지. tqdm.notebook 사용.")
            from tqdm.notebook import tqdm as progress_bar
            # 간단한 테스트
            try:
                for _ in progress_bar(range(2), desc="프로그레스 바 테스트", leave=False):
                    time.sleep(0.01)
                print("✅ 노트북 프로그레스 바 작동 확인.")
            except Exception as e:
                 print(f"⚠️ 노트북 프로그레스 바 테스트 중 오류: {e}. 표준 tqdm 사용.")
                 from tqdm import tqdm as progress_bar
            return progress_bar
        else: # 다른 IPython 환경 (터미널 등)
             print("IPython 터미널 환경 감지. 표준 tqdm 사용.")
             from tqdm import tqdm as progress_bar
    except NameError:
        print("표준 Python 환경 감지. 표준 tqdm 사용.")
        from tqdm import tqdm as progress_bar
    finally:
        print("=== 프로그레스 바 설정 완료 ===")
        return progress_bar # 최종 결정된 progress_bar 반환
        
# --- 3. 체크포인트 관리 시스템 ---
class CheckpointManager:
    def __init__(self, pipeline_id="default"):
        self.pipeline_id = pipeline_id
        os.makedirs(CHECKPOINT_DIR, exist_ok=True)
        self.checkpoint_file = os.path.join(CHECKPOINT_DIR, f"{pipeline_id}_checkpoint.json")
        self.state = self._load_state()
        # Load 시 set으로 변환되었는지 확인
        if isinstance(self.state.get("processed_ids", {}).get("review_ids"), list) or \
           isinstance(self.state.get("processed_ids", {}).get("document_ids"), list):
             self._convert_ids_to_set()

    def _convert_ids_to_set(self):
        """로드된 ID 리스트를 집합(set)으로 변환"""
        if "processed_ids" in self.state:
            for id_type in self.state["processed_ids"]:
                 if isinstance(self.state["processed_ids"][id_type], list):
                      # print(f"Converting processed_ids['{id_type}'] to set...") # 디버깅용
                      self.state["processed_ids"][id_type] = set(self.state["processed_ids"][id_type])

    def _load_state(self):
        """기존 체크포인트 상태 로드"""
        if os.path.exists(self.checkpoint_file):
            try:
                with open(self.checkpoint_file, 'r', encoding='utf-8') as f:
                    loaded_state = json.load(f)
                # 로드 후 바로 set으로 변환
                if "processed_ids" in loaded_state:
                    for id_type in loaded_state["processed_ids"]:
                        if isinstance(loaded_state["processed_ids"][id_type], list):
                             loaded_state["processed_ids"][id_type] = set(loaded_state["processed_ids"][id_type])
                return loaded_state
            except json.JSONDecodeError as e:
                print(f"⚠️ 체크포인트 파일 JSON 디코딩 오류 ({self.checkpoint_file}): {e}")
                print("   -> 파일을 백업하고 새로 시작합니다.")
                self._backup_corrupted_checkpoint()
                return self._init_state()
        
            except Exception as e:
                print(f"⚠️ 체크포인트 파일 로드 중 예상치 못한 오류 ({self.checkpoint_file}): {str(e)}")
                self._backup_corrupted_checkpoint()
                return self._init_state()
                
        else:
            return self._init_state()

    def _backup_corrupted_checkpoint(self):
        """손상된 체크포인트 파일을 백업"""
        if os.path.exists(self.checkpoint_file):
            backup_file = f"{self.checkpoint_file}.corrupted_{int(time.time())}.bak"
            try : 
                os.rename(self.checkpoint_file, backup_file)
                print(f"   손상된 파일 백업됨: {backup_file}")
            except Exception as backup_e:
                print(f"   ⚠️ 손상된 파일 백업 실패: {backup_e}")
            

    def _init_state(self):
        """새 체크포인트 상태 초기화"""
        print(f"새 체크포인트 상태 초기화: {self.pipeline_id}")
        return {
            "pipeline_id": self.pipeline_id,
            "start_time": time.time(),
            "last_update": time.time(),
            "stages": {
                # 파이프라인 단계에 맞춰 수정
                "metadata_loaded": False, "reviews_processed": False, "data_enriched": False,
                "documents_created": False, "documents_split": False, "vectorstore_setup": False,
                "passage_formatting_applied": False, # E5 포맷팅 단계 추가
                "documents_added": False, "vectorstore_tested": False
            },
            "counters": {
                "metadata_count": 0, "reviews_processed": 0, "enriched_count": 0,
                "documents_created": 0, "documents_split": 0, "documents_added": 0,
                "last_added_batch_index": -1
            },
            "processed_ids": {
                "review_ids": set(), # 리뷰 처리 중복 방지용
                "document_ids": set() # 벡터 저장소 추가 중복 방지용 (Chroma ID 기준)
            },
            # 추가 정보 저장 가능 (예: 로드된 파일 경로)
            "source_files": {
                "metadata_file": None,
                "review_file": None
            }
        }

    def save(self):
        """현재 상태를 파일에 저장 (set을 list로 변환)"""
        save_state = self.state.copy()
        # JSON 저장을 위해 set을 list로 변환
        save_state["processed_ids"] = {
            key: list(val) for key, val in self.state["processed_ids"].items()
        }
        save_state["last_update"] = time.time() # 저장 시점 업데이트

        try:
            # 임시 파일에 먼저 저장
            temp_file = self.checkpoint_file + ".tmp"
            with open(temp_file, 'w', encoding='utf-8') as f:
                json.dump(save_state, f, indent=4, ensure_ascii=False)
            # 저장 성공 시 원본 파일로 변경 (원자적 연산 시도)
            os.replace(temp_file, self.checkpoint_file)
        except Exception as e:
            print(f"❌ 체크포인트 저장 오류 ({self.checkpoint_file}): {e}")
            # 임시 파일 삭제 시도
            if os.path.exists(temp_file):
                 try: os.remove(temp_file)
                 except: pass

    def update_stage(self, stage, status=True):
        """단계 완료 상태 업데이트"""
        if stage in self.state["stages"]:
            # 상태가 변경되었을 때만 저장
            if self.state["stages"][stage] != status:
                 self.state["stages"][stage] = status
                 self.save()
        else:
             print(f"⚠️ 경고: 존재하지 않는 체크포인트 단계 업데이트 시도 - {stage}")

    def update_counter(self, counter, value):
        """카운터 값 업데이트"""
        if counter in self.state["counters"]:
            # 값이 변경되었을 때만 저장
            if self.state["counters"][counter] != value:
                 self.state["counters"][counter] = value
                 self.save()
        else:
             print(f"⚠️ 경고: 존재하지 않는 체크포인트 카운터 업데이트 시도 - {counter}")

    # def increment_counter(self, counter, increment=1):
    #     """카운터 증가"""
    #     if counter in self.state["counters"]:
    #         self.state["counters"][counter] += increment
    #         # 증가는 빈번할 수 있으므로 매번 save하지 않고,
    #         # 단계 완료 시 또는 주기적으로 save하는 것이 효율적일 수 있음
    #         # 여기서는 일단 저장
    #         self.save()

    def is_stage_completed(self, stage):
        """단계가 완료되었는지 확인"""
        return self.state["stages"].get(stage, False)
        
    def add_processed_ids(self, id_type, ids):
        """처리된 ID 추가 (set 업데이트 후 저장)"""
        if id_type in self.state["processed_ids"]:
            if not isinstance(ids, (list, set, tuple)): # tuple 추가
                ids = [ids]
            # None 값 제거 및 문자열 변환
            str_ids = {str(id_val) for id_val in ids if id_val is not None}
            if not str_ids: return # 추가할 유효한 ID가 없으면 반환

            initial_len = len(self.state["processed_ids"][id_type])
            self.state["processed_ids"][id_type].update(str_ids)
            # 실제 ID가 추가되었을 때만 저장
            if len(self.state["processed_ids"][id_type]) > initial_len:
                self.save()
        else:
            print(f"⚠️ 경고: 존재하지 않는 ID 타입에 ID 추가 시도 - {id_type}")

    def is_id_processed(self, id_type, id_value):
        """ID가 이미 처리되었는지 확인"""
        if id_type in self.state["processed_ids"] and id_value is not None:
            return str(id_value) in self.state["processed_ids"][id_type]
        return False

    def get_processed_ids_count(self, id_type):
        """처리된 ID 개수 반환"""
        return len(self.state.get("processed_ids", {}).get(id_type, set()))

    def get_resume_info(self):
        """재개 정보 얻기"""
        completed_stages = [s for s, status in self.state.get("stages", {}).items() if status]
        import datetime
        last_update_ts = self.state.get("last_update", 0)
        last_update_str = "N/A"
        if last_update_ts:
            try:
                last_update_str = datetime.datetime.fromtimestamp(last_update_ts).strftime('%Y-%m-%d %H:%M:%S')
            except ValueError: # 유효하지 않은 타임스탬프 처리
                pass

        counters = self.state.get("counters", {})
        last_added_batch_idx = counters.get("last_added_batch_index", -1)

        return {
            "can_resume": len(completed_stages) > 0 or last_added_batch_idx >= 0,
            "completed_stages": completed_stages,
            "counters": counters,
            "processed_counts": {
                "reviews": self.get_processed_ids_count("review_ids"),
                "documents": self.get_processed_ids_count("document_ids"),
            },
            "last_update": last_update_str,
            "last_added_batch_index": last_added_batch_idx
        }

    def reset(self):
        """체크포인트 상태 초기화"""
        print(f"🔄 체크포인트 초기화 중: {self.checkpoint_file}")
        self.state = self._init_state()
        self.save()
        print("✅ 체크포인트가 초기화되었습니다.")

    def update_source_file(self, file_type, file_path):
        """소스 파일 경로 저장"""
        if file_type in self.state.get("source_files", {}):
             if self.state["source_files"][file_type] != file_path:
                  self.state["source_files"][file_type] = file_path
                  self.save()


# --- 4. 병렬 처리 관리자 ---
class ParallelProcessor:
    def __init__(self, use_processes=False, max_workers=None):
        self.use_processes = use_processes
        cpu_count = multiprocessing.cpu_count()
        self.max_workers = max_workers if max_workers is not None else max(1, cpu_count - 1)
        print(f"병렬 처리 설정: {'프로세스' if use_processes else '스레드'} 모드, {self.max_workers}개 워커")
        self.executor_class = ProcessPoolExecutor if self.use_processes else ThreadPoolExecutor

    def process_batch(self, func, items, desc="병렬 처리 중", chunksize=1, **kwargs):
        """항목 배치를 병렬로 처리 (개선된 버전)"""
        if not items: return []

        if len(items) < self.max_workers * 2:
             print(f"ℹ️ 항목 수가 적어 ({len(items)}개) 직렬 처리합니다.")
             results = []
             for item in tqdm_notebook(items, desc=f"{desc} (직렬)"):
                  try:
                      results.append(func(item, **kwargs) if kwargs else func(item))
                  except Exception as e:
                      # 직렬 처리 오류 시 인덱스 정보 추가
                      item_info = f"항목 {items.index(item)}" if isinstance(items, list) else "항목"
                      print(f"⚠️ {item_info} 처리 중 오류 발생 (직렬): {e}")
                      results.append(None)
             return results

        results = [None] * len(items)
        futures_map = {} # Future -> original index 매핑

        with self.executor_class(max_workers=self.max_workers) as executor:
            # submit 사용 시 인덱스와 함께 전달
            for i, item in enumerate(items):
                 future = executor.submit(func, item, **kwargs) if kwargs else executor.submit(func, item)
                 futures_map[future] = i

            with tqdm_notebook(total=len(items), desc=desc) as pbar:
                for future in as_completed(futures_map):
                    original_index = futures_map[future]
                    try:
                        result = future.result()
                        results[original_index] = result
                    except Exception as e:
                        print(f"⚠️ 병렬 작업 오류 (원본 인덱스 {original_index}): {e}")
                        # 결과는 None으로 유지됨
                    finally:
                         pbar.update(1)

        error_count = sum(1 for r in results if r is None) # None 결과 개수 확인
        if error_count > 0:
            print(f"⚠️ 병렬 처리 중 {error_count}/{len(items)}개 항목에서 오류가 발생했을 수 있습니다.")

        return results


# --- 기존 함수 정의 (필요에 따라 내부 수정) ---

# load_game_metadata (체크포인트 로직 추가)
def load_game_metadata(metadata_file=METADATA_CSV_PATH, checkpoint_mgr=None, progress_bar=tqdm_notebook):
    """보드게임 메타데이터 로드 (체크포인트 지원 시도)"""
    print("\n" + "="*50)
    print("📚 1단계: 메타데이터 로드")
    print("="*50)
    stage_name = "metadata_loaded"
    reload_needed = True # 기본적으로 리로드 필요

    # --- 체크포인트 확인 ---
    # 메타데이터는 일반적으로 다시 로드하는 것이 안전하므로,
    # 여기서는 완료 여부만 확인하고 실제 로드는 항상 수행하도록 할 수 있음.
    # 또는, 로드된 파일 경로가 같은지 확인하는 로직 추가 가능.
    if checkpoint_mgr:
        if checkpoint_mgr.is_stage_completed(stage_name):
             loaded_file = checkpoint_mgr.state["source_files"].get("metadata_file")
             if loaded_file == metadata_file:
                 print(f"✅ 메타데이터 '{metadata_file}'는 이전 실행에서 이미 로드되었습니다.")
                 print(f"   저장된 카운트: {checkpoint_mgr.state['counters']['metadata_count']}")
                 # 실제 데이터를 로드하는 기능은 없으므로, 재로드하거나 여기서 None 반환
                 # 여기서는 재로드 진행
                 print("   (체크포인트 확인했지만, 데이터 재로드를 진행합니다)")
             else:
                  print(f"⚠️ 이전 로드 파일({loaded_file})과 현재 파일({metadata_file})이 다릅니다. 재로드합니다.")

    start_time = time.time()

    if not os.path.exists(metadata_file):
        print(f"❌ 파일 없음: {metadata_file}"); return None, None
    # ... (파일 정보 출력, 인코딩 시도, ID 컬럼 찾기, 변환, 중복 제거 로직은 이전과 동일) ...
    file_size = os.path.getsize(metadata_file) / (1024 * 1024)
    print(f"📁 파일: {metadata_file} ({file_size:.2f} MB)")
    metadata_df = None
    encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1']
    for encoding in progress_bar(encodings, desc="메타 인코딩 테스트"):
         try:
             metadata_df = pd.read_csv(metadata_file, encoding=encoding)
             print(f"  ✅ {encoding} 로드 성공!"); break
         except: continue
    if metadata_df is None: print("❌ 메타 로드 실패."); return None, None

    id_column = METADATA_ID_COLUMN if METADATA_ID_COLUMN in metadata_df.columns else None
    if not id_column:
        id_candidates = ['id', 'ID', 'game_id', 'gameid', 'item_id']
        for candidate in id_candidates:
             matches = [c for c in metadata_df.columns if c.lower() == candidate.lower()]
             if matches: id_column = matches[0]; break
    if not id_column: print("⚠️ ID 컬럼 못찾음. 인덱스 사용."); metadata_df['generated_id'] = metadata_df.index.astype(str); id_column = 'generated_id'
    else: print(f"✅ ID 컬럼: '{id_column}'")

    metadata_df[id_column] = metadata_df[id_column].astype(str).str.replace(r'\.0$', '', regex=True)
    dups = metadata_df.duplicated(subset=[id_column]).sum()
    if dups > 0: print(f"   ⚠️ 중복 ID {dups}개 발견. 첫 값 유지."); metadata_df = metadata_df.drop_duplicates(subset=[id_column], keep='first')

    print("🔄 사전 생성 중...")
    metadata_dict = metadata_df.set_index(id_column).to_dict('index')
    metadata_count = len(metadata_dict)

    if checkpoint_mgr:
        checkpoint_mgr.update_counter("metadata_count", metadata_count)
        checkpoint_mgr.update_stage(stage_name, True)
        print(f"✅ 체크포인트 업데이트: {stage_name}=True, count={metadata_count}")

    total_time = time.time() - start_time
    print(f"✅ 로드 완료: {metadata_count:,}개 게임 ({total_time:.2f}초)")
    print("="*50 + "\n")
    return metadata_dict, id_column

# preprocess_reviews (체크포인트 로직 추가)
def preprocess_reviews(csv_path=REVIEW_CSV_PATH, checkpoint_mgr=None, progress_bar=tqdm_notebook):
    """CSV 리뷰 데이터 로드 및 전처리 (체크포인트 지원 시도)"""
    print("\n" + "="*70)
    print("📝 2단계: 리뷰 데이터 처리")
    print("="*70)
    stage_name = "reviews_processed"

    # --- 체크포인트 확인 ---
    if checkpoint_mgr:
         if checkpoint_mgr.is_stage_completed(stage_name):
             loaded_file = checkpoint_mgr.state["source_files"].get("review_file")
             if loaded_file == csv_path:
                  processed_count = checkpoint_mgr.state["counters"]["reviews_processed"]
                  saved_id_count = checkpoint_mgr.get_processed_ids_count("review_ids")
                  print(f"✅ 리뷰 파일 '{csv_path}'는 이전 실행에서 이미 처리되었습니다.")
                  print(f"   저장된 처리 카운트: {processed_count:,}, 저장된 ID 수: {saved_id_count:,}")
                  # 여기서 처리를 건너뛰려면, 이전 결과를 로드하는 매커니즘 필요.
                  # 여기서는 ID 기반 중복 처리를 강화하는 방향으로 진행.
                  print("   (체크포인트 확인. ID 기반 중복 처리를 사용하여 진행합니다)")
             else:
                  print(f"⚠️ 이전 처리 파일({loaded_file})과 현재 파일({csv_path})이 다릅니다. 새로 처리합니다.")
                  # 이전 ID 정보가 유효하지 않으므로 초기화 필요 시 고려
                  # checkpoint_mgr.state["processed_ids"]["review_ids"] = set()
                  # checkpoint_mgr.save()

    start_time = time.time()

    if not os.path.exists(csv_path):
        print(f"❌ CSV 파일을 찾을 수 없습니다: {csv_path}")
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        return None, None, None

    print(f"📂 리뷰 데이터 파일: {csv_path}")
    # (파일 크기 확인 등 유지)

    df = None
    try:
        # (인코딩 시도 로직 유지, SAMPLE_SIZE 적용 유지)
        encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1']
        encoding_used = None
        for encoding in progress_bar(encodings, desc="리뷰 파일 인코딩 테스트"):
            try:
                 print(f"   🔍 {encoding} 인코딩으로 시도...")
                 # 샘플 로드로 인코딩 확인
                 pd.read_csv(csv_path, nrows=5, encoding=encoding)
                 # 로드
                 read_kwargs = {'encoding': encoding, 'on_bad_lines': 'warn'}
                 if SAMPLE_SIZE:
                      print(f"\n⚠️ 테스트 모드: 처음 {SAMPLE_SIZE:,}행만 읽습니다.")
                      read_kwargs['nrows'] = SAMPLE_SIZE
                 else:
                      print(f"\n📂 전체 데이터 로드 시도...")
                 try:
                     df = pd.read_csv(csv_path, **read_kwargs)
                 except pd.errors.ParserError as pe:
                     print(f"   ⚠️ 파싱 오류 ({encoding}): {pe}. 오류 라인 건너뛰고 재시도.")
                     read_kwargs['on_bad_lines'] = 'skip'
                     df = pd.read_csv(csv_path, **read_kwargs)

                 print(f"   ✅ {encoding} 로드 성공.")
                 encoding_used = encoding
                 break
            except UnicodeDecodeError:
                 print(f"   ❌ {encoding} 인코딩 실패")
                 continue
            except Exception as e:
                 print(f"   ❌ 로드 오류 ({encoding}): {e}")
                 continue

        if df is None:
            print("❌ CSV 파일 로드 실패.")
            if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
            return None, None, None

        print(f"\n✅ CSV 로드 성공: {len(df):,}행")

        # (리뷰 컬럼 찾기, ID 컬럼 찾기 및 변환 로직 유지)
        review_column_candidates = ['comment', 'review', 'text', 'content', 'review_text']
        review_column = None
        for col in review_column_candidates:
            col_matches = [c for c in df.columns if c.lower() == col.lower()]
            if col_matches:
                review_column = col_matches[0]; break
        if not review_column: print("❌ 리뷰 컬럼 찾기 실패."); return None, None, None
        print(f"✅ 리뷰 컬럼: '{review_column}'")

        id_column = None
        id_candidates = [REVIEW_ID_COLUMN, 'id', 'ID', 'game_id', 'gameid', 'item_id', 'bgg_id', 'gid']
        id_candidates = [c for c in id_candidates if c] # None 제거
        for candidate in id_candidates:
             col_matches = [c for c in df.columns if c.lower() == candidate.lower()]
             if col_matches:
                 id_column = col_matches[0]; break
        if not id_column: print("⚠️ 리뷰 ID 컬럼 찾기 실패. 메타데이터 연결 불가.")
        else:
             print(f"✅ 리뷰 ID 컬럼: '{id_column}'")
             print(f"\n🔄 ID 컬럼 '{id_column}' 문자열 변환 및 .0 제거...")
             df[id_column] = df[id_column].astype(str).str.replace(r'\.0$', '', regex=True)

        # --- 데이터 전처리 (체크포인트 ID 확인 로직 추가) ---
        print("\n🔄 리뷰 데이터 전처리 중...")
        initial_count = len(df)
        processed_ids_in_this_run = set()
        processed_rows = []

        # CheckpointManager에서 이미 처리된 ID 로드
        previously_processed_ids = set()
        if checkpoint_mgr and id_column:
            previously_processed_ids = checkpoint_mgr.state["processed_ids"].get("review_ids", set())
            print(f"   ℹ️ 체크포인트에서 로드된 이전 처리 리뷰 ID: {len(previously_processed_ids):,}개")

        for index, row in progress_bar(df.iterrows(), total=initial_count, desc="리뷰 전처리 및 필터링"):
             # 1. ID 기반 건너뛰기 (ID 컬럼이 있고, 체크포인트 사용 시)
             current_id = None
             if id_column:
                 current_id = row[id_column]
                 if checkpoint_mgr and checkpoint_mgr.is_id_processed("review_ids", current_id):
                     continue # 이미 처리된 ID면 건너뛰기

             # 2. Null 또는 빈 리뷰 제거
             review_text = row[review_column]
             if pd.isna(review_text): continue
             review_text_str = str(review_text).strip()
             if not review_text_str: continue

             # 3. 짧은 리뷰 제거
             if len(review_text_str) < MIN_REVIEW_LENGTH: continue

             # 모든 필터 통과 시 결과 리스트에 추가
             processed_rows.append(row.to_dict()) # 딕셔너리로 변환하여 추가

             # 이번 실행에서 처리된 ID 기록 (나중에 중복 제거 및 체크포인트 업데이트용)
             if current_id is not None:
                 processed_ids_in_this_run.add(current_id)

        # 처리된 행들로 새 DataFrame 생성
        if not processed_rows:
             print("⚠️ 전처리 후 유효한 리뷰 데이터가 없습니다.")
             if checkpoint_mgr:
                  # 처리는 했지만 결과가 없으므로 완료 상태는 아님
                  checkpoint_mgr.update_stage(stage_name, False)
                  checkpoint_mgr.update_counter("reviews_processed", 0)
             return None, review_column, id_column

        processed_df = pd.DataFrame(processed_rows)
        print(f"✅ 기본 필터링 완료: {len(processed_df):,}행 남음")

        # 4. 내용 기반 중복 제거 (처리된 결과에 대해 수행)
        before_dedup = len(processed_df)
        processed_df = processed_df.drop_duplicates(subset=[review_column], keep='first')
        print(f"✅ 내용 기반 중복 리뷰 제거: {before_dedup - len(processed_df):,}개 제거됨")

        final_count = len(processed_df)
        print(f"\n📊 전처리 후 최종 데이터 크기: {final_count:,}행")

        # --- 체크포인트 업데이트 ---
        if checkpoint_mgr:
            # 새로 처리된 ID들을 체크포인트에 추가
            if id_column and processed_ids_in_this_run:
                 newly_added_ids = processed_ids_in_this_run - previously_processed_ids
                 if newly_added_ids:
                     checkpoint_mgr.add_processed_ids("review_ids", list(newly_added_ids))
                     print(f"   💾 체크포인트에 새로 처리된 리뷰 ID {len(newly_added_ids):,}개 추가됨.")

            # 카운터는 이번 실행에서 *최종적으로 남은* 행의 수로 업데이트
            # 또는 전체 누적 카운트로 관리할 수도 있음 (여기선 최종 카운트 사용)
            checkpoint_mgr.update_counter("reviews_processed", final_count)
            checkpoint_mgr.update_source_file("review_file", csv_path)
            # 최종 결과가 있을 때만 완료로 표시
            checkpoint_mgr.update_stage(stage_name, final_count > 0)
            print(f"✅ 체크포인트 업데이트: {stage_name}={'True' if final_count > 0 else 'False'}, count={final_count}")

        total_time = time.time() - start_time
        print("\n" + "="*70)
        print("📊 리뷰 데이터 전처리 요약:")
        print(f"✅ 최종 행 수: {final_count:,}")
        print(f"✅ 리뷰 컬럼: '{review_column}'")
        print(f"✅ ID 컬럼: '{id_column if id_column else '없음'}'")
        print(f"⏱️ 총 소요 시간: {total_time:.2f}초")
        print("="*70)

        return processed_df, review_column, id_column

    except Exception as e:
        print(f"\n❌ 리뷰 처리 중 오류: {e}")
        import traceback
        traceback.print_exc()
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        print("="*70)
        return None, None, None


# enrich_review_data_fixed (병렬 처리 적용)
def enrich_review_data_fixed(review_df, metadata_dict, meta_id_column, review_id_column,
                             parallel_mgr=None, checkpoint_mgr=None, progress_bar=tqdm_notebook):
    """리뷰 데이터에 게임 메타데이터 추가 (병렬 처리 및 체크포인트 시도)"""
    print("\n" + "="*70)
    print("🔄 3단계: 데이터 통합 (Enrichment)")
    print("="*70)
    stage_name = "data_enriched"

    # --- 체크포인트 확인 ---
    if checkpoint_mgr and checkpoint_mgr.is_stage_completed(stage_name):
         enriched_count = checkpoint_mgr.state["counters"]["enriched_count"]
         print(f"✅ 데이터 통합 단계는 이전에 완료되었습니다 (처리된 행: {enriched_count:,}).")
         # 여기서 건너뛰려면 이전 결과 로드 필요. 일단 진행.
         print("   (체크포인트 확인. 데이터 통합을 다시 진행합니다)")

    start_time = time.time()

    if metadata_dict is None or review_df is None or review_df.empty:
        print("❌ 메타데이터 또는 리뷰 데이터가 없어 통합 불가.")
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        return review_df if review_df is not None else pd.DataFrame()

    if not review_id_column or review_id_column not in review_df.columns:
        print(f"⚠️ 리뷰 ID 컬럼 '{review_id_column}' 없어 메타데이터 통합 불가.")
        # ID 없으면 통합 불가, 원본 반환
        # 체크포인트는 완료되지 않음
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        return review_df

    # 사용할 필드
    fields_to_enrich = IMPORTANT_META_FIELDS
    print(f"   - 통합할 메타 필드: {fields_to_enrich}")

    # 데이터 통합을 위한 헬퍼 함수 (단일 행 처리)
    def enrich_single_row(row_tuple):
        index, row_series = row_tuple # 입력은 (인덱스, 시리즈) 형태
        enriched_row = row_series.to_dict() # 딕셔너리로 변환
        game_id = str(enriched_row.get(review_id_column, '')) # ID 추출 및 문자열 변환

        # game_ 필드 초기화
        for field in fields_to_enrich:
            enriched_row[f'game_{field}'] = None

        # 메타데이터 매칭 및 필드 복사
        if game_id in metadata_dict:
            game_meta = metadata_dict[game_id]
            for field in fields_to_enrich:
                if field in game_meta and pd.notna(game_meta[field]):
                    enriched_row[f'game_{field}'] = game_meta[field]
        # else: # 매칭 안되면 None 유지됨

        return enriched_row # 처리된 딕셔너리 반환


    # 처리할 아이템 리스트 준비 (인덱스 포함 튜플)
    items_to_process = list(review_df.iterrows())

    enriched_results = None
    if parallel_mgr:
        print(f"\n🔄 병렬 처리기로 메타데이터 통합 시작 ({len(items_to_process):,}개 행)...")
        # process_batch 호출
        enriched_results = parallel_mgr.process_batch(
            enrich_single_row,
            items_to_process,
            desc="메타데이터 통합 (병렬)"
        )
    else:
        print(f"\n🔄 직렬 처리로 메타데이터 통합 시작 ({len(items_to_process):,}개 행)...")
        enriched_results = []
        for item in progress_bar(items_to_process, desc="메타데이터 통합 (직렬)"):
            try:
                 enriched_results.append(enrich_single_row(item))
            except Exception as e:
                 print(f"⚠️ 행 처리 중 오류: {e}")
                 enriched_results.append(None) # 오류 시 None 추가


    # 결과 처리 (오류가 발생한 항목은 None일 수 있음)
    valid_results = [res for res in enriched_results if res is not None]

    if not valid_results:
        print("❌ 데이터 통합 후 유효한 결과가 없습니다.")
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        return pd.DataFrame() # 빈 데이터프레임 반환

    enriched_df = pd.DataFrame(valid_results)
    final_count = len(enriched_df)

    # 매칭률 계산 (간단히 game_Title 필드가 채워진 경우로 추정)
    matched_count = enriched_df[f'game_{fields_to_enrich[1]}'].notna().sum() # Title 기준
    match_pct = matched_count / final_count * 100 if final_count > 0 else 0
    print(f"\n✅ 메타데이터 통합 완료: {final_count:,}개 행 처리됨")
    print(f"   - 메타데이터 매칭 추정: {matched_count:,} ({match_pct:.1f}%)")


    # --- 체크포인트 업데이트 ---
    if checkpoint_mgr:
        checkpoint_mgr.update_counter("enriched_count", final_count)
        checkpoint_mgr.update_stage(stage_name, final_count > 0)
        print(f"✅ 체크포인트 업데이트: {stage_name}={'True' if final_count > 0 else 'False'}, count={final_count}")

    total_time = time.time() - start_time
    print(f"⏱️ 총 처리 시간: {total_time:.2f}초")
    print("="*70)
    return enriched_df


# create_documents (병렬 처리 적용)
def create_documents(df, review_column, enrich_text=True,
                     parallel_mgr=None, checkpoint_mgr=None, progress_bar=tqdm_notebook):
    """리뷰와 메타데이터를 결합하여 Document 객체 생성 (병렬 처리)"""
    print("\n" + "="*70)
    print(f"📄 4단계: Document 객체 생성 (enrich={enrich_text})")
    print("="*70)
    stage_name = "documents_created"

    # --- 체크포인트 확인 ---
    if checkpoint_mgr and checkpoint_mgr.is_stage_completed(stage_name):
        doc_count = checkpoint_mgr.state["counters"]["documents_created"]
        print(f"✅ Document 생성 단계는 이전에 완료되었습니다 (생성된 문서: {doc_count:,}).")
        # 여기서 건너뛰려면 이전 결과 로드 필요. 일단 진행.
        print("   (체크포인트 확인. Document 생성을 다시 진행합니다)")

    start_time = time.time()

    if df is None or df.empty:
        print("❌ 입력 데이터가 없어 Document 생성 불가.")
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        return []

    # 필드 이름 매핑 (이전 코드와 동일)
    field_display_names = {
        'Game_Id': '게임 ID', 'Title': '게임', 'Name': '게임',
        'Description': '설명', 'description_detail': '상세 설명',
        'AvgRating': '평점', 'yearpublished': '출시 연도',
        'minplayers': '최소 플레이어', 'maxplayers': '최대 플레이어',
        'playingtime': '플레이 시간', 'minage': '최소 연령',
        'category_bert': 'BERT 카테고리', 'CategoryType': '카테고리 타입',
        'boardgamecategory': '장르', 'boardgamemechanic': '메커니즘',
        'averageweight': '복잡도', 'complexity': '복잡도', 'weight': '복잡도'
    }

    # Document 생성을 위한 헬퍼 함수 (단일 행 처리)
    def create_single_document(row_tuple):
        index, row_series = row_tuple
        row_dict = row_series.to_dict()

        # 1. 메타데이터 구성
        metadata = {}
        for col, value in row_dict.items():
            if col != review_column and pd.notna(value):
                # 타입 변환 시도
                if isinstance(value, (np.int64, np.int32)): metadata[col] = int(value)
                elif isinstance(value, (np.float64, np.float32)): metadata[col] = float(value)
                elif isinstance(value, np.bool_): metadata[col] = bool(value)
                else: metadata[col] = str(value) # 나머지는 문자열

        # 2. page_content 구성
        page_content = ""
        review_text = str(row_dict.get(review_column, ''))

        if not review_text.strip():
            return None # 리뷰 없으면 None 반환

        if enrich_text:
            combined_parts = []
            # IMPORTANT_META_FIELDS 순회
            for field_name in IMPORTANT_META_FIELDS:
                col_name_in_df = f"game_{field_name}"
                if col_name_in_df in row_dict and pd.notna(row_dict[col_name_in_df]):
                    value = row_dict[col_name_in_df]
                    display_name = field_display_names.get(field_name, field_name)
                    formatted_value = ""
                    # (값 포맷팅 로직은 이전과 동일하게 적용)
                    try:
                        if field_name in ['minplayers', 'maxplayers', 'playingtime', 'minage']:
                            formatted_value = f"{int(float(value))}"
                            if field_name in ['minplayers', 'maxplayers']: formatted_value += "명"
                            if field_name == 'playingtime': formatted_value += "분"
                            if field_name == 'minage': formatted_value += "세 이상"
                        elif field_name in ['averageweight', 'complexity', 'weight']:
                            formatted_value = f"{float(value):.2f}/5"
                        elif field_name == 'AvgRating':
                            formatted_value = f"{float(value):.2f}점"
                        elif field_name in ['Description', 'description_detail'] and len(str(value)) > 200:
                             formatted_value = f"{str(value)[:200]}..." # 설명 길이는 조금 줄임
                        elif isinstance(value, str) and value.strip().startswith('[') and value.strip().endswith(']'):
                             try:
                                 parsed_list = ast.literal_eval(value); formatted_value = ", ".join(map(str, parsed_list)) if isinstance(parsed_list, list) else str(value)
                             except: formatted_value = value.strip("[]").replace("'", "").replace('"', '')
                        elif field_name != 'Game_Id': # Game_Id는 보통 내용에 포함 안 함
                             formatted_value = str(value)

                        # 내용 추가 (Title은 레이블 없이, 설명/ID 제외)
                        if formatted_value and field_name not in ['Game_Id', 'Description', 'description_detail']:
                            if field_name == 'Title': combined_parts.insert(0, f"{display_name}: {formatted_value}")
                            else: combined_parts.append(f"{display_name}: {formatted_value}")

                    except Exception: pass # 포맷팅 오류 시 해당 필드 건너뜀

            # 설명 필드 추가
            desc_text = ""
            desc_col = next((c for c in [f'game_{f}' for f in ['Description', 'description', 'description_detail']] if c in row_dict and pd.notna(row_dict[c])), None)
            if desc_col:
                desc_val = str(row_dict[desc_col])
                desc_text = f"설명: {desc_val[:200]}..." if len(desc_val) > 200 else f"설명: {desc_val}"

            # 최종 page_content 결합
            content_elements = [item for item in [ "\n".join(combined_parts), desc_text, f"리뷰: {review_text}"] if item.strip()]
            page_content = "\n\n".join(content_elements)

        else: # enrich_text=False 이면 리뷰만 사용
            page_content = review_text

        # Document 객체 생성
        if page_content:
            return Document(page_content=page_content, metadata=metadata)
        else:
            return None

    # 처리할 아이템 리스트
    items_to_process = list(df.iterrows())
    document_results = None

    if parallel_mgr:
         print(f"\n🔄 병렬 처리기로 Document 생성 시작 ({len(items_to_process):,}개 행)...")
         document_results = parallel_mgr.process_batch(
             create_single_document,
             items_to_process,
             desc="Document 생성 (병렬)"
         )
    else:
         print(f"\n🔄 직렬 처리로 Document 생성 시작 ({len(items_to_process):,}개 행)...")
         document_results = []
         for item in progress_bar(items_to_process, desc="Document 생성 (직렬)"):
             try:
                  doc = create_single_document(item)
                  if doc: document_results.append(doc)
                  # else: # 리뷰 없거나 내용 생성 실패 시 건너뜀
             except Exception as e:
                  print(f"⚠️ 행 처리 중 오류: {e}")
                  # 오류 발생 행은 건너뜀

    # 결과 필터링 (None 제거)
    final_documents = [doc for doc in document_results if doc is not None]
    final_count = len(final_documents)

    print(f"\n✅ Document 생성 완료: {final_count:,}개 생성됨")

    # --- 체크포인트 업데이트 ---
    if checkpoint_mgr:
        checkpoint_mgr.update_counter("documents_created", final_count)
        checkpoint_mgr.update_stage(stage_name, final_count > 0)
        print(f"✅ 체크포인트 업데이트: {stage_name}={'True' if final_count > 0 else 'False'}, count={final_count}")

    total_time = time.time() - start_time
    print(f"⏱️ 총 처리 시간: {total_time:.2f}초")
    print("="*70)
    return final_documents


# split_documents (변경 없음 - 내부 로직은 유지)
def split_documents(documents, max_chunk_size=MAX_CHUNK_SIZE, checkpoint_mgr=None, progress_bar=tqdm_notebook):
    """긴 문서를 적절한 크기로 분할"""
    print("\n" + "="*70)
    print("✂️ 5단계: 문서 분할")
    print("="*70)
    stage_name = "documents_split"

    # --- 체크포인트 확인 ---
    if checkpoint_mgr and checkpoint_mgr.is_stage_completed(stage_name):
         split_count = checkpoint_mgr.state["counters"]["documents_split"]
         print(f"✅ 문서 분할 단계는 이전에 완료되었습니다 (분할된 문서: {split_count:,}).")
         # 여기서 건너뛰려면 이전 결과 로드 필요. 일단 진행.
         print("   (체크포인트 확인. 문서 분할을 다시 진행합니다)")


    start_time = time.time()

    if not documents:
        print("❌ 분할할 문서가 없습니다.")
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        return []

    # (문서 길이 통계 계산 로직 유지)
    doc_lengths = [len(doc.page_content) for doc in documents if hasattr(doc, 'page_content')]
    if not doc_lengths: print("❌ 유효한 문서 내용이 없어 분할 불가."); return documents
    max_len = max(doc_lengths)
    print(f"   - 최대 문서 길이: {max_len:,}자")

    if max_len <= max_chunk_size:
        print(f"✅ 모든 문서가 청크 크기({max_chunk_size}) 이하이므로 분할 건너뜁니다.")
        if checkpoint_mgr:
             # 분할이 필요 없었으므로 완료로 간주, 카운트는 원본 문서 수
             checkpoint_mgr.update_counter("documents_split", len(documents))
             checkpoint_mgr.update_stage(stage_name, True)
        return documents

    print(f"\n🔪 문서를 {max_chunk_size}자 단위로 분할 중...")
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=max_chunk_size,
        chunk_overlap=int(max_chunk_size * 0.1),
        length_function=len,
        separators=["\n\n", "\n", ". ", "? ", "! ", ", ", " ", ""],
        keep_separator=False
    )

    split_docs = []
    split_errors = 0
    # 분할은 병렬화 효과가 크지 않을 수 있음 (LangChain 내부 최적화 가능성)
    # 여기서는 직렬 처리 유지, 필요 시 병렬화 고려
    try:
        split_docs = text_splitter.split_documents(documents)
    except Exception as e:
         print(f"❌ 문서 분할 중 오류 발생: {e}")
         split_errors += 1
         # 오류 시 원본 반환 또는 빈 리스트 반환 결정 필요
         if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
         return documents # 오류 시 원본 반환

    final_count = len(split_docs)
    print(f"\n📊 분할 결과: {len(documents):,}개 -> {final_count:,}개 문서")
    if split_errors > 0: print(f"   ⚠️ 분할 중 오류 발생 건수: {split_errors:,}")

    # --- 체크포인트 업데이트 ---
    if checkpoint_mgr:
        checkpoint_mgr.update_counter("documents_split", final_count)
        checkpoint_mgr.update_stage(stage_name, final_count > 0)
        print(f"✅ 체크포인트 업데이트: {stage_name}={'True' if final_count > 0 else 'False'}, count={final_count}")

    total_time = time.time() - start_time
    print(f"⏱️ 총 처리 시간: {total_time:.2f}초")
    print("="*70)

    return split_docs


# setup_vectorstore (변경 없음 - 내부 로직은 유지)
def setup_vectorstore(model_name=MODEL_NAME, persist_dir=CHROMA_PERSIST_DIR, device="cpu", checkpoint_mgr=None):
    """임베딩 모델 로드 및 벡터 저장소 설정 또는 로드"""
    print("\n" + "="*70)
    print("🏗️ 6단계: 벡터 저장소 설정/로드")
    print("="*70)
    stage_name = "vectorstore_setup" # setup 단계 자체는 상태 저장이 덜 중요할 수 있음

    start_time = time.time()

    print(f"   - 모델: {model_name}")
    print(f"   - 저장 경로: {persist_dir}")
    print(f"   - 사용 장치: {device}")

    # 1. 임베딩 모델 로드
    embeddings = None
    try:
        print("\n🧠 임베딩 모델 로드 중...")
        embeddings = HuggingFaceEmbeddings(
            model_name=model_name,
            model_kwargs={'device': device},
            encode_kwargs={'normalize_embeddings': True, 'batch_size': 128} # 적절한 배치 크기
        )
        print(f"✅ 임베딩 모델 로드 완료 ({model_name})")
    except Exception as e:
        print(f"❌ 임베딩 모델 로드 오류: {e}")
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        return None # 모델 로드 실패 시 진행 불가

    # 2. ChromaDB 설정 또는 로드
    vectorstore = None
    try:
        print("\n💾 ChromaDB 설정/로드 중...")
        if os.path.exists(persist_dir) and os.listdir(persist_dir):
            print(f"   ℹ️ 기존 DB 발견: {persist_dir}. 로드 시도...")
            vectorstore = Chroma(
                persist_directory=persist_dir,
                embedding_function=embeddings
            )
            print(f"   ✅ 기존 ChromaDB 로드 완료.")
            try:
                 count = vectorstore._collection.count()
                 print(f"   📊 로드된 DB 문서 수: {count:,}")
            except Exception as e_count:
                 print(f"   ⚠️ 로드된 DB 문서 수 확인 오류: {e_count}")
        else:
            print(f"   ℹ️ 새 ChromaDB 생성: {persist_dir}")
            os.makedirs(persist_dir, exist_ok=True)
            # 문서는 add_documents 단계에서 추가하므로 여기서는 빈 DB 생성
            vectorstore = Chroma(
                persist_directory=persist_dir,
                embedding_function=embeddings
            )
            print(f"   ✅ 새 ChromaDB 인스턴스 준비 완료.")

    except Exception as e:
        print(f"❌ ChromaDB 설정/로드 오류: {e}")
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        return None

    # --- 체크포인트 업데이트 ---
    # setup 단계는 성공/실패 여부만 기록해도 충분할 수 있음
    if checkpoint_mgr:
         checkpoint_mgr.update_stage(stage_name, vectorstore is not None)
         print(f"✅ 체크포인트 업데이트: {stage_name}={'True' if vectorstore else 'False'}")

    total_time = time.time() - start_time
    print(f"\n⏱️ 총 처리 시간: {total_time:.2f}초")
    print("="*70)

    return vectorstore


# add_documents_to_vectorstore (체크포인트 ID 확인 및 배치 재개 로직 추가)
def add_documents_to_vectorstore(documents, vectorstore, batch_size=BATCH_SIZE, checkpoint_mgr=None, progress_bar=tqdm_notebook):
    """문서를 벡터 저장소에 추가 (배치 처리, ID 중복 방지, 체크포인트 재개)"""
    print("\n" + "="*70)
    print("📥 7단계: 벡터 저장소에 문서 추가")
    print("="*70)
    stage_name = "documents_added" # 이 단계의 완료 여부가 중요

    start_time = time.time()

    if not documents:
        print("✅ 추가할 새 문서가 없습니다.")
        # 문서가 없어도 단계는 '완료'로 볼 수 있음 (할 일이 없으므로)
        if checkpoint_mgr and not checkpoint_mgr.is_stage_completed(stage_name):
              checkpoint_mgr.update_stage(stage_name, True)
              print("   (체크포인트 업데이트: 추가할 문서 없음, 완료로 간주)")
        return vectorstore

    if not vectorstore:
        print("❌ 벡터 저장소가 설정되지 않아 문서 추가 불가.")
        if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, False)
        return None

    total_docs_to_process = len(documents)
    print(f"   - 추가 시도 문서 수: {total_docs_to_process:,}")
    print(f"   - 배치 크기: {batch_size}")

    # --- 체크포인트 기반 재개 로직 ---
    start_index = 0
    previously_added_ids = set()
    if checkpoint_mgr:
        # 이전에 마지막으로 성공한 배치 인덱스 확인
        last_added_batch_idx = checkpoint_mgr.state["counters"].get("last_added_batch_index", -1)
        if last_added_batch_idx >= 0:
             start_index = (last_added_batch_idx + 1) * batch_size
             print(f"🔄 체크포인트 발견: 마지막 성공 배치 인덱스 {last_added_batch_idx}.")
             if start_index >= total_docs_to_process:
                 print("   ✅ 모든 문서가 이미 이전 실행에서 추가된 것으로 보입니다.")
                 if not checkpoint_mgr.is_stage_completed(stage_name):
                      checkpoint_mgr.update_stage(stage_name, True) # 완료 상태 업데이트
                 return vectorstore
             else:
                  print(f"   -> 인덱스 {start_index}부터 문서 추가를 재개합니다.")

        # 이미 추가된 문서 ID 로드 (중복 추가 방지용)
        previously_added_ids = checkpoint_mgr.state["processed_ids"].get("document_ids", set())
        if previously_added_ids:
             print(f"   ℹ️ 체크포인트에서 로드된 이전 추가 문서 ID: {len(previously_added_ids):,}개 (중복 방지용)")


    # 문서 ID 생성 및 필터링 (재개 인덱스 및 중복 ID 고려)
    docs_to_add_this_run = []
    ids_to_add_this_run = []
    processed_for_id_gen = 0
    skipped_already_added = 0
    unique_generated_ids = set(previously_added_ids) # 기존 ID 포함하여 시작

    # ID 생성기 (충돌 방지 포함)
    import hashlib
    import uuid
    def generate_unique_doc_id(doc, idx, existing_ids):
        content_hash = hashlib.md5(doc.page_content.encode()).hexdigest()[:8]
        base_id = f"doc_{idx}_{content_hash}" # 인덱스와 내용 해시 기반
        # 메타데이터에서 식별자 사용 시도 (선택적)
        # meta_id = doc.metadata.get('game_Game_Id', doc.metadata.get('id'))
        # if meta_id: base_id = f"{meta_id}_{content_hash}"

        final_id = base_id
        counter = 0
        while final_id in existing_ids:
            counter += 1
            final_id = f"{base_id}_dup{counter}"
        return final_id

    print("   ID 생성 및 중복 확인 중...")
    # 재개 지점부터 ID 생성 및 필터링
    for idx, doc in enumerate(documents[start_index:], start=start_index):
        processed_for_id_gen += 1
        generated_id = generate_unique_doc_id(doc, idx, unique_generated_ids)

        # previously_added_ids 에 있으면 건너뛰기 (이미 DB에 있을 가능성 높음)
        if generated_id in previously_added_ids:
            skipped_already_added += 1
            continue

        docs_to_add_this_run.append(doc)
        ids_to_add_this_run.append(generated_id)
        unique_generated_ids.add(generated_id) # 새로 생성된 ID도 집합에 추가

    print(f"   ID 생성 완료: {processed_for_id_gen}개 확인, {skipped_already_added}개 건너뜀.")
    print(f"   이번 실행에서 DB에 추가할 문서 수: {len(docs_to_add_this_run):,}")

    if not docs_to_add_this_run:
        print("✅ 이번 실행에서 추가할 새 문서가 없습니다.")
        if checkpoint_mgr and not checkpoint_mgr.is_stage_completed(stage_name):
             checkpoint_mgr.update_stage(stage_name, True) # 추가할 것 없으면 완료
        return vectorstore

    # 배치 처리
    total_batches = (len(docs_to_add_this_run) + batch_size - 1) // batch_size
    added_count_this_run = 0
    current_batch_index_global = (start_index // batch_size) # 전역 배치 인덱스 (체크포인트용)

    print(f"\n🔄 {total_batches}개 배치로 나누어 문서 추가 시작 (전역 배치 인덱스 {current_batch_index_global}부터)...")

    # DB 추가 전 현재 문서 수 확인
    try: initial_db_count = vectorstore._collection.count()
    except: initial_db_count = -1 # 확인 불가 시

    # 배치 루프
    for i in range(0, len(docs_to_add_this_run), batch_size):
        batch_docs = docs_to_add_this_run[i : i + batch_size]
        batch_ids = ids_to_add_this_run[i : i + batch_size]
        current_batch_index_local = i // batch_size # 이번 실행 내 배치 인덱스
        current_batch_index_global = (start_index + i) // batch_size # 전역 배치 인덱스

        progress_desc = f"벡터 저장소 추가 (배치 {current_batch_index_local+1}/{total_batches}, 전역 {current_batch_index_global})"

        try:
             # tqdm 적용하여 add_documents 호출
             # 참고: add_documents 자체가 오래 걸릴 수 있으므로, tqdm이 즉각 반응 안 할 수 있음
             #       하지만 배치 단위로는 표시됨
             vectorstore.add_documents(documents=batch_docs, ids=batch_ids)
             added_count_this_run += len(batch_docs)

             # --- 체크포인트 업데이트 (배치 성공 시) ---
             if checkpoint_mgr:
                 # 성공한 전역 배치 인덱스 저장
                 checkpoint_mgr.update_counter("last_added_batch_index", current_batch_index_global)
                 # 성공적으로 추가된 ID 저장 (메모리 사용량 주의하며 주기적으로 저장)
                 checkpoint_mgr.add_processed_ids("document_ids", batch_ids)
                 # 누적 추가 카운터 업데이트 (선택적)
                 # checkpoint_mgr.update_counter("documents_added", checkpoint_mgr.state["counters"]["documents_added"] + len(batch_docs))

             # 진행률 표시 (tqdm과 별개로)
             if (current_batch_index_local + 1) % 10 == 0: # 10 배치마다 로그 출력
                  print(f"   진행: {current_batch_index_local + 1}/{total_batches} 배치 완료 ({added_count_this_run:,}개 문서 추가됨)")


        except Exception as e:
            print(f"\n❌ 배치 {current_batch_index_local + 1} (전역 {current_batch_index_global}) 처리 중 오류: {e}")
            # 오류 발생 시 해당 배치 건너뛰고 다음 배치 진행
            # 또는 파이프라인 중단 결정
            print("   ⚠️ 해당 배치 건너뛰고 계속 진행합니다.")
            # 오류 발생 시 체크포인트 업데이트는 하지 않음
            continue # 다음 배치로


    print(f"\n✅ 문서 추가 작업 완료 (이번 실행에서 {added_count_this_run:,}개 시도).")

    # 최종 저장 (persist)
    try:
        print("\n💾 변경사항 저장 중...")
        vectorstore.persist()
        print("✅ 저장 완료.")
    except Exception as e_persist:
        print(f"   ⚠️ 저장 중 오류: {e_persist}")

    # 최종 문서 수 확인
    try:
        final_db_count = vectorstore._collection.count()
        print(f"\n📊 최종 DB 문서 수 (확인): {final_db_count:,}")
        if initial_db_count != -1:
            net_added = final_db_count - initial_db_count
            print(f"   - 순 증가 문서 수: {net_added:,}")
    except Exception as e:
        print("\n⚠️ 최종 문서 수 확인 중 오류: {e}")


    # --- 최종 체크포인트 업데이트 ---
    if checkpoint_mgr:
        final_added_count = checkpoint_mgr.get_processed_ids_count("document_ids")
        checkpoint_mgr.update_counter("documents_added", final_added_count) # 전체 누적 ID 수로 업데이트

        # 모든 문서가 처리되었는지 확인 후 완료 상태 업데이트
        # (start_index + len(docs_to_add_this_run)) 이 원래 문서 수와 같으면 완료
        is_complete = (start_index + len(docs_to_add_this_run)) >= total_docs_to_process
        if is_complete:
             checkpoint_mgr.update_stage(stage_name, True)
             print(f"✅ 체크포인트 업데이트: {stage_name}=True (모든 문서 처리 완료)")
        else:
             # 아직 처리할 문서가 남음 (오류 등으로 중단된 경우)
             checkpoint_mgr.update_stage(stage_name, False)
             print(f"⚠️ 체크포인트 업데이트: {stage_name}=False (아직 처리할 문서 남음)")


    total_time = time.time() - start_time
    print(f"\n⏱️ 총 처리 시간: {total_time:.2f}초")
    print("="*70)

    return vectorstore

# test_vectorstore (변경 없음 - 내부 로직은 유지)
def test_vectorstore(vectorstore, checkpoint_mgr=None):
    """벡터 저장소 테스트 (E5 쿼리 포맷팅 적용)"""
    print("\n" + "="*70); print("🧪 8단계: 벡터 저장소 테스트"); print("="*70)
    stage_name="vectorstore_tested"

    if vectorstore is None: print("❌ 저장소 없음."); return None

    try: doc_count = vectorstore._collection.count(); print(f"📊 문서 수: {doc_count:,}")
    except: doc_count = 0; print("⚠️ 문서 수 확인 불가")
    if doc_count == 0: print("   ⚠️ 문서 없음."); return vectorstore

    test_queries = [ "아이들과 함께하는 게임", "전략적인 게임 추천", "짧은 플레이 타임의 게임", "Catan" ]
    print("\n🔍 기본 쿼리 테스트 (상위 3개 결과):")

    # --- <<< E5 모델 확인 및 쿼리 포맷팅 적용 >>> ---
    is_e5_model = MODEL_NAME.startswith("intfloat/e5") or MODEL_NAME.startswith("intfloat/multilingual-e5")
    if is_e5_model:
        print("   ℹ️ E5 모델 감지: 검색 쿼리에 'query: ' 접두사 적용")

    for query in test_queries:
        # --- <<< 쿼리 포맷팅 적용 >>> ---
        search_query = format_query(query) if is_e5_model else query
        print(f"\n📝 쿼리: '{query}' {'(포맷팅 적용됨)' if is_e5_model else ''}")

        try:
            start_time = time.time()
            results = vectorstore.similarity_search_with_score(search_query, k=3) # 포맷팅된 쿼리 사용
            search_time = time.time() - start_time
            print(f"⏱️ 검색 시간: {search_time*1000:.2f}ms")
            if results:
                print(f"📄 검색 결과 ({len(results)}개):")
                for i, (doc, score) in enumerate(results):
                    title = doc.metadata.get('game_Title', doc.metadata.get('Title', '?'))
                    game_id = doc.metadata.get('game_Game_Id', doc.metadata.get('Game_Id', '?'))
                    # --- <<< Passage 접두사 제거 후 출력 (선택적) >>> ---
                    content_preview = doc.page_content
                    if content_preview.startswith("passage: "):
                         content_preview = content_preview[len("passage: "):]
                    content_preview = content_preview.replace('\n', ' ').strip()
                    print(f"  Rank {i+1} (Score: {score:.4f}) ID:{game_id} Title:{title}")
                    print(f"    내용: {content_preview[:150]}...")
            else: print("  ❌ 결과 없음")
        except Exception as e: print(f"❌ 쿼리 오류: {e}")

    if checkpoint_mgr: checkpoint_mgr.update_stage(stage_name, True)
    print("\n" + "="*70)
    return vectorstore


# --- [수정된 run_pipeline 함수] ---
def run_pipeline(
    resume=False,
    reset_checkpoint=False,
    use_parallel=True, # 병렬 처리 사용 여부 플래그
    parallel_use_processes=False # 병렬 처리 방식 (False: 스레드, True: 프로세스)
):
    """전체 파이프라인 실행 (체크포인트, 병렬 처리 적용)"""
    print("\n" + "="*70)
    print("🚀 보드게임 추천 시스템 파이프라인 시작 (개선 버전)")
    print(f"⏱️ 시작 시간: {time.strftime('%H:%M:%S')}")
    print(f"🔄 옵션: resume={resume}, reset_checkpoint={reset_checkpoint}, use_parallel={use_parallel}")
    print("="*70)

    pipeline_start = time.time()
    vectorstore = None

    # 0. 초기 설정 및 진단
    print("\n📋 0단계: 초기 설정 및 진단")
    has_gpu = check_gpu_status()
    device = "cuda" if has_gpu else "cpu"
    print(f"🖥️ 사용 장치: {device}")
    progress_bar = setup_progress_bar() # 환경에 맞는 tqdm 가져오기

    # 체크포인트 관리자 초기화
    checkpoint_mgr = CheckpointManager("boardgame_pipeline_integrated_e5")

    # 리셋 옵션 처리
    if reset_checkpoint:
        print("🔄 체크포인트 초기화 요청됨...")
        checkpoint_mgr.reset()
        resume = False # 리셋하면 재개할 수 없음

    # 재개 모드 처리
    if resume:
        resume_info = checkpoint_mgr.get_resume_info()
        if resume_info["can_resume"]:
            print("\n🔄 이전 세션에서 파이프라인 재개:")
            print(f"   마지막 업데이트: {resume_info['last_update']}")
            print(f"   완료된 단계: {', '.join(resume_info['completed_stages'])}")
            # 카운터 정보 출력 등...
        else:
            print("⚠️ 재개할 수 있는 이전 세션 없음. 처음부터 시작.")
            resume = False # 재개 불가능하면 처음부터

    # 병렬 처리 관리자 초기화
    parallel_mgr = ParallelProcessor(use_processes=parallel_use_processes) if use_parallel else None


    # --- 파이프라인 단계 실행 ---
    try:
        # 1. 메타데이터 로드
        # 메타데이터는 보통 변경이 잦지 않으므로, 체크포인트 완료 시 건너뛰지 않고
        # 함수 내부에서 파일 경로 비교 등을 통해 재로드를 결정할 수 있음.
        # 여기서는 항상 호출하되, 내부 로직이 체크포인트 정보를 활용하도록 함.
        metadata_dict, meta_id_column = load_game_metadata(
            METADATA_CSV_PATH,
            checkpoint_mgr,
            progress_bar
        )
        if metadata_dict is None:
             print("❌ 1단계: 메타데이터 로드 실패. 파이프라인 중단.")
             return None

        # 2. 리뷰 데이터 처리
        review_df, review_column, review_id_column = preprocess_reviews(
            REVIEW_CSV_PATH,
            checkpoint_mgr,
            progress_bar
        )
        # preprocess_reviews 내부에서 체크포인트를 사용하여 이미 처리된 ID 건너뜀
        if review_df is None or review_column is None:
             print("❌ 2단계: 리뷰 데이터 처리 실패. 파이프라인 중단.")
             return None

        # 3. 데이터 통합 (Enrichment)
        enriched_df = enrich_review_data_fixed(
            review_df, metadata_dict, meta_id_column, review_id_column,
            parallel_mgr, # 병렬 관리자 전달
            checkpoint_mgr,
            progress_bar
        )
        if enriched_df is None or enriched_df.empty:
             print("❌ 3단계: 데이터 통합 후 유효 데이터 없음. 파이프라인 중단.")
             return None

        # 4. Document 객체 생성
        documents = create_documents(
            enriched_df, review_column, ENRICH_TEXT,
            parallel_mgr, # 병렬 관리자 전달
            checkpoint_mgr,
            progress_bar
        )
        if not documents:
             print("❌ 4단계: Document 객체 생성 실패. 파이프라인 중단.")
             return None

        # 5. 문서 분할
        split_docs = split_documents(
            documents, MAX_CHUNK_SIZE,
            checkpoint_mgr,
            progress_bar
        )
        # split_documents 내부에서 체크포인트 확인 및 상태 업데이트

        # 6. 벡터 저장소 설정/로드
        vectorstore = setup_vectorstore(
            MODEL_NAME, CHROMA_PERSIST_DIR, device,
            checkpoint_mgr
        )
        if vectorstore is None:
             print("❌ 6단계: 벡터 저장소 설정/로드 실패. 파이프라인 중단.")
             return None
         # --- <<< NEW: E5 모델 Passage 포맷팅 적용 단계 >>> ---
        formatted_docs_for_db = split_docs # 기본값은 원본 사용
        formatting_stage_name = "passage_formatting_applied"
        is_e5_model = MODEL_NAME.startswith("intfloat/e5") or MODEL_NAME.startswith("intfloat/multilingual-e5")

        if is_e5_model:
            print("\n" + "="*70); print("📜 6.5단계: E5 Passage 포맷팅 적용"); print("="*70)
            reload_formatting = True
            if checkpoint_mgr and checkpoint_mgr.is_stage_completed(formatting_stage_name):
                 print("✅ Passage 포맷팅은 이전 완료됨 (체크포인트)")
                 # reload_formatting = False # 건너뛰기 옵션

            if reload_formatting:
                st_format = time.time()
                formatted_docs_list = []
                for doc in progress_bar(split_docs, desc="Passage 포맷팅"):
                     formatted_content = format_passage(doc.page_content)
                     # 메타데이터 유지하며 새 Document 생성
                     formatted_docs_list.append(Document(page_content=formatted_content, metadata=doc.metadata))
                formatted_docs_for_db = formatted_docs_list
                print(f"✅ 포맷팅 완료: {len(formatted_docs_for_db):,}개 문서 ({time.time()-st_format:.2f}초)")
                if checkpoint_mgr:
                    checkpoint_mgr.update_stage(formatting_stage_name, True)
                    print(f"✅ 체크포인트 업데이트: {formatting_stage_name}=True")
            else:
                 # 포맷팅 건너뛸 경우, 이전에 포맷팅된 데이터를 로드해야 함.
                 # 로드 로직 없으므로 일단 원본 사용 (경고 표시)
                 print("   ⚠️ 포맷팅 건너뛰기 선택됨 (데이터 로드 로직 부재로 원본 사용)")
                 formatted_docs_for_db = split_docs
        else:
             print("\nℹ️ E5 모델 아님. Passage 포맷팅 건너뜀.")
        # --- <<< End of E5 Formatting Section >>> ---

        # 7. 벡터 저장소에 문서 추가
        # add_documents_to_vectorstore 내부에서 체크포인트 기반 재개 및 중복 방지 처리
        vectorstore = add_documents_to_vectorstore(
            split_docs, vectorstore, BATCH_SIZE,
            checkpoint_mgr,
            progress_bar
        )
        if vectorstore is None:
             print("❌ 7단계: 문서 추가 중 오류 발생. 파이프라인 중단.")
             return None

        # 8. 벡터 저장소 테스트
        vectorstore = test_vectorstore(vectorstore, checkpoint_mgr)


        # --- 파이프라인 완료 ---
        pipeline_time = time.time() - pipeline_start
        minutes, seconds = divmod(pipeline_time, 60)
        hours, minutes = divmod(minutes, 60)

        print("\n" + "="*70)
        print("✅ 보드게임 추천 시스템 파이프라인 완료 (개선 버전)")
        print(f"⏱️ 총 처리 시간: {int(hours)}시간 {int(minutes)}분 {seconds:.2f}초")
        print(f"⏱️ 완료 시간: {time.strftime('%H:%M:%S')}")
        print("="*70)

        # 최종 요약 정보 (체크포인트 정보 활용)
        print("\n📊 파이프라인 요약 (체크포인트 기준):")
        try:
             final_state = checkpoint_mgr.get_resume_info()
             counters = final_state["counters"]
             processed_counts = final_state["processed_counts"]
             db_count = vectorstore._collection.count() if vectorstore else 0

             print(f"   - 로드된 메타데이터 (카운터): {counters.get('metadata_count', 0):,}")
             print(f"   - 처리된 리뷰 (카운터): {counters.get('reviews_processed', 0):,}")
             print(f"   - 통합된 행 (카운터): {counters.get('enriched_count', 0):,}")
             print(f"   - 생성된 Document (카운터): {counters.get('documents_created', 0):,}")
             print(f"   - 분할된 문서 (카운터): {counters.get('documents_split', 0):,}")
             print(f"   - DB에 추가된 문서 ID (누적): {processed_counts.get('documents', 0):,}")
             print(f"   - 최종 벡터 저장소 문서 수 (실제): {db_count:,}")
             print(f"   - 최종 벡터 저장소 위치: {CHROMA_PERSIST_DIR}")
        except Exception as e_summary:
             print(f"   ⚠️ 요약 정보 생성 중 오류: {e_summary}")

        return vectorstore

    # --- 예외 처리 ---
    except FileNotFoundError as fnf_error:
         print(f"\n❌ 파이프라인 오류: 파일 없음 - {fnf_error}")
         return None
    except ValueError as val_error:
         print(f"\n❌ 파이프라인 오류: 데이터 문제 - {val_error}")
         return None
    except ImportError as imp_error:
         print(f"\n❌ 파이프라인 오류: 라이브러리 없음 - {imp_error}")
         return None
    except KeyboardInterrupt:
         print("\n🚫 사용자에 의해 파이프라인 중단됨.")
         # 중단 시 현재 상태 저장 시도 (선택적)
         if checkpoint_mgr:
              print("   현재까지의 진행 상황을 체크포인트에 저장합니다...")
              checkpoint_mgr.save()
         return None
    except Exception as e:
        print(f"\n❌ 파이프라인 실행 중 예기치 않은 오류: {e}")
        import traceback
        traceback.print_exc()
        # 오류 발생 시 현재 상태 저장 시도 (선택적)
        if checkpoint_mgr:
             print("   오류 발생 전까지의 상태를 체크포인트에 저장 시도...")
             checkpoint_mgr.save()

        print("\n" + "="*70)
        print("⚠️ 보드게임 추천 시스템 파이프라인 실패")
        print(f"⏱️ 중단 시간: {time.strftime('%H:%M:%S')}")
        print("="*70)
        return None


# --- 파이프라인 실행 ---
if __name__ == "__main__": # 스크립트로 실행될 때만 실행
    print("--- 파이프라인 실행 시작 ---")

    # 실행 옵션 설정
    # 예시: 처음 실행 시
    # final_vectorstore = run_pipeline(resume=False, reset_checkpoint=False, use_parallel=True)

    # 예시: 중단 후 재개 시
    final_vectorstore = run_pipeline(resume=True, reset_checkpoint=False, use_parallel=True, parallel_use_processes=False)

    # 예시: 처음부터 다시 실행 (체크포인트 무시)
    # final_vectorstore = run_pipeline(resume=False, reset_checkpoint=True, use_parallel=True)

    if final_vectorstore:
        print("\n--- 파이프라인 성공적으로 완료. final_vectorstore 사용 가능 ---")
        # 간단한 추가 테스트
        # try:
        #     test_query = "협력 게임"
        #     results = final_vectorstore.similarity_search(test_query, k=5)
        #     print(f"\n테스트 검색 ('{test_query}'):")`
        #     for doc in results:
        #         print(f" - {doc.metadata.get('game_Title', '제목 없음')}")
        # except Exception as e:
        #      print(f"최종 검색 테스트 중 오류: {e}")
    else:
        print("\n--- 파이프라인 실패 또는 중단 ---")

In [None]:
print("dd")

## 5. 메타데이터와 리뷰 데이터 통합 (최적화 버전)