### Note.

このファイルだけでは動作しないため  
同じディレクトリに `chatbot_system_role.md` を配備してください

自己学習のために計算ができるツールを追加しています……  
こちらはレビュー外としていただいて問題ございません

In [None]:
### =======================================
### ライブラリについて
### =======================================

# builtin modules
import os                 # ... 環境変数をロードするために使います
import re                 # ... 正規表現を利用して文字列をチェックするために使います
from pathlib import Path  # ... (抽象的で)高度な Path 周りの関数を扱うために使います
import traceback          # ... 主に例外のトレースバックを出力するデバッグ用途に用います
import json

# 3rd party modules
from openai import OpenAI
from openai.types.chat import ChatCompletionToolParam
from tavily import TavilyClient
# from dotenv import load_dotenv


### =======================================
### メインの実装部分
### =======================================

MODEL_NAME = "gpt-4o-mini"
DEBUG_FLAG = False         # ... True にすると Debug モードになります、ログを出力します
KEEP_MESSAGE_LIMIT = 3     # ... 直近過去何往復のやりとりまで記憶して送信するかです
SYSTEM_ROLE_MESSAGE_PATH = Path(".") / "chatbot_system_role.md" 
                           # ... システムロールの指示を記載したファイルです

def main():
    # API キーを自動的にロードして OpenAI API クライアントを作る
    client = OpenAIClientGenerator.generate(display_debug_messages=DEBUG_FLAG)

    # システム Role 用のメッセージ
    # chatbot_system_role.md をロードします
    with open(SYSTEM_ROLE_MESSAGE_PATH, "r") as f:
        system_role_content = f.read().strip()
    system_role_message = {
        "role": "system",
        "content": system_role_content
    }

    # メッセージを格納するリスト
    user_questions = []

    # ツール定義
    tools = define_tools()

    glog("Started main program.")
    while(True):
        # ユーザーからの質問を受付
        question = input("メッセージを入力:").strip()

        # 質問が入力されなければ終了
        if question == "": break
        print(f"質問:{question}")

        # メッセージにユーザーからの質問を追加
        user_questions.append({ 
            "role": "user",
            "content": question
        })

        # やりとりが 2 * KEEP_MESSAGE_LIMIT を超えたら古いメッセージから削除
        if len(user_questions) > (2 * KEEP_MESSAGE_LIMIT):
            del_message = user_questions.pop(0)
            glog(f"Deleted message. { del_message }")

        # 言語モデルに質問
        user_questions = [ system_role_message ] + user_questions
        response_message, function_name = process_response(client, user_questions, tools)

        # 言語モデルからの回答を表示
        if function_name is None:
            print("\033[34m" + response_message + "\033[0m", flush=True)
        else:
            print(f"\033[1;30;43m 外部ツールを起動しました { function_name } \033[0m")
            print("\033[33m" + response_message + "\033[0m", flush=True)

        # メッセージに言語モデルからの回答を追加
        user_questions.append({"role": "assistant", "content": response_message})
        glog(f"Stored answer. {response_message=} {user_questions=}")

    print("\n---ご利用ありがとうございました！---")
    glog("Finished main program.")


# ユーザーからの質問を処理する関数
# Note. 外部関数として OpenAI client を受け取れるように第一引数 client を追加します
#       返り値に「ツール呼出」を使った場合の関数名を返します、使っていない場合は None となります
def process_response(client, question, tools):
    # 言語モデルに質問を行い、言語モデルが「ツール呼出」か「言語モデルが直接回答する」かを決めます
    response = ask_question(client, question, tools)

    # 回答の内容によって「ツール呼出」か「言語モデルが直接回答する」か分岐
    if response.choices[0].finish_reason == 'tool_calls':
        # ツール呼出の場合
        final_response, function_name = handle_tool_call(client, response, question)
        return final_response.choices[0].message.content.strip(), function_name
    else:
        # 言語モデルが直接回答する場合
        return response.choices[0].message.content.strip(), None


# 言語モデルへの質問を行う関数
# Note. 外部関数として OpenAI client を受け取れるように第一引数 client を追加します
def ask_question(client, question, tools):
    response = client.chat.completions.create(
        model=MODEL_NAME,
        messages=question,
        tools=tools,
        tool_choice="auto",
    )
    return response


