# <center >谷歌开源Agent框架ADK开发实战</center>
## <center>Part 4. ADK 会话设计模式及权限管理开发实战</center>

&emsp;&emsp;本节课，我们将深入介绍`Google ADK`中用于权限控制和会话储存的`SessionService`工作流模块，其定位如下图所示；


<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506051536794.png" width=60%></div>

# 一、Google ADK 会话管理设计模式

&emsp;&emsp;大模型的应用产品都离不开会话管理。例如聊天机器人在接收用户一轮接一轮的新问题的时候，需要先回忆起之前的问答历史，再结合当前的问题给出本轮的回答。单智能体系统在自主处理复杂任务的时候，也需要将已经执行过的任务（如调用工具）和当前的任务进行关联，从而实现任务的闭环。而更复杂的多智能体系统则会涉及到更到的协同工作，更需要一个整体的会话管理来明确的知道每个子智能体都做了什么，当前处于什么状态，接下来需要做什么以及什么时候完成了最终的任务需求。这种管理机制通常被称为`Conversational Context`，即会话语境。

&emsp;&emsp;在`Google ADK`中，`Conversational Context`主要是由`Session`、`State`和`Memory`三个核心组件来实现。我们需要依次来了解这三个组件的实现原理和应用方法。

&emsp;&emsp;首先来看`Session`组件在`Google ADK`中的设计思路。

## 1.1 Session构造函数

