In [20]:
def create_manual_labels():
    """수동으로 향수 어코드 라벨을 생성하는 함수 (참고용)"""
    
    # 일반적인 향수 어코드들 (45개 클래스 예시)
    fragrance_accords = [
        "Citrus", "Fresh", "Aquatic", "Green", "Herbal",           # 0-4: 상쾌한 계열
        "Floral", "Rose", "Jasmine", "Lily", "Peony",             # 5-9: 플로럴 계열  
        "Fruity", "Apple", "Pear", "Peach", "Berry",              # 10-14: 과일 계열
        "Spicy", "Pepper", "Cinnamon", "Ginger", "Cardamom",      # 15-19: 스파이시 계열
        "Woody", "Sandalwood", "Cedar", "Pine", "Vetiver",        # 20-24: 우디 계열
        "Oriental", "Amber", "Incense", "Oud", "Patchouli",       # 25-29: 오리엔탈 계열
        "Gourmand", "Vanilla", "Caramel", "Chocolate", "Coffee",  # 30-34: 구르망 계열
        "Musk", "Animalic", "Leather", "Tobacco", "Smoky",        # 35-39: 깊은 계열
        "Powdery", "Clean", "Soap", "Marine", "Ozonic"            # 40-44: 파우더리/클린 계열
    ]
    
    return fragrance_accords[:45]  # 정확히 45개로 맞춤def inspect_model_bundle(pkl_path: str):
    """모델 번들 구조를 자세히 분석하는 함수"""
    print("\n" + "="*60)
    print("모델 번들 구조 분석")
    print("="*60)
    
    try:
        obj = joblib.load(pkl_path)
        print(f"메인 객체 타입: {type(obj)}")
        
        if isinstance(obj, dict):
            print(f"딕셔너리 키 개수: {len(obj)}")
            for key, value in obj.items():
                print(f"  '{key}': {type(value)}")
                
                # 라벨 관련 속성 확인
                if hasattr(value, 'classes_'):
                    print(f"    └─ classes_: {len(value.classes_)}개 ({type(value.classes_[0]) if len(value.classes_) > 0 else 'empty'})")
                    if len(value.classes_) > 0:
                        print(f"       샘플: {list(value.classes_[:3])}...")
                
                # 배열/리스트인 경우 내용 확인
                if isinstance(value, (list, tuple, np.ndarray)):
                    print(f"    └─ 길이: {len(value)}")
                    if len(value) > 0:
                        print(f"       타입: {type(value[0])}")
                        if isinstance(value[0], str):
                            print(f"       샘플: {list(value[:3])}...")
        
        else:
            print("단일 객체 (비딕셔너리)")
            if hasattr(obj, 'classes_'):
                print(f"  classes_: {len(obj.classes_)}개")
                print(f"  샘플: {list(obj.classes_[:3])}...")
        
    except Exception as e:
        print(f"분석 실패: {e}")
    
    print("="*60)# ============================================
# model.pkl 번들 로드 + MiniLM 임베딩 + 예측
# (joblib.load, 마스킹 평균풀링, 라벨/임계값 정렬 포함)
# ============================================

import os
import numpy as np
import joblib
import torch
from typing import Any, Dict, List, Tuple
from transformers import AutoTokenizer, AutoModel

