In [1]:
from dotenv import load_dotenv
load_dotenv(verbose=True)
"""
import os
os.environ['OPENAI_API_KEY'] = 'your-api-key'
"""

"\nimport os\nos.environ['OPENAI_API_KEY'] = 'your-api-key'\n"

In [2]:
import operator
from typing import TypedDict, Annotated, Sequence

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import BaseMessage,HumanMessage,AIMessage
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.output_parsers import StrOutputParser
from langchain_community.document_loaders import WikipediaLoader

from langgraph.graph import StateGraph, END

In [3]:
# モデルのロード
model = ChatOpenAI(model_name='gpt-4-turbo')
# model = ChatOpenAI(model_name='gpt-3.5-turbo')

In [4]:
class AgentState(TypedDict):
    user_question : str #ユーザーの質問
    messages: Annotated[Sequence[BaseMessage], operator.add] #会話履歴
    knowledge_base : Annotated[str, operator.add] #知識ベース(検索した情報を追記していく)
    next_task_search_keyword : str #次に検索するキーワード
    next_task_search_content : str #次に検索して調べる内容
    answer_counter : Annotated[int, operator.add] #最終回答を試みた回数

In [5]:
def call_init_agentstate(state):
    state["user_question"] = state['messages'][-1].content
    state["messages"] = []
    state["knowledge_base"] = ""
    state["answer_counter"] = 0
    return state

In [6]:
from langchain.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field, validator

class SearchTask(BaseModel):
    search_reason: str = Field(description="検索する理由や目的。")
    search_keyword: str = Field(description="Wikipediaでの検索する単語。必ずひとつの単語である必要がある。")
    search_content : str = Field(description="検索したWikipediaのページで取得したい情報。")

gen_task_template ="""
以下の質問に回答するため、不足している情報をwikipediaから検索します。
検索する情報は最小の一つだけです。検索するための単語を記載してください。
検索する項目やその理由を記載してください。
検索する項目は一つだけです。
複数の質問がなされていても、一つの質問に対してのみ回答を生成してください。

{format_instructions}

-----------------
質問例:映画オッペンハイマーの監督が作成した映画の本数を調べてください。
search_reason="情報が不足しているため。とりあえずまず"映画オッペンハイマー"のwikipediaを検索して、監督の名前を調べる必要があります。"
search_keyword="オッペンハイマー"
search_content="映画オッペンハイマーの監督の名前"
-----------------
質問例:中性子と原子核が発見された年に起こった出来事を調べてください。
search_reason="情報が不足しているため、とりあえずまず"中性子"のwikipediaを検索して、とりあえず発見された年を調べる必要があります。"
search_keyword="中性子"
search_content="中性子の発見された年"
-----------------
質問例:中性子と原子核が発見された年に起こった出来事を調べてください。ただし、以下の情報はすでに検索済みです。中性子の発見された年は1932年です。
search_reason="原子核に関する情報が不足しているため、"原子核"のwikipediaを検索して、発見された年を調べる必要があります。"
search_keyword="原子核"
search_content="原子核の発見された年"
-----------------

質問:{user_question}。{knowledge_base}
"""
def call_generate_task(state):
    _parser = PydanticOutputParser(pydantic_object=SearchTask)
    _prompt = PromptTemplate(
        template=gen_task_template,
        input_variables=["user_question", "knowledge_base"],
        partial_variables={"format_instructions": _parser.get_format_instructions()},
    )
    chain = _prompt| model | _parser
    # chain = chain.with_retry(stop_after_attempt=5)
    res = chain.invoke({'user_question': state['user_question'], 'knowledge_base': state['knowledge_base']})

    return {
        "messages": [AIMessage(content=res.search_reason)],
        'next_task_search_keyword':res.search_keyword,
        'next_task_search_content':res.search_content
    }

In [7]:
def search_for_wikipedia(query: str) -> str:
    """
    Search for a wikipedia article and return the content of the first article found.
    """
    docs = WikipediaLoader(query=query, load_max_docs=5, lang='ja').load()
    if len(docs) > 0:
        return ''.join([d.page_content for d in docs])
    return ""


search_and_answer_template ="""
以下の参考文書を使用して、知りたい内容に回答してください。
知りたい内容には簡潔に数10文字のテキストで答えてください。
たとえば、「XXXはYYYです。」と短文で回答すること。

不要な情報は削除すること。

参考文書:{searched_content}

知りたい内容:{next_task_search_content}
"""

