In [33]:
# -*- coding: utf-8 -*-
# ==============================================================
#  알레르기 분석 서비스 (OCR → Regex → RAG → Zero-shot Fallback)
#  + 원재료명 섹션 성분별 알레르기 판정표 생성
# ==============================================================

import os, io, json, re, logging, unicodedata
from typing import List, Set, TypedDict
try:
    from typing import NotRequired
except ImportError:
    NotRequired = None

import numpy as np
from collections import deque
from sklearn.metrics.pairwise import cosine_similarity
import torch

from sentence_transformers import SentenceTransformer
from transformers import pipeline
from google.cloud import vision
from google.oauth2 import service_account
from langgraph.graph import StateGraph, END



In [34]:
logging.basicConfig(level=logging.INFO, format="%(levelname)s | %(message)s")
print("--- 🚀 알레르기 분석 서비스 (OCR + RAG + ZS-Fallback + per-ingredient) 시작 ---")

# --- 표준 알레르기 / 필터 / 동의어 ---
ALLERGENS_STD_SET = {
    "알류", "우유", "메밀", "땅콩", "대두", "밀", "잣", "호두",
    "게", "새우", "오징어", "고등어", "조개류", "복숭아", "토마토",
    "닭고기", "돼지고기", "쇠고기", "아황산류"
}
IGNORE_KEYWORDS = {
    '열량','탄수화물','단백질','지방','당류','나트륨','콜레스테롤','포화지방','트랜스지방','내용량','I','II'
}
ALIAS_MAP = {
    # 알류
    "계란":"알류","달걀":"알류","난백":"알류","난황":"알류",
    # 우유
    "유청":"우유","유청분말":"우유","유청단백":"우유","카제인나트륨":"우유","치즈":"우유","버터":"우유","크림":"우유","분유":"우유","탈지분유":"우유",
    # 대두/밀/메밀
    "대두레시틴":"대두","대두단백":"대두","소이프로틴":"대두",
    "밀가루":"밀","메밀가루":"메밀",
    # 육류
    "소고기":"쇠고기","소 육":"쇠고기","우육":"쇠고기","돼지":"돼지고기","돈육":"돼지고기","닭":"닭고기","계육":"닭고기",
    # 견과/어패류 변형
    "호두분말":"호두","잣가루":"잣","새우가루":"새우","오징어분말":"오징어",
    # 과채
    "토마토페이스트":"토마토"
}
def alias_to_std(name: str) -> str:
    key = name.replace(" ","")
    for k,v in ALIAS_MAP.items():
        if k in key:
            return v
    return name



--- 🚀 알레르기 분석 서비스 (OCR + RAG + ZS-Fallback + per-ingredient) 시작 ---


In [35]:
# --- 글로벌 리소스 로드 ---
try:
    EMBEDDING_MODEL_NAME = 'distiluse-base-multilingual-cased-v1'
    logging.info(f"임베딩 모델 로드: {EMBEDDING_MODEL_NAME}")
    embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME)

    ZSL_MODEL_CANDIDATES = [
        "MoritzLaurer/deberta-v3-large-zeroshot-v2.0",
        "joeddav/xlm-roberta-large-xnli"
    ]
    device = 0 if torch.cuda.is_available() else -1
    nli_pipeline = None
    for name in ZSL_MODEL_CANDIDATES:
        try:
            nli_pipeline = pipeline("zero-shot-classification", model=name, device=device)
            logging.info(f"Zero-shot 모델 사용: {name} (device={device})")
            break
        except Exception as e:
            logging.warning(f"{name} 로드 실패: {e}")
    if nli_pipeline is None:
        logging.warning("Zero-shot 모델 로드 실패 → Fallback 비활성")

    KEY_JSON_PATH_DEFAULT = r"D:\key folder\ocr-project-470906-7ffeebabeb09.json"  # 필요시 교체
    KEY_JSON_PATH = os.getenv("GOOGLE_APPLICATION_CREDENTIALS", KEY_JSON_PATH_DEFAULT)
    vision_client = None
    try:
        credentials = service_account.Credentials.from_service_account_file(KEY_JSON_PATH)
        vision_client = vision.ImageAnnotatorClient(credentials=credentials)
        logging.info("GCP Vision 클라이언트 준비 완료")
    except Exception as e:
        logging.warning(f"GCP Vision 초기화 실패: {e} → 로컬 OCR Fallback만 사용")

    KB_EMB_PATH = "kb_embeddings.npy"
    KB_CAT_PATH = "kb_categories.json"
    kb_embeddings = np.load(KB_EMB_PATH)
    with open(KB_CAT_PATH,"r",encoding="utf-8") as f:
        kb_categories = json.load(f)
    assert kb_embeddings.shape[0] == len(kb_categories), \
        f"KB 불일치: emb={kb_embeddings.shape[0]} vs cat={len(kb_categories)}"
    logging.info(f"KB 캐시 로드 완료: {len(kb_categories)}개 항목")
