In [65]:
import pdfplumber, re, redis, numpy as np, json, requests
from sentence_transformers import SentenceTransformer
from redis.commands.search.field import TextField, VectorField
from redis.commands.search.index_definition import IndexDefinition, IndexType
from redis.commands.search.query import Query
from collections import deque

In [66]:
# ============================================================
# 1️⃣ Redis & 모델 초기화
# ============================================================
r = redis.Redis(host="localhost", port=6379, decode_responses=False)
model = SentenceTransformer("jhgan/ko-sroberta-multitask")

CACHE_INDEX = "cache_index"   # 캐시 인덱스
RAG_INDEX = "qa2_index"        # RAG 인덱스
user_cache = deque(maxlen=5)  # Dynamic Cache (최근 5개)

In [67]:
# ------------------------------------------------------------
# 2️⃣ 임베딩 함수 (정규화 비활성화: Redis COSINE과 호환)
# ------------------------------------------------------------
def embed(text: str):
    # 🚀 중요: normalize_embeddings=False → Redis COSINE 계산 정확도 향상
    emb = model.encode(text, normalize_embeddings=False)
    return np.array(emb, dtype=np.float32).tobytes()

In [68]:
# ------------------------------------------------------------
# 3️⃣ 캐시 인덱스 초기화 (기존 인덱스 삭제 후 재생성)
# ------------------------------------------------------------
def init_cache_index(force_recreate=True):
    if force_recreate:
        try:
            r.ft(CACHE_INDEX).dropindex(delete_documents=True)
            print("🗑️ 기존 cache_index 삭제 완료")
        except Exception:
            pass

    try:
        r.ft(CACHE_INDEX).info()
        print("ℹ️ cache_index 이미 존재 (재사용)")
    except:
        dim = len(model.encode("차원 확인", normalize_embeddings=False))
        r.ft(CACHE_INDEX).create_index(
            fields=[
                VectorField("embedding", "FLAT", {
                    "TYPE": "FLOAT32",
                    "DIM": dim,
                    "DISTANCE_METRIC": "COSINE"   # ✅ COSINE + 비정규화 벡터 조합
                }),
                TextField("text"),
                TextField("source")
            ],
            definition=IndexDefinition(prefix=["cache:"], index_type=IndexType.HASH)
        )
        print("✅ cache_index 인덱스 생성 완료")

init_cache_index()

🗑️ 기존 cache_index 삭제 완료
✅ cache_index 인덱스 생성 완료


In [None]:
# ------------------------------------------------------------
# 4️⃣ PDF Q–A 파싱 (본문 + 표 포함)
# ------------------------------------------------------------
PDF_PATH = "/Users/yoodongseok/Desktop/rag_project/data/2024 관세행정 민원상담 사례집.pdf"

def extract_qa_pairs(pdf_path):
    qa_pairs = []
    with pdfplumber.open(pdf_path) as pdf:
        for page_idx, page in enumerate(pdf.pages):
            text = page.extract_text() or ""
            lines = text.split("\n") if text else []

            # 표 추출
            tables = page.extract_tables()
            table_texts = []
            for table in tables:
                rows = [" | ".join([cell if cell else "" for cell in row]) for row in table]
                table_texts.append("\n".join(rows))
            table_text_block = "\n\n[표 데이터]\n" + "\n\n".join(table_texts) if table_texts else ""

            merged_text = text + table_text_block

            current_q, current_a = None, []
            for line in merged_text.split("\n"):
                line = line.strip()
                if re.match(r"^(\?|관세법|.*\?)", line):
                    if current_q and current_a:
                        qa_pairs.append({
                            "question": current_q,
                            "answer": "\n".join(current_a).strip()
                        })
                    current_q = line
                    current_a = []
                elif current_q:
                    current_a.append(line)

            if current_q and current_a:
                qa_pairs.append({
                    "question": current_q,
                    "answer": "\n".join(current_a).strip()
                })

    return qa_pairs

In [70]:
# ------------------------------------------------------------
# 5️⃣ Pre-Cache (PDF → Redis 저장)
# ------------------------------------------------------------
def pre_cache_pdf(pdf_path):
    qa_list = extract_qa_pairs(pdf_path)
    print(f"📘 PDF에서 {len(qa_list)}개의 QA 추출 완료")

    for i, qa in enumerate(qa_list):
        key = f"cache:pdf:{i}"
        r.hset(key, mapping={
            "embedding": embed(qa["question"]),
            "text": qa["answer"],
            "source": "pdf_pre_cache"
        })
    print(f"💾 Redis에 {len(qa_list)}개 Pre-Cache 저장 완료")