&emsp;&emsp;`Session`组件是专门设计用来跟踪和管理单个对话线程的对象。在业务场景中，一个`Session`对象可以简单的理解为一个独立的会话，这里我们以[智谱清言](https://chatglm.cn/)为例，如下所示：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506041151068.png" width=60%></div>

&emsp;&emsp;用户可以随意开启新的对话，也可以随时切换到之前的对话继续进行对话，每一轮对话信息都是独立的，互不干扰。在`Google ADK` 框架的设计下，为了能够在独立线程中存储和管理会话中涉及到的交互数据，定义了一个`Session`类，其源码位置如下：https://github.com/google/adk-python/blob/main/src/google/adk/sessions/session.py

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202505261547573.png" width=60%></div>

&emsp;&emsp;`Session`是一个标准的`Pydantic`数据模型类，该类会作为数据容器，接收自定义的值，从而提供独立的缓存对象。其可接收的核心参数如下所示：

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>Session 对象核心参数</font></p>
<div class="center">

| 属性名              | 类型               | 描述                                           |
|-------------------|------------------|----------------------------------------------|
| `id`              | `str`            | 会话的唯一标识符。                                 |
| `app_name`        | `str`            | 应用的名称。                                     |
| `user_id`         | `str`            | 用户的ID。                                      |
| `state`           | `dict[str, Any]` | 会话的状态，默认为空字典。                           |
| `events`          | `list[Event]`    | 会话的事件列表，例如用户输入、模型响应、函数调用/响应等，默认为空列表。 |
| `last_update_time`| `float`          | 会话的最后更新时间，默认为0.0。                     |

&emsp;&emsp;在每个独立的会话中（`Session`），都会包含特定的用户与代理之间的对话历史记录，以及代理的上下文等信息。这些信息则是通过`Session`对象的指定参数来进行管理。如下所示：

- **id**：对每个会话生成唯一的标识符，用于区分不同的会话。可以手动指定，如未手动指定，则会使用`uuid.uuid4()`生成一个随机字符串作为`Session ID`;
- **app_name**：应用的名称，用于区分不同的应用。比如一个应用产品中不仅包含普通对话功能，还包含知识库问答功能等，那么就可以通过`app_name`来区分不同的功能模块，不同的功能模块彼此间的数据是隔离的，互不干扰；
- **user_id**：用户的ID，用于区分不同的用户；主要是用来做用户级的状态管理，每个用户之间的对话信息是隔离的，互不干扰；
- **events**：会话的事件列表，比如用户的输入、大模型的响应、函数调用/响应等，其实就是一个历史会话列表。首次运行时为空列表，每当用户与代理进行一轮交互，就会根据交互的类型和内容生成相应的事件，不断地将事件追加进来；
- **last_update_time**：会话的最后更新时间，默认为0.0。每发生一次事件更新，就会更新一次。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506041200321.png" width=60%></div>

&emsp;&emsp;在对不同应用层级的权限管理上（即区分`app_name`、`user_id`和`session_id`），`Session`对象的`state`参数可以接收一个字典，字典的键值对即为不同层级的状态管理，分别为：

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>状态层级表</font></p>
<div class="center">

| 状态层级 | 前缀 | 作用域 | 生命周期 | 使用场景 |
|----------|------|--------|----------|----------|
| 应用级状态 | app: | 所有用户共享 | 应用生命周期 | 全局配置、功能开关、模型参数 |
| 用户级状态 | user: | 特定用户 | 用户生命周期 | 个人偏好、历史记录、权限设置 |
| 会话级状态 | 无前缀 | 单次会话 | 会话生命周期 | 对话上下文、临时变量、会话配置 |
| 临时状态 | temp: | 单次会话 | 不持久化 | UI状态、临时计算结果 |

## 1.2 SessionService抽象服务类

&emsp;&emsp;理解了`Session`对象的结构后，如果要实际应用，则需要创建出一个个`Session`对象。但是需要说明的是，`Session`对象不是直接实例化的（虽然可以直接调用构造函数，但并不建议这样做），而是通过`SessionService`方法创建，`ADK`的这种设计模式就是典型的工厂模式，它要求对同样数据结构的`Session`对象，如果选择不同的实例化类，就能达到不同的应用效果。而实例化`Session`的基类方法`SessionService`实现。其源码位置如下：https://github.com/google/adk-python/blob/main/src/google/adk/sessions/base_session_service.py

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202505261612533.png" width=60%></div>

&emsp;&emsp;在`BaseSessionService`类中核心实现的就是五个方法，其中四个核心抽象方法及一个事件管理方法，如下表所示：

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>BaseSessionService核心方法</font></p>
<div class="center">

| 方法名 | 参数 | 返回值 | 功能描述 | 用途场景 |
|--------|------|--------|----------|----------|
| create_session | app_name, user_id, state?, session_id? | Session | 创建新的会话实例 | 用户开启新对话、初始化会话状态 |
| get_session | app_name, user_id, session_id, config? | Session? | 根据ID获取已存在的会话 | 恢复对话上下文、读取历史状态 |
| list_sessions | app_name, user_id | ListSessionsResponse | 列出用户在应用下的所有会话 | 显示对话历史、会话管理界面 |
| delete_session | app_name, user_id, session_id | None | 删除指定会话及其数据 | 清理过期会话、用户删除操作 |
| append_event | session, event | Event | 向会话追加事件并更新状态 | 需要特殊存储逻辑、事务处理 |

&emsp;&emsp;其他类通过继承`BaseSessionService`并重写这四个核心抽象方法，便可以实现不同的存储方式。其中`ADK`框架已经提供了`InMemorySessionService`、`DatabaseSessionService`、`VertexAiSessionService`等实现，如下所示：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202505261613259.png" width=60%></div>

&emsp;&emsp;从代码中可以看到`ADK`提供了多种 `SessionService` 实现，如内存、数据库、Vertex AI 等，以适应不同的持久化需求，其对应的函数类包含：

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>SessionService实现类</font></p>
<div class="center">

| 实现类 | 存储方式 | 适用场景 |
|--------|----------|----------|
| InMemorySessionService | 内存字典 | 开发测试、单机应用 |
| DatabaseSessionService | 关系型数据库 | 生产环境、持久化需求 |
| VertexAiSessionService | Google Cloud | 云端部署、AI集成 |

&emsp;&emsp;无论是哪种`SessionService`，其核心功能都是创建、检索、更新（附加 `Events` 、修改 `State` ）、删除等方法，以管理`Session` 对象处理生命周期。

- 创建会话：当有新用户交互时初始化一个 `Session`，会话具有唯一 ID（可由 `SessionService` 自动生成或提供）和关联的 `app_name`（应用标识）及 `user_id`（用户标识）。这些标识用于区分不同对话线程和归属用户。
- 恢复会话：根据会话ID检索已有 `Session`，让代理能够“接着上次聊”，保持上下文连续性。这对多轮对话非常关键。`SessionService` 可同时管理多个 `Session`，对每个用户/应用可以维护独立的对话线程。
- 追加事件：在会话过程中，每当用户发送消息或代理产生响应/动作时，`SessionService` 负责将此交互作为 `Event` 附加到 `Session` 的事件历史中。这一过程还负责更新会话的 `State`——由事件中包含的状态差异 (`state_delta`) 来指定哪些状态键值需要改变。`SessionService` 提供 `append_event`(`session`, `event`) 等方法来保证历史记录与状态更新的原子性，这样可以确保在持久化存储下会话状态不会遗漏更新。
- 列出现有会话：`SessionService` 支持查询指定用户在某应用下的活动会话列表。这有助于管理多对话线程，例如在应用界面显示用户的所有对话，或在程序中遍历清理过期会话等。
- 结束清理会话：当对话完成或不再需要时，`SessionService` 提供删除会话的方法，将其从存储中移除。良好的清理策略可以防止资源占用过多。官方建议在对话结束时显式调用 `delete_session` 清除会话数据。

&emsp;&emsp;每一种`SessionService`的实现类都实现不同的会话存储方式，各自有不同的应用场景。接下来我们就逐一展开介绍。首先来看`InMemorySessionService`的应用方法。

# 二、InMemorySessionService内存会话服务

&emsp;&emsp;`InMemorySessionService` 是`Google ADK` 框架中的内存会话服务实现，内部使用进程内数据结构（如字典）保存 `Session` 对象及其事件、状态，方法实现上相对简单直接。它继承自 `BaseSessionService`，实现了所有抽象方法。但由于仅存储于内存，它不提供持久化，所以会导致应用重启后所有会话数据都会丢失，因此更加适合开发调试和短期任务场景。其提供了完整的会话管理功能，包括创建、检索、更新（附加 Events 、修改 State ）、删除等方法，如下所示：

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>InMemorySessionService核心方法</font></p>
<div class="center">

| 方法名 | 类型 | 参数 | 返回值 | 主要功能 | 应用场景 |
|--------|------|------|--------|----------|----------|
| create_session() | 异步 | app_name, user_id, state?, session_id? | Session | 创建新会话 | 新用户注册、主题分离、状态初始化 |
| get_session() | 异步 | app_name, user_id, session_id, config? | Optional[Session] | 获取指定会话 | 会话恢复、状态检查、历史过滤 |
| list_sessions() | 异步 | app_name, user_id | ListSessionsResponse | 列出用户所有会话 | 会话管理、会话选择、统计分析 |
| delete_session() | 异步 | app_name, user_id, session_id | None | 删除指定会话 | 隐私保护、存储清理、用户请求 |
| append_event() | 异步 | session, event | Event | 向会话追加事件 | 对话记录、状态更新、行为追踪 |


&emsp;&emsp;其核心实现的功能包括：
- 开始新的对话： 当用户开始交互时创建新的 `Session` 对象。
- 恢复现有对话： 检索特定 `Session` （使用其 ID），以便代理可以从中断的地方继续。
- 保存进度： 将新的交互（ `Event` 对象）添加到会话历史记录中。
- 列出对话： 查找特定用户和应用程序的活动会话线程。
- 清理： 当对话结束或不再需要时，删除 `Session` 对象及其相关数据。

&emsp;&emsp;`InMemorySessionService` 的存储机制是在内存中存储 `Session` 对象，`Runner` 需要在内存中找到已存在的 `Session`，再执行后续的调度处理。所以必须是先存储`Session`，后`Runner`查找并运行，这是必须遵循的顺序。对之前的课程，使用的方法是在内存中预先创建并存储 `Session`，如下所示：

```python

    # 步骤1: 预先在内存中创建 Session
    await session_service.create_session(
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=SESSION_ID
    )

    # 步骤2: Runner 找到这个 Session 会话对象
    runner = Runner(app_name=APP_NAME, session_service=session_service, ...)

    # 步骤3: 运行时 Runner 通过 session_service.get_session() 找到已存在的 Session_ID
    await runner.run_async(user_id=USER_ID, session_id=SESSION_ID, ...)
```

&emsp;&emsp;这里大家可以运行`pre_created_session.py`文件实现基于`InMemorySessionService`的流式多轮对话，运行效果如下所示：

```python
    # 注意：依赖的第三方依赖包为：
    pip install google-adk litellm

    # 同时需要在.env 文件中正确填写api_key
    DEEPSEEK_API_BASE=https://api.deepseek.com
    DEEPSEEK_API_KEY=sk-db7ecc2d50ea
```

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506041325715.png" width=60%></div>

&emsp;&emsp;这种预构建`Session`对象中`app_name`、`user_id`、`session_id`的方法只适用于快速开发调试，在生产环境中并不适用。更常见的做法是让`Runner` 自动创建 `Session`，其开发逻辑就是先通过`get_session()`方法检查 `Session` 是否存在，如果存在则直接使用，否则通过`create_session()`方法创建一个新的 `Session`。这里需要说明的是:在源码中当传入的`session_id`为空时，会通过`uuid.uuid4()`生成一个随机字符串作为 `Session ID` :

```python
    def _create_session_impl(self, *, app_name: str, user_id: str, 
                            state: Optional[dict[str, Any]] = None,
                            session_id: Optional[str] = None) -> Session:
        # 如果没有提供 session_id，则自动生成
        session_id = (
            session_id.strip()
            if session_id and session_id.strip()
            else str(uuid.uuid4())  # 自动生成 UUID
        )
```

&emsp;&emsp;我们将优化后的逻辑写在 `auto_create_session.py` 文件中，运行效果如下所示：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506041341518.png" width=60%></div>

&emsp;&emsp;更近一步地，当了解了`Session`的存储机制之后， 我们就可以利用 `InMemorySessionService` 来实现会话级的状态管理。这里我们通过`Gradio`构建了一个会话级的状态管理界面，通过这个界面我们可以更加直观的体会到在`ADK`中如何实现会话级的状态管理。核心代码在`session_manager_chatbot.py`文件中，启动方法如下所示：

```python
    # 注意: 运行前需要先安装gradio
    pip install gradio
```

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506041543062.png" width=60%></div>

&emsp;&emsp;当正常启动后，可以在浏览器中通过`localhost:7860`地址访问到会话管理界面，界面中包含会话列表、创建会话、切换会话、删除会话等功能，如下所示：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506041543063.png" width=60%></div>

&emsp;&emsp;基于`Google ADK`框架中实现的`Session`构建逻辑，可以非常容易地实现会话管理，无需我们编写额外的基础功能逻辑，可以非常快速的构建应用产品的用户管理模块。但是，`InMemorySessionService` 的缺点也非常明显，即会话数据仅存储在内存中，应用重启后所有会话数据都会丢失。因此适合开发调试和短期任务场景。如果想要进一步的贴近业务开发场景，则是需要对用户数据进行持久化存储，这里就需要用到`DatabaseSessionService`。

# 三、DatabaseSessionService持久化存储

&emsp;&emsp;`DatabaseSessionService`是`Google ADK`框架中提供的另一种会话服务管理的实现，与`InMemorySessionService`实现的内存会话存储不同，`DatabaseSessionService`可以直接连接到关系型数据库，利用`ORM`或直接通过`SQL`语句将产生的`Session` 转换为数据库记录（如会话表、事件表、状态表）来存储会话数据，从而实现数据的持久性。在`database_session_service.py` 中定义的源码中，从`DynamicJSON`类型的实现可以看出目前明确支持的数据库类型为：

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>DatabaseSessionService支持的数据库类型</font></p>
<div class="center">

| 数据库 | 数据类型 | 序列化 | 性能 | 源码注释 | 支持级别 |
|--------|----------|--------|------|----------|----------|
| PostgreSQL | JSONB | ❌ 无需 | 🚀 最高 | "uses JSONB" | 最佳 |
| MySQL | LONGTEXT | ✅ 需要 | ⚡ 中等 | "address data too long issue" | 完全 |
| 其他 | TEXT | ✅ 需要 | 🐌 较低 | "TEXT with JSON serialization" | 基础 |

&emsp;&emsp;`JSONB`或者`LONGTEXT`其实就是在关系型数据库里“怎么把一段 JSON（或其他半结构化内容）落到表里”的两种存储方式。在源码中有多处明显的逻辑表明对`PostgreSQL`的支持是最佳的，分别为：

```python
    # 1. 类注释明确说明:PostgreSQL使用 JSONB，其他数据库使用TEXT+序列化
    class DynamicJSON(TypeDecorator):
    """A JSON-like type that uses JSONB on PostgreSQL and TEXT with JSON serialization for other databases."""

    # 2. 数据类型优化
    def load_dialect_impl(self, dialect: Dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.JSONB)  # 🚀 原生JSONB
        if dialect.name == "mysql":
            return dialect.type_descriptor(mysql.LONGTEXT)    # 📝 长文本
        return dialect.type_descriptor(Text)                 # 📄 普通文本

    # 3. 数据处理效率
   def process_bind_param(self, value, dialect: Dialect):
        if value is not None:
            if dialect.name == "postgresql":
                return value  # 🚀 JSONB直接处理dict，无需序列化
            return json.dumps(value)  # 🐌 其他数据库需要序列化为字符串

    def process_result_value(self, value, dialect: Dialect):
        if value is not None:
            if dialect.name == "postgresql":
                return value  # 🚀 JSONB直接返回dict
            else:
                return json.loads(value)  # 🐌 其他数据库需要反序列化 
```


&emsp;&emsp;因此，我们也建议大家在使用`DatabaseSessionService`时，优先使用`PostgreSQL`数据库以获得最佳性能。接下来，我们就以`PostgreSQL`为例，详细介绍如何构建`Session`的本地持久化存储。

&emsp;&emsp;首先，我们需要先在本地安装`PostgreSQL`应用程序，并创建一个数据库以用于存储`Session`数据。

## 3.1 本地部署 PostgreSQL

&emsp;&emsp;`PostgreSQL`是一个开源的关系型数据库管理系统，支持在`Windows`、`Linux`、`macOS`等操作系统上安装和部署，其下载的官网地址为：https://www.enterprisedb.com/downloads/postgres-postgresql-downloads 

- **Step 1. 下载安装包**

&emsp;&emsp;点击链接后跳转到官方 `PostgreSQL`下载页面, 选择`17.5`最新稳定版本，点击`Download`按钮，即可下载适用于`Windows`的`PostgreSQL`安装包。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609780.png" width=60%></div>

- **Step 2. 运行安装程序**

&emsp;&emsp;下载后，导航到下载目录，双击`postgresql-17.5-1-windows-x64`文件，双击可执行文件即可开始安装过程：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609781.png" width=60%></div>

- **Step 3. 执行安装过程**

&emsp;&emsp;运行后将显示欢迎屏幕。点击 `Next` 继续。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609783.png" width=60%></div>

&emsp;&emsp;在安装路径选择页面，选择安装路径，默认路径为`C:\Program Files\PostgreSQL\17`，点击`Next`按钮。强烈不建议修改此路径，容易导致安装失败。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609784.png" width=60%></div>


&emsp;&emsp;安装程序将询问要安装哪些组件。默认情况下，它会选择`PostgreSQL Server`(数据库服务器) 、`pgAdmin`(数据库管理工具) 、`Stack Builder`(用于下载和安装附加工具的安装管理工具) 和 `Command Line Tools`(命令行工具) ,默认全选即可。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609785.png" width=60%></div>

&emsp;&emsp;接下来指定数据库数据文件的位置。默认即可。然后点击`Next` 。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609786.png" width=60%></div>

&emsp;&emsp;这一步，需要为 `PostgreSQL` 超级用户 (postgres) 设置密码。请务必设置一个强密码并牢记，后续所有与 `PostgreSQL` 相关的操作都需要使用此密码。设置密码后点击 `Next` 。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609787.png" width=60%></div>

&emsp;&emsp;默认端口为 `5432`，如​​果没有特殊要求，可以直接使用默认端口号。点击 `Next` 。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609788.png" width=60%></div>

&emsp;&emsp;继续选择数据库的区域设置。默认设置`Default`即可，也可以选择自定义设置，比如`Yi, China`。选择后继续单击 `Next` 。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609789.png" width=60%></div>

&emsp;&emsp;执行完上述步骤后，安装程序将显示我们之前步骤选择的选项摘要。确认无误后，点击 `Next` 开始安装。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609791.png" width=60%></div>


&emsp;&emsp;最后，等待自动安装完成后，点击 `Finish` 完成安装。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609792.png" width=60%></div>

- **Step 4. 启动 `pgAdmin` 验证**


&emsp;&emsp;完成安装后，可以通过`pgAdmin`可视化工具连接到`PostgreSQL`数据库。我们需要在开始菜单中找到`pgAdmin 4`，并点击`pgAdmin 4`启动`pgAdmin`。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609793.png" width=60%></div>

&emsp;&emsp;`pgAdmin4` 打开后，它会自动检测你的 `PostgreSQL` 服务器。双击服务器名称，然后输入安装时设置的密码进行连接。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609794.png" width=60%></div>

&emsp;&emsp;连接成功后，可以看到`pgAdmin`的界面。如果一切操作执行顺利，则说明`PostgreSQL`安装成功。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609795.png" width=60%></div>

- **Step 5. 创建数据库**

&emsp;&emsp;在`pgAdmin4`中，一旦连接成功，就可以看到`PostgreSQL`的`Servers`列表。可以通过右键单击浏览器中的 `Databases` 节点并选择 `Create > Database` 来创建新的数据库。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609796.png" width=60%></div>

&emsp;&emsp;在弹出的对话框中，输入数据库名称，然后点击`Create`按钮。这里我们使用`adk`作为数据库名称，同时`OID`是对每个数据库的唯一标识。执行 `CREATE DATABASE dbname;` 时，`PostgreSQL` 会自动为 `dbname` 在 `pg_database` 中分配一个新的 `OID`，所以可以不用手动设置。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609797.png" width=60%></div>

&emsp;&emsp;创建成功后，可以看到数据库列表中出现了新创建的数据库。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031609798.png" width=60%></div>

* **Step 6. Python 连接 PostgreSQL 测试**


&emsp;&emsp;当`PostgreSQL Server`正常运行且数据库创建成功后，我们就可以使用`Python`连接`PostgreSQL`数据库，并进行连通性测试。使用`Python`连接`PostgreSQL`数据库可以使用底层的`DB-API`或异步协议，也可以使用封装层的`SQLAlchemy`，其区别如下：

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>Python 连接 PostgreSQL 的方式</font></p>
<div class="center">


| 层级                           | 依赖的第三方包                                                                                                                 | 作用                                                                       | 典型连接串示例                                                                                                                                     |
| ---------------------------- | ---------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------- |
| **底层 – 纯驱动（DB‑API 或异步协议）** | - **`psycopg2` / `psycopg2‑binary`**（经典，同步）  <br> - **`psycopg` v3**（新版，可同步也可 `asyncio`）  <br> - **`asyncpg`**（高性能全异步） | 把 Python 语句直接送到 PostgreSQL，返回游标/结果集。自己管理 SQL、事务、连接池                      | `psycopg.connect("dbname=adk user=postgres")`                                                                                               |
| **抽象层 – SQLAlchemy**        | `sqlalchemy`                                                                                                           | 在驱动之上提供 **Engine(连接池) + Core(SQL 构造器) + ORM**| `create_engine("postgresql+psycopg2://user:pwd@localhost/adk")` <br>异步：`create_async_engine("postgresql+asyncpg://user:pwd@localhost/adk")` |


&emsp;&emsp;我们这里先使用`psycopg2`来测试`PostgreSQL`的连通性。首先需要安装`psycopg2`包，执行如下命令在当前虚拟环境中安装即可。

In [3]:
! pip install psycopg2 



&emsp;&emsp;接下里，设置`PostgreSQL`的连接配置信息，需要根据自己的实际情况进行修改。

In [22]:
import psycopg2

# 数据库连接配置
DB_CONFIG = {
    'host': 'localhost',  # 这里替换成实际的 PostgreSQL 服务器地址
    'port': 5432,   # 这里替换成实际的 PostgreSQL 服务器端口
    'database': 'adk',  # 这里替换成实际的 PostgreSQL 数据库名称
    'user': 'postgres',  # 这里替换成实际的 PostgreSQL 用户名
    'password': 'snowball2019'  # 这里替换成实际的 PostgreSQL 密码
}

&emsp;&emsp;编写`psycopg2`的连接测试代码，如下所示:

In [23]:
def test_psycopg2_connection():
    """使用 psycopg2 测试数据库连接"""
    print("🔍 正在使用 psycopg2 测试数据库连接...")
    
    try:
        # 尝试连接数据库
        conn = psycopg2.connect(**DB_CONFIG)
        cursor = conn.cursor()
        
        # 执行简单查询
        cursor.execute("SELECT version();")
        version = cursor.fetchone()
        
        print(f"📊 PostgreSQL 版本: {version[0]}")
        
        # 测试数据库操作
        cursor.execute("SELECT current_database(), current_user;")
        db_info = cursor.fetchone()
        print(f"📂 当前数据库: {db_info[0]}")
        print(f"👤 当前用户: {db_info[1]}")
        
        cursor.close()
        conn.close()
        return "✅ PostgreSQL 连接成功！"
        
    except psycopg2.Error as e:
        print(f"❌ PostgreSQL 连接失败: {e}")
        return False
    except Exception as e:
        print(f"❌ 意外错误: {e}")
        return False

&emsp;&emsp;最后，执行测试代码，如果能正常输出`PostgreSQL`的版本信息，则说明使用`psycopg2`连接`PostgreSQL`成功。

In [24]:
# 基础连接测试
test_psycopg2_connection()

🔍 正在使用 psycopg2 测试数据库连接...
📊 PostgreSQL 版本: PostgreSQL 17.5 on x86_64-windows, compiled by msvc-19.43.34808, 64-bit
📂 当前数据库: adk
👤 当前用户: postgres


'✅ PostgreSQL 连接成功！'

## 3.2 DatabaseSessionService接入PostgreSQL 

&emsp;&emsp;在`ADK`框架中，`DatabaseSessionService`是通过`SQLAlchemy`来连接`PostgreSQL`数据库的，这一点从源码中可以明显看出：

```python
    # 关键导入 - 只有 SQLAlchemy
    from sqlalchemy import Boolean, delete, Dialect, ForeignKeyConstraint, func, Text
    from sqlalchemy.engine import create_engine, Engine
    from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
    from sqlalchemy.orm import Session as DatabaseSessionFactory, sessionmaker

    def __init__(self, db_url: str, **kwargs: Any):
        # 1. 创建 SQLAlchemy 引擎
        db_engine = create_engine(db_url, **kwargs)
        
        # 2. 创建 session factory
        self.database_session_factory = sessionmaker(bind=self.db_engine)
        
        # 3. 自动创建表结构
        Base.metadata.create_all(self.db_engine)
```

&emsp;&emsp;因此，我们只需要在`DatabaseSessionService`的初始化方法中，传入`PostgreSQL`的连接字符串即可。但在接入之前，先在开发环境中使用`SQLAlchemy`的`create_engine`方法测试`PostgreSQL`的连通性。首先需要安装`SQLAlchemy`包，执行如下命令在当前虚拟环境中安装即可。

In [13]:
! pip install SQLAlchemy



&emsp;&emsp;依然沿用上文中的`DB_CONFIG`配置信息：

In [25]:
# 数据库连接配置
DB_CONFIG = {
    'host': 'localhost',  # 这里替换成实际的 PostgreSQL 服务器地址
    'port': 5432,   # 这里替换成实际的 PostgreSQL 服务器端口
    'database': 'adk',  # 这里替换成实际的 PostgreSQL 数据库名称
    'user': 'postgres',  # 这里替换成实际的 PostgreSQL 用户名
    'password': 'snowball2019'  # 这里替换成实际的 PostgreSQL 密码
}

&emsp;&emsp;编写`SQLAlchemy`的连接测试代码，如下所示:

In [26]:
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError


def test_sqlalchemy_connection():
    """使用 SQLAlchemy 测试数据库连接"""
    print("\n🔍 正在使用 SQLAlchemy 测试数据库连接...")
    
    # 构建数据库URL
    db_url = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
    print(f"🔗 数据库URL: {db_url.replace(DB_CONFIG['password'], '***')}")
    
    try:
        # 创建引擎
        engine = create_engine(db_url)
        
        # 测试连接
        with engine.connect() as conn:
            # 执行简单查询
            result = conn.execute(text("SELECT version()"))
            version = result.fetchone()
            
            print(f"📊 PostgreSQL 版本: {version[0]}")
            
            # 测试数据库信息
            result = conn.execute(text("SELECT current_database(), current_user"))
            db_info = result.fetchone()
            print(f"📂 当前数据库: {db_info[0]}")
            print(f"👤 当前用户: {db_info[1]}")
            
            # 测试表创建权限
            try:
                conn.execute(text("CREATE TABLE IF NOT EXISTS test_table (id SERIAL PRIMARY KEY, name VARCHAR(50))"))
                conn.execute(text("DROP TABLE IF EXISTS test_table"))
                print(f"✅ 数据库写入权限正常")
            except Exception as e:
                print(f"⚠️ 数据库写入权限测试失败: {e}")
            
        return True, db_url
        
    except SQLAlchemyError as e:
        print(f"❌ SQLAlchemy 连接失败: {e}")
        return False, None
    except Exception as e:
        print(f"❌ 意外错误: {e}")
        return False, None

In [27]:
# 执行测试
test_sqlalchemy_connection()


🔍 正在使用 SQLAlchemy 测试数据库连接...
🔗 数据库URL: postgresql://postgres:***@localhost:5432/adk
📊 PostgreSQL 版本: PostgreSQL 17.5 on x86_64-windows, compiled by msvc-19.43.34808, 64-bit
📂 当前数据库: adk
👤 当前用户: postgres
✅ 数据库写入权限正常


(True, 'postgresql://postgres:snowball2019@localhost:5432/adk')

&emsp;&emsp;如果正常输出`PostgreSQL`的版本信息，则说明使用`SQLAlchemy`库连接`PostgreSQL`成功。而对于`DatabaseSessionService`的接入，只需要在`DatabaseSessionService`的初始化方法中，传入`PostgreSQL`的连接字符串即可。即上述代码返回的`db_url（数据库URL）`。其基本形式如下：

In [33]:
from google.adk.sessions import DatabaseSessionService

# 构建数据库URL
db_url = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"

# 创建 DatabaseSessionService
db_session_service = DatabaseSessionService(db_url)
print(f"✅ DatabaseSessionService 创建成功！")

✅ DatabaseSessionService 创建成功！


&emsp;&emsp;而需要重点说明的是，当执行完`db_session_service`的初始化方法后，`ADK`会自动创建`PostgreSQL`的表结构，如下代码：

```python
    def __init__(self, db_url: str, **kwargs: Any):
        """Initializes the database session service with a database URL."""
        # 1. Create DB engine for db connection
        # 2. Create all tables based on schema  
        # 3. Initialize all properties

        try:
            db_engine = create_engine(db_url, **kwargs)
        except Exception as e:
            # ... 错误处理 ...

        self.db_engine: Engine = db_engine
        # ... 其他初始化 ...

        # 🎯 关键：自动创建所有表
        Base.metadata.create_all(self.db_engine)
```

&emsp;&emsp;其源码位置：https://github.com/google/adk-python/blob/main/src/google/adk/sessions/database_session_service.py

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031808784.png" width=60%></div>

&emsp;&emsp;这四个表结构分别对应的`StorageSession`、`StorageEvent`、`StorageAppState`、`StorageUserState`类，对应着"sessions"、"events"、"app_states"、"user_states"表。我们可以在`pgAdmin4`中进行查看，如下所示：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031857794.png" width=60%></div>

&emsp;&emsp;我们需要重点了解这四个表的结构才能很好的理解为什么`DatabaseSessionService`能够实现会话的持久化存储。因此接下来我们需要详细的深入到各个表中字段的设计和使用以明确每个表之间的关联关系。

## 3.3 DatabaseSessionService的表结构分析

&emsp;&emsp;我们先依次查看各个表中字段的定义及说明，如下所示：

- **sessions 表 (StorageSession) 作为会话主表，存储会话基本信息**

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>StorageSession表结构</font></p>
<div class="center">

| 字段名 | 数据类型 | 约束 | 说明 |
|--------|----------|------|------|
| app_name | String(128) | PRIMARY KEY | 应用名称 |
| user_id | String(128) | PRIMARY KEY | 用户ID |
| id | String(128) | PRIMARY KEY | 会话ID (默认UUID) |
| state | DynamicJSON | - | 会话状态数据 (JSON格式) |
| create_time | DateTime | DEFAULT now() | 创建时间 |
| update_time | DateTime | DEFAULT now(), ON UPDATE now() | 更新时间 |

- **events 表 (StorageEvent) 作为事件表，存储对话消息和事件**

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>StorageEvent表结构</font></p>
<div class="center">

| 字段名 | 数据类型 | 约束 | 说明 |
|--------|----------|------|------|
| id | String(128) | PRIMARY KEY | 事件ID |
| app_name | String(128) | PRIMARY KEY, FOREIGN KEY | 应用名称 |
| user_id | String(128) | PRIMARY KEY, FOREIGN KEY | 用户ID |
| session_id | String(128) | PRIMARY KEY, FOREIGN KEY | 会话ID |
| invocation_id | String(256) | - | 调用ID |
| author | String(256) | - | 作者 (user/agent) |
| branch | String(256) | NULLABLE | 分支 |
| timestamp | DateTime | DEFAULT now() | 事件时间戳 |
| content | DynamicJSON | NULLABLE | 消息内容 (JSON格式) |
| actions | PickleType | - | 动作数据 (序列化对象) |
| long_running_tool_ids_json | Text | NULLABLE | 长运行工具ID (JSON字符串) |
| grounding_metadata | DynamicJSON | NULLABLE | 基础元数据 (JSON格式) |
| partial | Boolean | NULLABLE | 是否为部分消息 |
| turn_complete | Boolean | NULLABLE | 是否完成轮次 |
| error_code | String(256) | NULLABLE | 错误代码 |
| error_message | String(1024) | NULLABLE | 错误消息 |
| interrupted | Boolean | NULLABLE | 是否被中断 |

- **app_states 表 (StorageAppState) 作为应用级状态表，存储应用全局状态**

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>StorageAppState表结构</font></p>
<div class="center">

| 字段名 | 数据类型 | 约束 | 说明 |
|--------|----------|------|------|
| app_name | String(128) | PRIMARY KEY | 应用名称 |
| state | DynamicJSON | DEFAULT {} | 应用状态数据 (JSON格式) |
| update_time | DateTime | DEFAULT now(), ON UPDATE now() | 更新时间 |

- **user_states 表 (StorageUserState) 作为用户级状态表，存储用户在特定应用中的状态**

<style>
.center 
{
  width: auto;
  display: table;
  margin-left: auto;
  margin-right: auto;
}
</style>

<p align="center"><font face="黑体" size=4>StorageUserState表结构</font></p>
<div class="center">

| 字段名 | 数据类型 | 约束 | 说明 |
|--------|----------|------|------|
| app_name | String(128) | PRIMARY KEY | 应用名称 |
| user_id | String(128) | PRIMARY KEY | 用户ID |
| state | DynamicJSON | DEFAULT {} | 用户状态数据 (JSON格式) |
| update_time | DateTime | DEFAULT now(), ON UPDATE now() | 更新时间 |

&emsp;&emsp;这四个表对应的就是`ADK`框架对用户状态、应用状态、会话状态、事件状态的存储与管理机制，其对应的实体关系图如下所示：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031831418.png" width=60%></div>

&emsp;&emsp;同时当发生交互时，其数据流向如下图所示：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031838225.png" width=60%></div>

&emsp;&emsp;对于层级管理，`ADK`框架中通过前缀来区分不同的作用域，如下所示：

```bash
                                                                                                    ┌─────────────────────────────────────────┐
                                                                                                    │ app_state[app_name][key]               │  ← app:xxx
                                                                                                    ├─────────────────────────────────────────┤
                                                                                                    │ user_state[app_name][user_id][key]     │  ← user:xxx  
                                                                                                    ├─────────────────────────────────────────┤
                                                                                                    │ sessions[app_name][user_id][session_id] │  ← xxx (无前缀)
                                                                                                    │   .state[key]                          │
                                                                                                    ├─────────────────────────────────────────┤
                                                                                                    │ (内存中，不持久化)                        │  ← temp:xxx
                                                                                                    └─────────────────────────────────────────┘


                                                                                                ┌─────────────────────────────────────────────────────────┐
                                                                                                │                     SessionService                     │  
                                                                                                │ ┌─────────────────┬─────────────────┬─────────────────┐ │
                                                                                                │ │   App Level     │   User Level    │  Session Level  │ │
                                                                                                │ │                 │                 │                 │ │
                                                                                                │ │ app_state       │ user_state      │ session.state   │ │
                                                                                                │ │ {              │ {               │ {               │ │
                                                                                                │ │   app1: {      │   app1: {       │   session1: {   │ │
                                                                                                │ │     config: {} │     user1: {    │     data: {}    │ │
                                                                                                │ │   }            │       pref: {}  │   }             │ │
                                                                                                │ │ }              │     }           │ }               │ │
                                                                                                │ │                 │   }             │                 │ │
                                                                                                │ └─────────────────┴─────────────────┴─────────────────┘ │
                                                                                                └─────────────────────────────────────────────────────────┘
```

&emsp;&emsp;理解了上述的表结构和数据流向后，当我们在进行`DatabaseSessionService`的创建和使用时，就能够很好的理解各个表中数据的存储和使用情况。这里我们创建如下示例依次进行测试。首先创建会话：

In [34]:
# 测试创建会话
test_session = await db_session_service.create_session(
    app_name="test_app",
    user_id="test_user",
    session_id="test_session_001"
)
print(f"✅ 测试会话创建成功: {test_session.id}")

✅ 测试会话创建成功: test_session_001


&emsp;&emsp;`create_session`方法会依次在`sessions`、`app_states`、`user_states`表中创建数据，而不会生成`events`表中的数据。

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506031904640.png" width=60%></div>


&emsp;&emsp;同时，我们也可以通过`get_session`方法获取会话信息，如下所示：

In [35]:
# 测试获取会话
retrieved_session = await db_session_service.get_session(
    app_name="test_app",
    user_id="test_user",
    session_id="test_session_001"
)

print(f"✅ 测试会话获取成功: {retrieved_session.id if retrieved_session else 'None'}")

✅ 测试会话获取成功: test_session_001


&emsp;&emsp;创建多个会话并通过`list_sessions`方法列出所有会话，如下所示：

In [36]:
# 再创建一个新的会话
test_session = await db_session_service.create_session(
    app_name="test_app",
    user_id="test_user",
    session_id="test_session_002"
)

# 测试列出会话
sessions_list = await db_session_service.list_sessions(
    app_name="test_app",
    user_id="test_user"
)

print(f"✅ 会话列表获取成功，共 {len(sessions_list.sessions)} 个会话")
print(f"✅ 会话列表: {[session.id for session in sessions_list.sessions]}")

✅ 会话列表获取成功，共 2 个会话
✅ 会话列表: ['test_session_001', 'test_session_002']


&emsp;&emsp;最后，我们还可以通过`delete_session`方法删除会话，如下所示：

In [37]:
# 清理测试数据
await db_session_service.delete_session(
    app_name="test_app",
    user_id="test_user",
    session_id="test_session_001"
)
print(f"✅ 测试会话删除成功")

✅ 测试会话删除成功


&emsp;&emsp;再次通过`list_sessions`方法列出所有会话，会发现`test_session_001`会话已经被删除，如下所示：

In [38]:
# 测试列出会话
sessions_list = await db_session_service.list_sessions(
    app_name="test_app",
    user_id="test_user"
)

print(f"✅ 会话列表获取成功，共 {len(sessions_list.sessions)} 个会话")
print(f"✅ 会话列表: {[session.id for session in sessions_list.sessions]}")

✅ 会话列表获取成功，共 1 个会话
✅ 会话列表: ['test_session_002']


&emsp;&emsp;通过上述的测试，我们可以发现`DatabaseSessionService`的创建和使用非常简单，只需要通过`create_session`方法创建会话，然后通过`get_session`、`list_sessions`、`delete_session`方法进行会话的获取、列出和删除。同时我们也可以快速利用这些方法来实现会话的持久化存储。这里我们实现了一个`database_session_chat.py`文件，并基于`auto_create_session.py`文件进行改造，主要变化如下：

- **替换存储方式**: InMemorySessionService → DatabaseSessionService；
- **持久化存储**: 所有对话记录保存到PostgreSQL；
- **会话管理**: 支持列出/选择/删除已存在的会话；
- **会话恢复**: 重启程序后可以继续之前的对话；

&emsp;&emsp;运行示例如下：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506041631955.png" width=60%></div>

&emsp;&emsp;同时在`PGAdmin4`中也可以实时的查看到`Agent`运行过程中产生的数据，如下所示：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506041632678.png" width=60%></div>

&emsp;&emsp;通过`DatabaseSessionService`实现的持久化存储往往是实际开发中最为常用的存储方式。但从`InMemorySessionService`到`DatabaseSessionService`的切换，其实并不复杂，只需要在初始化时，将`SessionService`的实现替换为`DatabaseSessionService`即可。总的来说，`SessionService` 通过这些实现，使开发者可以根据需求选择不同的存储策略：不需要持久化时用内存`InMemorySessionService`，要求持久化则使用`DatabaseSessionService`。这种模块化设计提高了扩展性，也使测试和部署更加灵活。无论具体实现如何，`SessionService` 都保证对上层提供统一接口，代理逻辑在使用时不必关心底层如何存储数据。

&emsp;&emsp;了解了以上两种不同的存储方式后，接下来需要进一步探讨一下在`sessions`、`app_states`、`user_states`三个表中均出现的`state`字段，它到底是什么？

# 四、State 临时状态

&emsp;&emsp;每个会话 (`Session`) 都维护一个 `state` 对象，用于存储对当前对话有用的动态信息。可以把它理解为智能体运行过程中的“草稿板”或短期记忆，用于记录用户提供的偏好、任务进度、临时结果等数据。`State` 是一个键值对集合，它的特点是只在当前会话范围内有效，随着对话进展不断更新。其源码如下：https://github.com/google/adk-python/blob/main/src/google/adk/sessions/state.py

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202505261514002.png" width=60%></div>

&emsp;&emsp;`State`虽然是作为临时状态，但也是作用于`Session`对象中，因此`State`的更新和存储也是依托于`Session`对象，同时采用前缀来区分不同的作用域，即：

- **无前缀**：特定于会话，仅在当前会话中持续存在；
- **app:** 应用级状态，应用程序范围，在所有用户和会话之间共享；
- **user:** ，用户级状态，在特定用户的所有会话中持续存；
- **temp:** ：处理临时状态，仅在当前执行周期内存在；

&emsp;&emsp;这里能够理解的是，`State`其实是`Session`的一部分，可以在运行时灵活的指定一些数据临时存储在`State`中。而在存储时，则遵循`SessionService`的实现。不同的`SessionService`实现，则`state`的存储形式也会不同。如果是`InMemorySessionService`，则`State`只存储在内存中，当程序重启时会丢失；如果是`DatabaseSessionService`，则`State`会存储到数据库中，当程序重启时会继续存在。存储的基本原则仍然是按照`app`、`user`、`session`三个层级来存储，而`temp`中存储的数据，不论是`InMemorySessionService`还是`DatabaseSessionService`，都会在运行结束后被清空。

&emsp;&emsp;这里我们可以逐步看一下`Google ADK`在运行时`state`的变化情况。为了方便进行测试，我们这里使用预构建的`app`、`user`、`session`，并使用`DatabaseSessionService`进行测试。

In [2]:
import os
import asyncio
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.genai import types 
from dotenv import load_dotenv
load_dotenv(override=True)

# 配置 模型信息 与 Session 信息
DS_API_KEY = os.getenv("DS_API_KEY")
DS_BASE_URL = os.getenv("DS_BASE_URL")

# 预构建 应用、用户、会话
APP_NAME = "test_app"
USER_ID = "user_101"
SESSION_ID = "session_101"  

# 创建模型和 Agent
model = LiteLlm(
    model="deepseek/deepseek-chat",  
    api_base=DS_BASE_URL,
    api_key=DS_API_KEY
)

&emsp;&emsp;接下来我们配置数据库连接信息，并创建`SessionService`对象：

In [3]:
from google.adk.sessions import DatabaseSessionService

# 数据库配置
DB_CONFIG = {
    'host': 'localhost',
    'port': 5432,
    'database': 'adk',
    'user': 'postgres',
    'password': 'snowball2019'
}

# 生成数据库连接字符串
DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"

# 创建 SessionService 对象
session_service = DatabaseSessionService(DATABASE_URL)

&emsp;&emsp;构造 `Agent`对象实例：

In [4]:
agent = Agent(
    name="persistent_chatbot",
    model=model,
    instruction="你是一个乐于助人的中文助手。请根据用户的问题给出回答。",
)

&emsp;&emsp;创建 `Runner`对象实例：

In [5]:
# 创建 Runner
runner = Runner(
    agent=agent,
    app_name=APP_NAME,
    session_service=session_service,
)

&emsp;&emsp;使用`create_session`方法在`PostgreSQL`中生成`Session`数据。

In [6]:
# 测试创建会话
test_session = await session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
    )
