## 250603 테스트

* 정의된 함수 옮기기
* 정의된 클래스 옮기기
* hcx-005 모델 테스트

In [26]:
import os
import re
import json
import jsonlines
from langchain.schema import Document
from langchain_experimental.text_splitter import SemanticChunker
from langchain_naver.embeddings import ClovaXEmbeddings
from langchain_milvus.vectorstores import Milvus
from uuid import uuid4
from langchain_naver.chat_models import ChatClovaX
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

import pandas as pd
import pytz

from datasets import Dataset
from datetime import timedelta
from operator import itemgetter
from langchain_teddynote.retrievers import KiwiBM25Retriever
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import (
  AttributeInfo,
  StructuredQueryOutputParser,
  get_query_constructor_prompt
)
from langchain_teddynote.evaluator import GroundednessChecker
from langchain.retrievers.self_query.milvus import MilvusTranslator
from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda
import warnings
from langchain_core.runnables import chain

warnings.filterwarnings('ignore')

In [24]:
from dotenv import load_dotenv

load_dotenv()

True

In [3]:
embeddings = ClovaXEmbeddings(
    model='bge-m3'
)

In [None]:
def adjust_time_filter_to_week(time_filter):
    """
    특정 날짜(YYYY-MM-DD)가 주어진 경우, 해당 날짜를 포함하는 주(월~일)의
    첫 번째 날(월요일)과 마지막 날(일요일)로 변환하는 함수.

    :param time_filter: dict, {"start_date": datetime, "end_date": datetime}
    :return: dict, {"start_date": datetime, "end_date": datetime}
    """
    # Extract start_date and end_date from time_filter
    start_date = time_filter.start_date
    end_date = time_filter.end_date

    # Handle the case where start_date or end_date is None
    if start_date is None or end_date is None:
        if start_date is not None and end_date is None:
            start_of_week = start_date - timedelta(days=start_date.weekday())  # 월요일 찾기
            end_of_week = start_of_week + timedelta(days=6)  # 해당 주 일요일 찾기

            return {
                "start_date": start_of_week.replace(hour=0, minute=0, second=0),
                "end_date": end_of_week.replace(hour=23, minute=59, second=59)
            }
        elif end_date is not None and start_date is None:
            start_of_week = end_date - timedelta(days=end_date.weekday())  # 월요일 찾기
            end_of_week = start_of_week + timedelta(days=6)  # 해당 주 일요일 찾기

            return {
                "start_date": start_of_week.replace(hour=0, minute=0, second=0),
                "end_date": end_of_week.replace(hour=23, minute=59, second=59)
            }
        else:
            return None  # or return the time_filter as is if you prefer

    # 날짜가 동일한 경우, 주의 첫 번째 날(월요일)과 마지막 날(일요일)로 변경
    if start_date.year == end_date.year and start_date.month==end_date.month and start_date.day==end_date.day:
        start_of_week = start_date - timedelta(days=start_date.weekday())  # 월요일 찾기
        end_of_week = start_of_week + timedelta(days=6)  # 해당 주 일요일 찾기

        return {
            "start_date": start_of_week.replace(hour=0, minute=0, second=0),
            "end_date": end_of_week.replace(hour=23, minute=59, second=59)
        }

    # 날짜가 다르면 기존 time_filter 유지
    return {
        "start_date": start_date,
        "end_date": end_date
    }

In [5]:
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
import instructor
from pydantic import BaseModel, Field, field_validator
from typing import Literal


class TimeFilter(BaseModel):
    start_date: Optional[datetime] = None
    end_date: Optional[datetime] = None

class SearchQuery(BaseModel):
    query: str
    time_filter: TimeFilter

class Label(BaseModel):
    chunk_id: int = Field(description="The unique identifier of the text chunk")
    chain_of_thought: str = Field(
        description="The reasoning process used to evaluate the relevance"
    )
    relevancy: int = Field(
        description="Relevancy score from 0 to 10, where 10 is most relevant",
        ge=0,
        le=10,
    )

class RerankedResults(BaseModel):
    labels: list[Label] = Field(description="List of labeled and ranked chunks")

    @field_validator("labels")
    @classmethod
    def model_validate(cls, v: list[Label]) -> list[Label]:
        return sorted(v, key=lambda x: x.relevancy, reverse=True)

def rerank_results(query: str, chunks: list[dict]) -> RerankedResults:
    # HCX-005 모델을 사용하는 ChatClovaX 인스턴스 생성
    chat = ChatClovaX(
        base_url="https://clovastudio.stream.ntruss.com/v1/openai",
        model="HCX-005",
    )

    # 프롬프트 구성
    system_prompt = """
You are an expert search result ranker. Your task is to evaluate the relevance of each text chunk to the given query and assign a relevancy score.

For each chunk:
1. Analyze its content in relation to the query.
2. Provide a chain of thought explaining your reasoning.
3. Assign a relevancy score from 0 to 10, where 10 is most relevant.

Be objective and consistent in your evaluations.
"""

    # chunk 정보를 텍스트로 변환
    chunk_text = ""
    for chunk in chunks:
        chunk_text += f'<chunk id="{chunk["id"]}">\n{chunk["text"]}\n</chunk>\n'

    user_prompt = f"""
<query>{query}</query>

<chunks_to_rank>
{chunk_text}
</chunks_to_rank>

Please provide a JSON array of objects with keys: id, score, reasoning.
"""

    # HCX-005에 메시지 전달
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]

    response = chat.invoke(messages)
    # response.content에 결과 JSON이 들어있다고 가정
    import json
    labels = json.loads(response.content)
    return RerankedResults(labels=[Label(**label) for label in labels])

