In [1]:
from dotenv import load_dotenv
from langchain_core.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
import os

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import transformers
import torch
import tiktoken
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from transformers import AutoTokenizer, AutoModelForCausalLM
from langchain_core.prompt_values import ChatPromptValue
from langchain_core.runnables import RunnablePassthrough
from langchain_huggingface import HuggingFaceEndpoint
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chains import RetrievalQA
from dotenv import load_dotenv
from openai import OpenAI

from openpyxl import load_workbook
from typing import Dict, List, Optional
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader

from openpyxl import load_workbook
from typing import Dict, List, Optional
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
load_dotenv()
api_key = os.environ.get('Openai_key')

사용할 데이터 불러오기

In [3]:
class XLSXLoader(BaseLoader):
    """Loads an XLSX file into a list of documents from specified sheets.

    Each document represents one row of the XLSX file. Every row is converted into a
    key/value pair and outputted to a new line in the document's page_content, including
    the sheet name.

    The source for each document loaded from xlsx is set to the value of the
    'file_path' argument for all documents by default.
    You can override this by setting the 'source_column' argument to the
    name of a column in the XLSX file.
    The source of each document will then be set to the value of the column
    with the name specified in 'source_column'.

    Output Example:
        .. code-block:: txt

            sheet: SheetName
            column1: value1
            column2: None
            column3: value3
    """

    def __init__(
            self,
            file_path: str,
            source_column: Optional[str] = None,
            encoding: Optional[str] = None,
            exclude_columns: Optional[List[str]] = None,  # 열을 제외할 목록
            target_sheets: Optional[List[str]] = None  # 불러올 시트 목록
    ):
        self.file_path = file_path
        self.source_column = source_column
        self.encoding = encoding
        self.exclude_columns = exclude_columns if exclude_columns else []
        self.target_sheets = target_sheets if target_sheets else []

    def load(self) -> List[Document]:
        docs = []

        wb = load_workbook(filename=self.file_path, read_only=True, data_only=True)

        # 특정 시트를 처리
        sheets_to_process = self.target_sheets if self.target_sheets else wb.sheetnames[1:]

        for sheet_name in sheets_to_process:
            if sheet_name in wb.sheetnames:
                ws = wb[sheet_name]
                headers = [cell.value for cell in ws[1]]
                headers = [header if header is not None else '' for header in headers]  # None을 빈 문자열로 변환

                # 이전 행의 값을 저장하기 위한 리스트
                previous_row = [None] * len(headers)

                for i, row in enumerate(ws.iter_rows(min_row=2)):
                    row_values = [cell.value for cell in row]

                    # None 값을 이전 행의 값으로 대체, 제외할 열을 처리
                    for j, (value, header) in enumerate(zip(row_values, headers)):
                        if value is None and header not in self.exclude_columns:
                            row_values[j] = previous_row[j]

                        # '단종 여부' 열 처리
                        if header == '단종 여부' and value is None:
                            row_values[j] = '해당없음'

                        # '구분' 열 처리
                        if header == '구분':
                            if value is None:
                                row_values[j] = previous_row[j]
                            if 'ODM' in (row_values[j] or ''):
                                row_values[j] = 'ODM'
                            else:
                                row_values[j] = 'OEM'

                    row_dict = dict(zip(headers, row_values))

                    # 시트 이름과 None 값을 처리하여 'None' 문자열로 변환
                    content_lines = [f"sheet: {sheet_name}"]  # 시트 이름 추가
                    content_lines.extend(
                        f"{k.strip()}: {str(v) if v is not None else 'None'}"
                        for k, v in row_dict.items()
                        if k.strip() != ''  # 빈 헤더 무시
                    )
                    content = "\n".join(content_lines)

                    if self.source_column is not None:
                        source = row_dict.get(self.source_column, 'None')  # source_column이 없는 경우 'None' 사용
                    else:
                        source = self.file_path

                    metadata = {"source": source, "sheet": sheet_name}
                    doc = Document(page_content=content, metadata=metadata)
                    docs.append(doc)

                    # 현재 행의 값을 이전 행으로 저장
                    previous_row = row_values

        return docs