print(f"✅ 测试会话创建成功: {test_session.id}")

✅ 测试会话创建成功: session_101


&emsp;&emsp;获取刚刚创建的`Session`对象，查看在运行前`Session`对象的`state`存储的数据情况：

In [7]:
print("=== ADK 默认状态结构演示 ===\n")

# 2. 获取 刚刚创建的 Session 对象
session = await session_service.get_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
    )

print("1. 初始会话的默认状态:")
print(f"   session.state 类型: {type(session.state)}")
print(f"   session.state 内容: {session.state}")
print(f"   session.events 长度: {len(session.events)}")


=== ADK 默认状态结构演示 ===

1. 初始会话的默认状态:
   session.state 类型: <class 'dict'>
   session.state 内容: {}
   session.events 长度: 0


&emsp;&emsp;通过输出结果可以看到，当执行`create_session`方法进行初始化操作时，`Session`对象的`state`是一个空字典，同时也不触发任何`events`事件。接下来我们实际运行`Runner`, 再次打印`Session`对象的`state`和`events`信息以观察`state`临时状态中的数据变化情况。

In [8]:
# 3. 运行一个简单对话并查看Events中的状态
print("2. 运行对话前的状态:")

query = "你好，请你介绍一下你自己。"

# 将用户的问题转换为 ADK 格式
content = types.Content(role='user', parts=[types.Part(text=query)])

