In [None]:
!pip install chromadb

In [None]:
!pip install -U langchain-community langchain langchain-openai langgraph

In [None]:
import os
os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"

In [None]:
import sqlite3

# データベース作成＆接続
conn = sqlite3.connect("knowledge.db")
c = conn.cursor()

# ナレッジテーブル作成
c.execute('''
CREATE TABLE IF NOT EXISTS knowledge (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    title TEXT,
    content TEXT
)
''')

# 初期ナレッジを挿入
c.executemany('''
INSERT INTO knowledge (title, content) VALUES (?, ?)
''', [
    ("LangChainとは", "LangChainはLLMのチェーン構築フレームワークです。"),
    ("RAGとは", "RAGはRetrieval-Augmented Generationの略で、検索と生成を組み合わせます。")
])
conn.commit()
conn.close()

In [None]:
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.runnables import RunnableLambda
from langchain_community.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.schema import Document
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Literal, Optional
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

In [None]:
class State(TypedDict):
    input: str
    intent: Optional[Literal["rag", "llm"]]
    response: Optional[str]
    quality_ok: Optional[bool]

In [None]:
llm = ChatOpenAI(model="gpt-4o-mini")

In [None]:
# SQLiteからデータ取得
conn = sqlite3.connect("knowledge.db")
c = conn.cursor()
rows = c.execute("SELECT title, content FROM knowledge").fetchall()
conn.close()

# LangChain用ドキュメントに変換
documents = [Document(page_content=f"{title}\n{content}") for title, content in rows]

# 分割してベクトル化
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
docs = text_splitter.split_documents(documents)
embedding = OpenAIEmbeddings()

# ベクトルストア作成
vectorstore = Chroma.from_documents(docs, embedding, persist_directory="./chroma_db")

In [None]:
def is_duplicate_knowledge(new_content: str, threshold: float = 0.85) -> bool:
  # 既存知識取得
  conn = sqlite3.connect("knowledge.db")
  c = conn.cursor()
  rows = c.execute("SELECT title, content FROM knowledge").fetchall()
  conn.close()

  if not rows:
    return False

  # 文章をDocumentにしてベクトル化
  documents = [Document(page_content=f"{title}\n{content}") for title, content in rows]
  splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
  docs = splitter.split_documents(documents)
  docs_contents = [doc.page_content for doc in docs]

  # ベクトル化
  existing_vecs = embedding.embed_documents(docs_contents)
  new_vec = embedding.embed_query(new_content)

  # 類似度計算
  sims = cosine_similarity([new_vec], existing_vecs)[0]
  max_sim = np.max(sims)

  return max_sim >= threshold

In [None]:
# ノード定義
def classify_intent(state):
  query = state["input"]

  # 検索によって知識ベースにどの程度マッチするか確認
  retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
  retrieved_docs = retriever.invoke(query)

  # 類似度スコア付きで取得できるならスコアで判断（今回は取得できない想定なのでテキストで判断）
  if retrieved_docs and len(retrieved_docs) > 0:
    # 簡易的に文字長で「それなりの知識がある」とみなす
    if len(retrieved_docs[0].page_content) > 30:
      state["intent"] = "rag"
    else:
      state["intent"] = "llm"
  else:
    state["intent"] = "llm"

  return state

def rag_node(state):
  query = state["input"]
  retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
  retrieved_docs = retriever.invoke(query)

  # もっとも関連性の高い文章を返却
  retrieved_text = (
    retrieved_docs[0].page_content if retrieved_docs else "該当する情報が見つかりませんでした。"
  )
  return {"response": f"[RAG] {retrieved_text}"}

def llm_node(state):
    user_input = state.get("input", "")
    messages = [HumanMessage(content=user_input)]
    response = llm.invoke(messages)
    answer = response.content

    # 重複チェック
    if is_duplicate_knowledge(answer):
        response_text = f"（※この内容は既にナレッジとして登録されています）\n{answer}"
    else:
        # DB保存
        conn = sqlite3.connect("knowledge.db")
        c = conn.cursor()
        c.execute("INSERT INTO knowledge (title, content) VALUES (?, ?)", (user_input, answer))
        conn.commit()
        conn.close()

        # Chromaへの追加
        new_doc = Document(page_content=f"{user_input}\n{answer}")
        vectorstore.add_documents([new_doc])

        response_text = answer

    return {"response": response_text}

def evaluate_answer(state):
  resp = state["response"]
  if " (" in resp:
    # 品質低いと見なす
    state["quality_ok"] = False
  else:
    state["quality_ok"] = True
  return state

def loop_or_end(state):
  return "repeat" if not state["quality_ok"] else "end"


In [None]:
# --- グラフ構築 ---
workflow = StateGraph(State)
workflow.set_entry_point("classify")
workflow.add_node("classify", RunnableLambda(classify_intent))
workflow.add_node("rag", RunnableLambda(rag_node))
workflow.add_node("llm", RunnableLambda(llm_node))
workflow.add_node("evaluate", RunnableLambda(evaluate_answer))

# 分岐と遷移
workflow.add_conditional_edges("classify", lambda s: s["intent"], {
    "rag": "rag",
    "llm": "llm"
})
workflow.add_edge("rag", "evaluate")
workflow.add_edge("llm", "evaluate")
workflow.add_conditional_edges("evaluate", loop_or_end, {
    "classify": "classify",
    "end": END
})

app = workflow.compile()

In [None]:
input_state = {"input": "RAGについて説明して。"}
for s in app.stream(input_state):
  print(s)

In [None]:
# 必要なパッケージをインストール
!apt-get install -y graphviz libgraphviz-dev pkg-config
!pip install pygraphviz

In [None]:
from IPython.display import Image

# グラフの描画
Image(app.get_graph().draw_png())