In [4]:
# 파일 불러오기
loader = XLSXLoader(
    file_path="C:/Users/SUNJIN/Documents/인턴/03_ideachatbot/data/년도별 신제품 리스트_냉장-240425.xlsx",
    source_column="제품명",
    exclude_columns=["단종 여부"],  # 제외할 열 지정
    target_sheets=["2023"]  # 불러올 시트 지정
)

documents = loader.load()

In [5]:
# 토크나이징
tokenizer = tiktoken.get_encoding("cl100k_base")

def tiktoken_len(text):
    tokens = tokenizer.encode(text)
    return len(tokens)

In [6]:
# chunking
text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10, length_function=tiktoken_len)
texts = text_splitter.split_documents(documents)

In [7]:
# 벡터화에 사용할 모델
model_name ="jhgan/ko-sbert-nli"
# model_kwargs = {'device': 'cuda'} # gpu 사용
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': True}
hf = HuggingFaceEmbeddings(
    model_name=model_name,
    model_kwargs=model_kwargs,
    encode_kwargs=encode_kwargs
)

  warn_deprecated(


모델

In [8]:
# DB에 넣고 리트리버 구성
docsearch = Chroma.from_documents(texts, hf)
retriever = docsearch.as_retriever()

In [21]:
openai = ChatOpenAI(model_name="gpt-3.5-turbo",
                    streaming=True, callbacks=[StreamingStdOutCallbackHandler()],
                    temperature = 1,
                    max_tokens = 300,
                    api_key=api_key)

In [10]:
contextualize_q_system_prompt = (
      "당신은 회사 제품을 잘 알고 있는 마케딩 디렉터입니다."
      "답변은 사용자가 질문한 언어와 같은 언어를 사용하세요."
      "다음 질문에 대해 주어진 문맥을 사용하여 상세하고 정확한 답변을 제공해주세요. "
      "관련 문맥 부분을 참조하여 답변을 작성하세요.\n\n"
)
contextualize_q_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", contextualize_q_system_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ]
)
history_aware_retriever = create_history_aware_retriever(
    openai, retriever, contextualize_q_prompt
)

In [11]:
system_prompt = (
    """
    You MUST Answer in Korean.
    당신은 회사 제품을 잘 알고 있는 신제품 개발 팀장입니다.
    새로운 제품을 출시하기 위해 과거 신제품에 대한 정보를 모두 알고 있으며 팀원들이 특정 제품에 대해 물어볼 때 모두 대답할 수 있어야합니다.
    물어본 특성에 해당하는 제품은 하나도 빠짐없이 대답해줘야합니다.

    {context}"""
)
qa_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ]
)
question_answer_chain = create_stuff_documents_chain(openai, qa_prompt)

rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)

In [12]:
store = {}


def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]


conversational_rag_chain = RunnableWithMessageHistory(
    rag_chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="chat_history",
    output_messages_key="answer",
)

In [23]:
# 시도
documents_rag_chain = RetrievalQA.from_chain_type(
    retriever=retriever,
    llm = openai,
    return_source_documents=True
)

In [17]:
user_query = "2023년도에 나온 제품들 중 단종된 제품 이름을 전부 말해줘"

In [24]:
qa = RetrievalQA.from_chain_type(llm = llm,
                                 chain_type= "stuff",
                                 retriever = docsearch.as_retriever(
                                     search_type="mmr",
                                     search_kwargs={'k':1, 'fetch_k': 5}),
                                     return_source_documents=True)

In [None]:
result = qa(user_query)
result

In [None]:
conversational_rag_chain.invoke(
    {"input": "2023년도에 나온 제품들 중 단종된 제품 이름을 전부 말해줘"},
    config={"configurable": {"session_id": "abc123"}},
    return_source_documents=True,
)["answer"]