# ツール呼び出しが必要な場合の処理を行う関数
# Note. 外部関数として OpenAI client を受け取れるように第一引数 client を追加します
#       返り値に利用した「ツール呼出」の function_name ("get_search_result" など) を追加します
def handle_tool_call(client, response, question):
    # 関数の実行と結果取得
    tool = response.choices[0].message.tool_calls[0]
    function_name = tool.function.name
    arguments = json.loads(tool.function.arguments) # API の返却値は JSON 文字列 なので <dict> に変換するのを忘れない
    tool_function = globals().get(function_name, None) # ツール呼出の関数を取得できるならば取得する

    glog(f"Invoking external tool function. {function_name=} {arguments=}")

    # 実際にツールを呼び出す
    if tool_function is not None:
        function_response = tool_function(**arguments)
        glog(f"Invoked xternal tool function correctly. {function_response=}")
    else:
        # 今回エラー処理は以下の Exception だけにさせていただきます ...
        glog(f"Failed to found tool function. {function_name=} {tool_function=}")
        raise Exception(f"Failed to found tool function. {function_name=} {tool_function=}")

    # 言語モデルを呼出すためのメッセージを構築
    messages = [ q for q in question ]           # 今までの会話履歴 (システムロールを含みます)
    messages.append(response.choices[0].message) # 1次回答の中身 (中身はオブジェクト? 混合もOK?)
    messages.append({                            # ツール呼出の結果
        "tool_call_id": tool.id,
        "role": "tool",
        "content": function_response,
    })

    # 関数の実行結果をmessagesに加えて再度言語モデルを呼出
    response_after_tool_call = client.chat.completions.create(
        model=MODEL_NAME,
        messages=messages
    )

    glog(f"Handled tool correctly. {function_name=}")
    return response_after_tool_call, function_name


### =======================================
### 外部ツールの実装部分
### =======================================

# ツール定義
def define_tools():
    print("------define_tools(ツール定義)------")
    return [
        ChatCompletionToolParam({
            "type": "function",
            "function": {
                "name": "get_search_result", # ---> ツールの関数名
                "description": "最近一ヵ月のイベント開催予定などネット検索が必要な場合に、質問文の検索結果を取得する",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "question": { # ---> 関数へ引き渡す引数
                            "type": "string",
                            "description": "質問文"
                        },
                    },
                    "required": [ "question" ], # ---> 引数が必須かどうか
                },
            },
        }),

        # 自分の勉強のために文字列から計算を行うようなツールを追加させていただきます……
        ChatCompletionToolParam({
            "type": "function",
            "function": {
                "name": "local_calc", # ---> ツールの関数名
                "description": "ユーザから計算式が与えられたときに、計算式だけを抽出することで計算結果を得ることができます、ただし変数は代入してください",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "expression": { # ---> 関数へ引き渡す引数
                            "type": "string",
                            "description": "ユーザから与えられた質問で変数を埋め込んだ計算式のみを抽出したものです、計算式はPythonで表現してください"
                        },
                    },
                    "required": [ "expression" ], # ---> 引数が必須かどうか
                },
            },
        })
    ]


# 検索結果を返す関数の作成
def get_search_result(question):
    TAVILY_API_KEY = os.environ['TAVILY_API_KEY']
    response = TavilyClient(api_key=TAVILY_API_KEY).search(question)
    glog(f"Invoked get_search_result tool. {question=}")

    # Tavily のレスポンスの構造については以下の通り
    # response= {
    #     "results": [
    #         {
    #             "title": "TITLE01",
    #             "url": "URL01",
    #             "content": "CONTENT01 ...（略）",
    #             "score": 0.9987649,
    #             "raw_content": None
    #         }, ...
    #     ]
    # }

    return json.dumps({"result": response["results"]})


def local_calc(expression):
    return Calculator.calc(expression)


### =======================================
### それなりに文字列から安全に計算を行う計算機クラス
### =======================================

