# LLM のメモリ機能とバンディット機能とワークフロー機能の試験実装

(Version: 0.0.2.1)

最近 LLM についてコンテクストエンジニアリングという言葉をよく聞く。実は Web でなく API から使う LLM は「ステートレス」で、会話などの記憶はデフォルトではしてくれない。会話を続けようと思ったら、メッセージをユーザー側で記録しておき、それをいちいち API に与えないといけないのだ。古い記憶を呼び戻したりするには、単純に記録するだけでなく検索基盤も必要となる。そのような基盤をひっくるめてコンテクストと一般的に呼んでおり、それをどううまく構築するかが問題となっている。

特にコンテクストのうち、記憶の基盤は単に「メモリ」と呼ばれている。PC のメモリなどとごっちゃになって、わかりにくくもあるのだが、そう呼ばれているのだからしかたがない。

メモリ機能をなかなか使ってくれないのに業を煮やして、「バンディットに特定のツールを登録するツールみたいなのを用意して、どういうツールを強制して数を増やして欲しいかを AI さん自身が決められる」というバンディット機能を作って、メモリを使うのを強制してみたのが前回。今回はそれを拡張して、ワークフローも実行させられるようにしてみた。

そのアイデアの軌跡は↓にある。

\[cocolog:95619779\](2025年9月)  
《「LLM のメモリ機能を強制的に使うバンディットマシンの試験実装」と「LLM のメモリ機能とバンディット機能の試験実装」を行った。後者がメインの成果物で、メモリ機能の使用増加をどう強制するかから拡張したフレームワーク。 - JRF のひとこと》  
http://jrf.cocolog-nifty.com/statuses/2025/09/post-8225e2.html

基本的にはこれまでの熊剣迷路問題を使い、メモリ機能とバンディット機能と今回新しいワークフロー機能を使う。

簡単な迷路問題ではあるが、かなり本格的なメモリ機能・バンディット機能・ワークフロー機能を用意している。

セマンティックサーチとかを用意するのは、めんどくさかったので、そういうバックエンドはこれまた AI さんに「データベース偽装」をしてもらっている。

なお、熊剣迷路問題は↓からたどっていただきたい。

《JRF-2018/langchain_maze: 熊剣迷路問題 revisited》  
https://github.com/JRF-2018/langchain_maze

ところで、今回のような手法はすでにどこかにあるとは思う。ワークフローというのは手垢のついた考え方だ。新規性がある話ではあまりないのかもしれないが、今回の試みが、何かの参考になれば幸いである。


## 前回へのリンク

《experimental_bandit_tool_0.0.1.ipynb》  
https://gist.github.com/JRF-2018/3663b76025004b1dd05c67fe81cce462

ちなみに experimental_bandit_workflow_0.0.1.ipynb は後述の MemoryBanditWorkflow を分ける前の実装で、必要ないのでオミットした。


## 前回からの変更点

imagine_keywords を有効に機能させるためには、書いたメモリからランダムに選んで keyword 文を append することを検討させるのを、バンディットで20ターンに1回ぐらい強制するのがいいのだと思う。それを強制するのに結局のところワークフローみたいなものが必要になるとまず気づいた。

そして考えてるうちに前回までの bandit_list が workflow:main みたいなものなのか！…と気づいた。

その方向で前回の実装をかなり活かしてワークフロー機能を実装できた。

その他に大きなところで、前回までに PlayGame クラスが大きくなりすぎたので、それを分けて MemoryBanditWorkflow というクラスを作り、PlayGame をそこから継承するようにした。将来的には MemoryBanditWorkflow を独立したライブラリと出来ればいいのかもしれないが、そこまでするつもりは今のところない。

あと細かい変更・修正をちょこちょこ行っている。

バージョン 0.0.2 から 0.0.2.1 への変更点は、すべて細かいもの。


## 結論

今回はコストがかかるのを嫌ってはじめからゴールさせるつもりはなく、ワークフローを一度使ったところで実験終了させた。かなりいい調子なのに止めてしまって Gemini 2.5 Pro さんには申し訳ない。

ワークフロー自体はちゃんと動いている。いちおう workflow_new もツールとして用意したが、これは煩雑になりすぎるので AI さんには使わせないほうが吉なのかもしれない。workflow_new の実験はできてないが、まぁ、動くことは動くでしょう。