pre_cache_pdf(PDF_PATH)


📘 PDF에서 1000개의 QA 추출 완료
💾 Redis에 1000개 Pre-Cache 저장 완료


In [71]:
# ------------------------------------------------------------
# 6️⃣ 캐시 검색 (CAG)
# ------------------------------------------------------------
def check_cache(user_query, k=3, threshold=0.60):
    q_emb = embed(user_query)
    q = Query(f"*=>[KNN {k} @embedding $vec AS score]") \
        .return_fields("text", "source", "score") \
        .sort_by("score") \
        .dialect(2)

    try:
        res = r.ft(CACHE_INDEX).search(q, query_params={"vec": q_emb})
    except Exception as e:
        print("❌ 캐시 검색 오류:", e)
        return None

    if not res.docs:
        print("❌ 캐시에서 유사 문서 없음")
        return None

    # 유사도 계산
    sim = 1 - float(res.docs[0].score)
    print(f"📊 유사도 점수: {sim:.2f}")
    if sim >= threshold:
        print(f"⚡ 캐시 HIT (유사도 {sim:.2f}) [source={res.docs[0].source}]")
        return res.docs[0].text
    else:
        print(f"❌ 캐시 MISS (유사도 {sim:.2f} < {threshold})")
    return None

In [72]:
# ------------------------------------------------------------
# 7️⃣ Dynamic Cache 저장 (최근 5개 유지)
# ------------------------------------------------------------
def save_dynamic_cache(query, answer):
    key = f"cache:dyn:{abs(hash(query)) % (10**8)}"
    r.hset(key, mapping={
        "embedding": embed(query),
        "text": answer,
        "source": "dynamic_cache"
    })
    user_cache.append(key)
    if len(user_cache) > user_cache.maxlen:
        oldest = user_cache.popleft()
        r.delete(oldest)
        print(f"🗑️ 오래된 캐시 삭제: {oldest}")
    print(f"💾 Dynamic Cache 저장: {query[:30]}...")

In [73]:
# ------------------------------------------------------------
# 8️⃣ RAG 검색 (qa_index에서 검색)
# ------------------------------------------------------------
def search_rag_context(query, k=3):
    q_emb = embed(query)
    q = Query(f"*=>[KNN {k} @embedding $vec AS score]") \
        .return_fields("answer", "score") \
        .sort_by("score") \
        .dialect(2)
    try:
        res = r.ft(RAG_INDEX).search(q, query_params={"vec": q_emb})
        return [doc.answer for doc in res.docs]
    except Exception as e:
        print("❌ RAG 검색 오류:", e)
        return []

In [74]:
# ------------------------------------------------------------
# 9️⃣ Llama3.2 (Ollama) 호출
# ------------------------------------------------------------
def call_llm_with_context(query, contexts):
    context_text = "\n\n".join(contexts)
    prompt = f"""
너는 관세행정 민원상담 전문 챗봇이야.
아래 문서를 참고해 질문에 정확히 답변해줘.

[문서 내용]
{context_text}

[질문]
{query}
"""
    url = "http://localhost:11434/api/generate"
    payload = {"model": "llama3.2:3b", "prompt": prompt}

    response_text = ""
    with requests.post(url, json=payload, stream=True) as r_:
        for line in r_.iter_lines():
            if line:
                data = json.loads(line.decode("utf-8"))
                if "response" in data:
                    response_text += data["response"]
                if data.get("done", False):
                    break
    return response_text.strip()

In [75]:
# ------------------------------------------------------------
# 🔟 전체 파이프라인 (CAG → RAG → LLM → Dynamic Cache)
# ------------------------------------------------------------
def ask(query):
    print(f"\n👤 사용자 질문: {query}")

    cached = check_cache(query)
    if cached:
        return f"(캐시 응답)\n{cached}"

    print("💡 캐시 MISS → RAG 검색 중...")
    contexts = search_rag_context(query)
    if not contexts:
        return "❌ 관련 문서를 찾지 못했습니다."

    print("🤖 Llama3.2 응답 생성 중...")
    answer = call_llm_with_context(query, contexts)
    save_dynamic_cache(query, answer)

    return f"(새로 생성된 응답)\n{answer}"

In [76]:
# ------------------------------------------------------------
# 🔍 테스트 (유사도 점검용)
# ------------------------------------------------------------
print("\n🔍 유사도 테스트")
for q in [
    "관세법 제97조 재수출면세 대상 및 재수출 시기",
    "재수출면세 대상 알려줘",
    "면세 한도 금액은 얼마야?"
]:
    print(f"\n질문: {q}")
    cached = check_cache(q)
    print("응답:", cached[:100] if cached else "MISS")