def call_search_and_answer(state):
    searched_content = search_for_wikipedia(state['next_task_search_keyword'])

    _parser = PydanticOutputParser(pydantic_object=SearchTask)
    _prompt = PromptTemplate(
        template=search_and_answer_template,
        input_variables=["next_task_search_content", "searched_content"],
    )
    chain = _prompt| model | StrOutputParser()

    res = chain.invoke({'next_task_search_content': state['next_task_search_content'], 'searched_content': searched_content})
    new_knowledge = f"「{state['next_task_search_content']}」に関する情報は以下の通り。\n{res}\n\n\n"
    return {
        "messages": [AIMessage(content=new_knowledge)],
        'knowledge_base':new_knowledge,
    }

In [12]:
def router_fa(state):
    res = call_final_answer(state)
    print(state)
    if state['answer_counter'] >= 5:
        print('回答制限')
        return 'end'
    if '回答不可' in res['messages'][-1].content:
        return 'continue'
    else:
        return 'end'

In [13]:
final_answer ="""
以下の知識をもとに質問に回答してください。
もし回答に必要な情報が不足している場合は、回答できな理由を述べた後、「回答不可」と回答してください。


知識:{knowledge_base}

質問:{user_question}
"""
def call_final_answer(state):
    prompt = ChatPromptTemplate.from_template(template=final_answer)
    chain = prompt| model | StrOutputParser()
    res = chain.invoke({
        'user_question': state['user_question'],
        'knowledge_base': state['knowledge_base']
    })
    print(state)
    return {"messages": [AIMessage(content=res)], "answer_counter":1}

In [14]:
workflow = StateGraph(AgentState)
workflow.add_node("init_agent",call_init_agentstate)
workflow.add_node("generate_task", call_generate_task)
workflow.add_node("search_and_answer", call_search_and_answer)
workflow.add_node("final_answer", call_final_answer)

workflow.set_entry_point("init_agent")
workflow.add_edge("init_agent", "generate_task")
workflow.add_edge("generate_task", "search_and_answer")
workflow.add_edge("search_and_answer", "final_answer")
workflow.add_conditional_edges(
    "final_answer",
    router_fa,
    {
        "end": END,
        "continue": "generate_task"
    }
)
app = workflow.compile()

In [15]:
inputs = {"messages": [HumanMessage(content="ジョジョの奇妙な物語の作者の出身小学校の設立年は")]}
for output in app.stream(inputs):
    for key, value in output.items():
        print(f"Output from node '{key}':")
        print(value)
    print("\n---\n")

Output from node 'init_agent':
{'user_question': 'ジョジョの奇妙な物語の作者の出身小学校の設立年は', 'messages': [], 'knowledge_base': '', 'next_task_search_keyword': None, 'next_task_search_content': None, 'answer_counter': 0}

---

Output from node 'generate_task':
{'messages': [AIMessage(content='情報が不足しているため、まずは「ジョジョの奇妙な物語」の作者の名前を調べる必要があります。')], 'next_task_search_keyword': 'ジョジョの奇妙な物語', 'next_task_search_content': 'ジョジョの奇妙な物語の作者の名前'}

---

Output from node 'search_and_answer':
{'messages': [AIMessage(content='「ジョジョの奇妙な物語の作者の名前」に関する情報は以下の通り。\nジョジョの奇妙な冒険の作者は荒木飛呂彦です。\n\n\n')], 'knowledge_base': '「ジョジョの奇妙な物語の作者の名前」に関する情報は以下の通り。\nジョジョの奇妙な冒険の作者は荒木飛呂彦です。\n\n\n'}

---

{'user_question': 'ジョジョの奇妙な物語の作者の出身小学校の設立年は', 'messages': [HumanMessage(content='ジョジョの奇妙な物語の作者の出身小学校の設立年は'), AIMessage(content='情報が不足しているため、まずは「ジョジョの奇妙な物語」の作者の名前を調べる必要があります。'), AIMessage(content='「ジョジョの奇妙な物語の作者の名前」に関する情報は以下の通り。\nジョジョの奇妙な冒険の作者は荒木飛呂彦です。\n\n\n')], 'knowledge_base': '「ジョジョの奇妙な物語の作者の名前」に関する情報は以下の通り。\nジョジョの奇妙な冒険の作者は荒木飛呂彦です。\n\n\n', 'next_