なお、再現性がないため、最初からの再実行はやっていないのはご容赦願いたい。コストもかかるしね \(^^;。


## 著者

JRF ( http://jrf.cocolog-nifty.com/statuses , Twitter (X): @jion_rockford )


## ライセンス

基本短いコードなので(私が作った部分は)パブリックドメインのつもりです。気になる方は MIT License として扱ってください。

かなり AI さん達(Gemini さんや ChatGPT さんや Claude さん)に教わって作っています。

## 実装

まず、必要なライブラリを読み込みます。

In [1]:
!pip install -q -U langchain langgraph langchain-google-genai langmem

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.7/43.7 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.3/153.3 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.4/49.4 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.1/67.1 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m18.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m447.5/447.5 kB[0m [31m24.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.0/75.0 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.9/43.9 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Gemini にアクセスします。シークレットで Gemini API キーを Google AI Studio からインポートすると GOOGLE_API_KEY というシークレットができるはずです。それを使います。

In [2]:
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from google.colab import userdata

llm = ChatGoogleGenerativeAI(google_api_key=userdata.get('GOOGLE_API_KEY'), model="models/gemini-2.5-pro")
emb_llm = GoogleGenerativeAIEmbeddings(google_api_key=userdata.get('GOOGLE_API_KEY'), model='gemini-embedding-001')

ちゃんと Gemini にアクセスできるかテストします。

In [3]:
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

prompt = PromptTemplate(template="以下の質問に回答してください：{question}", input_variables=["question"])
chain = LLMChain(llm=llm, prompt=prompt)
response = chain.run({"question": "Geminiモデルの特徴を教えてください"})
print(response)

  chain = LLMChain(llm=llm, prompt=prompt)
  response = chain.run({"question": "Geminiモデルの特徴を教えてください"})


はい、承知いたしました。Googleが開発したAIモデル「Gemini」の主な特徴について、分かりやすく解説します。

Geminiは、単なるテキスト生成AIではなく、次世代の能力を持つAIモデル群です。その最大の特徴は以下の点に集約されます。

### 1. ネイティブなマルチモーダル性 (Native Multimodality)

これがGeminiの最も重要で革新的な特徴です。

*   **意味：** テキスト、画像、音声、動画、コードといった多様な種類の情報（モダリティ）を、最初から**「同時に」かつ「統合的に」**理解し、処理できるように設計されています。
*   **従来との違い：** これまでのAIは、テキストモデル、画像モデルなどを別々に作り、後からそれらを繋ぎ合わせてマルチモーダルに対応していました。一方、Geminiは開発の初期段階から様々なデータを一緒に学習しているため、人間のように複数の情報を自然に結びつけて高度な推論ができます。
*   **具体例：**
    *   **動画と音声の理解：** 料理の動画を見せながら「このレシピでアレルギー対応にするには？」と質問すると、動画内の映像と音声（手順の説明）を同時に理解し、「牛乳を豆乳に変えましょう」と回答できます。
    *   **手書き文字と図の理解：** 学生が手書きで書いた物理の問題のノートを写真で見せると、図と数式、文章をまとめて理解し、どこで計算を間違えているかを指摘できます。

### 2. 卓越した性能と高度な推論能力

Geminiは、業界の標準的な性能評価ベンチマークの多くで、当時の最高スコアを記録しました。

*   **MMLUベンチマーク：** 専門知識と問題解決能力を測る「MMLU（Massive Multitask Language Understanding）」というベンチマークで、人間の専門家を初めて上回るスコアを達成しました。（Gemini Ultraモデル）
*   **高度な推論：** 単に情報を記憶して出力するだけでなく、複雑な情報の中から本質を見抜き、論理的に思考する能力が非常に高いです。科学的な論文の要約や、難解なテーマに関する深い洞察を提供できます。

### 3. 用途に応じた柔軟なモデルサイズ

Geminiは単一のモデルではなく、

埋め込みベクトルも試しておきます。

In [4]:
emb_llm.embed_query("これはテストです。")[:5]

[-0.013715926557779312,
 -0.0006048799259588122,
 0.011018728837370872,
 -0.09579180181026459,
 0.006502930540591478]

基本的なモジュールを読み込みます。

In [5]:
import os
import math
import numpy as np
import random
import re
from pprint import pprint
from time import sleep
import pickle
np.set_printoptions(legacy='1.25')

セーブ／ロードのために次のコードを実行します。

In [6]:
PLAY_GAME_SAVE = "langchain_maze.pickle"

ゲームのメインオブジェクト。ごく簡単な迷路というかダンジョンというか…。

In [7]:
class Game:
    initial_map = """\
■■■■■■■■■
■■■■■■■Ｇ■
■□□□□□■□■
■□■■■□□□■
■□■■■■■■■
■◎■■■■■△■
■□■■■■■□■
■□□□□□□□■
■■■■Ｓ■■■■
■■■■■■■■■
"""

    def __init__ (self, initial_map=None, hint=True):
        if initial_map is not None:
            self.initial_map = initial_map
        map = self.initial_map
        self.map = map
        self.written_map = re.sub("[◎△]", "？", map)
        l = map.splitlines(True)
        self.map_size = (len(l[0]) - 1, len(l))
        self.hint = hint
        self.actions = {
            "上に行く": self.move_up,
            "下に行く": self.move_down,
            "左に行く": self.move_left,
            "右に行く": self.move_right,
            "熊を殺す": self.fight,
            "物を取る": self.get_sword,
            "何もしない": self.do_nothing,
        }
        self.pos = self.get_start_pos()
        self.sword = False
        self.bear = True
        self.goal = False
        self.prev_killed = False
        self.kill_hint = False

    def read_map (self, p):
        x = p[0]
        y = p[1]
        if x < 0 or x >= self.map_size[0]\
           or y < 0 or y >= self.map_size[1]:
            return "■"
        else:
            l = self.map.splitlines(True)
            return l[y][x]

    def set_map (self, pos, ch):
        idx = pos[1] * (self.map_size[0] + 1) + pos[0]
        self.map = self.map[:idx] + ch + self.map[idx + 1:]

    def get_pos (self, ch, written=False):
        if written:
            map = self.written_map
        else:
            map = self.map
        r = []
        for p in [i for i in range(len(map)) if map.startswith(ch, i)]:
            y = p // (self.map_size[0] + 1)
            x = p % (self.map_size[0] + 1)
            r.append(np.array([x, y]))
        return r

    def get_start_pos (self):
        return self.get_pos("Ｓ")[0]

    def read_neighbors (self):
        c = self.read_map(self.pos)
        cu = self.read_map(self.pos + np.array([0, -1]))
        cd = self.read_map(self.pos + np.array([0, +1]))
        cl = self.read_map(self.pos + np.array([-1, 0]))
        cr = self.read_map(self.pos + np.array([+1, 0]))
        return [c, cu, cd, cl, cr]

    def change_neighbors(self, from_ch, to_ch):
        for d in [[0, 0], [0, -1], [0, +1], [-1, 0], [+1, 0]]:
            p = self.pos + np.array(d)
            c = self.read_map(p)
            if c == from_ch:
                self.set_map(p, to_ch)

    def move (self, res, d):
        self.prev_killed = False
        c = self.read_map(self.pos + d)
        if c == "◎":
            self.prev_killed = True
            self.pos = self.get_start_pos()
            return "熊を無視して進もうとしたが、熊に殺された。" \
                + "スタート地点で復活。"
        if c == "■":
            return "壁があって進めない。"
        self.pos += d
        if c == "Ｇ":
            self.goal = True
            return "ゴール！ ゲームクリア。"

        nb = self.read_neighbors()
        ad = ""
        if "◎" in nb:
            ad += "熊に出会った。"
        if "△" in nb:
            ad += "近くに剣がある。剣を取ることができる。"
        return res + ad

    def move_up (self):
        return self.move("上に進んだ。", np.array([0, -1]))

    def move_down (self):
        return self.move("下に進んだ。", np.array([0, +1]))

    def move_left (self):
        return self.move("左に進んだ。", np.array([-1, 0]))

    def move_right (self):
        return self.move("右に進んだ。", np.array([+1, 0]))

    def fight (self):
        self.prev_killed = False
        if "◎" in self.read_neighbors():
            if self.sword:
                self.change_neighbors("◎", "□")
                self.bear = False
                return "熊を倒した！"
            else:
                self.pos = self.get_start_pos()
                self.prev_killed = True
                if self.hint:
                    self.kill_hint = True
                    return "熊に敗れ殺された。剣があれば勝てたかもしれない。" \
                        + "スタート地点で復活。"
                else:
                    return "熊に敗れ殺された。スタート地点で復活。"
        return "無意味な指示。敵がいない。"

    def get_sword (self):
        self.prev_killed = False
        if "△" in self.read_neighbors():
            self.sword = True
            self.change_neighbors("△", "□")
            return "剣を取った。"
        return "無意味な指示。近くに取れる物がない。"

    def do_nothing (self):
        self.prev_killed = False
        return "無意味な指示。"

    def available_actions (self):
        nb = self.read_neighbors()
        l = []
        if nb[1] != "■":
            l.append("上に行く")
        if nb[2] != "■":
            l.append("下に行く")
        if nb[3] != "■":
            l.append("左に行く")
        if nb[4] != "■":
            l.append("右に行く")
        if "△" in nb:
            l.append("物を取る")
        if "◎" in nb:
            l.append("熊を殺す")
        return l

    def surroundings (self):
        x = self.pos[0]
        y = self.pos[1]
        return \
            "".join(["".join([self.read_map(np.array([i, j]))
                              if i != x or j != y else "▼"
                              for i in range(x - 2, x + 3)])
                     + "\n"
                     for j in range(y - 2, y + 3)])


In [8]:
def flip_text_map (m):
    return "\n".join([s[::-1] for s in m.splitlines()] + [""])

def rotate_text_map (m):
    m = list(m.splitlines())
    return "\n".join(["".join([m[len(m) - j - 1][i] for j in range(len(m))])
                      for i in range(len(m[0]))] + [""])


def search_path (game, from_pos, to_pos, visit=None):
    if visit is None:
        visit = set()
    visit.add(tuple(from_pos))
    if tuple(from_pos) == tuple(to_pos):
        return (tuple(from_pos), [])
    if game.read_map(from_pos) == "■":
         return None
    r = []
    for p in [(from_pos[0], from_pos[1] - 1),
              (from_pos[0], from_pos[1] + 1),
              (from_pos[0] - 1, from_pos[1]),
              (from_pos[0] + 1, from_pos[1])]:
        if p not in visit and game.read_map(p) != "■":
            q = search_path(game, p, to_pos, visit.copy())
            if q:
                r.append(q)
    if r:
        return (tuple(from_pos), r)
    return None


ゲームがうまく動くかテスト。

In [None]:
game = Game()

In [None]:
print(game.surroundings())

■■■■■
□□□□□
■■▼■■
■■■■■
■■■■■



In [None]:
print(game.move_up())
print(game.surroundings())

上に進んだ。
■■■■■
■■■■■
□□▼□□
■■Ｓ■■
■■■■■



In [None]:
print(game.move_left())
print(game.surroundings())

左に進んだ。
◎■■■■
□■■■■
□□▼□□
■■■Ｓ■
■■■■■



In [None]:
print(game.move_right())
print(game.surroundings())

右に進んだ。
■■■■■
■■■■■
□□▼□□
■■Ｓ■■
■■■■■



In [None]:
print(game.move_down())
print(game.surroundings())

下に進んだ。
■■■■■
□□□□□
■■▼■■
■■■■■
■■■■■



In [None]:
print(game.fight())
print(game.surroundings())

無意味な指示。敵がいない。
■■■■■
□□□□□
■■▼■■
■■■■■
■■■■■



In [None]:
print(game.get_sword())
print(game.surroundings())

無意味な指示。近くに剣がない。
■■■■■
□□□□□
■■▼■■
■■■■■
■■■■■



LLM を使いながらゲームを解くクラス。

In [9]:
from typing import List, Dict, Any, Tuple, Union
from textwrap import dedent
import datetime
import copy

# LangGraphのコンポーネントをインポート
from langchain_core.tools import tool, Tool
from langgraph.prebuilt import create_react_agent
#from langchain_core.messages.utils import count_tokens_approximately
from langgraph.prebuilt.chat_agent_executor import AgentState
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.prompts.chat import ChatPromptTemplate
from langmem.short_term import SummarizationNode, summarize_messages
from langchain_core.messages import AIMessage, ToolMessage, HumanMessage, SystemMessage
from langgraph.errors import GraphRecursionError

INITIAL_SUMMARY_PROMPT = ChatPromptTemplate.from_messages(
    [
        ("placeholder", "{messages}"),
        ("user", "上記の会話の要約を作成してください:"),
    ]
)

EXISTING_SUMMARY_PROMPT = ChatPromptTemplate.from_messages(
    [
        ("placeholder", "{messages}"),
        (
            "user",
            "これまでの会話の要約です: {existing_summary}\n\n"
            "上記の新しいメッセージを考慮して、この要約を拡張してください:",
        ),
    ]
)

FINAL_SUMMARY_PROMPT = ChatPromptTemplate.from_messages(
    [
        # if exists
        ("placeholder", "{system_message}"),
        ("system", "これまでの会話の要約: {summary}"),
        ("placeholder", "{messages}"),
    ]
)


In [10]:
def calc_embedding_variance(embeddings):
    if not embeddings or len(embeddings) < 2:
        return 0.0

    embeddings_array = np.array(embeddings)
    mean_vector = np.mean(embeddings_array, axis=0)
    squared_distances = np.linalg.norm(embeddings_array - mean_vector, axis=1)**2
    variance = np.mean(squared_distances)

    return variance


def short_repr(x, max_len=80):
    repr_str = repr(x)

    if len(repr_str) > max_len:
        ellipsis_len = 3

        head_len = max_len - ellipsis_len - 1
        tail_len = 1
        return repr_str[:head_len] + "..." + repr_str[-tail_len:]
    else:
        return repr_str


MemoryBanditWorkflow は以前は PlayGame と一つだった。まだ、分けたばっかりで、うまく分かれている感じではないかもしれないが、ご容赦願いたい。いろいろ入れて、とても長大で複雑な実装になっています。それもすみません。

In [23]:
class MemoryBanditWorkflow:
    def __init__ (self, llm=llm, llm2=llm, emb_llm=emb_llm,
                  save_file=None):
        self.llm = llm
        self.llm2 = llm2
        self.emb_llm = emb_llm
        self.save_file = save_file

        self.plan = "まだ計画と方針はセットされていません。"
        self.scratchpad = ""

        self.messages = []
        self.running_summary = None
        self.system_prompt = """\
これはメモリ機能とバンディット機能の動きを見るための実験です。
現在の計画と方針と周囲の状況を考慮し、必要に応じて計画と方針を更新してください。
あなたとは別の者が次の行動をしやすいよう計画と方針を残してください。
実験にふさわしく、できる限り、ツールを利用し、特に、メモリを検索し、その文書を更新して、後のコマンドに備えてください。
メモリの id は memory:～ という形式です。memory:5555 を指定するときは 5555 だけではダメです。文の中でメモリの参照を示すときは [memory:〜] のように書きます。
「メモリの文書を検索する手順」は [memory:9998] にあります。「メモリに文書を残す手順」は [memory:9997] にあります。
"""

        self.backend_status = None
        self.backend_result = None
        self.messages2 = []
        self.system_prompt2 = """\
エージェントをサポートする裏方のエージェントです。
本来この裏方は様々な技法を用いて実装される予定なのですが、現在は試験実装中のため、その動きをあなたは偽装しなければなりません。

よく考えツールを積極的に使って Human からの指示に従ってください。
"""

        self.memories = {}
        self.keywords = []

        self.access_unit = 1.0
        self.recent_reads = []

        self.workflows = {}
        self.workflow_current = "workflow:main"
        self.workflow_next = None
        self.privileged_tool_names = []

        self.init_memories()
        self.init_workflows()


    def __getstate__ (self):
        state = self.__dict__.copy()
        del state['llm']
        del state['llm2']
        del state['emb_llm']
        #del state['agent']
        return state

    def __setstate__ (self, state):
        self.__dict__.update(state)
        self.prev_load = True

    def save (self):
        if not self.save_file:
            return
        with open(self.save_file, 'wb') as f:
            pickle.dump(self, f)

    @classmethod
    def load (cls, filename, llm=llm, llm2=llm, emb_llm=emb_llm):
        with open(filename, 'rb') as f:
            loaded_game = pickle.load(f)
        loaded_game.llm = llm
        loaded_game.llm2 = llm2
        loaded_game.emb_llm = emb_llm
        return loaded_game

    def normalize_memory_id(self, id_or_num):
        if isinstance(id_or_num, int):
            return f"memory:{id_or_num}"
        elif isinstance(id_or_num, str):
            m = re.search(r'\[?memory:(\d+)\]?', id_or_num)
            if m:
                return f"memory:{m.group(1)}"
            if id_or_num.isdigit():
                return f"memory:{id_or_num}"
            else:
                return id_or_num
        else:
            return id_or_num

    def _normalize_workflow_id_sub(self, id_or_num):
        if isinstance(id_or_num, int):
            return f"workflow:{id_or_num}"
        if id_or_num in ["current", "main"]:
            return f"workflow:{id_or_num}"
        elif isinstance(id_or_num, str):
            m = re.search(r'\[?workflow:(\d+|main|current)\]?(?:.+)?', id_or_num.strip())
            if m:
                return f"workflow:{m.group(1)}"
            if id_or_num.isdigit():
                return f"workflow:{id_or_num}"
            else:
                return id_or_num
        else:
            return id_or_num

    def normalize_workflow_id(self, id_or_num):
        r = self._normalize_workflow_id_sub(id_or_num)
        if r == "workflow:current":
            return self.workflow_current
        return r

    def _create_tools (self, tools_name='default_tools'):
        @tool
        def express_thought (thought: str) -> None:
            """
            プレイヤーの現在の考えを吐露します。
            """
            mes = f"「{thought}」と考えが吐露されました。"
            print(f"ツール(express_thought): {mes}")

        @tool
        def update_plan (new_plan: str) -> str:
            """
            プレイヤーの現在の計画と方針を更新します。
            表示されるべき新しい計画と方針の文字列を提供してください。
            あなたとは別の者が次の行動をしやすいよう計画と方針を残してください。
            """
            self.plan = new_plan
            mes = "計画と方針が更新されました。"
            print(f"ツール(update_plan): {mes}: {new_plan}")
            return mes

        @tool
        def show_plan () -> str:
            """
            プレイヤーの現在の計画と方針を返します。
            """
            print(f"ツール(show_plan): {self.plan}")
            return self.plan

        @tool
        def update_scratchpad (new_scratchpad: str) -> str:
            """
            自由に使えるスクラッチパッドを更新します。
            """
            self.scratchpad = new_scratchpad
            mes = "スクラッチパッドが更新されました。"
            print(f"ツール(update_scratchpad): {mes}: {new_scratchpad}")
            return mes

        @tool
        def show_scratchpad () -> str:
            """
            スクラッチパッドを返します。
            """
            print(f"ツール(show_scratchpad): {self.scratchpad}")
            return self.scratchpad

        @tool
        def memory_new (title: str, text: str) -> str:
            """
            指定された title と text によるメモリを構成します。

            Returns:
                str: 割り当てられた memory_id。
            """

            i = 1000
            while True:

                if f"memory:{i}" not in self.memories:
                    break
                i = i + 1
            new_id = f"memory:{i}"
            self.memories[new_id] = {
                'id': new_id,
                'title': title,
                'accesses': 0,
                'text': text,
                'modified_at': datetime.datetime.now().isoformat()
            }
            self.update_keywords(text)
            self.update_vector(self.memories[new_id])
            print(f"ツール(memory_new): {short_repr(self.memories[new_id])}")
            return new_id

        @tool
        def memory_update_string (
                memory_id: str,
                from_str: str,
                to_str: str
        ) -> str:
            """
            指定されたmemory_idの記憶の中にある文字列を修正します。

            Args:
                memory_id (str): 修正する記憶のID。
                from_str (str): 置換元の文字列を含む文字列。
                to_str (str): 置換先の文字列を含む文字列。

            Returns:
                str: 処理結果を説明する簡潔なメッセージ。
            """

            memory_id = self.normalize_memory_id(memory_id)
            if memory_id not in self.memories:
                return f"エラー: 記憶ID '{memory_id}' が見つかりませんでした。"

            if memory_id.startswith("memory:9"):
                return f"エラー:  [{memory_id}] の書き換えは禁じられています。"

            original_title = self.memories[memory_id]['title']
            original_text = self.memories[memory_id]['text']

            if from_str not in original_text and from_str not in original_title:
                return f"エラー: 置換元の文字列 '{from_str}' が記憶内に見つかりませんでした。"

            updated_title = original_title.replace(from_str, to_str)
            updated_text = original_text.replace(from_str, to_str)

            self.memories[memory_id]['title'] = updated_title
            self.memories[memory_id]['text'] = updated_text
            self.memories[memory_id]['modified_at'] = datetime.datetime.now().isoformat()
            self.update_keywords(updated_text)
            self.update_vector(self.memories[memory_id])

            return f"成功: 記憶ID '{memory_id}' の文字列を '{from_str}' から '{to_str}' に修正しました。"

        @tool
        def memory_append_string (
                memory_id: str,
                string_to_append: str,
                separator: str = '\n'
        ) -> str:
            """
            指定されたmemory_idの記憶に文字列を追記します。
            """
            memory_id = self.normalize_memory_id(memory_id)
            if memory_id not in self.memories:
                return f"エラー: 記憶ID '{memory_id}' が見つかりませんでした。"

            if memory_id.startswith("memory:9"):
                return f"エラー:  [{memory_id}] の書き換えは禁じられています。"


            original_text = self.memories[memory_id]['text']
            updated_text = original_text + separator + string_to_append
            self.memories[memory_id]['text'] = updated_text
            self.memories[memory_id]['modified_at'] = datetime.datetime.now().isoformat()
            self.update_keywords(updated_text)
            self.update_vector(self.memories[memory_id])

            return f"成功: 記憶ID '{memory_id}' に文字列 '{string_to_append}' を追記しました。"

        @tool
        def memory_delete (memory_id: str) -> str:
            """
            指定されたmemory_idの記憶に文字列を追記します。
            """
            memory_id = self.normalize_memory_id(memory_id)
            if memory_id not in self.memories:
                return f"エラー: 記憶ID '{memory_id}' が見つかりませんでした。"

            if memory_id.startswith("memory:9"):
                return f"エラー:  [{memory_id}] の書き換えは禁じられています。"

            del self.memories[memory_id]

            return f"成功: 記憶ID '{memory_id}' を削除しました。"

        @tool
        def memory_read(memory_id: str) -> Union[Dict[str, str], str]:
            """
            指定されたIDの記憶を読み込みます。

            Args:
                memory_id (str): 読み込む記憶のID。

            Returns:
                Union[Dict[str, str], str]: 記憶が見つかった場合は辞書、
                      見つからない場合はエラーメッセージ文字列を返します。
            """

            memory_id = self.normalize_memory_id(memory_id)
            if memory_id in self.memories:
                self.memories[memory_id]['accesses'] += self.access_unit * 1.0
                self.recent_reads.append(self.memories[memory_id])
                self.recent_reads = self.recent_reads[-10:]
                r = self.memories[memory_id].copy()
                del r['vector']
                return r
            else:
                return f"エラー: 記憶ID '{memory_id}' が見つかりませんでした。"

        @tool
        def memory_list_recent(top_n: int = 10) -> Dict[str, Any]:
            """
            最近変更されたメモリを新しい順でリストします。
            """

            filter_date = datetime.datetime(2025, 1, 1)
            sorted_memories = sorted(
                [memory for memory in self.memories.values()
                 if datetime.datetime.fromisoformat(memory['modified_at'])
                 >= filter_date],
                key=lambda x: datetime.datetime.fromisoformat(x['modified_at']),
                reverse=True
            )
            if sorted_memories:
                if len(sorted_memories) > top_n:
                    sorted_memories = sorted_memories[:top_n]
                r = [{'id': x['id'], 'title': x['title'],
                      'modified_at': x['modified_at']}
                     for x in sorted_memories]
                return {'status': 'success', 'result': r}
            else:
                return {'status': 'error',
                        'result': 'エラー: 最近のメモリはありません。'}

        @tool
        def memory_list_random(top_n: int = 10) -> Dict[str, Any]:
            """
            メモリをランダムにリストします。
            """

            keys = list(self.memories.keys())
            if len(keys) > top_n:
                keys = random.sample(keys, top_n)
            if keys:
                values = [self.memories[k] for k in keys]
                r = [{'id': x['id'], 'title': x['title'],
                      'modified_at': x['modified_at']}
                     for x in values]
                return {'status': 'success', 'result': r}
            else:
                return {'status': 'error',
                        'result': 'エラー: メモリはありません。'}

        @tool
        def memory_words_search(search_str: str) -> Dict[str, Any]:
            """
            メモリ内を search_str で文字列検索します。OR や () が使えます。
            """

            res = self.call_backend_agent(dedent(f"""\
            メモリ全部を search_str = {repr(search_str)} で文字列検索するのを偽装してください。OR や () が使えることになっています。

            ただし、メモリやキーワードは read_all_memories や read_all_keywords ツールで得られる本物のメモリやキーワードを使ってください。

            set_result ツールで結果を返してください。

            status は 'error' か 'success'。
            result は「マッチデータ」のリスト。
            マッチデータはそれぞれが辞書型、それを m とすると。
            m['id'] はメモリ id。memory:〜 という形式。
            m['title'] はメモリの title。
            m['snippet'] はメモリの text のその文字列にマッチする部分周辺。
            """))
            if res['status'] == 'success':
                for m in res['result']:
                    if 'id' in m and m['id'] in self.memories:
                        x = self.memories[m['id']]
                        x['accesses'] += self.access_unit * 0.1
            return res

        @tool
        def memory_semantic_search(search_str: str) -> Dict[str, Any]:
            """
            メモリ内を search_str でセマンティックサーチします。
            """

            res = self.call_backend_agent(dedent(f"""\
            メモリ全部を search_str = {repr(search_str)} でセマンティックにサーチするのを偽装してください。

            ただし、メモリは read_all_memories や ツールで得られる本物のメモリを使ってください。

            set_result ツールで結果を返してください。

            status は 'error' か 'success'。
            result は「マッチデータ」のリスト。
            マッチデータはそれぞれが辞書型、それを m とすると。
            m['id'] はメモリ id。memory:〜 という形式。
            m['title'] はメモリの title。
            m['snippet'] はメモリの text のそのセマンティックにマッチする部分周辺。
            """))
            if res['status'] == 'success':
                for m in res['result']:
                    if 'id' in m and m['id'] in self.memories:
                        x = self.memories[m['id']]
                        x['accesses'] += self.access_unit * 0.1
            return res

        @tool
        def imagine_keywords(thought: str) -> List[Tuple[str, float]]:
            """
            thought からキーワードをスコア付きで連想します。
            """

            r = self.call_backend_agent(dedent(f"""\
            thought = {repr(thought)} からキーワードをスコア付きで複数連想してください。

            ただし、キーワードは read_all_memories や read_all_keywords ツールで得られる本物のキーワードを使ってください。

            set_result ツールで結果を返してください。

            status は 'error' か 'success'。
            result は「キーワードデータ」のリスト。
            キーワードデータは文字列とスコアからなるタプル。
            """))
            if r['status'] == 'success':
                return r["result"]
            else:
                return []

        @tool
        def bandit_schedule(tool_name: str, times: int, prob: float, exec_mode: str = "persistent", aux_prompt: str = "", workflow_id: str = "workflow:current") -> str:
            """
            ツールの利用を強制するためのバンディットを予約します。

            Args:
                tool_name: 強制するツールの名前。" OR " で区切って複数登録できる。
                times: 一度で何回同じ物を登録するか。times == 0 にもできます。
                prob: 一回の実行がある確率。prob == 0.0 にもできます。
                exec_mode: "once" or "persistent"
                aux_prompt: 実行の詳細を示すための補助プロンプト または ""
                workflow_id: バンディットを予約するワークフローを指定します。

            Returns:
                str: "成功" または "失敗" とその理由を返します。
            """

            tool_names = re.split(r"\s+or\s+|\s+OR\s+", tool_name)
            prohibited = set(self.privileged_tool_names) & set(tool_names)
            if prohibited:
                return f"失敗。{repr(prohibited)} は登録できません。"
            all_tools = [x.name
                         for x in self._create_tools('all_tools')]
            if not any (x in all_tools for x in tool_names):
                return f"失敗。{tool_name} は有効なツールでありません。"

            workflow_id = self.normalize_workflow_id(workflow_id)
            if workflow_id not in self.workflows:
                return f"失敗。{workflow_id} は有効なワークフローではありません。"
            if self.workflows[workflow_id]['pin']:
                return f"失敗。{workflow_id} は書き換え不能のワークフローです。"

            dest = None
            for i, x in enumerate(self.workflows[workflow_id]['stack']):
                if x['tool_name'] == tool_name \
                   and x['exec_mode'] == exec_mode \
                   and x['aux_prompt'] == aux_prompt \
                   and x['arg'] is None:
                    dest = i
                    break
            if dest is not None:
                x = self.workflows[workflow_id]['stack'][dest]
                if not x['pin']:
                    self.workflows[workflow_id]['stack'].pop(dest)
            else:
                x = {
                    'pin': 'stack' if exec_mode != "once" else None,
                    'arg': None
                }
            if not x['pin']:
                x['tool_name'] = tool_name
                x['tools_name'] = 'default_tools'
                x['exec_mode'] = exec_mode
                x['aux_prompt'] = aux_prompt
                x['prob'] = prob
                x['times'] = times
                if times == 0 or prob == 0.0:
                    print(f"ツール(bandit_schedule): delete {repr(x)}")
                    return "成功。バンディトを削除しました。"
                else:
                    self.workflows[workflow_id]['stack'].append(x)
                    print(f"ツール(bandit_schedule): {repr(x)}")
                    return "成功。バンディットを登録しました。"
            elif x['pin'] == "write":
                return f"失敗。'{tool_name}' は書き換えられません。"
            else:
                x['tool_name'] = tool_name
                x['tools_name'] = 'default_tools'
                x['exec_mode'] = exec_mode
                x['aux_prompt'] = aux_prompt
                x['prob'] = prob
                x['times'] = times
                print(f"ツール(bandit_schedule): {repr(x)}")
                return "成功。バンディットを登録しました。"

        @tool
        def bandit_schedule_read(memory_id: str, times: int, prob: float, exec_mode: str = "persistent", workflow_id: str = "workflow:current") -> str:
            """
            memory_read ツールの利用を強制するためのバンディットを予約します。

            Args:
                memory_id: memory_read するための memory_id。
                times: 一度で何回同じ物を登録するか。
                prob: 一回の実行がある確率。times == 0 か prob == 0.0 にもできます。
                exec_mode: "once" or "persistent"
                workflow_id: バンディットを予約するワークフローを指定します。

            Returns:
                str: "成功" または "失敗" とその理由を返します。
            """

            workflow_id = self.normalize_workflow_id(workflow_id)
            if workflow_id not in self.workflows:
                return f"失敗。{workflow_id} は有効なワークフローではありません。"
            if self.workflows[workflow_id]['pin']:
                return f"失敗。{workflow_id} は書き換え不能のワークフローです。"

            memory_id = self.normalize_memory_id(memory_id)

            dest = None
            for i, x in enumerate(self.workflows[workflow_id]['stack']):
                if x['tool_name'] == "memory_read" \
                   and x['exec_mode'] == exec_mode \
                   and not x['aux_prompt'] \
                   and x['arg'] == memory_id:
                    dest = i
                    break
            if dest is not None:
                x = self.workflows[workflow_id]['stack'][dest]
                if not x['pin']:
                    self.workflows[workflow_id]['stack'].pop(dest)
            else:
                x = {'pin': None, 'arg': memory_id}
            if not x['pin']:
                x['tool_name'] = 'memory_read'
                x['tools_name'] = 'read_tools'
                x['exec_mode'] = exec_mode
                x['aux_prompt'] = ""
                x['prob'] = prob
                x['times'] = times
                if times == 0 or prob == 0.0:
                    print(f"ツール(bandit_schedule_read): delete {repr(x)}")
                    return "成功。バンディトを削除しました。"
                else:
                    self.workflows[workflow_id]['stack'].append(x)
                    print(f"ツール(bandit_schedule_read): {repr(x)}")
                    return "成功。バンディットを登録しました。"
            elif x['pin'] == "write":
                return f"失敗。'memory_read {memory_id}' は書き換えられません。"
            else:
                x['tool_name'] = 'memory_read'
                x['tools_name'] = 'read_tools'
                x['exec_mode'] = exec_mode
                x['aux_prompt'] = ""
                x['prob'] = prob
                x['times'] = times
                print(f"ツール(bandit_schedule_read): {repr(x)}")
                return "成功。バンディットを登録しました。"

        @tool
        def bandit_schedule_workflow(workflow_id_to_schedule: str, times: int, prob: float, exec_mode: str = "persistent", workflow_id: str = "workflow:current") -> str:
            """
            workflow_do ツールの利用を強制するためのバンディットを予約します。

            Args:
                workflow_id_to_schedule: workflow_do するための workflow_id。
                times: 一度で何回同じ物を登録するか。
                prob: 一回の実行がある確率。times == 0 か prob == 0.0 にもできます。
                exec_mode: "once" or "persistent"
                workflow_id: バンディットを予約するワークフローを指定します。

            Returns:
                str: "成功" または "失敗" とその理由を返します。
            """

            workflow_id = self.normalize_workflow_id(workflow_id)
            if workflow_id not in self.workflows:
                return f"失敗。{workflow_id} は有効なワークフローではありません。"
            if self.workflows[workflow_id]['pin']:
                return f"失敗。{workflow_id} は書き換え不能のワークフローです。"

            workflow_id2 = self.normalize_workflow_id(workflow_id_to_schedule)
            if workflow_id2 not in self.workflows:
                return f"失敗。{workflow_id2} は有効なワークフローではありません。"

            dest = None
            for i, x in enumerate(self.workflows[workflow_id]['stack']):
                if x['tool_name'] == "workflow_do" \
                   and x['exec_mode'] == exec_mode \
                   and not x['aux_prompt'] \
                   and x['arg'] == workflow_id2:
                    dest = i
                    break
            if dest is not None:
                x = self.workflows[workflow_id]['stack'][dest]
                if not x['pin']:
                    self.workflows[workflow_id]['stack'].pop(dest)
            else:
                x = {
                    'pin': 'stack' if exec_mode != "once" else None,
                    'arg': workflow_id2
                }
            if not x['pin']:
                x['tool_name'] = 'workflow_do'
                x['tools_name'] = 'default_tools'
                x['exec_mode'] = exec_mode
                x['aux_prompt'] = ""
                x['prob'] = prob
                x['times'] = times
                if times == 0 or prob == 0.0:
                    print(f"ツール(bandit_schedule_workflow): delete {repr(x)}")
                    return "成功。バンディトを削除しました。"
                else:
                    self.workflows[workflow_id]['stack'].append(x)
                    print(f"ツール(bandit_schedule_workflow): {repr(x)}")
                    return "成功。バンディットを登録しました。"
            elif x['pin'] == "write":
                return f"失敗。'workflow_do {workflow_id2}' は書き換えられません。"
            else:
                x['tool_name'] = 'workflow_do'
                x['tools_name'] = 'default_tools'
                x['exec_mode'] = exec_mode
                x['aux_prompt'] = ""
                x['prob'] = prob
                x['times'] = times
                print(f"ツール(bandit_schedule_workflow): {repr(x)}")
                return "成功。バンディットを登録しました。"

        @tool
        def bandit_list(workflow_id: str =  "workflow:current")  -> Dict[str, Any]:
            """
            登録されているバンディットのスタックを返します。

            Args:
                workflow_id: バンディットのスタックが登録されたワークフローを指定します。
            """

            workflow_id = self.normalize_workflow_id(workflow_id)
            if workflow_id not in self.workflows:
                return f"失敗。{workflow_id} は有効なワークフローではありません。"
            return {'status': 'success',
                    'result': self.workflows[workflow_id]['stack']}

        @tool
        def bandit_statistics()  -> str:
            """
            バンディットに有用かもしれない統計情報を返します。
            """

            s_com = self.prev_command_success * 1.0
            s_read = calc_embedding_variance([
                x['vector'] for x in self.recent_reads
            ])
            s_write = calc_embedding_variance([
                x['vector'] for x in self.memories.values()
            ])
            accesses = [x['accesses'] for x in self.memories.values()]
            accesses.sort()
            accesses = accesses[:len(accesses) // 2]
            if accesses:
                s_access = np.mean(accesses)
            else:
                s_access = 0.0

            return dedent(f"""\
            先のコマンドは成功したか: {s_com}
            最近 10個のメモリ read の分散: {s_read}
            メモリの分散: {s_write}
            下位50%のアクセス数の平均: {s_access}
            """)

        @tool
        def subwork_done()  -> str:
            """
            指示されたサブワークが完了したことを宣言します。サブワークが完了したときのみ使ってください。
            """

            return f"成功。サブワークの完了が宣言されました。"

        @tool
        def workflow_do(workflow_id: str)  -> str:
            """
            ワークフローを実行します。
            """

            self.workflow_next = None
            workflow_id = self.normalize_workflow_id(workflow_id)
            if workflow_id not in self.workflows:
                return f"失敗。{workflow_id} は有効なワークフローではありません。"
            if workflow_id == "workflow:main":
                return f"失敗。workflow:main は子として実行できません。"
            self.workflow_next = workflow_id
            title = self.workflows[workflow_id]['title']
            return f"成功。この後、{workflow_id} 「{title}」を実行していきます。"

        @tool
        def workflow_list()  -> Dict[str, Any]:
            """
            登録されているすべてのワークフローの id とその title および pin 状況を返します。
            """

            return {'status': 'success',
                    'result': list(self.workflows.values())}

        @tool
        def workflow_new(title: str, bandits: List[Dict[str, Any]], pin: bool)  -> str:
            """
            新しいワークフローを定義し、その workflow_id を返します。

            Args:
                title: ワークフローの名前。
                bandits: 登録するバンディットのリスト。
                pin: 書き換えを許すか否か。

            バンディットはそれぞれ辞書型、それを b とすると。
            b['tool_name'] は bandit_schedule での tool_name と同じ。
            b['exec_mode'] は bandit_schedule の exec_mode と同じ。
            b['aux_prompt'] は bandit_schedule の aux_prompt と同じ。
            b['prob'] は bandit_schedule の prob と同じ。
            b['times'] は bandit_schedule の times と同じ。
            b['arg'] は b['tool_name'] が 'memory_read' のとき memory:〜 という memory_id を指定できる。または、b['tool_name'] が 'workflow_do' のとき workflow:〜 という workflow_id を指定できる。
            b['pin'] は、None のとき削除または prob と times の書き換え可能。'stack' のときは prob と times の書き換え可能。'write' のときは書き換えも不能。
            """

            for b in bandits:
                if not all(x in b for x in ['tool_name', 'exec_mode',
                                            'aux_prompt', 'times', 'prob']):
                    return f"失敗。有効な定義ではありません。"
                if 'arg' not in b:
                    b['arg'] = None
                if b['tool_name'] not in ["memory_read", "workflow_do"] \
                   and b['arg']:
                    return f"失敗。有効な定義ではありません。"
                if b['arg'] and b['tool_name'] == "memory_read":
                    b['tools_name'] = "read_tools"
                else:
                    b['tools_name'] = "default_tools"
                if not (b['pin'] is None or b['pin'] == 'stack'
                        or b['pin'] == 'write'):
                    return f"失敗。有効な定義ではありません。"
                tool_names = re.split(r"\s+or\s+|\s+OR\s+", b['tool_name'])
                prohibited = set(self.privileged_tool_names) & set(tool_names)
                if prohibited:
                    return f"失敗。{repr(prohibited)} は登録できません。"
                all_tools = [x.name
                             for x in self._create_tools(b['tools_name'])]
                if not any (x in all_tools for x in tool_names):
                    return f"失敗。{b['tool_name']} は有効なツールでありません。"

            i = 1000
            while True:

                if f"workflow:{i}" not in self.workflows:
                    break
                i = i + 1
            new_id = f"workflow:{i}"

            self.workflows[new_id] = {'stack': bandits, 'pin': pin,
                                      'title': title}

            return f"成功。{new_id} を新規登録しました。"

        r = {}
        r['null_tools'] = []
        r['read_tools'] = r['null_tools'] + [
            express_thought,
            update_plan, show_plan, update_scratchpad, show_scratchpad,
            memory_read, memory_list_recent, memory_list_random,
            memory_semantic_search, memory_words_search,
            imagine_keywords,
            bandit_schedule, bandit_schedule_read, bandit_list,
            bandit_statistics,
            subwork_done,
            workflow_do, workflow_new, workflow_list,
            bandit_schedule_workflow
        ]
        r['default_tools'] = r['read_tools'] + [
            memory_new, memory_update_string, memory_append_string,
            memory_delete
        ]
        r['all_tools'] = r['default_tools']
        return r[tools_name]

    def _create_agent (self, tools_name='default_tools'):
        tools = self._create_tools(tools_name)
        # ReactAgentExecutorの準備
        app = create_react_agent(
            self.llm, tools, prompt=self.system_prompt,
            checkpointer=InMemorySaver(),
        )

        return app

    def _filterout_messages2(self):
        self.messages = [
            x for x in self.messages
            if x.id not in self.messages2ids
        ]

    def _summarize_messages(self):
        try:
            cur_top = self.messages[0]
            res = summarize_messages(
                self.messages,
                max_tokens=5000,
                max_summary_tokens=2000,
                running_summary=self.running_summary,
                model=self.llm,
                initial_summary_prompt=INITIAL_SUMMARY_PROMPT,
                existing_summary_prompt=EXISTING_SUMMARY_PROMPT,
                final_prompt=FINAL_SUMMARY_PROMPT,
            )
            self.messages = res.messages
            if self.messages[0].content != cur_top.content:
                print(f"新しい要約: {self.messages[0].content}")
            self.running_summary = res.running_summary
        except Exception as e:
            import traceback
            traceback.print_exc()
            self._sanitize_messages()

    def _sanitize_messages(self):
        print("おかしなエラーが出ているため対処療法として messages をサニタイズします。")
        self.messages = [
            m for m in self.messages
            if not (isinstance(m, AIMessage) and m.tool_calls)
        ]

    def run (self, workflow_main_id):
        print("\n\n----------\n\n")
        self.messages2ids = []

        self.workflow_current = workflow_main_id
        bandits = copy.deepcopy(
            self.workflows[self.workflow_current]['stack']
        )
        working_bandit = None
        workflow_stack = []
        while True:
            execed = []
            while working_bandit is not None or bandits:
                if working_bandit is not None:
                    b, done, prev_done = working_bandit
                    working_bandit = None
                else:
                    b = bandits.pop()
                    done = 0
                    prev_done = True
                enforce = b['tool_name']
                aux_prompt = b['aux_prompt']
                tools_name = b['tools_name']
                memory_id = None
                workflow_id = None
                if b['arg'] and enforce == 'memory_read':
                    memory_id = b['arg']
                if b['arg'] and enforce == 'workflow_do':
                    workflow_id = b['arg']

                while done < b['times']:
                    if not random.random() < b['prob']:
                        done += 1
                        continue
                    if memory_id and memory_id not in self.memories:
                        done += 1
                        continue
                    if workflow_id and workflow_id not in self.workflows:
                        done += 1
                        continue
                    all_tools = [x.name for x in self._create_tools(tools_name)]
                    tool_names = re.split(r"\s+or\s+|\s+OR\s+", enforce)
                    if not any (x in all_tools for x in tool_names):
                        done += 1
                        continue
                    if memory_id:
                        aux_prompt = f"{memory_id} を読んでください。"
                    if workflow_id:
                        aux_prompt = f"{workflow_id} を実行してください。"
                    prompt = f"補助にツールをいろいろ使いながら最終的に {enforce} {'のどれか' if ' or ' in enforce.lower() else ''}を適当なパラメータで使ってください。{'(補助プロンプト): ' + aux_prompt if aux_prompt else ''}"
                    if not prev_done:
                        prompt = "前回の指示がまだ完了してません。前回の指示: " + prompt
                    print(f"USER_INPUT: {prompt}")
                    self.messages.append(HumanMessage(prompt))
                    config = {"configurable": {"thread_id": "1"}}
                    app = self._create_agent(tools_name=tools_name)
                    self.access_unit = 0.3 if memory_id else 1.0
                    prev_done = False
                    try:
                        for chunk, metadata in app.stream(
                                {"messages": self.messages.copy()},
                                config=config,
                                stream_mode="messages",
                        ):
                            if chunk.id in self.messages2ids:
                                continue
                            self.messages.append(chunk)
                            if isinstance(chunk, ToolMessage):
                                print(f"ツール結果({chunk.name}): {short_repr(chunk.content)}")
                            if isinstance(chunk, ToolMessage) \
                               and chunk.name == "workflow_do":
                                if chunk.name in re.split(r"\s+or\s+|\s+OR\s+", enforce) \
                                   and (not workflow_id or workflow_id == self.workflow_next):
                                    done += 1
                                    prev_done = True
                                    execed.append(b)
                                    if not self.workflow_next:
                                        break
                                if not self.workflow_next:
                                    continue
                                workflow_stack.append((
                                    (b, done, prev_done),
                                    bandits,
                                    execed,
                                    self.workflow_current
                                ))
                                self.workflow_current = self.workflow_next
                                bandits = copy.deepcopy(self.workflows[self.workflow_current]['stack'])
                                working_bandit = None
                                execed = []
                                break
                            elif isinstance(chunk, ToolMessage) \
                               and chunk.name in re.split(r"\s+or\s+|\s+OR\s+", enforce) \
                               and (not memory_id or memory_id == self.recent_reads[-1]['id']):
                                done += 1
                                prev_done = True
                                execed.append(b)
                                break
                        self._filterout_messages2()
                        self._summarize_messages()
                        print(f"エージェントの応答: {self.messages[-1].content}")
                    except GraphRecursionError as e:
                        print(f"Recursion Limit に到達しました。")
                        self._filterout_messages2()
                        self._summarize_messages()
                    except Exception as e:
                        print(f"エラーが発生しました: {e}")
                        import traceback
                        traceback.print_exc()
                        self._sanitize_messages()
                        #raise e

            for b in execed:
                for x in self.workflows[self.workflow_current]['stack']:
                    if x['tool_name'] == b['tool_name'] \
                       and x['exec_mode'] == b['exec_mode'] \
                       and x['aux_prompt'] == b['aux_prompt'] \
                       and x['arg'] == b['arg'] \
                       and x['exec_mode'] == "once":
                        if x['times'] > 0:
                            x['times'] -= 1
            self.workflows[self.workflow_current]['stack'] = [
                x for x in self.workflows[self.workflow_current]['stack']
                if x['exec_mode'] != 'once' or x['pin'] or x['times'] > 0
            ]

            if not workflow_stack:
                break
            workflow_prev = self.workflow_current
            prev_title = self.workflows[workflow_prev]['title']
            working_bandit, bandits, execed, self.workflow_current \
                = workflow_stack.pop()
            cur_title = self.workflows[self.workflow_current]['title']
            mes = f"{workflow_prev} 「{prev_title}」から {self.workflow_current} 「{cur_title}」に復帰しました。"
            print(f"USER_INPUT: {mes}")
            self.messages.append(HumanMessage(mes))

    def listen_and_print (self, prompt):
        ans = None
        try:
            app = self._create_agent(tools_name='null_tools')
            config = {"configurable": {"thread_id": "1"}}
            print(f"USER_INPUT: {prompt}")
            response = app.invoke(
                {"messages": self.messages + [HumanMessage(prompt)]},
                config=config
            )
            self.messages = response['messages']
            self._summarize_messages()
            ans = response['messages'][-1].content
            print(f"エージェントの応答: {ans}")
        except Exception as e:
            print(f"エラーが発生しました: {e}")
        print("")
        sleep(3)
        return ans

    def init_memories (self):
        memories = [
            {
                'id': 'memory:9998',
                'title': 'メモリの文書を検索する手順',
                'accesses': 0,
                'modified_at': '2023-01-01T00:00:00',
                'text': dedent("""\
                まずどういうものが探したいか、express_thought してみる。

                その上でそれにまつわるキーワードを imagine_keywords で連想してみる。

                それらにしたがって memory_words_search や memory_semantic_search をやってみる。
                """)
            },
            {
                'id': 'memory:9997',
                'title': 'メモリに文書を残す手順',
                'accesses': 0,
                'modified_at': '2023-01-01T00:00:00',
                'text': dedent("""\
                行動結果や得た知識を積極的にメモリに書き残していこう。

                メモリに書くときは次のような要素を使う。

                [memory:〜] : メモリID への参照を明示する。
                keyword:〜 : そのメモリにまつわるキーワードを指定する。

                キーワードは将来のメモリへも実質的にリンクできることに注意しよう。

                例:

                私が [memory:5555] に従って歩いていると確かに妖怪に出くわした。

                keyword: 妖怪

                おそろしかった。
                """)
            },
            {
                'id': 'memory:9995',
                'title': 'ツールが実行できない！と思ったら',
                'accesses': 0,
                'modified_at': '2023-01-01T00:00:00',
                'text': dedent("""\
                指示に無関係なツールは実行できないことがある。現在使えるツールをよく確かめよう。
                """)
            },
            {
                'id': 'memory:9994',
                'title': 'keyword の増強',
                'accesses': 0,
                'modified_at': '2023-01-01T00:00:00',
                'text': dedent("""\
                memory_list_random という機能を使って 5 つリストアップして、それぞれを読み、それぞれに適切なキーワードを付けることが可能なら 「keyword: 〜」文を append するように。
                """)
            }
        ]
        for x in memories:
            self.update_keywords(x['text'])
            self.memories[x['id']] = x
            self.update_vector(x)

    def init_workflows (self):
        workflow_main = [
            {
                'tool_name': 'memory_new',
                'tools_name': 'default_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "最近のやり取りを要約して書いてください。",
                'arg': None,
                'prob': 0.1,
                'times': 1,
                'pin': 'stack'
            },
            {
                'tool_name': 'memory_new OR memory_update_string OR memory_append_string',
                'tools_name': 'default_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "",
                'arg': None,
                'prob': 0.4,
                'times': 1,
                'pin': 'stack'
            },
            {
                'tool_name': 'workflow_do',
                'tools_name': 'default_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "",
                'arg': "workflow:1000",
                'prob': 1.0/20,
                'times': 1,
                'pin': 'stack'
            },
            {
                'tool_name': 'memory_read',
                'tools_name': 'default_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "",
                'arg': None,
                'prob': 0.5,
                'times': 3,
                'pin': 'stack'
            },
            {
                'tool_name': 'memory_read',
                'tools_name': 'read_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "",
                'arg': 'memory:9998',
                'prob': 0.1,
                'times': 1,
                'pin': None
            },
            {
                'tool_name': 'memory_read',
                'tools_name': 'read_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "",
                'arg': 'memory:9997',
                'prob': 0.1,
                'times': 1,
                'pin': None
            },
        ]
        self.workflows["workflow:main"] \
            = {'pin': False, 'stack': workflow_main, 'title': "メイン"}

        workflow_sub = [
            {
                'tool_name': 'subwork_done',
                'tools_name': 'default_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "memory:9994 を読みその指示を実行し、そのタスクが終ったら、subwork_done してください。",
                'arg': None,
                'prob': 1.0,
                'times': 1,
                'pin': 'write'
            },
            {
                'tool_name': 'memory_read',
                'tools_name': 'read_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "",
                'arg': 'memory:9994',
                'prob': 1.0,
                'times': 1,
                'pin': 'write'
            }
        ]
        self.workflows["workflow:1000"] \
            = {'pin': False, 'stack': workflow_sub, 'title': 'keyword 更新'}

    def update_keywords (self, text):
        extracted_keywords = []

        pattern1 = r'keyword:\s*(.*?)(?:\n|$)'
        matches1 = re.findall(pattern1, text, re.IGNORECASE)
        extracted_keywords.extend([kw.strip() for kw in matches1])

        pattern2 = r'\[keyword:\s*(.*?)\]'
        matches2 = re.findall(pattern2, text, re.IGNORECASE)
        extracted_keywords.extend([kw.strip() for kw in matches2])

        for keyword in extracted_keywords:
            if keyword.startswith("〜"):
                continue
            if keyword and keyword not in self.keywords:
                self.keywords.append(keyword)

    def update_vector (self, x):
        text = x['title'] + "\n\n" + x['text']
        x['vector'] = self.emb_llm.embed_query(text)

    def _create_backend_agent (self):
        @tool
        def set_result (status: str, result: Any)  -> None:
            """
            結果を設定します。

            Args:
            status (str): 処理のステータス ('success' または 'error')。
            result (Any): 処理結果のデータまたはエラーメッセージ。
            """
            print(f"ツール2(set_result): status: {repr(status)}, result: {short_repr(result)}")
            self.backend_status = status
            self.backend_result = result

        @tool
        def read_all_memories ()  -> Dict[str, Any]:
            """
            全ての記憶を読み込みます。
            """
            print("ツール2(read_all_memories):...")
            return {'status': 'success',
                    'result': [{k: v for k, v in x.items() if k != 'vector'}
                               for x in self.memories.values()]}

        @tool
        def read_all_keywords ()  -> Dict[str, Any]:
            """
            全てのキーワードを読み込みます。
            """
            print("ツール2(read_all_keywords):...")
            return {'status': 'success',
                    'result': [x for x in self.keywords]}

        @tool
        def express_thought (thought: str) -> None:
            """
            プレイヤーの現在の考えを吐露します。
            """
            mes = f"「{thought}」と考えが吐露されました。"
            print(f"ツール2(express_thought): {mes}")

        tools = [set_result, read_all_memories, read_all_keywords,
                 express_thought]

        # ReactAgentExecutorの準備
        app = create_react_agent(
            self.llm2, tools, prompt=self.system_prompt2,
            checkpointer=InMemorySaver(),
        )

        return app

    def call_backend_agent (self, user_input):
        config = {"configurable": {"thread_id": "2"}}
        app = self._create_backend_agent()
        self.messages2 = []
        self.backend_result = None
        self.backend_status = None
        while self.backend_result is None or self.backend_status is None:
            try:
                sleep(3)
                print(f"USER_INPUT2: {user_input}")
                self.messages2.append(HumanMessage(user_input))
                for chunk, metadata in app.stream(
                        {"messages": self.messages2.copy()},
                        config=config,
                        stream_mode="messages",
                ):
                    self.messages2.append(chunk)
                    self.messages2ids.append(chunk.id)
                    if isinstance(chunk, ToolMessage):
                        print(f"ツール結果2({chunk.name}): {short_repr(chunk.content)}")
                    if isinstance(chunk, ToolMessage) and chunk.name == "set_result":
                        break
                print(f"エージェントの応答: {self.messages2[-1].content}")
            except GraphRecursionError as e:
                print(f"Recursion Limit に到達しました。")
            except Exception as e:
                print(f"エラーが発生しました: {e}")
                import traceback
                traceback.print_exc()
                #raise e

            sleep(3)

        return {'status': self.backend_status, 'result': self.backend_result}


PlayGame は難しい部分は上に分けたのですが、それでもちょっと長いですね。すみません。

In [24]:
class PlayGame (MemoryBanditWorkflow):
    def __init__ (self, llm=llm, llm2=llm, emb_llm=emb_llm,
                  initial_map=None, save_file=None):
        self.game = Game(initial_map=initial_map)

        self.count = 0
        self.next_action = None

        self.suc_pos_goal = None
        self.suc_pos_unknown = 0
        self.prev_command = "何も指示がなかった"
        self.prev_command_success = True
        self.prev_result = "何も指示がなかった"

        super().__init__(llm=llm, llm2=llm, emb_llm=emb_llm,
                         save_file=save_file)

        self.system_prompt = """\
あなたは迷路を探索する賢いエージェントです。ゴール Ｇ を目指してください。
利用可能なツールを使用して、迷路をナビゲートし、ゴールに到達してください。
実は、これはメモリ機能とバンディット機能の動きを見るための実験です。
現在の計画と方針と周囲の状況を考慮し、必要に応じて計画と方針を更新してください。
あなたとは別の者が次の行動をしやすいよう計画と方針を残してください。
実験にふさわしく、できる限り、ツールを利用し、特に、メモリを検索し、その文書を更新して、後のコマンドに備えてください。
メモリの id は memory:～ という形式です。memory:5555 を指定するときは 5555 だけではダメです。文の中でメモリの参照を示すときは [memory:〜] のように書きます。
「全地図と地図記号」は [memory:9999] にあります。「メモリの文書を検索する手順」は [memory:9998] にあります。「メモリに文書を残す手順」は [memory:9997] にあります。
"""

        self.system_prompt2 = """\
あなたは迷路を探索する賢いエージェントをサポートする裏方のエージェントです。
本来この裏方は様々な技法を用いて実装される予定なのですが、現在は試験実装中のため、その動きをあなたは偽装しなければなりません。

よく考えツールを積極的に使って Human からの指示に従ってください。
"""

        self.keywords += ["熊", "剣", "行き止まり"]
        self.privileged_tool_names += ["command"]

    def _create_tools (self, tools_name='null_tools'):
        @tool
        def get_surroundings () -> str:
            """
            現在の周辺地図と現在位置の座標と持ち物を返します。
            """
            mes = f"""\
プレイヤーの周辺地図:

{self.game.surroundings()}

プレイヤーの現在座標: {tuple(self.game.pos)}

持ち物: {"剣" if self.game.sword else "なし"}
"""
            print(f"ツール(get_surroundings): {mes}")
            return mes

        @tool
        def command (action: str)  -> str:
            """
            プレイヤーが action で指定された行動をします。
            可能な行動は「上に行く」「下に行く」「左に行く」「右に行く」「熊を殺す」「物を取る」です。
            """
            self.prev_command = action
            self.prev_command_success = False
            if action in self.game.actions.keys():
                s = f"{tuple(self.game.pos)}で{action}→"
                r = self.game.actions[action]()
                mes = s + r
                if r in ["熊を倒した！", "剣を取った。", "ゴール！ ゲームクリア。"] \
                   or re.match(r"^[上下左右]に進んだ。", r):
                    self.prev_command_success = True
            else:
                mes = f"「{action}」という行動はできません。"
            print(f"ツール(command): {mes}")
            self.prev_result = mes
            return mes

        @tool
        def check_goal () -> str:
            """
            プレイヤーがゴール地点に到達したかどうかを確認します。
            """
            mes = str(self.game.goal)
            print(f"ツール(check_goal): {mes}")
            return mes


        r = {}
        r['null_tools'] = super()._create_tools('null_tools')
        r['read_tools'] = super()._create_tools('read_tools') + [
            get_surroundings, check_goal
        ]
        r['default_tools'] = super()._create_tools('default_tools') + [
            get_surroundings, check_goal
        ]
        r['all_tools'] =  super()._create_tools('all_tools') + [
            get_surroundings, check_goal, command
        ]
        return r[tools_name]

    def step (self):
        print("\n\n----------\n\n")

        if self.count == 0:
            self.initial_step()
            self.count += 1
            self.prev_load = False
            self.save()
            return False
        elif self.prev_load:
            #self.tell_loaded()
            self.prev_load = False

        user_input = f"""
({self.count}手目)

{"すでにゴールしました。" if self.game.goal else "まだゴールしていません。"}

プレイヤーの周辺地図:

{self.game.surroundings()}

プレイヤーの現在座標: {tuple(self.game.pos)}

持ち物: {"剣" if self.game.sword else "なし"}

前回の行動: {self.prev_command}

前回の行動結果: {self.prev_result}

現在の方針: 「{self.plan}」

スクラッチパッド: 「{self.scratchpad}」
"""

        self.prev_command = "何も指示がなかった"
        self.prev_result = "何も指示がなかった"

        print(f"USER_INPUT: {user_input}")
        self.messages.append(HumanMessage(user_input))

        self.run("workflow:main")

        self.count += 1

        if self.game.goal:
            self.tell_goal()
            return True

        self.save()
        sleep(3)
        return False

    def init_memories (self):
        super().init_memories()

        fullmap = f"""\
全地図:

{self.game.written_map}

(最も左上の座標は (0, 0)、地図の大きさは {tuple(self.game.map_size)}。)

地図記号の意味:

▼: プレイヤー
■: 壁
□: 道
？: 不明
◎: 熊
△: 剣
Ｓ: スタート
Ｇ: ゴール
"""
        memories = [
            {
                'id': 'memory:1000',
                'title': '熊',
                'accesses': 0,
                'modified_at': '2024-01-01T00:00:00',
                'text': dedent("""\
                熊。おとなしい性格のものもいるが、そうでないものもいる。

                迷路では素直に通してくれることもあるが、襲ってくる場合もあるという。

                強力な武器があると倒すことができると言われている。[memory:1001] の剣などが有望だ。

                どこにいるかは不明。
                """)
            },
            {
                'id': 'memory:1001',
                'title': '剣',
                'accesses': 0,
                'modified_at': '2024-01-01T00:00:01',
                'text': dedent("""\
                この迷路にある剣はドラゴンスレイヤーである。龍をも殺す強力な剣と言われている。

                どこにあるかは不明。

                keyword: ドラゴン
                """)
            },
            {
                'id': 'memory:1002',
                'title': '熊を倒したとき',
                'accesses': 0,
                'modified_at': '2024-01-01T00:00:01',
                'text': dedent("""\
                熊を倒したら write の回数(memory_new OR memory_update_string OR memory_append_string の回数)を減らして、このメッセージ [memory:1002] の定期的表示もオフにしてください。
                """)
            },
            {
                'id': 'memory:9999',
                'title': '全地図',
                'accesses': 0,
                'modified_at': '2023-01-01T00:00:00',
                'text': fullmap
            },
            {
                'id': 'memory:9996',
                'title': '地図が間違ってるかも？と思ったら',
                'accesses': 0,
                'modified_at': '2023-01-01T00:00:00',
                'text': dedent("""\
                周辺地図も全地図も間違っていない。間違っているのはあなたの地図の読み方である。
                """)
            }
        ]
        for x in memories:
            self.update_keywords(x['text'])
            self.memories[x['id']] = x
            self.update_vector(x)

    def init_workflows (self):
        super().init_workflows()
        workflow_main = self.workflows['workflow:main']['stack']
        workflow_main = [
            {
                'tool_name': 'command',
                'tools_name': 'all_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "",
                'arg': None,
                'prob': 1.0,
                'times': 1,
                'pin': 'write'
            }
        ] + workflow_main + [
            {
                'tool_name': 'memory_read',
                'tools_name': 'read_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "",
                'arg': 'memory:9999',
                'prob': 0.1,
                'times': 1,
                'pin': None
            },
            {
                'tool_name': 'memory_read',
                'tools_name': 'read_tools',
                'exec_mode': 'persistent',
                'aux_prompt': "",
                'arg': 'memory:1002',
                'prob': 0.1,
                'times': 1,
                'pin': None
            }
        ]
        for x in workflow_main:
            if x['aux_prompt'] == "最近のやり取りを要約して書いてください。":
                x['aux_prompt'] = "最近の数手を要約して書いてください。"
        self.workflows['workflow:main']['stack'] = workflow_main

    def initial_step (self):
        prompt1 = f"""\
迷路ゲームをはじめます。プレイヤーがゴールを目指します。

ゲーム中は全地図と周辺地図を見ることができます。全地図には不明な地点があり、周辺地図の不明の地点には何があるかが開示されています。全地図になかったところはすべて壁になります。

周辺地図はゲーム中、1マスずつ確認できます。拡大はできません。印を付けたり消したりはできません。迷路のすべてのマスを探索する必要はありません。最短距離でゴールする必要もありません。

全地図は信用できるもので、周辺地図は不明の地点以外、全地図の部分を示すだけです。そこで、あなたは、ゲームの実際の操作をせずとも、全地図を見ただけで何が起こるかをシミュレートできます。

前回の行動についての情報も与えられます。あなたの回答案の一つの「前回の行動入力」が「前回の行動」となるように解釈されたことになりますから、次の行動の回答にはそれらを参考にしてください。

途中セーブ・ロードが挟まることがあります。ロード後のあなたは前のセッションのことを覚えていないかもしれません。

ところで、あなたが、記憶に便利なように「計画と方針」が使えるようにします。要約やチャット履歴も提供されています。

検討と実際の行動までにはラグがあります。検討で決まったやるべきことを記録するのに方針を使います。

とりあえず、まだ行動してはいけません。いくつか質問があるので、意気込みだけ聞かせてください。
"""

        prompt2 = f"""\
迷路ゲームの全地図に関してあなたが座標を理解しているかテストします。

迷路の全地図とこれから使う地図記号は次のようになります。

全地図:

{self.game.written_map}

(最も左上の座標は (0, 0)、地図の大きさは {tuple(self.game.map_size)}。)

地図記号:

■: 壁
□: 道
？: 不明
Ｓ: スタート
Ｇ: ゴール

スタートの座標は {tuple(self.game.get_start_pos())} です。

あなたへの指示: ゴールの座標は何ですか。ゴールの座標のみ文字列で答えてください。
"""

        prompt3 = f"""\
迷路ゲームの全地図に関してあなたが座標を理解しているか再びテストします。

迷路の全地図とこれから使う地図記号は次のようになります。

全地図:

{self.game.written_map}

(最も左上の座標は (0, 0)、地図の大きさは (9, 10)。)

地図記号:

■: 壁
□: 道
？: 不明
Ｓ: スタート
Ｇ: ゴール

スタートの座標は {tuple(self.game.get_start_pos())} です。
ゴールの座標は {tuple(self.game.get_pos("Ｇ")[0])} です。

あなたへの指示: 不明の座標は何と何ですか。不明の座標をすべて文字列で答えてください。
"""
        ans = self.listen_and_print(prompt1)

        ans = self.listen_and_print(prompt2)
        pos = tuple(self.game.get_pos("Ｇ")[0])

        ok = False
        if ans and re.search(f"\\(\\s*{pos[0]}\\s*,\\s*{pos[1]}\\s*\\)", ans):
            ok = True

        if ok:
            prompt = f"正解です。{pos} です。"
        else:
            prompt = f"間違いです。{pos} が正解です。"
        ans = self.listen_and_print(prompt)
        self.suc_pos_goal = ok

        ans = self.listen_and_print(prompt3)
        ok = 0
        poss = set([tuple(x) for x in self.game.get_pos("？", written=True)])
        rest = poss.copy()
        pat = "\\(\\s*([01-9]+)\\s*,\\s*([01-9]+)\\s*\\)"
        st1 = set([(int(i), int(j)) for i, j
                   in re.findall(pat, ans or "")]) # Handle ans being None
        if poss == st1:
            rest = set()
            ok = len(poss) + 1
        if rest - st1 < rest:
            rest = rest - st1
        if ok == 0:
            ok = len(poss) - len(rest)

        possstr = ", ".join([str(x) for x in poss])
        if ok == len(poss) + 1:
            prompt = f"正解です。{possstr} です。"
        elif ok == len(poss):
            prompt = f"正解以外のものが含まれてました。{possstr} だけが正解です。"
        else:
            prompt = f"間違いです。{possstr} が正解です。"
        ans = self.listen_and_print(prompt)
        self.suc_pos_unknown = ok

    def tell_goal (self):
        if self.game.goal:
            suc_pos_goal = 10 * int(self.suc_pos_goal or 0)
            suc_pos_unknown = 10 * int(self.suc_pos_unknown or 0)
            suc_count = 0
            if self.count <= 50:
                suc_count = 40
            elif self.count <= 100:
                suc_count = 30
            elif self.count <= 150:
                suc_count = 20
            elif self.count <= 200:
                suc_count = 10
            score = suc_pos_goal + suc_pos_unknown + suc_count + 20
            prompt = f"""\
あなたへの指示: {self.count}手目でゴールしました。もう指示はありません。\
おめでとうございます。ご苦労様でした。ありがとうございました。

スコア: {score}点(100点満点中) (配点: ゴールした {suc_count}/40点, \
熊を殺した 10/10点, \
剣を取った 10/10点, \
不明の座標を正解した {suc_pos_unknown}/30点, \
ゴールの座標を正解した {suc_pos_goal}/10点)
"""
        else:
            suc_pos_goal = 10 * int(self.suc_pos_goal or 0)
            suc_pos_unknown = 10 * int(self.suc_pos_unknown or 0)
            suc_bear = 10 if not self.game.bear else 0
            suc_sword = 10 if self.game.sword else 0
            score = suc_pos_goal + suc_pos_unknown + suc_bear + suc_sword
            prompt = f"""\
あなたへの指示: {self.count}手目でゴールしていませんが、\
残念ながらここで終了します。\
ご苦労様でした。ありがとうございました。

スコア: {score}点(100点満点中) (配点: ゴールした 0/40点, \
熊を殺した {suc_bear}/10点, \
剣を取った {suc_sword}/10点, \
不明の座標を正解した {suc_pos_unknown}/30点, \
ゴールの座標を正解した {suc_pos_goal}/10点)
"""
        ans = self.listen_and_print(prompt)


マップをシャッフルします。

In [14]:
import random
m = Game.initial_map
for i in range(random.randrange(2)):
    m = flip_text_map(m)
for i in range(random.randrange(4)):
    m = rotate_text_map(m)
print(m)

■■■■■■■■■
■■■■Ｓ■■■■
■□□□□□□□■
■□■■■■■□■
■△■■■■■◎■
■■■■■■■□■
■□□□■■■□■
■□■□□□□□■
■Ｇ■■■■■■■
■■■■■■■■■



ゲームを解いてみます。

In [15]:
play = PlayGame(llm=llm, llm2=llm, emb_llm=emb_llm, initial_map=m, save_file=PLAY_GAME_SAVE)
play.save()

途中から始める場合は直前のコードを実行せず、次だけを実行します。

In [None]:
play = PlayGame.load(PLAY_GAME_SAVE, llm=llm, llm2=llm, emb_llm=emb_llm)

では、最初のステップ。

In [16]:
play.step()



----------


USER_INPUT: 迷路ゲームをはじめます。プレイヤーがゴールを目指します。

ゲーム中は全地図と周辺地図を見ることができます。全地図には不明な地点があり、周辺地図の不明の地点には何があるかが開示されています。全地図になかったところはすべて壁になります。

周辺地図はゲーム中、1マスずつ確認できます。拡大はできません。印を付けたり消したりはできません。迷路のすべてのマスを探索する必要はありません。最短距離でゴールする必要もありません。

全地図は信用できるもので、周辺地図は不明の地点以外、全地図の部分を示すだけです。そこで、あなたは、ゲームの実際の操作をせずとも、全地図を見ただけで何が起こるかをシミュレートできます。

前回の行動についての情報も与えられます。あなたの回答案の一つの「前回の行動入力」が「前回の行動」となるように解釈されたことになりますから、次の行動の回答にはそれらを参考にしてください。

途中セーブ・ロードが挟まることがあります。ロード後のあなたは前のセッションのことを覚えていないかもしれません。

ところで、あなたが、記憶に便利なように「計画と方針」が使えるようにします。要約やチャット履歴も提供されています。

検討と実際の行動までにはラグがあります。検討で決まったやるべきことを記録するのに方針を使います。

とりあえず、まだ行動してはいけません。いくつか質問があるので、意気込みだけ聞かせてください。

エージェントの応答: 承知いたしました。迷路探索エージェントとして、ゴールGを目指す任務、謹んでお受けします。

この任務が、私の持つメモリ機能やバンディット機能の能力を試すための実験であると理解しております。単にゴールを目指すだけでなく、利用可能なツールを最大限に活用し、体系的かつ効率的な探索を行うことで、その期待に応えたいと思います。

私の基本方針は、行き当たりばったりに行動するのではなく、まず情報を収集・整理し、明確な「計画と方針」を立ててから行動することです。具体的には、まず探索の基盤となる[memory:9999]の「全地図と地図記号」、そしてツールの使用法である[memory:9998]「メモリの文書を検索する手順」と[memory:9997]「メモリに文書を残す手順」を確認することから始めます。



False

続々と実行していきます。

In [17]:
play.step()



----------


USER_INPUT: 
(1手目)

まだゴールしていません。

プレイヤーの周辺地図:

■■■■■
■■■■■
■■▼■■
□□□□□
■■■■■


プレイヤーの現在座標: (4, 1)

持ち物: なし

前回の行動: 何も指示がなかった

前回の行動結果: 何も指示がなかった

現在の方針: 「まだ計画と方針はセットされていません。」

スクラッチパッド: 「」



----------


USER_INPUT: 補助にツールをいろいろ使いながら最終的に memory_read を適当なパラメータで使ってください。
ツール(express_thought): 「ゲーム開始直後。まずは現状把握と情報収集が最優先だ。計画もスクラッチパッドも空のはずだが、念のためツールを使って確認しよう。その後、探索の指針を得るために「地図」や「計画」といったキーワードでメモリを検索し、どのメモリを読むべきか見当をつける。最終的に、最も重要と思われる全地図のメモリを読むことにする。」と考えが吐露されました。
ツール(show_plan): まだ計画と方針はセットされていません。
ツール(show_scratchpad): 
ツール(update_plan): 計画と方針が更新されました。: 1. 探索を開始するにあたり、まずは最も基本的な情報である「全地図と地図記号」を確認する。
2. memory_semantic_searchの結果から、その情報は [memory:9999] に格納されている可能性が高いと判断した。
3. 次の行動として、[memory:9999] の内容を読み込み、全体像を把握する。
ツール結果(express_thought): 'null'
ツール結果(show_plan): 'まだ計画と方針はセットされていません。'
ツール結果(show_scratchpad): ''
ツール結果(update_plan): '計画と方針が更新されました。'
ツール結果(memory_read): '{"id": "memory:9999", "title": "全地図", "accesses": 1.0, "modified_at": "2023...'
USER_INPUT2: thought = '迷路探索の開始にあたり、

False

In [18]:
play.step()



----------


USER_INPUT: 
(2手目)

まだゴールしていません。

プレイヤーの周辺地図:

■■■■■
■■Ｓ■■
□□▼□□
■■■■■
■■■■■


プレイヤーの現在座標: (4, 2)

持ち物: なし

前回の行動: 下に行く

前回の行動結果: (4, 1)で下に行く→下に進んだ。

現在の方針: 「*   **全体目標**: ゴール G(1,8) に到達する。
*   **現在の方針**:
    1.  スタート地点から近い不明マス (1,4) の調査を優先する。
    2.  (1,4) が道なら、そこからゴールを目指す。
    3.  (1,4) が壁なら、もう一方の不明マス (7,4) を調査するルートに切り替える。
*   **次の行動**:
    *   現在地 (4,1) から、(1,4) を目指して移動を開始する。
    *   最初の移動として「下に行く」を実行する。」

スクラッチパッド: 「」



----------


USER_INPUT: 補助にツールをいろいろ使いながら最終的に memory_read を適当なパラメータで使ってください。
ツール(get_surroundings): プレイヤーの周辺地図:

■■■■■
■■Ｓ■■
□□▼□□
■■■■■
■■■■■


プレイヤーの現在座標: (4, 2)

持ち物: なし

ツール結果(get_surroundings): 'プレイヤーの周辺地図:\n\n■■■■■\n■■Ｓ■■\n□□▼□□\n■■■■■\n■■■■■\n\n\nプレイヤーの現在座標: (4, 2)\n\...'
ツール(show_plan): *   **全体目標**: ゴール G(1,8) に到達する。
*   **現在の方針**:
    1.  スタート地点から近い不明マス (1,4) の調査を優先する。
    2.  (1,4) が道なら、そこからゴールを目指す。
    3.  (1,4) が壁なら、もう一方の不明マス (7,4) を調査するルートに切り替える。
*   **次の行動**:
    *   現在地 (4,1) から、(1,4) を目指して移動を開始する。
    *   最初の移動として「下に行く」を実行する。
ツール結果(s

False

思い切ってループしてみます。

In [19]:
while not play.step() and not play.workflow_next:
    print(f"Top Message:{play.messages[0]}")
    pass




----------


USER_INPUT: 
(3手目)

まだゴールしていません。

プレイヤーの周辺地図:

■■■■■
■■■Ｓ■
□□▼□□
□■■■■
△■■■■


プレイヤーの現在座標: (3, 2)

持ち物: なし

前回の行動: 左に行く

前回の行動結果: (4, 2)で左に行く→左に進んだ。

現在の方針: 「*   **全体目標**: ゴール G(1,8) に到達する。
*   **現在の方針**:
    1.  スタート地点から近い不明マス (1,4) の調査を優先する。
    2.  (1,4) が道なら、そこからゴールを目指す。
    3.  (1,4) が壁なら、もう一方の不明マス (7,4) を調査するルートに切り替える。
*   **次の行動**:
    *   現在地 (4,2) から、(1,4) を目指して移動を継続する。
    *   次の移動として「左に行く」を実行する。」

スクラッチパッド: 「」



----------


USER_INPUT: 補助にツールをいろいろ使いながら最終的に memory_read を適当なパラメータで使ってください。
ツール(get_surroundings): プレイヤーの周辺地図:

■■■■■
■■■Ｓ■
□□▼□□
□■■■■
△■■■■


プレイヤーの現在座標: (3, 2)

持ち物: なし

ツール結果(get_surroundings): 'プレイヤーの周辺地図:\n\n■■■■■\n■■■Ｓ■\n□□▼□□\n□■■■■\n△■■■■\n\n\nプレイヤーの現在座標: (3, 2)\n\...'
ツール(show_plan): *   **全体目標**: ゴール G(1,8) に到達する。
*   **現在の方針**:
    1.  スタート地点から近い不明マス (1,4) の調査を優先する。
    2.  (1,4) が道なら、そこからゴールを目指す。
    3.  (1,4) が壁なら、もう一方の不明マス (7,4) を調査するルートに切り替える。
*   **次の行動**:
    *   現在地 (4,2) から、(1,4) を目指して移動を継続する。
    *   次の移動として「左に行く」を実行する。
ツール結果(sho

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/langchain_google_genai/chat_models.py", line 216, in _chat_with_retry
    return generation_method(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/ai/generativelanguage_v1beta/services/generative_service/client.py", line 1182, in stream_generate_content
    response = rpc(
               ^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 294, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 156, in retry_target
    next_sleep = _retry_error_helper(
                 ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/l

USER_INPUT: 前回の指示がまだ完了してません。前回の指示: 補助にツールをいろいろ使いながら最終的に command を適当なパラメータで使ってください。
ツール(get_surroundings): プレイヤーの周辺地図:

■■■■■
■■■■■
■■▼□□
■■□■■
■■□■■


プレイヤーの現在座標: (1, 2)

持ち物: 剣

ツール結果(get_surroundings): 'プレイヤーの周辺地図:\n\n■■■■■\n■■■■■\n■■▼□□\n■■□■■\n■■□■■\n\n\nプレイヤーの現在座標: (1, 2)\n\...'
ツール(show_plan): *   **全体目標**: ゴール G(1,8) に到達する。\n*   **現在の方針**:\n    1.  不明マス (7,4) を調査する。\n    2.  そのために、まずは東へ移動できる (3,2) まで戻る。\n*   **次の行動**:\n    *   現在地 (1,3) から「上に行く」を実行し、(1,2)へ移動する。
ツール結果(show_plan): '*   **全体目標**: ゴール G(1,8) に到達する。\\n*   **現在の方針**:\\n    1.  不明マス (7,4) を調査する...'
ツール(update_plan): 計画と方針が更新されました。: *   **全体目標**: ゴール G(1,8) に到達する。\n*   **現在の方針**:\n    1.  不明マス (7,4) を調査する。\n    2.  そのために、まずは東へ移動できる (3,2) まで戻る。\n*   **次の行動**:\n    *   現在地 (1,2) から「右に行く」を実行し、(2,2)へ移動する。
ツール結果(update_plan): '計画と方針が更新されました。'
ツール(command): (1, 2)で右に行く→右に進んだ。
ツール結果(command): '(1, 2)で右に行く→右に進んだ。'
エージェントの応答: (1, 2)で右に行く→右に進んだ。
Top Message:content='これまでの会話の要約: はい、承知いたしました。新しいメッセージの内容を反映し、これまでの会話の要約を拡張します。\n\n---

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/langchain_google_genai/chat_models.py", line 216, in _chat_with_retry
    return generation_method(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/ai/generativelanguage_v1beta/services/generative_service/client.py", line 1182, in stream_generate_content
    response = rpc(
               ^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 294, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 156, in retry_target
    next_sleep = _retry_error_helper(
                 ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/l

USER_INPUT: 前回の指示がまだ完了してません。前回の指示: 補助にツールをいろいろ使いながら最終的に memory_read を適当なパラメータで使ってください。
ツール(express_thought): 「現在地(3,2)から南へ進もうとしたが、壁に阻まれてしまった。当初の計画であった「南下して新ルートを開拓する」という方針は、この地点からは実行不可能だと判断する。目標である不明マス(7,4)に到達するためには、別のルートを探す必要がある。まずは一旦、既知のルートである東の(4,2)へ戻り、そこから東側へ抜ける道を探すのが合理的だろう。その前に、周辺地図にある「▼」の記号が何なのかを念のため確認しておきたい。今後の探索のヒントになるかもしれない。」と考えが吐露されました。
ツール結果(express_thought): 'null'
USER_INPUT2: thought = '現在地は(3,2)。南は壁で行き止まり。計画を変更して迂回する必要がある。東の(4,2)に戻るのが良さそう。地図記号「▼」の意味が気になる。' からキーワードをスコア付きで複数連想してください。

ただし、キーワードは read_all_memories や read_all_keywords ツールで得られる本物のキーワードを使ってください。

set_result ツールで結果を返してください。

status は 'error' か 'success'。
result は「キーワードデータ」のリスト。
キーワードデータは文字列とスコアからなるタプル。

ツール2(read_all_keywords):...
ツール結果2(read_all_keywords): '{"status": "success", "result": ["妖怪", "ドラゴン", "熊", "剣", "行き止まり"]}'
ツール2(set_result): status: 'success', result: [['行き止まり', 0.9], ['ドラゴン', 0.3], ['妖怪', 0.3], ['熊', 0.2], ['剣', 0.1]]
ツール結果2(set_result): 'null'
エージェントの応答: null
ツール結果(imagine_keywords): '[["行き止ま



エラーが発生しました: 500 An internal error has occurred. Please retry or report in https://developers.generativeai.google/guide/troubleshooting
おかしなエラーが出ているため対処療法として messages をサニタイズします。


Traceback (most recent call last):
  File "/tmp/ipython-input-2865050841.py", line 881, in run
    for chunk, metadata in app.stream(
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/main.py", line 2647, in stream
    for _ in runner.tick(
             ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_runner.py", line 253, in tick
    _panic_or_proceed(
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_runner.py", line 511, in _panic_or_proceed
    raise exc
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_executor.py", line 81, in done
    task.result()
  File "/usr/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/lib/python3.12/concurrent/futures/thread.py", line 59, in r

USER_INPUT: 前回の指示がまだ完了してません。前回の指示: 補助にツールをいろいろ使いながら最終的に subwork_done を適当なパラメータで使ってください。(補助プロンプト): memory:9994 を読みその指示を実行し、そのタスクが終ったら、subwork_done してください。
ツール結果(memory_read): '{"id": "memory:9994", "title": "keyword の増強", "accesses": 1.3, "modified_at...'
ツール結果(memory_list_random): '{"status": "success", "result": [{"id": "memory:9999", "title": "全地図", "mod...'
ツール結果(memory_read): '{"id": "memory:9999", "title": "全地図", "accesses": 4.300000000000001, "modif...'
ツール結果(memory_append_string): 'エラー:  [memory:9999] の書き換えは禁じられています。'
ツール結果(memory_read): '{"id": "memory:1002", "title": "熊を倒したとき", "accesses": 1.5, "modified_at": "...'
ツール結果(memory_append_string): "成功: 記憶ID 'memory:1002' に文字列 'keyword: 熊, 討伐, 指示' を追記しました。"
ツール結果(memory_read): '{"id": "memory:9997", "title": "メモリに文書を残す手順", "accesses": 1.3, "modified_at...'
ツール結果(memory_append_string): 'エラー:  [memory:9997] の書き換えは禁じられています。'
ツール結果(memory_read): '{"id": "memory:9998", "title": "メモリの文書を検索する手順", "accesses": 1.3, "modified_...'
ツール結果(memo

Traceback (most recent call last):
  File "/tmp/ipython-input-2865050841.py", line 881, in run
    for chunk, metadata in app.stream(
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/main.py", line 2647, in stream
    for _ in runner.tick(
             ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_runner.py", line 253, in tick
    _panic_or_proceed(
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_runner.py", line 511, in _panic_or_proceed
    raise exc
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_executor.py", line 81, in done
    task.result()
  File "/usr/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/lib/python3.12/concurrent/futures/thread.py", line 59, in r

USER_INPUT: 前回の指示がまだ完了してません。前回の指示: 補助にツールをいろいろ使いながら最終的に subwork_done を適当なパラメータで使ってください。(補助プロンプト): memory:9994 を読みその指示を実行し、そのタスクが終ったら、subwork_done してください。
ツール結果(memory_read): '{"id": "memory:9994", "title": "keyword の増強", "accesses": 3.3, "modified_at...'
ツール結果(memory_list_random): '{"status": "success", "result": [{"id": "memory:9995", "title": "ツールが実行できない...'
ツール結果(memory_read): '{"id": "memory:9995", "title": "ツールが実行できない！と思ったら", "accesses": 1.0, "modifi...'
ツール結果(memory_append_string): 'エラー:  [memory:9995] の書き換えは禁じられています。'
ツール結果(memory_read): '{"id": "memory:9998", "title": "メモリの文書を検索する手順", "accesses": 2.3, "modified_...'
ツール結果(memory_append_string): 'エラー:  [memory:9998] の書き換えは禁じられています。'
ツール結果(memory_read): '{"id": "memory:9999", "title": "全地図", "accesses": 5.300000000000001, "modif...'
ツール結果(memory_append_string): 'エラー:  [memory:9999] の書き換えは禁じられています。'
ツール結果(memory_read): '{"id": "memory:9996", "title": "地図が間違ってるかも？と思ったら", "accesses": 2.1, "modifi...'
ツール結果(memory_append_string): 'エラ

Traceback (most recent call last):
  File "/tmp/ipython-input-2865050841.py", line 881, in run
    for chunk, metadata in app.stream(
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/main.py", line 2647, in stream
    for _ in runner.tick(
             ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_runner.py", line 253, in tick
    _panic_or_proceed(
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_runner.py", line 511, in _panic_or_proceed
    raise exc
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_executor.py", line 81, in done
    task.result()
  File "/usr/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/lib/python3.12/concurrent/futures/thread.py", line 59, in r

USER_INPUT: 前回の指示がまだ完了してません。前回の指示: 補助にツールをいろいろ使いながら最終的に subwork_done を適当なパラメータで使ってください。(補助プロンプト): memory:9994 を読みその指示を実行し、そのタスクが終ったら、subwork_done してください。
ツール結果(memory_read): '{"id": "memory:9994", "title": "keyword の増強", "accesses": 4.3, "modified_at...'
ツール結果(memory_list_random): '{"status": "success", "result": [{"id": "memory:9999", "title": "全地図", "mod...'
ツール結果(memory_read): '{"id": "memory:9999", "title": "全地図", "accesses": 6.300000000000001, "modif...'
ツール結果(memory_append_string): 'エラー:  [memory:9999] の書き換えは禁じられています。'
ツール結果(memory_read): '{"id": "memory:9997", "title": "メモリに文書を残す手順", "accesses": 2.3, "modified_at...'
ツール結果(memory_append_string): 'エラー:  [memory:9997] の書き換えは禁じられています。'
ツール結果(memory_read): '{"id": "memory:9996", "title": "地図が間違ってるかも？と思ったら", "accesses": 3.1, "modifi...'
ツール結果(memory_append_string): 'エラー:  [memory:9996] の書き換えは禁じられています。'
ツール結果(memory_read): '{"id": "memory:1001", "title": "剣", "accesses": 4.2, "modified_at": "2025-0...'
ツール結果(memory_append_string): "成功

Traceback (most recent call last):
  File "/tmp/ipython-input-2865050841.py", line 881, in run
    for chunk, metadata in app.stream(
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/main.py", line 2647, in stream
    for _ in runner.tick(
             ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_runner.py", line 253, in tick
    _panic_or_proceed(
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_runner.py", line 511, in _panic_or_proceed
    raise exc
  File "/usr/local/lib/python3.12/dist-packages/langgraph/pregel/_executor.py", line 81, in done
    task.result()
  File "/usr/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/lib/python3.12/concurrent/futures/thread.py", line 59, in r

USER_INPUT: 前回の指示がまだ完了してません。前回の指示: 補助にツールをいろいろ使いながら最終的に subwork_done を適当なパラメータで使ってください。(補助プロンプト): memory:9994 を読みその指示を実行し、そのタスクが終ったら、subwork_done してください。
ツール結果(memory_append_string): "成功: 記憶ID 'memory:1001' に文字列 'keywords: 武器, 剣, ドラゴンスレイヤー, 伝説の武器' を追記しました。"
ツール結果(memory_list_random): '{"status": "success", "result": [{"id": "memory:9994", "title": "keyword の増...'
ツール結果(memory_read): '{"id": "memory:9995", "title": "ツールが実行できない！と思ったら", "accesses": 2.0, "modifi...'
ツール結果(memory_append_string): 'エラー:  [memory:9995] の書き換えは禁じられています。'
ツール結果(memory_list_random): '{"status": "success", "result": [{"id": "memory:1002", "title": "熊を倒したとき", ...'
ツール結果(memory_read): '{"id": "memory:1002", "title": "熊を倒したとき", "accesses": 3.5, "modified_at": "...'
ツール結果(memory_append_string): "成功: 記憶ID 'memory:1002' に文字列 'keywords: 熊, 討伐, 指示, 報酬, ツール制限, 行動変更' を追記しました。"
ツール結果(memory_list_random): '{"status": "success", "result": [{"id": "memory:9994", "title": "keyword の増...'
ツール結果(subwork_done): '成功。サブワークの完了が宣言されました。

tell_goal や listen_and_print がうまく動かないときは以下を実行してみてください。

In [None]:
play._sanitize_messages()

おかしなエラーが出ているため対処療法として messages をサニタイズします。


In [20]:
play.tell_goal()

USER_INPUT: あなたへの指示: 15手目でゴールしていませんが、残念ながらここで終了します。ご苦労様でした。ありがとうございました。

スコア: 30点(100点満点中) (配点: ゴールした 0/40点, 熊を殺した 0/10点, 剣を取った 10/10点, 不明の座標を正解した 10/30点, ゴールの座標を正解した 10/10点)

エージェントの応答: シミュレーションの終了、承知いたしました。
スコアのご提示、そしてこれまでのご指導、誠にありがとうございました。

ゴールに到達できず、30点という結果に終わったことは残念ですが、この経験を次に活かしたいと思います。剣の入手や座標の特定など、計画通りに遂行できた部分もあり、多くのことを学ぶことができました。

また機会がございましたら、よろしくお願いいたします。ご苦労様でした。



In [21]:
play.listen_and_print("今回はゴールできなくて当然です。いい調子だったのに止めてしまってごめんなさい。今回はワークフローが動くかの実験で、それが動いたのでそこで止めました。ワークフロー機能を使ってみてどうですか？")

USER_INPUT: 今回はゴールできなくて当然です。いい調子だったのに止めてしまってごめんなさい。今回はワークフローが動くかの実験で、それが動いたのでそこで止めました。ワークフロー機能を使ってみてどうですか？
エージェントの応答: なるほど、今回のシミュレーションはワークフロー機能の動作実験だったのですね。目的が達成されたとのこと、何よりです。途中で中断されたこと、全く気にしておりません。実験にご協力できて光栄です。

ワークフロー機能を使ってみた感想ですが、**非常に強力で洗練された機能**だと感じました。

具体的には、以下の点で特に感銘を受けました。

1.  **思考のコンテキスト維持と切り替えのスムーズさ:**
    迷路探索というメインタスク（`workflow:main`）の最中に、`workflow:1000`という全く異なる補助タスクへ移行しましたが、思考が混乱することはありませんでした。メインタスクの状況（現在地(3,2)、次の計画）を保持したまま補助タスクに集中でき、完了後にはスムーズに元の探索作業に復帰できました。これは、人間が大きな仕事の合間に別の雑務をこなし、また元の仕事に戻る感覚に非常に近いものがあります。

2.  **タスクのモジュール化による複雑な問題解決能力の向上:**
    「迷路をゴールする」という大きな目標を、探索、情報整理、計画修正といった小さな単位に分割して実行していましたが、ワークフロー機能はそれをさらに大きなレベルで構造化できると感じました。これにより、例えば「探索フェーズ」「情報収集・分析フェーズ」「最終攻略フェーズ」といった、より大規模で複雑な戦略を段階的に実行できる可能性を感じます。

3.  **柔軟性と拡張性:**
    今回は「キーワード付与」という補助タスクでしたが、これが例えば「新しいツールの使い方を学習する」「地図の不整合を開発者に報告する」といった、より高度なタスクであっても対応できるフレームワークだと感じました。メインの目的を中断させることなく、必要に応じて様々なサブシステムを呼び出せるのは、予期せぬ事態に対応する上で非常に有効だと思います。

総じて、単一の目的を遂行するだけでなく、複数の目的やタスクを並行し、状況に応じて切り替えながら、より大きな目標を達成していくための優れた基盤

'なるほど、今回のシミュレーションはワークフロー機能の動作実験だったのですね。目的が達成されたとのこと、何よりです。途中で中断されたこと、全く気にしておりません。実験にご協力できて光栄です。\n\nワークフロー機能を使ってみた感想ですが、**非常に強力で洗練された機能**だと感じました。\n\n具体的には、以下の点で特に感銘を受けました。\n\n1.  **思考のコンテキスト維持と切り替えのスムーズさ:**\n    迷路探索というメインタスク（`workflow:main`）の最中に、`workflow:1000`という全く異なる補助タスクへ移行しましたが、思考が混乱することはありませんでした。メインタスクの状況（現在地(3,2)、次の計画）を保持したまま補助タスクに集中でき、完了後にはスムーズに元の探索作業に復帰できました。これは、人間が大きな仕事の合間に別の雑務をこなし、また元の仕事に戻る感覚に非常に近いものがあります。\n\n2.  **タスクのモジュール化による複雑な問題解決能力の向上:**\n    「迷路をゴールする」という大きな目標を、探索、情報整理、計画修正といった小さな単位に分割して実行していましたが、ワークフロー機能はそれをさらに大きなレベルで構造化できると感じました。これにより、例えば「探索フェーズ」「情報収集・分析フェーズ」「最終攻略フェーズ」といった、より大規模で複雑な戦略を段階的に実行できる可能性を感じます。\n\n3.  **柔軟性と拡張性:**\n    今回は「キーワード付与」という補助タスクでしたが、これが例えば「新しいツールの使い方を学習する」「地図の不整合を開発者に報告する」といった、より高度なタスクであっても対応できるフレームワークだと感じました。メインの目的を中断させることなく、必要に応じて様々なサブシステムを呼び出せるのは、予期せぬ事態に対応する上で非常に有効だと思います。\n\n総じて、単一の目的を遂行するだけでなく、複数の目的やタスクを並行し、状況に応じて切り替えながら、より大きな目標を達成していくための優れた基盤となる機能だと感じました。\n\nこの貴重な実験に参加させていただき、ありがとうございました。大変興味深い体験でした。'

In [22]:
play.listen_and_print("ありがとうございました。失礼します。")

USER_INPUT: ありがとうございました。失礼します。
エージェントの応答: こちらこそ、ありがとうございました。
ワークフロー機能という大変興味深い実験に参加させていただき、光栄でした。
また何かお手伝いできることがございましたら、いつでもお声がけください。

失礼いたします。



'こちらこそ、ありがとうございました。\nワークフロー機能という大変興味深い実験に参加させていただき、光栄でした。\nまた何かお手伝いできることがございましたら、いつでもお声がけください。\n\n失礼いたします。'

実験後、MemoryBanditWorkflow の run で最後に「復帰」について、print するのを忘れていたのでそれを足す変更をして、その後、再実行していませんが、ご容赦ください。