# 异步运行
async for event in runner.run_async(
    user_id=USER_ID,
    session_id=SESSION_ID,
    new_message=content,
):
    # 打印事件信息
    print(f"   - Author: {event.author}")
    print(f"   - Content type: {type(event.content)}")
    print(f"   - Actions: {event.actions}")


    # 检查事件中的状态相关信息
    if hasattr(event.actions, 'state_delta') and event.actions.state_delta:
        print(f"   - State delta: {event.actions.state_delta}")
    else:
        print(f"   - State delta: None (默认为空)")
        
    # 如果是最终响应，显示完整内容
    if event.content and event.content.parts:
        text_part = event.content.parts[0]
        if hasattr(text_part, 'text') and text_part.text:
            if not event.partial:  # 只显示完整响应
                print(f"   - Response: {text_part.text[:100]}...")

2. 运行对话前的状态:
   - Author: persistent_chatbot
   - Content type: <class 'google.genai.types.Content'>
   - Actions: skip_summarization=None state_delta={} artifact_delta={} transfer_to_agent=None escalate=None requested_auth_configs={}
   - State delta: None (默认为空)
   - Response: 你好！我是一个智能助手，专注于提供信息查询、问题解答、日常陪伴和实用工具支持。我的核心特点包括：

1. **持续记忆**：在单次对话中能记住上下文，提供连贯的交流体验（但新对话会重置记忆）
2. *...