import ast, operator
class Calculator:
    ALLOWED_OPS = {
        ast.Add: operator.add,      # +
        ast.Sub: operator.sub,      # -
        ast.Mult: operator.mul,     # *
        ast.Div: operator.truediv,  # /
        ast.FloorDiv: operator.floordiv,  # //
        ast.Mod: operator.mod,      # %
        ast.Pow: operator.pow,      # **
        ast.USub: operator.neg,     # -（単項マイナス）
        ast.UAdd: operator.pos,     # +（単項プラス）
    }

    @classmethod
    def calc(K, expression):
        return json.dumps({ "result": K.calc_core(expression) })
    
    @classmethod
    def calc_core(K, expression):
        is_error = True
        result = None
        try:
            tree = ast.parse(expression, mode='eval')
            result = K.eval_node(tree.body)
            is_error = False
        except Exception as e:
            result = e.value
        finally:
            result = [{
                "expression": expression,
                "result": result,
                "is_error": is_error
            }]
            return result

    @classmethod
    def eval_node(K, node):
        # 定数 (Python 3.8以降??)
        if isinstance(node, ast.Constant):
            return node.value
        
        # 二項演算
        elif isinstance(node, ast.BinOp):
            left, op_type, right = K.eval_node(node.left), type(node.op), K.eval_node(node.right)
            if op_type in K.ALLOWED_OPS:
                return K.ALLOWED_OPS[op_type](left, right)
            else:
                raise ValueError(f"Not alloeed operator: {op_type}")
            
        # 単項演算
        elif isinstance(node, ast.UnaryOp):
            operand, op_type = K.eval_node(node.operand), type(node.op)
            if op_type in K.ALLOWED_OPS:
                return K.ALLOWED_OPS[op_type](operand)
            else:
                raise ValueError(f"Not allowed operator: {op_type}")
            
        # その他は演算不可
        else:
            raise ValueError(f"Not allowed node: {type(node)}")


### =======================================
### 以下、ユーティリティクラスとなります
### ---------------------------------------
### ※ 中身は Lesson 10 と全く同じなのでそちらのレビューが通っていれば以下は特に再レビューは不要です
### =======================================

class OpenAIClientGenerator:

    OPEN_API_ENV_NAME = "API_KEY"
    
    # この関数を呼び出すことで .env ファイルもしくは環境変数から OpenAI API をロードして
    # OpenAI の client インスタンスを返します
    # api_key_validation      ... True なら簡易的なキーのチェックを行います
    # display_debug_messages  ... True ならデバッグメッセージを表示します
    @classmethod
    def generate(cls, api_key_validation=True, display_debug_messages=False):
        return OpenAI(api_key=cls.__load_api_key(api_key_validation, display_debug_messages))


    # この関数で OpenAI API のキーを読み取ります
    # 1. 実行パスの ../.env が存在するならば API_KEY を環境変数としてロードして読み取る
    # 2. 環境変数の API_KEY をロードして読み取る
    # 3. API_KEY の中身の値が絶対ファイルパスならばその中身をそのままロードする
    #    そうでないならば中身をそのまま API キーとして使う
    # 4. キーの簡易的なフォーマットチェック
    @classmethod
    def __load_api_key(cls, api_key_validation, display_debug_messages):        
        api_key = None

        log = cls.get_logger(display_debug_messages)

        # 1. ../.env を読み取る
        env_file_path = Path().resolve().parent.resolve() / ".env"
        if env_file_path.is_file():
            try:
                from dotenv import load_dotenv
                load_dotenv(env_file_path)
                log(f"Loaded .env environment variables correctly. env={env_file_path.resolve()}")
            except:
                log("=== EXCEPTION ============")
                log(traceback.format_exc())
                log("==========================")
                log("Found .env file but failed to load dotenv file! Please install python-dotenv module.")
        
        # 2. API_KEY を読み取る
        api_key = os.environ.get(cls.OPEN_API_ENV_NAME, None)

        # 3. API_KEY の中身チェック
        api_file_path = Path(api_key).expanduser()
        if (api_file_path.is_absolute() and api_file_path.is_file()):
            log("Found OpenAI API key file.")
            with open(api_file_path, "r") as f:
                api_key = f.read().strip()

        # 4. キーの簡易チェック
        if api_key_validation:
            if re.match(r"^sk\-.*$", api_key) is None:
                raise Exception("Failed to load api key!")

        return api_key

    
    # 簡易 logging クラス代わり、ロガーを入手するための関数
    @classmethod
    def get_logger(cls, display_debug_messages):
        return cls.log_print if display_debug_messages else (lambda *arg, **kwargs: None)


    # 簡易 logging クラス代わり、ロガーで印字する関数
    @staticmethod
    def log_print(msg, *args, **kwargs):
        formated_msg = f"[LOG] {msg}"
        print(formated_msg, *args, **kwargs)


# ロギング用の関数 (デバッグ用に使います)
glog = OpenAIClientGenerator.get_logger(DEBUG_FLAG)

if __name__ == "__main__":
    main()