🔍 유사도 테스트

질문: 관세법 제97조 재수출면세 대상 및 재수출 시기
📊 유사도 점수: 1.00
⚡ 캐시 HIT (유사도 1.00) [source=pdf_pre_cache]
응답: [재수출면세 대상]

질문: 재수출면세 대상 알려줘
📊 유사도 점수: 0.65
⚡ 캐시 HIT (유사도 0.65) [source=pdf_pre_cache]
응답: [재수출면세 대상]

질문: 면세 한도 금액은 얼마야?
📊 유사도 점수: 0.82
⚡ 캐시 HIT (유사도 0.82) [source=pdf_pre_cache]
응답: 여행자가 해외(국내·외 면세점 포함)에서 취득(구입·선물 포함)한 물품을
국내로 휴대하여 반입할 때 해외에서 취득한 물품 가격의 총합이 미화
800달러 이하인 경우 관세가 면제됩니


In [83]:
print("🔍 유사도 테스트")
for q in [
    "관세법 제97조 재수출면세 대상 및 재수출 시기",
    "재수출면세 대상 알려줘",
    "면세 한도 금액은 얼마야?"
]:
    print(f"\n질문: {q}")
    cached = check_cache(q)
    print("응답:", cached[:100] if cached else "MISS")


🔍 유사도 테스트

질문: 관세법 제97조 재수출면세 대상 및 재수출 시기
📊 유사도 점수: 1.00
⚡ 캐시 HIT (유사도 1.00) [source=pdf_pre_cache]
응답: [재수출면세 대상]

질문: 재수출면세 대상 알려줘
📊 유사도 점수: 0.65
⚡ 캐시 HIT (유사도 0.65) [source=pdf_pre_cache]
응답: [재수출면세 대상]

질문: 면세 한도 금액은 얼마야?
📊 유사도 점수: 0.82
⚡ 캐시 HIT (유사도 0.82) [source=pdf_pre_cache]
응답: 여행자가 해외(국내·외 면세점 포함)에서 취득(구입·선물 포함)한 물품을
국내로 휴대하여 반입할 때 해외에서 취득한 물품 가격의 총합이 미화
800달러 이하인 경우 관세가 면제됩니


In [84]:
def debug_cache_search(user_query, k=3):
    q_emb = embed(user_query)
    q = Query(f"*=>[KNN {k} @embedding $vec AS score]") \
        .return_fields("text", "source", "score") \
        .sort_by("score") \
        .dialect(2)

    res = r.ft(CACHE_INDEX).search(q, query_params={"vec": q_emb})
    print(f"\n🔍 '{user_query}' 유사도 검사 결과:")
    for i, doc in enumerate(res.docs):
        sim = 1 - float(doc.score)
        preview = doc.text[:80].replace("\n", " ")
        print(f" {i+1}. 유사도={sim:.2f} | source={doc.source} | 내용: {preview}...")

# 실행 예시
debug_cache_search("재수출면세 대상 알려줘")



🔍 '재수출면세 대상 알려줘' 유사도 검사 결과:
 1. 유사도=0.65 | source=pdf_pre_cache | 내용: [재수출면세 대상]...
 2. 유사도=0.64 | source=pdf_pre_cache | 내용: 수입통관에 곤란한 사유가 없는 물품으로서 아래의 어느 하나에 해당하는 경우에는 납부하여야 할 관세 등에 상당하는 담보를 제공하고 세관장의 승인을...
 3. 유사도=0.62 | source=pdf_pre_cache | 내용: 사...


In [85]:
query = "면세 물품을 재수출할 때 세관 신고 절차는?"
print(ask(query))