&emsp;&emsp;当运行结束后，再次获取`Session`对象的`state`和`events`信息，可以看到`state`中已经存储了`last_greeting`字段，而`events`中则存储了完整的对话历史记录。

In [9]:
print("\n3. 对话后的会话状态:")
updated_session = await session_service.get_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
    )
print(f"   updated_session.state 内容: {updated_session.state}")
print(f"   updated_session.events 长度: {len(updated_session.events)}")

for index,event in enumerate(updated_session.events):
    print(f"   events: {index} - {event}")



3. 对话后的会话状态:
   updated_session.state 内容: {}
   updated_session.events 长度: 2
   events: 0 - content=Content(parts=[Part(video_metadata=None, thought=None, inline_data=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, text='你好，请你介绍一下你自己。')], role='user') grounding_metadata=None partial=None turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=None invocation_id='e-6578abdc-20d1-498a-8a86-dd537443c9bc' author='user' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=set() branch=None id='PSWAs2Gz' timestamp=1749104698.704265
   events: 1 - content=Content(parts=[Part(video_metadata=None, thought=None, inline_data=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, text='你好！我是一个智能助手，专注于提供信息查

&emsp;&emsp;通过这个手动构建的流程，在`ADK`中的`SessionService`、`Runner`及`Agent`中，其实都不依赖自定义`State`，默认空字典{}照样正常运行，只有事件是随着对话的进行而不断追加的。`State`真正的作用是供开发者提供自定义流程的工具，其核心是用来扩展`ADK`的灵活性和复杂性的。对于一些基础的流程，如简单问答："今天天气怎么样？"、代码生成："写个排序函数"、一次性翻译、摘要等，其实都不需要自定义`State`。但是多一些复杂流程，如多步骤任务：订票流程、上下文对话："继续刚才的话题"、Agent协作：数据传递等，则还需要自定义`State`来存储一些临时数据从而支撑数据的传递，大家可以理解为：

- 📊 State (状态) - 结构化的键值对数据
   - 用途: 存储配置、偏好、设置、当前状态
   - 特点: 可读写、可更新、结构化
   - 示例: 用户语言偏好、任务进度、多代理协作间的信息传递

- 📝 Event (事件) - 时序化的历史记录
   - 用途: 记录对话历史、操作记录、状态变更
   - 特点: 只写入、不可修改、时间序列
   - 示例: 用户消息、AI回复、工具调用、错误记录


&emsp;&emsp;`state` 的触发时机是：每当有新的交互会通过 `SessionService` 将对应的 `Event` 追加到会话历史 (`session.events`) 中，并据此更新会话的 `State`。更深入细节，则是`SessionService` 会读取 `Event` 中的状态变更指令（`state_delta`），合并到会话的 `state` 中，并处理持久化（例如`InMemorySessionService`或`DatabaseSessionService`）。一句话总结：`Event` 是`state`的载体，`state`是`Event`的存储结果。所以才有我们看到的默认情况下`Event`中的`state`是：

- 新`Session`的默认状态：`session.state` 返回空字典 `{}`
- `Event`的默认`state_delta`：`event.actions.state_delta` 默认为 `None`

- `State`对象内部结构：
  - _value: {} (持久化状态，初始为空)
  - _delta: {} (待提交变更，初始为空)

&emsp;&emsp;举一个实际的示例：当用户登录时，我们希望更新多个状态值（如登录次数、最后登录时间等），这可以通过构造一个带有 `state_delta` 的 `Event` 来实现：

In [10]:
APP_NAME = "test_app"
USER_ID = "user_101"
SESSION_ID = "session_102" # 这里新建一个 102 的会话

# 创建 Runner
runner = Runner(
    agent=agent,
    app_name=APP_NAME,
    session_service=session_service,
)

# 测试创建会话
login_session = await session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID,
    state={"user:login_count": 0, "task_status": "idle"}  # 在创建会话时，指定初始状态,idle表示空闲状态
    )


In [11]:
print("1. 初始会话的默认状态:")
print(f"   login_session.state 类型: {type(login_session.state)}")
print(f"   login_session.state 内容: {login_session.state}")
print(f"   login_session.events 长度: {len(login_session.events)}")

1. 初始会话的默认状态:
   login_session.state 类型: <class 'dict'>
   login_session.state 内容: {'task_status': 'idle', 'user:login_count': 0}
   login_session.events 长度: 0


&emsp;&emsp;可以看到，在创建会话时，因为我们指定了初始状态`state={"user:login_count": 0, "task_status": "idle"}`，所以`Session`对象的`state`中会包含这两个字段。接下来，其实我们就可以围绕这两个字段来构造`Event`，并更新`Session`的`state`。


&emsp;&emsp;首先构造一个系统登录成功事件，并写入`state`中进行事件状态传递：

In [12]:
import time
from google.adk.events import Event, EventActions

def login_success_delta(sess):
    ts = time.time()
    return {
        "task_status": "active",   # 无前缀，则作用于会话范围
        "user:login_count": sess.state.get("user:login_count", 0) + 1, # user 前缀，则作用于用户范围
        "user:last_login_ts": ts, #user 前缀，则作用于用户范围
        "temp:validation_needed": True        # temp 前缀，临时状态，仅本轮有效
    }

current_time = time.time()

# 生成状态变更
state_changes = login_success_delta(sess=login_session)  # 这里要替换为login_session

# 生成事件
actions = EventActions(state_delta=state_changes)
system_event = Event(
    invocation_id="login_update",
    author="system",
    actions=actions,
    timestamp=current_time
)

await session_service.append_event(session=login_session, event=system_event)  # 这里要替换为login_session
print("\n✅ 系统已写入登录增量（含 temp:validation_needed）")


✅ 系统已写入登录增量（含 temp:validation_needed）


&emsp;&emsp;然后再次获取`Session`对象的`state`和`events`信息：

In [13]:
# 2. 获取 刚刚创建的 Session 对象
update_session = await session_service.get_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
    )

print("2. 系统事件写入后的状态:")
print(f"   update_session.state 类型: {type(update_session.state)}")
print(f"   update_session.state 内容: {update_session.state}")
print(f"   update_session.events 长度: {len(update_session.events)}")
print(f"   update_session.events 内容: {update_session.events[0].invocation_id}")

2. 系统事件写入后的状态:
   update_session.state 类型: <class 'dict'>
   update_session.state 内容: {'task_status': 'active', 'user:login_count': 1, 'user:last_login_ts': 1749104797.6528993}
   update_session.events 长度: 1
   update_session.events 内容: login_update


&emsp;&emsp;可以看到，`Session`对象的`state`中已经包含`task_status`和`user:login_count`字段，并且`user:last_login_ts`字段也被更新了。注意：这里为什么读不到`temp:validation_needed`字段？ 是因为带`temp: xxx`前缀的键会在一次提交后立即丢弃，所以读不到它。如何验证临时键只存在一次？ 最简单的方法就是完成事件写入后，通过`state_delta`查看，如下代码所示：

In [14]:
# 测试创建会话
test_temp_session = await session_service.create_session(
    app_name="test_app",
    user_id="user_101",
    session_id="session_103",
    state={"user:login_count": 0, "task_status": "idle"}  # 在创建会话时，指定初始状态,idle表示空闲状态
    )

In [15]:
current_time = time.time()
state_changes = login_success_delta(sess=test_temp_session)  # 这里要替换为test_temp_session

actions = EventActions(state_delta=state_changes)

system_event = Event(
    invocation_id="login_update",
    author="system",
    actions=actions,
    timestamp=current_time
)

await session_service.append_event(session=test_temp_session, event=system_event)  # 这里要替换为test_temp_session
print("\n✅ 系统已写入登录增量（含 temp:validation_needed）")


✅ 系统已写入登录增量（含 temp:validation_needed）


&emsp;&emsp;`temp`前缀的设计目标就是 “只在本轮调用链里用一下，然后马上销毁”，在 `DatabaseSessionService.append_event()` 里，合并逻辑会先把 `temp:` 前缀过滤掉，再更新 `session.state`，所以打印是看不到它的，哪怕没有刷新会话对象，比如如下所示：

In [16]:
print("测试 temp: 前缀状态是否存在")
print(f"   test_temp_session.state 类型: {type(test_temp_session.state)}")
print(f"   test_temp_session.state 内容: {test_temp_session.state}")
print(f"   test_temp_session.events 长度: {len(test_temp_session.events)}")

测试 temp: 前缀状态是否存在
   test_temp_session.state 类型: <class 'dict'>
   test_temp_session.state 内容: {'task_status': 'active', 'user:login_count': 1, 'user:last_login_ts': 1749104828.1962988}
   test_temp_session.events 长度: 1


&emsp;&emsp;能够看到哪怕不刷新`Session`对象，`temp:validation_needed`字段也已经不存在了。因为其内部调用链顺序其实是这样的：

```json
    append_event(...)
    └─ ① 事件写入 events 表
    └─ ② 取 actions.state_delta
        ├─ 删除所有以 "temp:" 开头的键
        └─ 把剩余键合并进 session.state   ← 打印时看到的内容
```

&emsp;&emsp;因此，如果真正想看临时键，则需要直接通过事件里的 `state_delta` 来获取：

In [17]:
print(system_event.actions.state_delta)

{'task_status': 'active', 'user:login_count': 1, 'user:last_login_ts': 1749104828.1962988, 'temp:validation_needed': True}


&emsp;&emsp;`temp`前缀的键主要用来在同一轮内部让多个工具/子Agent传递短期标志数据（比如“当前回合已做过身份验证”），有效防止把纯瞬时数据写进持久存储，避免污染会话状态或占用数据库容量。这一点，我们会在接下来的课程中讲解使用工具/MCP构建复杂`Multi-Agent`时，再详细展开讲解，这里大家先做了解。

&emsp;&emsp;现在，我们再回到原始流程中。现在我们已经通过`Event`的`state_delta`实现了对`Session`的`state`的更新，那么`state`中的数据就可以用来做逻辑的处理从而影响`Agent`的运行状态，比如下面这个场景示例： 我们通过`Session`的`state`中的字段来模拟用 `ADK` 在后台做“登录 + 首次验证”流程，并用同一个持久会话继续对话。

&emsp;&emsp;首先新建一个`Session`对象，并写入初始状态：

In [18]:
import os
import asyncio
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.genai import types 
from google.adk.sessions import DatabaseSessionService
from dotenv import load_dotenv
load_dotenv(override=True)

# 配置 模型信息 与 Session 信息
DS_API_KEY = os.getenv("DS_API_KEY")
DS_BASE_URL = os.getenv("DS_BASE_URL")

# 配置 应用信息 与 用户信息
APP_NAME = "test_app"
USER_ID = "user_101"
SESSION_ID = "session_109" # 这里新建一个 1001 的会话

# 创建模型和 Agent
model = LiteLlm(
    model="deepseek/deepseek-chat",  
    api_base=DS_BASE_URL,
    api_key=DS_API_KEY
)

# 数据库配置
DB_CONFIG = {
    'host': 'localhost',
    'port': 5432,
    'database': 'adk',
    'user': 'postgres',
    'password': 'snowball2019'
}

# 生成数据库连接字符串
DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"

# 创建 SessionService 对象
session_service = DatabaseSessionService(DATABASE_URL)


# 创建 Agent对象
agent = Agent(
    name="chatbot",
    model=model,
    instruction="你是一个乐于助人的中文助手。请用中文回答用户的问题",
)

# 创建 Runner
runner = Runner(
    agent=agent,
    app_name=APP_NAME,
    session_service=session_service,
)

# 测试创建会话
login_session = await session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID,
    state={"user:login_count": 0, "task_status": "idle"}  # 在创建会话时，指定初始状态,idle表示空闲状态
    )