except Exception as e:
    logging.error(f"글로벌 설정 실패: {e}")
    raise



INFO | 임베딩 모델 로드: distiluse-base-multilingual-cased-v1
INFO | Use pytorch device_name: cpu
INFO | Load pretrained SentenceTransformer: distiluse-base-multilingual-cased-v1
Device set to use cpu
INFO | Zero-shot 모델 사용: MoritzLaurer/deberta-v3-large-zeroshot-v2.0 (device=-1)
INFO | GCP Vision 클라이언트 준비 완료
INFO | KB 캐시 로드 완료: 702개 항목


In [36]:
# --- 임계값/템플릿 ---
RAG_CONFIDENCE_THRESHOLD = float(os.getenv("RAG_THRESHOLD","0.85"))
NLI_FALLBACK_THRESHOLD   = float(os.getenv("NLI_THRESHOLD","0.5"))
HYPOTHESIS = "{} 알레르기(유발) 성분이다."

# --- 로컬 OCR Fallback(옵션) ---
def local_ocr_fallback(img_path: str) -> str:
    try:
        import easyocr
        reader = easyocr.Reader(['ko','en'])
        lines = reader.readtext(img_path, detail=0)
        return "\n".join(lines)
    except Exception:
        return ""


In [37]:
# --- 상태 정의 (★ per-ingredient 필드 추가) ---
class AllergyGraphState(TypedDict):
    image_path: str
    raw_ocr_text: str
    ingredients_to_check: deque
    current_ingredient: str
    rag_result: dict
    final_allergens: Set[str]
    final_output_json: str
    # [ADDED] 아래 5개
    ingredients_from_section: NotRequired[List[str]] if NotRequired else list  # 원재료명 목록
    declared_allergens: NotRequired[List[str]] if NotRequired else list        # '...함유' 명시 목록(표준화)
    coi_phrases: NotRequired[List[str]] if NotRequired else list               # 교차오염 문구
    per_ingredient_results: NotRequired[List[dict]] if NotRequired else list   # 성분별 판정표
    current_raw_ingredient: NotRequired[str] if NotRequired else str           # 원문 성분




In [38]:
# --- 노드 ---

def call_gcp_vision_api(state: AllergyGraphState) -> AllergyGraphState:
    img_path = state['image_path']
    logging.info(f"[Node1] OCR 호출: {img_path}")
    text = ""
    if vision_client is not None:
        try:
            with io.open(img_path,'rb') as f:
                image = vision.Image(content=f.read())
            res = vision_client.text_detection(image=image)
            if res.error.message:
                raise RuntimeError(res.error.message)
            text = res.full_text_annotation.text or ""
        except Exception as e:
            logging.warning(f"GCP OCR 실패: {e}")
    if not text.strip():
        lt = local_ocr_fallback(img_path)
        if lt.strip():
            logging.info("로컬 OCR Fallback 사용")
            text = lt
    if not text.strip():
        logging.warning("OCR 결과가 비어있음")
    return {**state, "raw_ocr_text": text}