# --------- 설정 ----------
MODEL_PKL = os.getenv("MODEL_PKL", "./model.pkl")  # 번들 경로
HF_MODEL  = os.getenv("HF_MODEL", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
MAX_LEN   = int(os.getenv("MAX_LEN", 256))
DEVICE    = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SZ  = int(os.getenv("BATCH_SZ", 16))
print(f"[Device] {DEVICE}")

# --------- 1) 번들 로드 (수정본) ----------
def load_bundle(pkl_path: str) -> Dict[str, Any]:
    print(f"[DEBUG] 번들 파일 로딩 중: {pkl_path}")
    obj = joblib.load(pkl_path)  # ✅ joblib.load로 읽어야 함
    
    # 번들 구조 디버깅
    print(f"[DEBUG] 로드된 객체 타입: {type(obj)}")
    if isinstance(obj, dict):
        print(f"[DEBUG] 딕셔너리 키들: {list(obj.keys())}")
        bundle = obj
    else:
        print(f"[DEBUG] 딕셔너리가 아님. 분류기로 가정")
        bundle = {"classifier": obj}

    # 분류기 찾기
    clf = bundle.get("classifier") or bundle.get("model") or bundle.get("clf")
    if clf is None:
        # 가능한 모든 키 확인
        for key, value in bundle.items():
            if hasattr(value, "predict_proba"):
                print(f"[DEBUG] '{key}'에서 predict_proba 발견")
                clf = value
                break
    
    if clf is None or not hasattr(clf, "predict_proba"):
        raise ValueError("bundle에 predict_proba 가능한 분류기가 필요합니다.")

    print(f"[DEBUG] 분류기 타입: {type(clf)}")
    
    # MultiLabelBinarizer 찾기
    mlb = bundle.get("mlb") or bundle.get("multilabel_binarizer") or bundle.get("label_binarizer")
    if mlb is None:
        for key, value in bundle.items():
            if hasattr(value, "classes_") and hasattr(value, "transform"):
                print(f"[DEBUG] '{key}'에서 MultiLabelBinarizer 발견")
                mlb = value
                break
    
    if mlb:
        print(f"[DEBUG] MLB 타입: {type(mlb)}, classes 개수: {len(mlb.classes_) if hasattr(mlb, 'classes_') else 'None'}")
    
    # 라벨 찾기 (여러 방법 시도)
    labels = None
    
    # 1. 직접 저장된 labels
    labels = bundle.get("labels") or bundle.get("label_names") or bundle.get("classes") or bundle.get("class_names")
    
    # 2. MLB에서 복원
    if labels is None and mlb is not None and hasattr(mlb, "classes_"):
        labels = list(mlb.classes_)
        print(f"[DEBUG] MLB에서 라벨 복원: {len(labels)}개")
        print(f"[DEBUG] 라벨 샘플: {labels[:5] if len(labels) > 5 else labels}")
    
    # 3. 분류기에서 복원
    if labels is None and hasattr(clf, "classes_"):
        labels = list(clf.classes_)
        print(f"[DEBUG] 분류기에서 라벨 복원: {len(labels)}개")
    
    # 4. 번들의 모든 키에서 라벨 찾기
    if labels is None:
        for key, value in bundle.items():
            if isinstance(value, (list, tuple, np.ndarray)) and len(value) > 0:
                if isinstance(value[0], str):  # 문자열 배열이면 라벨일 가능성
                    print(f"[DEBUG] '{key}'에서 문자열 배열 발견: {len(value)}개")
                    labels = list(value)
                    break
    
    thresholds = bundle.get("thresholds", 0.5)
    print(f"[DEBUG] 임계값 타입: {type(thresholds)}")

    # 라벨 정렬/검증
    if hasattr(clf, "classes_") and labels is not None:
        clf_labels = list(clf.classes_)
        if list(labels) != clf_labels:
            print(f"[DEBUG] 라벨 순서 불일치. 분류기 순서로 재정렬")
            idx_map = {lbl: i for i, lbl in enumerate(labels)}
            labels = [lbl for lbl in clf_labels if lbl in idx_map]

        # thresholds가 dict라면 clf 순서에 맞춰 배열로 변환
        if isinstance(thresholds, dict):
            thresholds = np.array([float(thresholds.get(lbl, 0.5)) for lbl in labels], dtype=float)

    print(f"[DEBUG] 최종 라벨 개수: {len(labels) if labels else 0}")
    return {"classifier": clf, "mlb": mlb, "labels": labels, "thresholds": thresholds}

# --------- 2) 임베더 로드 ----------
def load_embedder():
    tokenizer = AutoTokenizer.from_pretrained(HF_MODEL)
    base_model = AutoModel.from_pretrained(HF_MODEL).to(DEVICE).eval()
    return tokenizer, base_model

@torch.inference_mode()
def encode_texts(texts: List[str], tokenizer, base_model, max_len: int = MAX_LEN, batch_size: int = BATCH_SZ) -> np.ndarray:
    out = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        enc = tokenizer(
            batch, padding=True, truncation=True, max_length=max_len, return_tensors="pt"
        ).to(DEVICE)

        last = base_model(**enc).last_hidden_state        # (B, L, H)
        mask = enc["attention_mask"].unsqueeze(-1)        # (B, L, 1)
        summed = (last * mask).sum(dim=1)                 
        counts = mask.sum(dim=1).clamp(min=1)             
        emb = summed / counts                             
        out.append(emb.detach().cpu().numpy())
    return np.vstack(out)

# --------- 3) 확률 표준화 ----------
def _standardize_proba(proba) -> np.ndarray:
    if isinstance(proba, list):
        cols = []
        for p in proba:
            p = np.asarray(p)
            if p.ndim == 2 and p.shape[1] > 1:
                cols.append(p[:, 1])
            else:
                cols.append(p.ravel())
        proba = np.column_stack(cols)
    proba = np.asarray(proba)
    if proba.ndim == 1:
        proba = proba.reshape(1, -1)
    return proba

# --------- 4) 임계값 적용 (수정본) ----------
def apply_thresholds(p: np.ndarray, labels: List[str], thresholds) -> List[Tuple[str, float]]:
    n = len(p)
    
    # labels 검증 및 처리
    if labels is None or len(labels) == 0:
        print(f"[WARN] labels가 없음. 인덱스 번호를 사용합니다 (len(p)={n})")
        labels = [f"class_{i}" for i in range(n)]
    elif len(labels) != n:
        print(f"[WARN] len(p)={n}, len(labels)={len(labels)} → 길이 맞춤")
        if len(labels) < n:
            # labels가 부족하면 인덱스로 채움
            labels = list(labels) + [f"class_{i}" for i in range(len(labels), n)]
        else:
            # labels가 너무 많으면 자름
            labels = labels[:n]

    # thresholds 처리
    if isinstance(thresholds, dict) and labels is not None:
        thr_arr = np.array([float(thresholds.get(lbl, 0.5)) for lbl in labels], dtype=float)
    elif hasattr(thresholds, "__len__") and not isinstance(thresholds, (str, bytes)):
        thr_arr = np.array(thresholds, dtype=float)
        if len(thr_arr) != n:
            thr_arr = np.full(n, float(np.median(thresholds) if len(thresholds) else 0.5), dtype=float)
    else:
        thr_arr = np.full(n, float(thresholds), dtype=float)

    # 임계값 적용
    idx = np.where(p >= thr_arr)[0]
    
    # 안전한 인덱스 접근
    names = []
    vals = []
    for i in idx:
        if i < len(labels):
            names.append(labels[i])
            vals.append(float(p[i]))
    
    return sorted(list(zip(names, vals)), key=lambda x: x[1], reverse=True)

# --------- 5) 예측 클래스 ----------
class TextClassifier:
    def __init__(self, model_path: str):
        print(f"[Loading] Model bundle from: {model_path}")
        self.bundle = load_bundle(model_path)
        self.clf = self.bundle["classifier"]
        self.labels = self.bundle["labels"]
        self.thresholds = self.bundle["thresholds"]
        
        print(f"[Loading] Embedder: {HF_MODEL}")
        self.tokenizer, self.base_model = load_embedder()
        
        print(f"[Loaded] Labels: {len(self.labels) if self.labels else 0}")
        if self.labels and len(self.labels) > 0:
            print(f"[Loaded] Sample labels: {self.labels[:5]}{'...' if len(self.labels) > 5 else ''}")
        else:
            print(f"[WARN] labels가 없음 - 런타임에 class_0, class_1... 형태로 생성됨")
        print(f"[Loaded] Thresholds type: {type(self.thresholds).__name__}")

    def predict_labels(self, texts: List[str], topk: int = 3) -> List[Dict[str, Any]]:
        vec = encode_texts(texts, self.tokenizer, self.base_model)
        proba = _standardize_proba(self.clf.predict_proba(vec))
        results = []
        
        for i, t in enumerate(texts):
            p = proba[i]
            n_classes = len(p)
            
            # labels 안전성 검사
            if self.labels is None or len(self.labels) == 0:
                current_labels = [f"class_{j}" for j in range(n_classes)]
                print(f"[INFO] labels 없음 → class_0, class_1, ... 사용")
            elif len(self.labels) != n_classes:
                current_labels = list(self.labels) + [f"class_{j}" for j in range(len(self.labels), n_classes)]
                if len(current_labels) > n_classes:
                    current_labels = current_labels[:n_classes]
            else:
                current_labels = self.labels
            
            # TopK 계산
            idx_sorted = np.argsort(p)[::-1][:topk]
            top_pairs = [(current_labels[j], float(p[j])) for j in idx_sorted if j < len(current_labels)]
            
            # 임계값 적용
            passed = apply_thresholds(p, current_labels, self.thresholds)
            
            results.append({
                "text": t,
                "topk": top_pairs,
                "passed_threshold": passed,
                "all_probs": p,
            })
        return results

    def predict_one(self, text: str, topk: int = 5) -> Dict[str, Any]:
        out = self.predict_labels([text], topk=topk)[0]
        print(f"\n[Query] {out['text']}")
        print(f" - TopK : {', '.join(f'{n}({s:.3f})' for n, s in out['topk'])}")
        if out["passed_threshold"]:
            print(f" - Pass : {', '.join(f'{n}({s:.3f})' for n, s in out['passed_threshold'])}")
        else:
            print(" - Pass : (임계값 통과 없음 → TopK 참고)")
        return out

# ============================================
# 테스트 코드
# ============================================

def run_tests():
    """테스트 실행 함수"""
    
    # 먼저 모델 구조 분석
    inspect_model_bundle(MODEL_PKL)
    
    # 향수 관련 테스트 쿼리들 (메인 어코드 추출용)
    test_queries = [
        # 계절별 향수 추천
        "여름에 뿌릴만한 향수 추천해주세요",
        "겨울에 어울리는 따뜻한 향수가 뭐가 있을까요?",
        "봄에 어울리는 플로럴 향수 추천해주세요",
        "가을에 뿌리기 좋은 우디한 향수는?",
        
        # 상황별 향수 추천
        "데이트할 때 뿌리면 좋은 향수 추천",
        "직장에서 뿌려도 되는 은은한 향수는?",
        "파티나 클럽에 어울리는 섹시한 향수 추천",
        "운동할 때 뿌려도 좋은 상쾌한 향수는?",
        
        # 성별/연령별 추천
        "20대 여성에게 어울리는 향수 추천해주세요",
        "30대 남성이 쓰기 좋은 향수는?",
        "10대가 쓰기에 부담없는 향수 추천",
        "50대 여성에게 어울리는 우아한 향수는?",
        
        # 향조/어코드별 문의
        "시트러스 계열의 상큼한 향수 추천",
        "바닐라 향이 강한 달콤한 향수는?",
        "머스크 향이 들어간 향수 추천해주세요",
        "우디 계열의 중후한 향수가 뭐가 있나요?",
        "플로럴 부케 향수 중에 추천해주세요",
        
        # 브랜드/가격대별
        "샤넬 향수 중에 인기있는거 추천",
        "10만원 이하로 살 수 있는 좋은 향수는?",
        "디올 향수 중에 여름용으로 좋은건?",
        "학생도 살 수 있는 저렴한 향수 추천",
        
        # 특정 향 선호도
        "장미향이 나는 향수 추천해주세요",
        "오션 냄새가 나는 시원한 향수는?",
        "과일향이 강한 프루티한 향수 추천",
        "커피나 초콜릿 향이 나는 향수는?",
        "허브향이 들어간 자연스러운 향수 추천"
    ]
    
    try:
        # 분류기 초기화
        print(f"\n모델 로딩을 시도합니다...")
        classifier = TextClassifier(MODEL_PKL)
        
        # 라벨이 없으면 수동으로 생성 제안
        if not classifier.labels or len(classifier.labels) == 0:
            print(f"\n💡 라벨이 없으므로 향수 어코드를 수동으로 매핑할 수 있습니다.")
            manual_labels = create_manual_labels()
            
            response = input(f"수동 향수 어코드 라벨을 사용하시겠습니까? (y/n): ").strip().lower()
            if response in ['y', 'yes', '네', 'ㅇ']:
                classifier.labels = manual_labels
                print(f"✅ 향수 어코드 라벨 {len(manual_labels)}개를 적용했습니다.")
                print(f"샘플: {manual_labels[:5]}...")
        
        print("\n" + "="*60)
        print("향수 메인 어코드 추출 테스트 (predict_one)")
        print("="*60)
        
        # 개별 테스트 (상위 3개)
        for i, query in enumerate(test_queries[:5]):  # 처음 5개만
            print(f"\n[Test {i+1}]")
            result = classifier.predict_one(query, topk=3)
            
        print("\n" + "="*60)
        print("배치 향수 어코드 추출 테스트")
        print("="*60)
        
        # 배치 테스트
        batch_results = classifier.predict_labels(test_queries[:8], topk=3)
        
        for i, result in enumerate(batch_results):
            print(f"\n[Batch {i+1}] {result['text'][:50]}...")
            print(f"  Top3: {', '.join(f'{n}({s:.3f})' for n, s in result['topk'])}")
            if result['passed_threshold']:
                print(f"  Pass: {', '.join(f'{n}({s:.3f})' for n, s in result['passed_threshold'])}")
        
        print("\n" + "="*60)
        print("메인 어코드 추출 통계")
        print("="*60)
        
        # 임계값 통과 통계
        total_tests = len(batch_results)
        passed_tests = sum(1 for r in batch_results if r['passed_threshold'])
        
        print(f"전체 테스트: {total_tests}")
        print(f"임계값 통과: {passed_tests}")
        print(f"어코드 추출율: {passed_tests/total_tests*100:.1f}%")
        
        # 어코드별 통계
        accord_counts = {}
        for result in batch_results:
            for accord, score in result['passed_threshold']:
                accord_counts[accord] = accord_counts.get(accord, 0) + 1
        
        if accord_counts:
            print("\n메인 어코드별 등장 횟수:")
            for accord, count in sorted(accord_counts.items(), key=lambda x: x[1], reverse=True):
                print(f"  {accord}: {count}회")
        
        print("\n테스트 완료!")
        
    except FileNotFoundError:
        print(f"[ERROR] 모델 파일을 찾을 수 없습니다: {MODEL_PKL}")
        print("다음 중 하나를 확인해주세요:")
        print("1. model.pkl 파일이 현재 디렉토리에 있는지 확인")
        print("2. MODEL_PKL 환경변수로 올바른 경로 설정")
        print("3. 예시: export MODEL_PKL='/path/to/your/model.pkl'")
        
    except ValueError as e:
        print(f"[ERROR] 모델 로딩 오류: {str(e)}")
        print("model.pkl 파일의 구조를 확인해주세요.")
        
    except Exception as e:
        print(f"[ERROR] 예상치 못한 오류가 발생했습니다: {str(e)}")
        print(f"오류 타입: {type(e).__name__}")
        import traceback
        traceback.print_exc()

def interactive_test():
    """대화형 테스트 함수"""
    try:
        classifier = TextClassifier(MODEL_PKL)
        
        print("\n" + "="*60)
        print("향수 메인 어코드 추출 - 대화형 모드")
        print("="*60)
        print("향수 관련 질문을 입력하면 메인 어코드 3개를 추출합니다.")
        print("예시: '여름에 뿌릴만한 향수 추천', '데이트용 향수 추천' 등")
        print("종료하려면 'quit' 또는 'exit'를 입력하세요.")
        
        while True:
            query = input("\n향수 질문 입력: ").strip()
            if query.lower() in ['quit', 'exit', '종료']:
                break
            if not query:
                continue
                
            result = classifier.predict_one(query, topk=3)  # 메인 어코드 3개
            
            # 추가 정보 표시
            if result["passed_threshold"]:
                print(f"\n✅ 추출된 메인 어코드: {len(result['passed_threshold'])}개")
            else:
                print(f"\n⚠️  임계값을 넘는 어코드가 없어 상위 3개를 참고하세요")
            
    except KeyboardInterrupt:
        print("\n\n프로그램을 종료합니다.")
    except Exception as e:
        print(f"[ERROR] {str(e)}")

# ============================================
# 메인 실행부
# ============================================

if __name__ == "__main__":
    print("향수 메인 어코드 추출 시스템 테스트")
    print("="*60)
    
    # 환경 정보 출력
    print(f"Model Path: {MODEL_PKL}")
    print(f"HF Model: {HF_MODEL}")
    print(f"Device: {DEVICE}")
    print(f"Max Length: {MAX_LEN}")
    print(f"Batch Size: {BATCH_SZ}")
    
    # 테스트 선택
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "interactive":
        interactive_test()
    else:
        run_tests()

[Device] cpu
향수 메인 어코드 추출 시스템 테스트
Model Path: ./model.pkl
HF Model: sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2
Device: cpu
Max Length: 256
Batch Size: 16


NameError: name 'inspect_model_bundle' is not defined

In [22]:
def create_manual_labels():
    """수동으로 향수 어코드 라벨을 생성하는 함수 (참고용)"""
    
    # 일반적인 향수 어코드들 (45개 클래스 예시)
    fragrance_accords = [
        "Citrus", "Fresh", "Aquatic", "Green", "Herbal",           # 0-4: 상쾌한 계열
        "Floral", "Rose", "Jasmine", "Lily", "Peony",             # 5-9: 플로럴 계열  
        "Fruity", "Apple", "Pear", "Peach", "Berry",              # 10-14: 과일 계열
        "Spicy", "Pepper", "Cinnamon", "Ginger", "Cardamom",      # 15-19: 스파이시 계열
        "Woody", "Sandalwood", "Cedar", "Pine", "Vetiver",        # 20-24: 우디 계열
        "Oriental", "Amber", "Incense", "Oud", "Patchouli",       # 25-29: 오리엔탈 계열
        "Gourmand", "Vanilla", "Caramel", "Chocolate", "Coffee",  # 30-34: 구르망 계열
        "Musk", "Animalic", "Leather", "Tobacco", "Smoky",        # 35-39: 깊은 계열
        "Powdery", "Clean", "Soap", "Marine", "Ozonic"            # 40-44: 파우더리/클린 계열
    ]
    
    return fragrance_accords[:45]  # 정확히 45개로 맞춤def inspect_model_bundle(pkl_path: str):
    """모델 번들 구조를 자세히 분석하는 함수"""
    print("\n" + "="*60)
    print("모델 번들 구조 분석")
    print("="*60)
    
    try:
        obj = joblib.load(pkl_path)
        print(f"메인 객체 타입: {type(obj)}")
        
        if isinstance(obj, dict):
            print(f"딕셔너리 키 개수: {len(obj)}")
            for key, value in obj.items():
                print(f"  '{key}': {type(value)}")
                
                # 라벨 관련 속성 확인
                if hasattr(value, 'classes_'):
                    print(f"    └─ classes_: {len(value.classes_)}개 ({type(value.classes_[0]) if len(value.classes_) > 0 else 'empty'})")
                    if len(value.classes_) > 0:
                        print(f"       샘플: {list(value.classes_[:3])}...")
                
                # 배열/리스트인 경우 내용 확인
                if isinstance(value, (list, tuple, np.ndarray)):
                    print(f"    └─ 길이: {len(value)}")
                    if len(value) > 0:
                        print(f"       타입: {type(value[0])}")
                        if isinstance(value[0], str):
                            print(f"       샘플: {list(value[:3])}...")
        
        else:
            print("단일 객체 (비딕셔너리)")
            if hasattr(obj, 'classes_'):
                print(f"  classes_: {len(obj.classes_)}개")
                print(f"  샘플: {list(obj.classes_[:3])}...")
        
    except Exception as e:
        print(f"분석 실패: {e}")
    
    print("="*60)# ============================================
# model.pkl 번들 로드 + MiniLM 임베딩 + 예측
# (joblib.load, 마스킹 평균풀링, 라벨/임계값 정렬 포함)
# ============================================

import os
import numpy as np
import joblib
import torch
from typing import Any, Dict, List, Tuple
from transformers import AutoTokenizer, AutoModel

# --------- 설정 ----------
MODEL_PKL = os.getenv("MODEL_PKL", "./model.pkl")  # 번들 경로
HF_MODEL  = os.getenv("HF_MODEL", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
MAX_LEN   = int(os.getenv("MAX_LEN", 256))
DEVICE    = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SZ  = int(os.getenv("BATCH_SZ", 16))
print(f"[Device] {DEVICE}")

# --------- 1) 번들 로드 (수정본) ----------
def load_bundle(pkl_path: str) -> Dict[str, Any]:
    print(f"[DEBUG] 번들 파일 로딩 중: {pkl_path}")
    obj = joblib.load(pkl_path)  # ✅ joblib.load로 읽어야 함
    
    # 번들 구조 디버깅
    print(f"[DEBUG] 로드된 객체 타입: {type(obj)}")
    if isinstance(obj, dict):
        print(f"[DEBUG] 딕셔너리 키들: {list(obj.keys())}")
        bundle = obj
    else:
        print(f"[DEBUG] 딕셔너리가 아님. 분류기로 가정")
        bundle = {"classifier": obj}

    # 분류기 찾기
    clf = bundle.get("classifier") or bundle.get("model") or bundle.get("clf")
    if clf is None:
        # 가능한 모든 키 확인
        for key, value in bundle.items():
            if hasattr(value, "predict_proba"):
                print(f"[DEBUG] '{key}'에서 predict_proba 발견")
                clf = value
                break
    
    if clf is None or not hasattr(clf, "predict_proba"):
        raise ValueError("bundle에 predict_proba 가능한 분류기가 필요합니다.")

    print(f"[DEBUG] 분류기 타입: {type(clf)}")
    
    # MultiLabelBinarizer 찾기
    mlb = bundle.get("mlb") or bundle.get("multilabel_binarizer") or bundle.get("label_binarizer")
    if mlb is None:
        for key, value in bundle.items():
            if hasattr(value, "classes_") and hasattr(value, "transform"):
                print(f"[DEBUG] '{key}'에서 MultiLabelBinarizer 발견")
                mlb = value
                break
    
    if mlb:
        print(f"[DEBUG] MLB 타입: {type(mlb)}, classes 개수: {len(mlb.classes_) if hasattr(mlb, 'classes_') else 'None'}")
    
    # 라벨 찾기 (여러 방법 시도)
    labels = None
    
    # 1. 직접 저장된 labels
    labels = bundle.get("labels") or bundle.get("label_names") or bundle.get("classes") or bundle.get("class_names")
    
    # 2. MLB에서 복원
    if labels is None and mlb is not None and hasattr(mlb, "classes_"):
        labels = list(mlb.classes_)
        print(f"[DEBUG] MLB에서 라벨 복원: {len(labels)}개")
        print(f"[DEBUG] 라벨 샘플: {labels[:5] if len(labels) > 5 else labels}")
    
    # 3. 분류기에서 복원
    if labels is None and hasattr(clf, "classes_"):
        labels = list(clf.classes_)
        print(f"[DEBUG] 분류기에서 라벨 복원: {len(labels)}개")
    
    # 4. 번들의 모든 키에서 라벨 찾기
    if labels is None:
        for key, value in bundle.items():
            if isinstance(value, (list, tuple, np.ndarray)) and len(value) > 0:
                if isinstance(value[0], str):  # 문자열 배열이면 라벨일 가능성
                    print(f"[DEBUG] '{key}'에서 문자열 배열 발견: {len(value)}개")
                    labels = list(value)
                    break
    
    thresholds = bundle.get("thresholds", 0.5)
    print(f"[DEBUG] 임계값 타입: {type(thresholds)}")

    # 라벨 정렬/검증
    if hasattr(clf, "classes_") and labels is not None:
        clf_labels = list(clf.classes_)
        if list(labels) != clf_labels:
            print(f"[DEBUG] 라벨 순서 불일치. 분류기 순서로 재정렬")
            idx_map = {lbl: i for i, lbl in enumerate(labels)}
            labels = [lbl for lbl in clf_labels if lbl in idx_map]

        # thresholds가 dict라면 clf 순서에 맞춰 배열로 변환
        if isinstance(thresholds, dict):
            thresholds = np.array([float(thresholds.get(lbl, 0.5)) for lbl in labels], dtype=float)

    print(f"[DEBUG] 최종 라벨 개수: {len(labels) if labels else 0}")
    return {"classifier": clf, "mlb": mlb, "labels": labels, "thresholds": thresholds}

def inspect_model_bundle(pkl_path: str):
    """모델 번들 구조를 자세히 분석하는 함수"""
    print("\n" + "="*60)
    print("모델 번들 구조 분석")
    print("="*60)
    
    try:
        obj = joblib.load(pkl_path)
        print(f"메인 객체 타입: {type(obj)}")
        
        if isinstance(obj, dict):
            print(f"딕셔너리 키 개수: {len(obj)}")
            for key, value in obj.items():
                print(f"  '{key}': {type(value)}")
                
                # 라벨 관련 속성 확인
                if hasattr(value, 'classes_'):
                    print(f"    └─ classes_: {len(value.classes_)}개 ({type(value.classes_[0]) if len(value.classes_) > 0 else 'empty'})")
                    if len(value.classes_) > 0:
                        print(f"       샘플: {list(value.classes_[:3])}...")
                
                # 배열/리스트인 경우 내용 확인
                if isinstance(value, (list, tuple, np.ndarray)):
                    print(f"    └─ 길이: {len(value)}")
                    if len(value) > 0:
                        print(f"       타입: {type(value[0])}")
                        if isinstance(value[0], str):
                            print(f"       샘플: {list(value[:3])}...")
        
        else:
            print("단일 객체 (비딕셔너리)")
            if hasattr(obj, 'classes_'):
                print(f"  classes_: {len(obj.classes_)}개")
                print(f"  샘플: {list(obj.classes_[:3])}...")
        
    except Exception as e:
        print(f"분석 실패: {e}")
    
    print("="*60)

def create_manual_labels():
    """수동으로 향수 어코드 라벨을 생성하는 함수 (참고용)"""
    
    # 일반적인 향수 어코드들 (45개 클래스 예시)
    fragrance_accords = [
        "Citrus", "Fresh", "Aquatic", "Green", "Herbal",           # 0-4: 상쾌한 계열
        "Floral", "Rose", "Jasmine", "Lily", "Peony",             # 5-9: 플로럴 계열  
        "Fruity", "Apple", "Pear", "Peach", "Berry",              # 10-14: 과일 계열
        "Spicy", "Pepper", "Cinnamon", "Ginger", "Cardamom",      # 15-19: 스파이시 계열
        "Woody", "Sandalwood", "Cedar", "Pine", "Vetiver",        # 20-24: 우디 계열
        "Oriental", "Amber", "Incense", "Oud", "Patchouli",       # 25-29: 오리엔탈 계열
        "Gourmand", "Vanilla", "Caramel", "Chocolate", "Coffee",  # 30-34: 구르망 계열
        "Musk", "Animalic", "Leather", "Tobacco", "Smoky",        # 35-39: 깊은 계열
        "Powdery", "Clean", "Soap", "Marine", "Ozonic"            # 40-44: 파우더리/클린 계열
    ]
    
    return fragrance_accords[:45]  # 정확히 45개로 맞춤

# --------- 2) 임베더 로드 ----------
def load_embedder():
    tokenizer = AutoTokenizer.from_pretrained(HF_MODEL)
    base_model = AutoModel.from_pretrained(HF_MODEL).to(DEVICE).eval()
    return tokenizer, base_model

@torch.inference_mode()
def encode_texts(texts: List[str], tokenizer, base_model, max_len: int = MAX_LEN, batch_size: int = BATCH_SZ) -> np.ndarray:
    out = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        enc = tokenizer(
            batch, padding=True, truncation=True, max_length=max_len, return_tensors="pt"
        ).to(DEVICE)

        last = base_model(**enc).last_hidden_state        # (B, L, H)
        mask = enc["attention_mask"].unsqueeze(-1)        # (B, L, 1)
        summed = (last * mask).sum(dim=1)                 
        counts = mask.sum(dim=1).clamp(min=1)             
        emb = summed / counts                             
        out.append(emb.detach().cpu().numpy())
    return np.vstack(out)

# --------- 3) 확률 표준화 ----------
def _standardize_proba(proba) -> np.ndarray:
    if isinstance(proba, list):
        cols = []
        for p in proba:
            p = np.asarray(p)
            if p.ndim == 2 and p.shape[1] > 1:
                cols.append(p[:, 1])
            else:
                cols.append(p.ravel())
        proba = np.column_stack(cols)
    proba = np.asarray(proba)
    if proba.ndim == 1:
        proba = proba.reshape(1, -1)
    return proba

# --------- 4) 임계값 적용 (수정본) ----------
def apply_thresholds(p: np.ndarray, labels: List[str], thresholds) -> List[Tuple[str, float]]:
    n = len(p)
    
    # labels 검증 및 처리
    if labels is None or len(labels) == 0:
        print(f"[WARN] labels가 없음. 인덱스 번호를 사용합니다 (len(p)={n})")
        labels = [f"class_{i}" for i in range(n)]
    elif len(labels) != n:
        print(f"[WARN] len(p)={n}, len(labels)={len(labels)} → 길이 맞춤")
        if len(labels) < n:
            # labels가 부족하면 인덱스로 채움
            labels = list(labels) + [f"class_{i}" for i in range(len(labels), n)]
        else:
            # labels가 너무 많으면 자름
            labels = labels[:n]

    # thresholds 처리
    if isinstance(thresholds, dict) and labels is not None:
        thr_arr = np.array([float(thresholds.get(lbl, 0.5)) for lbl in labels], dtype=float)
    elif hasattr(thresholds, "__len__") and not isinstance(thresholds, (str, bytes)):
        thr_arr = np.array(thresholds, dtype=float)
        if len(thr_arr) != n:
            thr_arr = np.full(n, float(np.median(thresholds) if len(thresholds) else 0.5), dtype=float)
    else:
        thr_arr = np.full(n, float(thresholds), dtype=float)

    # 임계값 적용
    idx = np.where(p >= thr_arr)[0]
    
    # 안전한 인덱스 접근
    names = []
    vals = []
    for i in idx:
        if i < len(labels):
            names.append(labels[i])
            vals.append(float(p[i]))
    
    return sorted(list(zip(names, vals)), key=lambda x: x[1], reverse=True)

# --------- 5) 예측 클래스 ----------
class TextClassifier:
    def __init__(self, model_path: str):
        print(f"[Loading] Model bundle from: {model_path}")
        self.bundle = load_bundle(model_path)
        self.clf = self.bundle["classifier"]
        self.labels = self.bundle["labels"]
        self.thresholds = self.bundle["thresholds"]
        
        print(f"[Loading] Embedder: {HF_MODEL}")
        self.tokenizer, self.base_model = load_embedder()
        
        print(f"[Loaded] Labels: {len(self.labels) if self.labels else 0}")
        if self.labels and len(self.labels) > 0:
            print(f"[Loaded] Sample labels: {self.labels[:5]}{'...' if len(self.labels) > 5 else ''}")
        else:
            print(f"[WARN] labels가 없음 - 런타임에 class_0, class_1... 형태로 생성됨")
        print(f"[Loaded] Thresholds type: {type(self.thresholds).__name__}")

    def predict_labels(self, texts: List[str], topk: int = 3) -> List[Dict[str, Any]]:
        vec = encode_texts(texts, self.tokenizer, self.base_model)
        proba = _standardize_proba(self.clf.predict_proba(vec))
        results = []
        
        for i, t in enumerate(texts):
            p = proba[i]
            n_classes = len(p)
            
            # labels 안전성 검사
            if self.labels is None or len(self.labels) == 0:
                current_labels = [f"class_{j}" for j in range(n_classes)]
                print(f"[INFO] labels 없음 → class_0, class_1, ... 사용")
            elif len(self.labels) != n_classes:
                current_labels = list(self.labels) + [f"class_{j}" for j in range(len(self.labels), n_classes)]
                if len(current_labels) > n_classes:
                    current_labels = current_labels[:n_classes]
            else:
                current_labels = self.labels
            
            # TopK 계산
            idx_sorted = np.argsort(p)[::-1][:topk]
            top_pairs = [(current_labels[j], float(p[j])) for j in idx_sorted if j < len(current_labels)]
            
            # 임계값 적용
            passed = apply_thresholds(p, current_labels, self.thresholds)
            
            results.append({
                "text": t,
                "topk": top_pairs,
                "passed_threshold": passed,
                "all_probs": p,
            })
        return results

    def predict_one(self, text: str, topk: int = 5) -> Dict[str, Any]:
        out = self.predict_labels([text], topk=topk)[0]
        print(f"\n[Query] {out['text']}")
        print(f" - TopK : {', '.join(f'{n}({s:.3f})' for n, s in out['topk'])}")
        if out["passed_threshold"]:
            print(f" - Pass : {', '.join(f'{n}({s:.3f})' for n, s in out['passed_threshold'])}")
        else:
            print(" - Pass : (임계값 통과 없음 → TopK 참고)")
        return out

# ============================================
# 테스트 코드
# ============================================

def run_tests():
    """테스트 실행 함수"""
    
    # 먼저 모델 구조 분석
    inspect_model_bundle(MODEL_PKL)
    
    # 향수 관련 테스트 쿼리들 (메인 어코드 추출용)
    test_queries = [
        # 계절별 향수 추천
        "여름에 뿌릴만한 향수 추천해주세요",
        "겨울에 어울리는 따뜻한 향수가 뭐가 있을까요?",
        "봄에 어울리는 플로럴 향수 추천해주세요",
        "가을에 뿌리기 좋은 우디한 향수는?",
        
        # 상황별 향수 추천
        "데이트할 때 뿌리면 좋은 향수 추천",
        "직장에서 뿌려도 되는 은은한 향수는?",
        "파티나 클럽에 어울리는 섹시한 향수 추천",
        "운동할 때 뿌려도 좋은 상쾌한 향수는?",
        
        # 성별/연령별 추천
        "20대 여성에게 어울리는 향수 추천해주세요",
        "30대 남성이 쓰기 좋은 향수는?",
        "10대가 쓰기에 부담없는 향수 추천",
        "50대 여성에게 어울리는 우아한 향수는?",
        
        # 향조/어코드별 문의
        "시트러스 계열의 상큼한 향수 추천",
        "바닐라 향이 강한 달콤한 향수는?",
        "머스크 향이 들어간 향수 추천해주세요",
        "우디 계열의 중후한 향수가 뭐가 있나요?",
        "플로럴 부케 향수 중에 추천해주세요",
        
        # 브랜드/가격대별
        "샤넬 향수 중에 인기있는거 추천",
        "10만원 이하로 살 수 있는 좋은 향수는?",
        "디올 향수 중에 여름용으로 좋은건?",
        "학생도 살 수 있는 저렴한 향수 추천",
        
        # 특정 향 선호도
        "장미향이 나는 향수 추천해주세요",
        "오션 냄새가 나는 시원한 향수는?",
        "과일향이 강한 프루티한 향수 추천",
        "커피나 초콜릿 향이 나는 향수는?",
        "허브향이 들어간 자연스러운 향수 추천"
    ]
    
    try:
        # 분류기 초기화
        print(f"\n모델 로딩을 시도합니다...")
        classifier = TextClassifier(MODEL_PKL)
        
        # 라벨이 없으면 수동으로 생성 제안
        if not classifier.labels or len(classifier.labels) == 0:
            print(f"\n💡 라벨이 없으므로 향수 어코드를 수동으로 매핑할 수 있습니다.")
            manual_labels = create_manual_labels()
            
            response = input(f"수동 향수 어코드 라벨을 사용하시겠습니까? (y/n): ").strip().lower()
            if response in ['y', 'yes', '네', 'ㅇ']:
                classifier.labels = manual_labels
                print(f"✅ 향수 어코드 라벨 {len(manual_labels)}개를 적용했습니다.")
                print(f"샘플: {manual_labels[:5]}...")
        
        print("\n" + "="*60)
        print("향수 메인 어코드 추출 테스트 (predict_one)")
        print("="*60)
        
        # 개별 테스트 (상위 3개)
        for i, query in enumerate(test_queries[:5]):  # 처음 5개만
            print(f"\n[Test {i+1}]")
            result = classifier.predict_one(query, topk=3)
            
        print("\n" + "="*60)
        print("배치 향수 어코드 추출 테스트")
        print("="*60)
        
        # 배치 테스트
        batch_results = classifier.predict_labels(test_queries[:8], topk=3)
        
        for i, result in enumerate(batch_results):
            print(f"\n[Batch {i+1}] {result['text'][:50]}...")
            print(f"  Top3: {', '.join(f'{n}({s:.3f})' for n, s in result['topk'])}")
            if result['passed_threshold']:
                print(f"  Pass: {', '.join(f'{n}({s:.3f})' for n, s in result['passed_threshold'])}")
        
        print("\n" + "="*60)
        print("메인 어코드 추출 통계")
        print("="*60)
        
        # 임계값 통과 통계
        total_tests = len(batch_results)
        passed_tests = sum(1 for r in batch_results if r['passed_threshold'])
        
        print(f"전체 테스트: {total_tests}")
        print(f"임계값 통과: {passed_tests}")
        print(f"어코드 추출율: {passed_tests/total_tests*100:.1f}%")
        
        # 어코드별 통계
        accord_counts = {}
        for result in batch_results:
            for accord, score in result['passed_threshold']:
                accord_counts[accord] = accord_counts.get(accord, 0) + 1
        
        if accord_counts:
            print("\n메인 어코드별 등장 횟수:")
            for accord, count in sorted(accord_counts.items(), key=lambda x: x[1], reverse=True):
                print(f"  {accord}: {count}회")
        
        print("\n테스트 완료!")
        
    except FileNotFoundError:
        print(f"[ERROR] 모델 파일을 찾을 수 없습니다: {MODEL_PKL}")
        print("다음 중 하나를 확인해주세요:")
        print("1. model.pkl 파일이 현재 디렉토리에 있는지 확인")
        print("2. MODEL_PKL 환경변수로 올바른 경로 설정")
        print("3. 예시: export MODEL_PKL='/path/to/your/model.pkl'")
        
    except ValueError as e:
        print(f"[ERROR] 모델 로딩 오류: {str(e)}")
        print("model.pkl 파일의 구조를 확인해주세요.")
        
    except Exception as e:
        print(f"[ERROR] 예상치 못한 오류가 발생했습니다: {str(e)}")
        print(f"오류 타입: {type(e).__name__}")
        import traceback
        traceback.print_exc()

def interactive_test():
    """대화형 테스트 함수"""
    try:
        classifier = TextClassifier(MODEL_PKL)
        
        print("\n" + "="*60)
        print("향수 메인 어코드 추출 - 대화형 모드")
        print("="*60)
        print("향수 관련 질문을 입력하면 메인 어코드 3개를 추출합니다.")
        print("예시: '여름에 뿌릴만한 향수 추천', '데이트용 향수 추천' 등")
        print("종료하려면 'quit' 또는 'exit'를 입력하세요.")
        
        while True:
            query = input("\n향수 질문 입력: ").strip()
            if query.lower() in ['quit', 'exit', '종료']:
                break
            if not query:
                continue
                
            result = classifier.predict_one(query, topk=3)  # 메인 어코드 3개
            
            # 추가 정보 표시
            if result["passed_threshold"]:
                print(f"\n✅ 추출된 메인 어코드: {len(result['passed_threshold'])}개")
            else:
                print(f"\n⚠️  임계값을 넘는 어코드가 없어 상위 3개를 참고하세요")
            
    except KeyboardInterrupt:
        print("\n\n프로그램을 종료합니다.")
    except Exception as e:
        print(f"[ERROR] {str(e)}")

# ============================================
# 메인 실행부
# ============================================

if __name__ == "__main__":
    print("향수 메인 어코드 추출 시스템 테스트")
    print("="*60)
    
    # 환경 정보 출력
    print(f"Model Path: {MODEL_PKL}")
    print(f"HF Model: {HF_MODEL}")
    print(f"Device: {DEVICE}")
    print(f"Max Length: {MAX_LEN}")
    print(f"Batch Size: {BATCH_SZ}")
    
    # 테스트 선택
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "interactive":
        interactive_test()
    else:
        run_tests()

[Device] cpu
향수 메인 어코드 추출 시스템 테스트
Model Path: ./model.pkl
HF Model: sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2
Device: cpu
Max Length: 256
Batch Size: 16

모델 번들 구조 분석


  Loading from a raw memory buffer (like pickle in Python, RDS in R) on a CPU-only
  machine. Consider using `save_model/load_model` instead. See:

    https://xgboost.readthedocs.io/en/latest/tutorials/saving_model.html

  for more details about differences between saving model and serializing.  Changing `tree_method` to `hist`.
  setstate(state)
  setstate(state)
  setstate(state)
  setstate(state)


메인 객체 타입: <class 'dict'>
딕셔너리 키 개수: 3
  'classifier': <class 'sklearn.multiclass.OneVsRestClassifier'>
    └─ classes_: 45개 (<class 'numpy.int64'>)
       샘플: [np.int64(0), np.int64(1), np.int64(2)]...
  'mlb': <class 'sklearn.preprocessing._label.MultiLabelBinarizer'>
    └─ classes_: 45개 (<class 'str'>)
       샘플: ['$$$', 'Amber', 'Aromatic']...
  'thresholds': <class 'dict'>

모델 로딩을 시도합니다...
[Loading] Model bundle from: ./model.pkl
[DEBUG] 번들 파일 로딩 중: ./model.pkl
[DEBUG] 로드된 객체 타입: <class 'dict'>
[DEBUG] 딕셔너리 키들: ['classifier', 'mlb', 'thresholds']
[DEBUG] 분류기 타입: <class 'sklearn.multiclass.OneVsRestClassifier'>
[DEBUG] MLB 타입: <class 'sklearn.preprocessing._label.MultiLabelBinarizer'>, classes 개수: 45
[DEBUG] MLB에서 라벨 복원: 45개
[DEBUG] 라벨 샘플: ['$$$', 'Amber', 'Aromatic', 'Blossom', 'Bouquet']
[DEBUG] 임계값 타입: <class 'dict'>
[DEBUG] 라벨 순서 불일치. 분류기 순서로 재정렬
[DEBUG] 최종 라벨 개수: 0
[Loading] Embedder: sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2
[Loaded] Labels: 0
[WARN] labels가