&emsp;&emsp;然后，我们再写入一个系统初始化事件，并更新`Session`的`state`：

In [21]:
import time
from google.adk.events import Event, EventActions

# ── 系统后台：写入一次登录事件 --------------------------
async def mark_login_success():
    # 先获取 Session 对象
    sess = await session_service.get_session(
            app_name=APP_NAME,
            user_id=USER_ID,
            session_id=SESSION_ID,
            )

    # 构造状态变更
    delta = {
        "task_status": "active",
        "user:login_count": sess.state.get("user:login_count", 0) + 1,
        "validation_needed": True  # 这里表示需要验证
    }

    # 写入事件
    await session_service.append_event(
        session=sess,
        event=Event(author="chatbot",
                    invocation_id="login_update",
                    actions=EventActions(state_delta=delta),
                    timestamp=time.time())
    )
    print("✅ 已写登录增量（validation_needed=True）")

&emsp;&emsp;接下来，我们再写一个网关函数，检查是否需要验证用户身份信息：

In [22]:
from google.adk.events import Event, EventActions
from google.genai import types 

# ── 网关函数：检查是否需要验证 ------------------------------------------
async def maybe_run_agent(user_input: str, code: str | None = None):
    # 先获取 Session 对象
    sess = await  session_service.get_session(
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=SESSION_ID
    )

    # 获取需要验证的标志
    need_check = sess.state.get("validation_needed", False)

    # 需要验证但尚未通过
    if need_check:
        if code == "123456":  # 这里假设是验证码场景
            # 写事件把 validation_needed 置 False
            await session_service.append_event(
                session=sess,
                event=Event(
                    author="chatbot",
                    invocation_id="validate_ok",
                    actions=EventActions(state_delta={"validation_needed": False}),
                    timestamp=time.time()
                )
            )
            # 重新拉一次最新会话，防止读到旧缓存
            sess = await session_service.get_session(
                app_name=APP_NAME,
                user_id=USER_ID, 
                session_id=SESSION_ID
            )
            print("🔓 验证成功，放行对话")
        else:
            print("⚠️  需要先完成账号验证。请提交验证码。")
            return

    content = types.Content(role='user', parts=[types.Part(text=user_input)])
    # 如果已登录，正常放行，调用Agent生成回复
    async for ev in runner.run_async(
        user_id=USER_ID,
        session_id=SESSION_ID,
        new_message=content,
    ):
        # 打印回复
        if ev.content and ev.content.parts:
            print("🤖", ev.content.parts[0].text)