👤 사용자 질문: 면세 물품을 재수출할 때 세관 신고 절차는?
📊 유사도 점수: 0.77
⚡ 캐시 HIT (유사도 0.77) [source=pdf_pre_cache]
(캐시 응답)
다음에 해당하는 물품을 소지한 여행자와 승무원은 세관에 자진 신고하여야
합니다.
1. 해외에서 취득한 물품(선물 등 무상물품 및 국내 면세점에서 취득 후
재반입하는 물품을 포함)으로서 전체 취득가격 합계액이 미화 800달러를
초과하는 물품 및 1명당 면세기준을 초과하는 주류, 담배, 향수
2. 상용물품과 수리용품, 견본품 등 회사용품
3. 총포·도검·화약류·분사기·전자충격기·석궁(부분품, 모의 또는 장식용을 포
함한다), 유독성 또는 방사성 물질류 및 감청설비
4. 앵속·아편·코카잎 등 마약류, 향정신성 의약품류, 대마류 및 이들의 제품,
오·남용 우려가 있는 의약품류
5. 국헌·공안·풍속을 저해하는 서적·사진·비디오테이프·필름·LD·CD·CD-ROM
등의 물품
6. 정부의 기밀을 누설하거나 첩보에 사용되는 물품
7. 위조·변조·모조의 화폐·지폐·은행권·채권 및 그 밖의 유가증권
8. 동물(고기·가죽·털을 포함한다), 식물, 과일, 채소류, 살아있는 수산생물,
농림축수산물(가공품을 포함한다), 그 밖의 식품류
9. 「멸종위기에 처한 야생 동·식물종의 국제 거래에 관한 협약(CITES)」에서
보호하는 살아있는 야생 동·식물 및 이들을 사용하여 만든 제품·가공품
10. 상표권 등 지식재산권 침해물품
11. 일시 출국하는 여행자 및 승무원이 출국 시 휴대반출신고하여 반출했다가
재반입하는 물품
관련법령
● 관세법 제241조, 「여행자및승무원휴대품통관에관한고시」 제5조
PART 1 통 관|195

[표 데이터]

통
관


In [80]:
query = "면세 담배 구입 기준이 어떻게 돼?"
print(ask(query))



👤 사용자 질문: 면세 담배 구입 기준이 어떻게 돼?
📊 유사도 점수: 0.79
⚡ 캐시 HIT (유사도 0.79) [source=pdf_pre_cache]
(캐시 응답)
우리나라 거주자가 해외에서 발송되는 술, 담배를 국내로 반입하는 경우
세관장이 자가사용 물품으로 인정하여 면세를 적용하는 기준은 아래와
같습니다
품목 물품가격 면세수량 비고
주류 1병 (1L 이하) 주세, 교육세는 과세
궐련 200개비
엽궐련 50개비
미화 150불 개별소비세,
니코틴용액 20ml
담배 이하 담배소비세,
전자담배 궐련형 200개비
지방교육세는 과세
기타유형 110g
기타담배 250g
- 상기의 면세통관범위를 충족하지 못하는 경우에는 물품가격 전체에 대하여
세금이 부과*됩니다.
* 주류 : 관세, 주세, 교육세, 부가가치세 등
담배 : 관세, 개별소비세, 담배소비세, 지방교육세, 부가가치세 등
자가사용이 아닌 판매용인 경우 등 개별 법령에 따른 허가·승인 등의
요건을 구비*하여야 하는 경우에는 해당 요건을 구비하였음을 증명하는
서류를 제출해야 수입이 가능합니다.
* 주류 : 수입식품안전관리특별법에 따른 지방식품의약품안전청장의 ‘수입식품등의
수입신고 확인증’ \
전자담배용 니코틴 용액 : 니코틴을 1% 이상 함유한 혼합물인 경우 지방(유역)환
경청장의 ‘유독물질 수입신고증’
관련법령
● 관세법 제94조제4호 및 관세법 시행규칙 제2항제1호
● 수입통관 사무처리에 관한 고시 제67조 및 별표 11
PART 1 통 관|023

[표 데이터]

통
관

품목 |  | 물품가격 | 면세수량 | 비고
|  | 미화 150불
이하 | 1병 (1L 이하) |
| 궐련 |  | 200개비 |
| 엽궐련 |  | 50개비 |
| 전자담배 |  | 니코틴용액 20ml
궐련형 200개비
기타유형 110g |
| 기타담배 |  | 250g |


In [81]:
query = "강아지 사료 영양성분은 어떻게 구성되나요?"
print(ask(query))