def parse_text_from_raw(state: AllergyGraphState) -> AllergyGraphState:
    """[REPLACED] Regex 파서 + 원재료명/함유/COI 분리 수집"""
    raw_text = state.get('raw_ocr_text','') or ''
    if not raw_text.strip():
        return {**state, "ingredients_to_check": deque(), "final_allergens": set(),
                "ingredients_from_section": [], "declared_allergens": [], "coi_phrases": [],
                "per_ingredient_results": []}

    text = unicodedata.normalize("NFKC", raw_text).replace("\n"," ")

    # (1) 원재료명 블록
    pat_ing = re.compile(
        r"원재료(?:명| 및[^:]{0,10}|/[^:]{0,10})?\s*[:：]\s*(.+?)(?:알레르기|영양정보|영양성분|함유|품목보고|고객상담|소비기한|$)",
        re.S
    )
    ingredients_from_section: List[str] = []
    m = pat_ing.search(text)
    if m:
        blob = m.group(1)
        raw_items = [s.strip() for s in blob.split(',') if s.strip()]
        cleaned = []
        for it in raw_items:
            it = it.split('(')[0].strip()
            if any(it.startswith(k) for k in IGNORE_KEYWORDS):
                continue
            cleaned.append(it)
        ingredients_from_section = cleaned
        logging.info(f"[Node2] 원재료 {len(cleaned)}개 추출: {cleaned[:12]}{'...' if len(cleaned)>12 else ''}")
    else:
        logging.info("[Node2] 원재료명 블럭을 찾지 못함")

    # (2) '...함유' (알레르기 유발물질: … 함유 포함)
    pat_contains = re.compile(r"(?:알레르기\s*(?:유발)?\s*물질[:：]?\s*)?([가-힣,\s]+?)\s*함유")
    declared_allergens = []
    m2 = pat_contains.search(text)
    if m2:
        contains_list = [s.strip() for s in m2.group(1).split(',') if s.strip()]
        logging.info(f"[Node2] '함유' 섹션 {len(contains_list)}개: {contains_list}")
        # [ADDED] 표준화(달걀→알류 등) + 표준 집합 필터
        for item in contains_list:
            std = alias_to_std(item)
            if std in ALLERGENS_STD_SET:
                declared_allergens.append(std)

    # (3) COI(교차오염) 문구 추출
    coi_phrases = []
    for pat in [r"같은\s*제조(?:시설|라인|설비)", r"교차오염", r"혼입\s*가능", r"함유\s*가능"]:
        for mm in re.finditer(pat, text):
            s = max(0, text.rfind('.', 0, mm.start()))
            e = text.find('.', mm.end())
            coi_phrases.append(text[s+1:e if e!=-1 else mm.end()+30].strip())

    # 큐는 “원재료명 성분만” 대상으로 분류 루프 수행
    q = deque(ingredients_from_section)

    # 명시된 알레르겐은 즉시 누적(최종에서 합집합)
    found_set = set(declared_allergens)

    return {**state,
            "ingredients_to_check": q,
            "final_allergens": found_set,
            "ingredients_from_section": ingredients_from_section,
            "declared_allergens": declared_allergens,
            "coi_phrases": coi_phrases,
            "per_ingredient_results": []}


def prepare_next_ingredient(state: AllergyGraphState) -> AllergyGraphState:
    """[REPLACED] 다음 성분 추출 + alias 표준화 필드 유지"""
    q = state['ingredients_to_check']
    if not q:
        return state
    raw = q.popleft()
    std = alias_to_std(raw)
    return {**state,
            "current_raw_ingredient": raw,        # [ADDED]
            "current_ingredient": std}            # 분류는 표준화 텍스트 기준



In [39]:

def rag_search(state: AllergyGraphState) -> AllergyGraphState:
    """Node4: RAG (alias가 곧 표준 알레르겐이면 즉시 확정)"""
    ing_std = state.get('current_ingredient','')
    ing_raw = state.get('current_raw_ingredient', ing_std)

    # [ADDED] alias로 곧바로 표준 알레르겐이 된 경우: 확정
    if ing_std in ALLERGENS_STD_SET:
        return {**state, "rag_result": {"confidence": 1.0, "found_allergen": ing_std, "method": "alias"}}

    # 그 외에는 RAG 검색
    if not ing_std:
        return {**state, "rag_result": {"confidence": 0.0, "found_allergen": "없음", "method": "none"}}

    qemb = embedding_model.encode([ing_std])
    sims = cosine_similarity(qemb, kb_embeddings)
    idx = int(np.argmax(sims[0])); conf = float(sims[0][idx]); found = kb_categories[idx]
    logging.info(f"[Node4] RAG: '{ing_raw}' → '{found}' (sim={conf:.4f})")
    return {**state, "rag_result": {"confidence": conf, "found_allergen": found, "method": "rag"}}



In [40]:
def llm_fallback(state: AllergyGraphState) -> AllergyGraphState:
    """Node5: Zero-shot NLI"""
    ing_std = state.get('current_ingredient','')
    if not nli_pipeline or not ing_std:
        return {**state, "rag_result": {"confidence": 1.0, "found_allergen": "없음", "method": "none"}}
    labels = list(ALLERGENS_STD_SET) + ["관련 없음"]
    try:
        resp = nli_pipeline(ing_std, labels, hypothesis_template=HYPOTHESIS)
        top_label, top_score = resp['labels'][0], float(resp['scores'][0])
        return {**state, "rag_result": {
            "confidence": top_score,
            "found_allergen": top_label if top_label in ALLERGENS_STD_SET and top_score>=NLI_FALLBACK_THRESHOLD else "없음",
            "method": "nli"
        }}
    except Exception as e:
        logging.warning(f"[Node5] ZS 오류: {e}")
        return {**state, "rag_result": {"confidence": 1.0, "found_allergen": "없음", "method": "none"}}

In [41]:


def update_final_list(state: AllergyGraphState) -> AllergyGraphState:
    """Node6: 결과 취합 + ★성분별 판정표 누적 [CHANGED]"""
    res = state['rag_result']
    found = res.get("found_allergen","없음")
    conf  = float(res.get("confidence",0.0))
    method= res.get("method","none")

    # 6-1) 최종 알레르겐 집합(명시 + 추론) 누적
    cur_set = state['final_allergens']
    if found in ALLERGENS_STD_SET:
        cur_set.add(found)

    # 6-2) ★ 성분별 판정표에 레코드 추가 (원문/표준/방법/점수/알레르겐여부)
    row = {
        "ingredient_raw": state.get("current_raw_ingredient", state.get("current_ingredient","")),
        "ingredient_std": state.get("current_ingredient",""),
        "is_allergen": found in ALLERGENS_STD_SET,
        "allergen": found if found in ALLERGENS_STD_SET else "없음",
        "method": method,
        "confidence": round(conf, 4)
    }
    table = state.get("per_ingredient_results", [])
    table.append(row)

    return {**state, "final_allergens": cur_set, "per_ingredient_results": table}



In [42]:
def finalize_processing(state: AllergyGraphState) -> AllergyGraphState:
    """Node7: 종료 (명시함유 + 추론합집합, COI, per-ingredient 포함)"""
    inferred = set(state.get('final_allergens', set()))
    declared = set(state.get('declared_allergens', []))
    final_list = sorted(list(inferred.union(declared)))  # 합집합

    result = {
        "allergens": final_list,
        "declared_allergens": sorted(list(declared)),
        "coi_phrases": state.get("coi_phrases", []),
        "ingredients": state.get("per_ingredient_results", [])
    }
    final_json = json.dumps(result, ensure_ascii=False)
    logging.info(f"[DONE] {final_json}")
    return {**state, "final_output_json": final_json}