In [None]:
def get_query_date(question):
    today = datetime(2025, 1, 25)
    days_since_last_friday = (today.weekday() - 4) % 7
    last_friday = today - timedelta(days=days_since_last_friday)
    issue_date = last_friday.strftime("%Y-%m-%d")

    # ChatClovaX 인스턴스 생성
    chat = ChatClovaX(
        base_url="https://clovastudio.stream.ntruss.com/v1/openai",
        model="HCX-005",  # 가장 성능 좋은 모델
    )

    system_prompt = f"""
        You are an AI assistant that extracts date ranges from financial queries.
        The current report date is {issue_date}.
        Your task is to extract the relevant date or date range from the user's query
        and format it in YYYY-MM-DD format.
        If no date is specified, answer with None value.
        Return your answer as a JSON object: {{"time_filter": "YYYY-MM-DD to YYYY-MM-DD"}} or {{"time_filter": null}}
    """

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": question}
    ]

    response = chat.invoke(messages)
    import json
    try:
        data = json.loads(response.content)
        time_filter = data.get("time_filter", None)
    except Exception:
        time_filter = None

    parsed_dates = adjust_time_filter_to_week(time_filter)

    if parsed_dates:
        start = parsed_dates['start_date']
        end = parsed_dates['end_date']
    else:
        start = None
        end = None

    if start is None or end is None:
        expr = None
    else:
        expr = f"issue_date >= '{start.strftime('%Y%m%d')}' AND issue_date <= '{end.strftime('%Y%m%d')}'"
    return expr

In [27]:
def convert_to_list(example):
    if isinstance(example["contexts"], list):
        contexts = example["contexts"]
    else:
        try:
            contexts = json.loads(example["contexts"])
        except json.JSONDecodeError as e:
            print(f"JSON Decode Error: {example['contexts']} - {e}")
            contexts = []
    return {"contexts": contexts}

def generate_expr(question: str) -> dict:
    expr = get_query_date(question)
    return {"expr": expr}

def reranking(docs, question, k=15):
    chunks = [{"id": idx, "issue_date": doc.metadata['issue_date'],  "text": doc.page_content} for idx, doc in enumerate(docs)]
    documents_with_metadata = [{"text": doc.page_content, "metadata": doc.metadata} for doc in docs]
    reranked_results = rerank_results(query=question, chunks=chunks)

    chunk_dict = {chunk["id"]: chunk["text"] for chunk in chunks}
    top_k_results = [chunk_dict.get(label.chunk_id, "") for label in reranked_results.labels[:k] if label.chunk_id in chunk_dict]

    reranked_results_with_metadata = []
    for reranked_result in top_k_results:
        page_content = reranked_result

        matching_metadata = None
        for doc in documents_with_metadata:
            if doc["text"] == page_content:
                matching_metadata = doc["metadata"]
                break

        document = Document(
            metadata=matching_metadata,
            page_content=page_content
        )
        reranked_results_with_metadata.append(document)

    context_rerankedNbm25 = reranked_results_with_metadata
    return context_rerankedNbm25

text_prompt = PromptTemplate.from_template(
'''
today is '2025-01-25'. You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
If question has date expressions, context already filtered with the date expression, so ignore about the date and answer without it.
Answer in Korean. Answer in detail.

#Question:
{question}
#Context:
{context}

#Answer:'''
)


In [None]:
question_answer_relevant = GroundednessChecker(
  llm=ChatClovaX(model="HCX-005"), target='question-answer'
).create()

@chain
def kill_table(result):
    if question_answer_relevant.invoke({'question': result['question'], 'answer': result['text']}).score == 'no':
        result['context'] = table_chain.invoke({'question': result['question']})
    else:
        result['context'] = result['text']
    return result

NotImplementedError: 

In [25]:
from langchain_naver import ChatClovaX

chat = ChatClovaX(
    model="HCX-005"
)
chat.invoke("hi!")


AIMessage(content='안녕하세요, 사용자님! 저는 CLOVA X입니다.\n\n궁금하신 내용이나 도움이 필요하신 일이 있으시다면 말씀해 주세요. 제가 알고 있는 지식과 능력으로 최대한 도움을 드리겠습니다.\n\n좋은 하루 보내세요!', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 46, 'prompt_tokens': 7, 'total_tokens': 53, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'HCX-005', 'system_fingerprint': None, 'id': 'ef22c03d65a34a2e81170285a9b41139', 'finish_reason': 'stop', 'logprobs': None}, id='run-9eafd6a8-df90-4e96-9376-340253599f12-0', usage_metadata={'input_tokens': 7, 'output_tokens': 46, 'total_tokens': 53, 'input_token_details': {}, 'output_token_details': {}})

In [None]:
URI = 'http://127.0.0.1:19530'

text_db = Milvus(
    embedding_function=embeddings,
    connection_args = {'uri': URI},
    index_params={'index_type': 'AUTOINDEX', 'metric_type': 'IP'},
    collection_name='text_db_2'
)