&emsp;&emsp;下面进行业务流程的测试。首先，我们进行初始化（对应的业务场景其实就是用户打开某一个页面）：

In [23]:
await mark_login_success()

✅ 已写登录增量（validation_needed=True）


&emsp;&emsp;接下来，尝试未登录状态下发起会话（对应的业务场景就是用户打开页面后，直接在对话框进行提问，而事先没有进行登录）：

In [24]:
await maybe_run_agent("你好，很高兴认识你~")   # 不带验证码 → 会被拦截

⚠️  需要先完成账号验证。请提交验证码。


&emsp;&emsp;然后，模拟登录成功的情况，即携带验证码发起会话（对应的业务场景是系统检测到用户没有登录，弹出一个登录框要求用户进行登录）：

In [25]:
await maybe_run_agent(
    user_input="你好，请你介绍一下你自己", # 用户输入
    code="123456"  # 提交验证码
    ) 

🔓 验证成功，放行对话
🤖 你好！我是一个人工智能助手，中文名字可以叫我“小助”。我的主要功能是帮助你解答问题、提供信息、协助完成任务或陪你聊天。无论是学习、工作、生活中的疑问，还是需要创意灵感、技术支持，我都会尽力提供有用的建议。

我的特点：
1. **多领域知识**：覆盖科学、技术、文化、生活等常见话题
2. **实时帮助**：24小时在线响应
3. **多语言支持**：中英文切换自如
4. **无判断性**：提供中立客观的信息

