In [4]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

import os
from dotenv import load_dotenv

load_dotenv()

print(f"OpenAI API Key: {os.getenv('OPENAI_API_KEY')}")
print(f"Langsmith Tracing: {os.getenv('LANGSMITH_TRACING')}")
print(f"Langsmith API Key: {os.getenv('LANGSMITH_API_KEY')}")
print(f"Langsmith Project: {os.getenv('LANGSMITH_PROJECT')}")

OpenAI API Key: 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
Langsmith Tracing: true
Langsmith API Key: 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
Langsmith Project: Retriever


In [5]:
from langsmith import Client

client = Client()

url = next(client.list_runs(project_name="Retriever")).url # 정상 실행 되면 langsmith 준비 완료

In [11]:
from datetime import datetime
import threading
import os
from langchain.document_loaders import MongodbLoader
from server.db import DB, get_mongo_collection
from server.db import get_mongo_connection_string
from server.logger import logger
from vectorstore import __file__
import nest_asyncio
nest_asyncio.apply()  # Jupyter Notebook에서 asyncio 실행 문제 해결

class Watson:
    prompt_template = PromptTemplate.from_template(
        """
        You are an assistant responding to inquiries from an investigator trying to investigate a drug-selling channel. 
        Below is chat data collected from a Telegram channel where drugs are being sold.
        Based on this chat data, answer questions about the transaction details of the channel.
        If you don't know the answer, just say that you don't know.
        Answer in Korean.
        
        #Question:
        {question}
        
        #Context:
        {context}
        
        #Answer:
        """
    )
    _instances = {}
    _lock = threading.Lock()

    def __new__(cls, channel_id):
        """ 
            싱글톤 객체의 변형 구현.
            입력받은 channel id에 대응하는 watson 챗봇이 없을 경우에 한해,
            고유한 watson 객체를 새로 만들고 반환하는 동시에 _instances에 내부적으로 저장한다.
            
            만약 해당 channel id에 대응하는 챗봇이 이미 생성되었을 경우,
            그 챗봇을 반환한다. 
        """
        if not cls._instances.get(channel_id):
            with cls._lock:
                if not cls._instances.get(channel_id):
                    cls._instances[channel_id] = super(Watson, cls).__new__(cls)

        return cls._instances[channel_id]
    
    def __init__(self, channel_id):
        # 이미 객체가 초기화되어 있을 경우, 객체를 초기화하지 않음.
        with self._lock:
            if hasattr(self, "channel_id"):
                return
            
        self.channel_id = channel_id
        self.embedding = OpenAIEmbeddings() # 임베딩(Embedding) 생성
        self.llm = ChatOpenAI(model_name="gpt-4o", temperature=0)  # 언어모델(LLM) 생성
        
        self.vectorstore, self.chain = None, None
        self._vectorstore_path = os.path.join(os.path.dirname(__file__), str(channel_id)) # watson/vectorstore/<channel id> 위치에 벡터스토어 저장.
        self._update_db()
        
        chat_collection = get_mongo_collection(DB.NAME, DB.COLLECTION.CHANNEL.DATA)       
        chatbot_collection = get_mongo_collection(DB.NAME, DB.COLLECTION.CHATBOT) 
        # 채널 ID를 기준으로 모든 채팅을 찾아서 각 채팅의 채팅 ID를 리스트로 생성
        chat_ids = [doc["id"] for doc in chat_collection.find({"channelId": channel_id})]
        bot_references = chatbot_collection.find_one({"channelId": channel_id}).get("chats")
        """
            다음의 경우에 RAG vectorstore를 재생성.
            1. 로컬에 저장된 벡터스토어가 없을 때
            2. MongoDB로 확인한 바, 현재 챗봇의 근거 데이터가 없을 때.
            3. MongoDB로 확인한 바, 현재 챗봇의 근거 데이터가 되는 채팅 ID 목록이 채널의 채팅 ID 목록과 서로 다를 때
        """
        if not os.path.exists(self._vectorstore_path):
            logger.info("There is no local vectorstore. Create vectorstore.")
            self.build_vectorstore() # MongoDB에서 채팅 데이터를 불러와서 vectorstore 재생성
        elif not bot_references:
            logger.info("There is no references for chatbot. Create vectorstore.")
            self.build_vectorstore()
        elif sorted(chat_ids) != sorted(bot_references):
            logger.info("The reference chats for chatbot is different from chats of the channel. Rebuild Vectorstore.")
            self.build_vectorstore()
        else:
            self.load_vectorstore() # 챗봇이 참조 중인 채팅이 현재 채널의 채팅과 일치하고, 그 벡터스토어가 로컬에 저장되어 있을 때 불러옴.
        
        self.build_chain()
        
    
    def _update_db(self):
        # MongoDB client 생성 및 컬렉션 선택
        chatbot_collection = get_mongo_collection(DB.NAME, DB.COLLECTION.CHATBOT)
        if not chatbot_collection.find_one({"channelId": self.channel_id}):
            chatbot_collection.insert_one({
                "channelId": self.channel_id,
                "updatedAt": datetime.now(),
                "chats": []
            })
    
    def load_vectorstore(self):
        FAISS.load_local(self._vectorstore_path, self.embedding)
    
    def save_vectorstore(self):
        self.vectorstore.save_local(self._vectorstore_path)
    
    def build_vectorstore(self):
        # 단계 1: 문서 로드(Load Documents)
        loader = MongodbLoader(
            connection_string=get_mongo_connection_string(),
            db_name=DB.NAME,
            collection_name=DB.COLLECTION.CHANNEL.DATA,
            filter_criteria={"channelId": self.channel_id}, # 데이터베이스에서 조회할 기준 (쿼리)
            field_names=("text",),
            metadata_names=("id", "timestamp", "channelId", "views", "url"), # 메타데이터로 지정할 필드 목록
        )
        docs = loader.load()
        
        # 단계 2: 문서 분할(Split Documents)
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=50)
        splitted_documents = text_splitter.split_documents(docs)
        
        # 단계 4: DB 생성(Create DB) 및 저장
        # 벡터스토어를 생성하고, 저장한다.
        self.vectorstore = FAISS.from_documents(documents=splitted_documents, embedding=self.embedding)
        self.save_vectorstore()
    
    def build_chain(self):
        # 단계 5: 검색기(Retriever) 생성
        # 문서에 포함되어 있는 정보를 검색하고 생성한다.
        retriever = self.vectorstore.as_retriever()
        
        # 단계 6: 프롬프트 생성(Create Prompt)
        # 프롬프트를 생성한다.
        # prompt = self.prompt_template -> self.promopt_template로 바로 참조하기 때문에 생략됨.

        # 단계 7: 언어모델(LLM) 생성
        # 모델(LLM)을 생성한다.
        # llm = ChatOpenAI(model_name="gpt-4o", temperature=0) -> self.llm으로 바로 참조하기 때문에 생략됨.
        
        # 단계 8: 체인(Chain) 생성
        self.chain = (
            {"context": retriever, "question": RunnablePassthrough()} # 1. [prompt에 들어갈 값](딕셔너리 형태)
            | self.prompt_template # 2. context와 question이 들어갈 [프롬프트]
            | self.llm # 3. 프롬프트가 들어갈 [LLM]
            | StrOutputParser() # 4. LLM이 내놓은 결과를 정리해줄 [Parser]
        ) # 배경지식과 질문 -> 프롬프트 -> LLM -> 결과 전처리 Parser 의 4단계 chain이 생성됨.
    
    # 체인 실행(Run Chain)
    # 문서에 대한 질의를 입력하고, 답변을 출력한다.
    def ask(self, question:str):
        response = self.chain.invoke(question)
        logger.info("챗봇이 질문에 대한 응답을 생성했습니다. ")
        print(response)

In [12]:
test_channel_id = 1890652954
watson = Watson(test_channel_id)

In [13]:
watson.ask("이 채널에서 주로 마약이 판매되는 지역은 어디지?")

이 채널에서 주로 마약이 판매되는 지역은 부산, 창원, 울산, 그리고 광주입니다.


In [14]:
watson.ask("이 채널에서 마약이 어느 정도 가격으로 판매되지?")

이 채널에서 마약은 다양한 가격대로 판매되고 있습니다. 예를 들어, 서울 및 수도권에서는 한 통에 280만 원, 광주 및 천안에서는 일지에 45만 원으로 판매되고 있습니다. 또한, 5지는 55만 원, 10지는 95만 원에 판매되고 있으며, 신규 고객에게는 2~10만 원의 할인이 제공되기도 합니다.


In [9]:
watson.ask("너가 바로 전에 말했던 대답을 다시 말해봐.")

모르겠습니다.