👤 사용자 질문: 강아지 사료 영양성분은 어떻게 구성되나요?
📊 유사도 점수: 0.67
⚡ 캐시 HIT (유사도 0.67) [source=pdf_pre_cache]
(캐시 응답)
관세율표 제1214호에는 스위드(swede)ㆍ맹골드(mangold)ㆍ사료용 뿌리
채소류(根菜類)ㆍ건초ㆍ루우산(lucerne)(알팔파)ㆍ클로버(clover)ㆍ샌포인
(sainfoin)ㆍ사료용 케일(kale)ㆍ루핀(lupine)ㆍ베치(vetch)와 이와 유사한
사료용 식물이 분류되며,
- 같은 호 해설서에 스위드(rutabaga : Brassica napobrassica)ㆍ맹골드
․
(mangold)사료용 순무(turnips)ㆍ사료용 당근(백색이나 담황색)과 그 밖의
사료용 뿌리채소류 이들 뿌리채소류는 식용에 적합한 것도 이 호에 분류
된다고 규정하고 있습니다.
- 이 호의 ‘유사한 사료용 식물’에는 동물사료용으로 재배된 식물에 한정하여
적용하며, 또한 동물의 사료로 사용되는 것이라도 사료용으로 재배되지
아니한 식물성 생산품(예: 비트(beet)ㆍ당근의 상부와 옥수수의 잎은
제2308호에 분류)은 제외됩니다.
관세율표 제2308호에는 사료용 식물성 물질ㆍ식물성 웨이스트ㆍ식물성 박
(residue)류와 부산물이 분류되며, 동물의 사료용으로 사용되는 것이라도
이 호에 분류됩니다.
1) 도토리, 마로니에 열매, 면실피
2) 낱알을 제거한 후의 옥수수 속대 ; 옥수수 줄기와 잎
3) 사탕무나 당근의 꼭지
4) 채소의 껍질(완두와 콩의 꼬투리 등)
5) 과실 웨이스트(waste)(사과ㆍ배 등의 껍질과 속 등)와 과실의 찌꺼기
(pomace or marc)
관련법령
● 관세법 제86조 제1항(특정물품에 적용될 품목분류의 사전심사)
494|관세행정 민원상담 사례집


In [86]:
query="재수출 면제 대상에 대해 알려줄 수 있어?"
print(ask(query))


👤 사용자 질문: 재수출 면제 대상에 대해 알려줄 수 있어?
📊 유사도 점수: 0.70
⚡ 캐시 HIT (유사도 0.70) [source=pdf_pre_cache]
(캐시 응답)
재수출 조건부 일시반입 제도는 우리나라를 일시 입국하는 여행자가
사용할 물품에 대해 재수출하는 조건으로 세금을 면제해주는 ‘조건부
면세제도’입니다.
대상자 및 허용범위는 외국거주자 중 우리나라를 일시방문하는 자이거나
우리나라 국적 소유자 중 교포, 유학생, 해외근무자 등 1년이상 외국에
거주하는 자의 신변용품, 신변장식용품, 직업용품으로 한정(회사용물품은
불가)하며, 반드시 재수출할 물품에 한해 엄격히 적용하고 있습니다.
재수출면세기간은 1년 이내 최초 출국일까지이며, 1년의 범위 내에서
연장이 가능하며, 반출기한 연장은 횟수에 제한은 없으나 전체 연장기간은
1년을 초과할 수 없어 입국시부터 최초 출국시까지 최장 2년간 면세가
가능합니다.
여행자는 재수출 조건부 일시반입 허용일 이후 최초 출국하는 날에 일시반입한
물품과 재수출조건 일시반입물품 확인서를 가지고 출국장에서 세관 직원에게
확인을 받고 반출하여야 합니다.
타인이 아닌 면세받은 여행자가 출국 시 반출확인을 직접 받아야 하며,
기한 내 재반출을 이행하지 않은 경우 최초 조건부 면세받은 세액 전액과
미반출에 대한 가산세 20%를 부과하여 추징합니다.
관련법령
● 관세법 제97조제1항제1호
● 관세법 시행령 제115조제1항
● 여행자 및 승무원 휴대품통관에 관한 고시 제60조~제63조
216|관세행정 민원상담 사례집


In [87]:
query = "면세 주류 구입 기준이 어떻게 돼?"
print(ask(query))



👤 사용자 질문: 면세 주류 구입 기준이 어떻게 돼?
📊 유사도 점수: 0.87
⚡ 캐시 HIT (유사도 0.87) [source=pdf_pre_cache]
(캐시 응답)
여행자가 반입하는 술의 면세범위는 여행자 1명당 합계 용량이 2ℓ이하
로서 총 가격이 미화 400달러 이하의 것 2병만 면세가 가능합니다.
여행자가 입국 시 반입하는 술이 면세범위를 초과하는 경우 세관에
신고하고 세금을 납부하면 통관이 가능합니다.
- 단, 만 19세 미만인 사람(만 19세가 되는 해의 1월 1일을 맞이한 사람은
제외)이 반입하는 술은 면세대상에 포함되지 않습니다.
\
관련법령
● 관세법 시행규칙 제48조
204|관세행정 민원상담 사례집