需要注意的是：
- 我的知识截止于训练数据（2023年），部分实时信息可能需要你进一步核实
- 我不会替代专业领域的医生、律师等建议

现在，有什么我可以帮你的吗？ 😊


&emsp;&emsp;最后，登录成功后，即可不再重复携带验证码，直接进行对话：

In [26]:
await maybe_run_agent(user_input="介绍一下什么是大模型？")  

🤖 **大模型（Large Language Model, LLM）** 是一种基于人工智能（AI）的深度学习模型，通过海量数据和庞大参数规模，学习语言规律和世界知识，从而具备理解、生成和推理文本的能力。以下是关键点解析：

---

### 1. **核心特点**
- **参数规模大**：参数量可达数十亿至万亿级别（例如GPT-3有1750亿参数），能捕捉更复杂的语言模式。
- **预训练+微调**：先在广泛的无标注数据（如书籍、网页）上预训练，再针对特定任务微调。
- **通用性强**：可处理翻译、问答、写作、编程等多种任务，无需为每类任务单独设计模型。

---

### 2. **技术基础**
- **Transformer架构**：依赖自注意力机制（Self-Attention）处理长距离文本依赖。
- **生成能力**：通过概率预测逐词生成连贯文本（如ChatGPT的对话）。
- **多模态扩展**：部分大模型已整合图像、音频等非文本数据（如GPT-4V）。

---

### 3. **典型应用**
- **智能助手**：如ChatGPT、Claude、文心一言等。
- **代码生成**：GitHub Copilot（基于Codex模型）。
- **内容创作**：自动撰写文章、广告文案等。
- **知识问答**：提供结构化信息摘要（需注意时效性）。

---

### 4. **局限性**
- **幻觉（Hallucination）**：可能生成看似合理但事实错误的内容。
- **数据偏见**：反映训练数据中的社会偏见或错误观点。
- **算力需求**：训练需高性能GPU集群，成本高昂。

---

### 5. **代表模型**
- **GPT系列**（OpenAI）：生成式预训练模型。
- **BERT**（Google）：擅长理解类任务（如搜索）。
- **PaLM**（Google）、**LLaMA**（Meta）：开源或轻量化探索。

---

如果需要更具体的细节（如技术原理、行业影响等），可以进一步探讨！ 😊


&emsp;&emsp;这个流程其实就是一套比较标准的用户登录校验的实现机制，同时我们也可以在数据库中看到实时的数据持久化存储信息，如下图所示：

<div align=center><img src="https://muyu20241105.oss-cn-beijing.aliyuncs.com/images/202506051432018.png" width=80%></div>

&emsp;&emsp;由此大家可以理解到的是：追加 `Event` 是更新 `State` 的机制。`SessionService` 是通过读取 `Event` 中的状态变更指令（`state_delta`），合并到会话的 `state` 中并进行处理的。所以`State`是短期记忆，用于存储当前对话的上下文和临时信息，在上下文中是可变的，随着对话进展不断更新。
而`Event` 是长期记忆，用于存储执行过程某个具体步骤的结果，如用户输入、工具调用、错误信息等， 是不可变的，一旦生成后不会改变。

&emsp;&emsp;而`State`除了能适配灵活的业务逻辑开发外，对于工具的权限控制、`Multi-Agent`之间跨会话、跨`Agent`共享等应用场景，我们将在接下来的课程中结合实际的案例再展开详细的讲解。