In [43]:
# --- 조건부 엣지 ---
def route_rag_result(state: AllergyGraphState) -> str:
    conf = float(state['rag_result']['confidence'])
    allergen = state['rag_result']['found_allergen']
    if conf >= RAG_CONFIDENCE_THRESHOLD and allergen in ALLERGENS_STD_SET:
        return "rag_success"
    return "needs_llm_fallback"

def check_remaining_ingredients(state: AllergyGraphState) -> str:
    q = state.get("ingredients_to_check", deque())
    return "has_more_ingredients" if q and len(q)>0 else "all_ingredients_done"



In [44]:
# --- 그래프 빌드/컴파일 ---
workflow = StateGraph(AllergyGraphState)
workflow.add_node("call_gcp_vision_api", call_gcp_vision_api)
workflow.add_node("parse_text_from_raw", parse_text_from_raw)
workflow.add_node("prepare_next_ingredient", prepare_next_ingredient)
workflow.add_node("rag_search", rag_search)
workflow.add_node("llm_fallback", llm_fallback)
workflow.add_node("update_final_list", update_final_list)
workflow.add_node("finalize_processing", finalize_processing)

workflow.set_entry_point("call_gcp_vision_api")
workflow.add_edge("call_gcp_vision_api","parse_text_from_raw")
# [REPLACED] parse → prepare를 조건부로 (빈 큐 보호)
workflow.add_conditional_edges(
    "parse_text_from_raw",
    check_remaining_ingredients,
    {"has_more_ingredients":"prepare_next_ingredient","all_ingredients_done":"finalize_processing"}
)
workflow.add_edge("prepare_next_ingredient","rag_search")
workflow.add_conditional_edges(
    "rag_search",
    route_rag_result,
    {"rag_success":"update_final_list","needs_llm_fallback":"llm_fallback"}
)
workflow.add_edge("llm_fallback","update_final_list")
workflow.add_conditional_edges(
    "update_final_list",
    check_remaining_ingredients,
    {"has_more_ingredients":"prepare_next_ingredient","all_ingredients_done":"finalize_processing"}
)
workflow.add_edge("finalize_processing", END)

app = workflow.compile()
logging.info("✅ LangGraph 컴파일 완료")



INFO | ✅ LangGraph 컴파일 완료


In [45]:
# --- 테스트 실행 ---
if __name__ == "__main__":
    my_test_image_file = "data/김광무_121.jpg"  # 필요하면 교체
    test_input = {
        "image_path": my_test_image_file,
        "raw_ocr_text": "",
        "ingredients_to_check": deque(),
        "current_ingredient": "",
        "rag_result": {"confidence": 0.0, "found_allergen": "없음", "method":"none"},
        "final_allergens": set(),
        "final_output_json": "",
        # [ADDED defaults]
        "ingredients_from_section": [],
        "declared_allergens": [],
        "coi_phrases": [],
        "per_ingredient_results": [],
        "current_raw_ingredient": ""
    }
    final_state = app.invoke(test_input, {"recursion_limit": 100})
    print("\n=== 최종 반환 JSON ===")
    print(final_state.get("final_output_json",""))

INFO | [Node1] OCR 호출: data/김광무_121.jpg
INFO | [Node2] 원재료명 블럭을 찾지 못함
INFO | [Node2] '함유' 섹션 5개: ['돼지고기', '대두', '쇠고기', '밀', '우유']
INFO | [DONE] {"allergens": ["대두", "돼지고기", "밀", "쇠고기", "우유"], "declared_allergens": ["대두", "돼지고기", "밀", "쇠고기", "우유"], "coi_phrases": ["개 그대로 넣을 경우 터질 우려가 있으니 을 사용한 제품과 같은 제조시설에서 제조하고 있습니다"], "ingredients": []}



=== 최종 반환 JSON ===
{"allergens": ["대두", "돼지고기", "밀", "쇠고기", "우유"], "declared_allergens": ["대두", "돼지고기", "밀", "쇠고기", "우유"], "coi_phrases": ["개 그대로 넣을 경우 터질 우려가 있으니 을 사용한 제품과 같은 제조시설에서 제조하고 있습니다"], "ingredients": []}
