In [None]:
import sqlite3

# 产品数据（与之前生成的数据一致）
products = [
    {
        "产品名称": "无线蓝牙耳机",
        "简介": "高端降噪技术，适合运动使用，长续航",
        "单价": 299,
        "价格波动区间": "±5%"
    },
    {
        "产品名称": "智能手环",
        "简介": "健康监测（心率/血氧/睡眠），50米防水",
        "单价": 199,
        "价格波动区间": "±10元"
    },
    # 此处省略中间数据（保持与示例一致）
    {
        "产品名称": "蓝牙音箱",
        "简介": "IPX7防水，TWS互联，12小时续航",
        "单价": 159,
        "价格波动区间": "±10元"
    }
]

# 连接SQLite数据库（如果不存在会自动创建）
conn = sqlite3.connect('product_db.sqlite')
cursor = conn.cursor()

# 创建数据表
cursor.execute('''CREATE TABLE IF NOT EXISTS products
               (id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_name TEXT NOT NULL,
                description TEXT,
                unit_price REAL,
                price_range TEXT)''')

# 插入数据的SQL语句（使用参数化查询防止SQL注入）
insert_sql = '''INSERT INTO products 
              (product_name, description, unit_price, price_range)
              VALUES (?, ?, ?, ?)'''

# 批量插入数据
for product in products:
    cursor.execute(insert_sql, (
        product["产品名称"],
        product["简介"],
        product["单价"],
        product["价格波动区间"]
    ))

# 提交事务并关闭连接
conn.commit()
conn.close()

print(f"成功插入 {len(products)} 条产品数据到 product_db.sqlite")

In [None]:
# demo.py
from google.adk.sessions.database_session_service import DatabaseSessionService
session_service = DatabaseSessionService(db_url=f"mysql+pymysql://chaoke:Lvdeng666@localhost:3306/ai_agent")

In [None]:
from google.adk.sessions import InMemorySessionService, Session

# 创建一个简单的会话来检查其属性
# temp_service = InMemorySessionService()
example_session: Session = session_service.create_session(
    app_name="my_app",
    user_id="example_user",
    state={"initial_key": "initial_value"} # 可以初始化状态
)

print(f"--- 检查 Session 属性 ---")
print(f"ID (`id`):                {example_session.id}")
print(f"应用程序名称 (`app_name`): {example_session.app_name}")
print(f"用户 ID (`user_id`):         {example_session.user_id}")
print(f"状态 (`state`):           {example_session.state}") # 注意：这里只显示初始状态
print(f"事件 (`events`):         {example_session.events}") # 初始为空
print(f"最后更新 (`last_update_time`): {example_session.last_update_time:.2f}")
print(f"---------------------------------")


In [None]:

# 清理（此示例可选）
session_service.delete_session(app_name=example_session.app_name,
                            user_id=example_session.user_id, session_id=example_session.id)

In [None]:
from google.adk.agents import LlmAgent
from google.adk.sessions import InMemorySessionService, Session
from google.adk.runners import Runner
from google.genai.types import Content, Part
from google.adk.models.lite_llm import LiteLlm # 用于多模型支持

deepseek_model = LiteLlm(
    model="openai/qwen-plus",  
    api_key="sk-ad30851f6a47453a8b2c56ec4da32957",
    api_base="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

# 定义带有 output_key 的智能体
greeting_agent = LlmAgent(
    name="Greeter",
    model=deepseek_model, # 使用有效模型
    instruction="Generate a short, friendly greeting.",
    output_key="last_greeting" # 将响应保存到 state['last_greeting']
)

# --- 设置 Runner 和 Session ---
app_name, user_id, session_id = "state_app", "user1", "session1"
session_service = InMemorySessionService()
runner = Runner(
    agent=greeting_agent,
    app_name=app_name,
    session_service=session_service
)
session = session_service.create_session(app_name=app_name, 
                                        user_id=user_id, 
                                        session_id=session_id)
print(f"初始状态：{session.state}")

# --- 运行智能体 ---
# Runner 处理调用 append_event，使用 output_key
# 自动创建 state_delta。
user_message = Content(parts=[Part(text="Hello")])
for event in runner.run(user_id=user_id, 
                        session_id=session_id, 
                        new_message=user_message):
    if event.is_final_response():
    #   print(f"响应：{event.content.text}")
      print(f"智能体已响应。") # 响应文本也在 event.content 中


In [None]:

# --- 检查更新后的状态 ---
updated_session = session_service.get_session(app_name=app_name, user_id=user_id, session_id=session_id)
print(f"智能体运行后的状态：{updated_session.state}")
# 预期输出可能包括：{'last_greeting': '你好！今天我能帮你什么忙？'}

In [None]:
from google.adk.sessions import InMemorySessionService, Session
from google.adk.events import Event, EventActions
from google.genai.types import Part, Content
import time

# --- 设置 ---
session_service = InMemorySessionService()
app_name, user_id, session_id = "state_app_manual", "user2", "session2"
session = session_service.create_session(
    app_name=app_name,
    user_id=user_id,
    session_id=session_id,
    state={"user:login_count": 0, "task_status": "idle"}
)
print(f"初始状态：{session.state}")

# --- 定义状态更改 ---
current_time = time.time()
state_changes = {
    "task_status": "active",              # 更新会话状态
    "user:login_count": session.state.get("user:login_count", 0) + 1, # 更新用户状态
    "user:last_login_ts": current_time,   # 添加用户状态
    "temp:validation_needed": True        # 添加临时状态（将被丢弃）
}

# --- 创建带有 Actions 的事件 ---
actions_with_update = EventActions(state_delta=state_changes)
# 此事件可能代表内部系统操作，而不仅仅是智能体响应
system_event = Event(
    invocation_id="inv_login_update",
    author="system", # 或 'agent', 'tool' 等
    actions=actions_with_update,
    timestamp=current_time
    # content 可能为 None 或表示所采取的操作
)

# --- 追加事件（这会更新状态） ---
session_service.append_event(session, system_event)
print("`append_event` 使用显式状态增量调用。")

# --- 检查更新后的状态 ---
updated_session = session_service.get_session(app_name=app_name,
                                            user_id=user_id, 
                                            session_id=session_id)
print(f"事件后的状态：{updated_session.state}")
# 预期：{'user:login_count': 1, 'task_status': 'active', 'user:last_login_ts': <timestamp>}
# 注意：'temp:validation_needed' 不存在。

In [None]:
from google.adk.memory import InMemoryMemoryService
from google.adk.tools import load_memory # 查询内存的工具

In [None]:
import asyncio
from google.adk.agents import LlmAgent
from google.adk.sessions import InMemorySessionService, Session
from google.adk.memory import InMemoryMemoryService # 导入 MemoryService
from google.adk.runners import Runner
from google.adk.tools import load_memory # 查询内存的工具
from google.genai.types import Content, Part

# --- 常量 ---
APP_NAME = "memory_example_app"
USER_ID = "mem_user"
MODEL = "gemini-2.0-flash" # 使用有效模型

# --- 智能体定义 ---
# 智能体 1：简单的信息捕获智能体
info_capture_agent = LlmAgent(
    model=MODEL,
    name="InfoCaptureAgent",
    instruction="确认用户的陈述。",
    # output_key="captured_info" # 也可以选择保存到状态
)

# 智能体 2：可以使用内存的智能体
memory_recall_agent = LlmAgent(
    model=MODEL,
    name="MemoryRecallAgent",
    instruction="回答用户的问题。如果答案可能在过去的对话中，请使用 'load_memory' 工具",
    tools=[load_memory] # 给智能体提供工具
)

# --- 服务和 Runner ---
session_service = InMemorySessionService()
memory_service = InMemoryMemoryService() # 用于演示的内存服务

runner = Runner(
    # 从信息捕获智能体开始
    agent=info_capture_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service # 向 Runner 提供内存服务
)

# --- 场景 ---

# 回合 1：在会话中捕获一些信息
print("--- 回合 1：捕获信息 ---")
session1_id = "session_info"
session1 = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=session1_id)
user_input1 = Content(parts=[Part(text="我最喜欢的项目是Project Alpha。")], role="user")
''
# 运行代理
final_response_text = "(No final response)"
for event in runner.run(user_id=USER_ID, session_id=session1_id, new_message=user_input1):
    if event.is_final_response() and event.content and event.content.parts:
        final_response_text = event.content.parts[0].text
print(f"智能体 1 响应: {final_response_text}")

# Get the completed session
completed_session1 = session_service.get_session(app_name=APP_NAME, user_id=USER_ID, session_id=session1_id)

# 将此会话的内容添加到 Memory Service
print("\n--- 将会话 1 添加到内存 ---")
memory_service.add_session_to_memory(completed_session1)
print("会话已添加到内存。")

# 回合 2：在*新*（或相同）会话中，提出需要内存的问题
print("\n--- 回合 2：回忆信息 ---")
session2_id = "session_recall" # 可以是相同或不同的会话 ID
session2 = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=session2_id)

# 将 runner 切换到回忆智能体
runner.agent = memory_recall_agent
user_input2 = Content(parts=[Part(text="我最喜欢的项目是什么？")], role="user")

# 运行回忆智能体
print("运行 MemoryRecallAgent...")
final_response_text_2 = "(无最终响应)"
for event in runner.run(user_id=USER_ID, session_id=session2_id, new_message=user_input2):
    print(f"  事件: {event.author} - 类型: {'Text' if event.content and event.content.parts and event.content.parts[0].text else ''}"
        f"{'FuncCall' if event.get_function_calls() else ''}"
        f"{'FuncResp' if event.get_function_responses() else ''}")
    if event.is_final_response() and event.content and event.content.parts:
        final_response_text_2 = event.content.parts[0].text
        print(f"智能体 2 最终响应: {final_response_text_2}")
        break # 在最终响应后停止

# 回合 2 的预期事件序列：
# 1. 用户发送 "我最喜欢的项目是什么？"
# 2. 智能体（LLM）决定使用 `load_memory` 工具，查询如 "favorite project"。
# 3. Runner 执行 `load_memory` 工具，它调用 `memory_service.search_memory`。
# 4. `InMemoryMemoryService` 找到来自 session1 的相关文本（"我最喜欢的项目是 Project Alpha。"）。
# 5. 工具在 FunctionResponse 事件中返回此文本。
# 6. 智能体（LLM）接收函数响应，处理检索到的文本。
# 7. 智能体生成最终答案（例如，"你最喜欢的项目是 Project Alpha。"）。

In [None]:
# 最小化测试代码
import pymysql
try:
    conn = pymysql.connect(
        host='localhost',
        user='chaoke',
        password='Lvdeng666'
    )
    print("连接成功！")
except Exception as e:
    print(f"错误详情：{str(e)}")

In [None]:
import sqlite3
from textwrap import indent

def analyze_sqlite_db(db_path):
    """分析SQLite数据库结构并打印详细信息"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    try:
        # 获取所有用户表（排除系统表）
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';")
        tables = cursor.fetchall()

        if not tables:
            print("数据库中没有用户表")
            return

        print(f"发现 {len(tables)} 个用户表：")
        for table in tables:
            table_name = table[0]
            print(f"\n表名：{table_name}")
            
            # 获取表结构信息
            cursor.execute(f"PRAGMA table_info({table_name})")
            columns = cursor.fetchall()
            
            # 打印列信息
            print("\n列信息：")
            print(f"{'序号':<4}{'列名':<20}{'数据类型':<15}{'允许NULL':<10}{'默认值':<15}{'主键'}")
            print("-" * 70)
            for idx, col in enumerate(columns, 1):
                cid, name, data_type, notnull, default_val, pk = col
                print(f"{idx:<4}{name:<20}{data_type:<15}{notnull:<10}{str(default_val):<15}{pk}")

            # 获取索引信息（修复点1：安全引用表名）
            cursor.execute(f'PRAGMA index_list("{table_name}")')  # 使用双引号包裹表名
            indexes = cursor.fetchall()
            if indexes:
                print("\n索引信息：")
                for idx_info in indexes:
                    # 修复点2：增加异常处理
                    try:
                        idx_name, unique, origin = idx_info[:3]  # 安全解包前3列
                        print(f"索引名：{idx_name}，唯一性：{unique}，来源：{origin}")
                    except ValueError:
                        print(f"⚠️ 解析索引信息失败：{idx_info}")

            # 获取外键信息（SQLite需要特殊处理）
            cursor.execute(f'PRAGMA foreign_key_list("{table_name}")')  # 修复点3：安全引用表名
            foreign_keys = cursor.fetchall()
            if foreign_keys:
                print("\n外键信息：")
                print(f"{'ID':<4}{'列':<20}{'目标表':<15}{'目标列':<15}{'更新规则':<10}{'删除规则'}")
                print("-" * 70)
                for fk in foreign_keys:
                    try:
                        # 安全解包外键信息
                        fk_id, _, _, col, target_table, target_col, on_update, on_delete, _ = fk
                        print(f"{fk_id:<4}{col:<20}{target_table:<15}{target_col:<15}{on_update:<10}{on_delete}")
                    except ValueError:
                        print(f"⚠️ 解析外键信息失败：{fk}")

    finally:
        conn.close()

# 使用示例
if __name__ == "__main__":
    db_path = "./my_agent_data.db"  # 替换为你的数据库路径
    analyze_sqlite_db(db_path)

## 定义两个tools
    1. 语音转文本
    2. 图片描述

In [9]:
import os
import requests
from http import HTTPStatus
from openai import OpenAI
from dashscope.audio.asr import Transcription
import json
# 加载环境变量
from dotenv import load_dotenv
load_dotenv()

qwen_api_key = os.getenv("Qwen_API_KEY")
# 若没有将API Key配置到环境变量中，需将下面这行代码注释放开，并将apiKey替换为自己的API Key
import dashscope
dashscope.api_key = qwen_api_key

In [10]:
# 语音转文本工具

def speech_to_text(audio_url: str) -> dict:
    """
    将音频文件转换为文本

    Args:
        audio_url (str): 音频文件的URL

    Returns:
        dict: 一个包含识别结果的字典。
              包含一个'status'键，值为'success'或'error'。
              如果'success',包含一个'text'键，值为识别结果。
              如果'error',包含一个'error_message'键，值为错误信息。
    """
    try:
        transcribe_response = Transcription.async_call(
        model='paraformer-v2',
        file_urls=[audio_url])

        while True:
            if transcribe_response.output.task_status == 'SUCCEEDED' or transcribe_response.output.task_status == 'FAILED':
                break
            transcribe_response = Transcription.fetch(task=transcribe_response.output.task_id)

        if transcribe_response.status_code == HTTPStatus.OK:
            text_url = transcribe_response.output["results"][0]["transcription_url"]
            status = transcribe_response.output["results"][0]["subtask_status"]
            response = requests.get(text_url, timeout=10)

            # 检查请求是否成功
            response.raise_for_status()

            # 解析JSON内容
            data = response.json()
            text = data["transcripts"][0]["text"]
            # print(json.dumps(transcribe_response.output, indent=4, ensure_ascii=False))
            print('transcription done!')
        return {"status": "success", "text": text}
    except Exception as e:
        return {"status": "error", "error_message": str(e)}

In [11]:
# 图像描述工具
def image_comprehension(image_url: str) -> dict:
    """
    将图像文件转换为文本描述，描述图像内容。

    Args:
        image_url (str): 需要识别的图像文件的URL。

    Returns:
        dict: 一个包含识别结果的字典。
              包含一个'status'键，值为'success'或'error'。
              如果'success',包含一个'image_description'键，值为图片描述结果。
              如果'error',包含一个'error_message'键，值为错误信息。
    """
    try:

        client = OpenAI(
            api_key=os.getenv("Qwen_API_KEY"),
            base_url=os.getenv("Qwen_BASE_URL"),
        )

        completion = client.chat.completions.create(
            model="qwen-vl-plus-latest",
            messages=[
                {
                    "role": "system",
                    "content": [{"type": "text", "text": "You are a helpful assistant."}],
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"{image_url}"
                            },
                        },
                        {"type": "text", "text": "图中描绘的是什么景象?"},
                    ],
                },
            ],
        )

        # print(completion.choices[0].message.content)
        return {"status": "success", "image_description": completion.choices[0].message.content}
    except Exception as e:
        return {"status": "error", "error_message": str(e)}

In [12]:
# 视频理解工具
def video_comprehension(video_url: str) -> dict:
    """
    通过分析视频的视觉和音频组件来了解视频的内容。

    Args:
        video_url (str): 需要分析的视频的 URL。

    Returns:
        dict: 一个包含视频理解结果的字典。
             包含一个'status'键，值为'success'或'error'。
             如果'success'，包含一个'video_description'键，值为视频描述结果。
             如果'error'，包含一个'error_message'键，值为错误信息。
    """
    try:
        client = OpenAI(
        # 若没有配置环境变量，请用百炼API Key将下行替换为：api_key="sk-xxx",
        api_key=os.getenv("Qwen_API_KEY"),
        base_url=os.getenv("Qwen_BASE_URL"),
        )
        completion = client.chat.completions.create(
            model="qwen-vl-max-latest",
            messages=[
                {"role": "system",
                "content": [{"type": "text","text": "You are a helpful assistant."}]},
                {"role": "user","content": [{
                    # 直接传入视频文件时，请将type的值设置为video_url
                    # 使用OpenAI SDK时，视频文件默认每间隔0.5秒抽取一帧，且不支持修改，如需自定义抽帧频率，请使用DashScope SDK.
                    "type": "video_url",            
                    "video_url": {"url": video_url}},
                    {"type": "text","text": "这段视频的内容是什么?"}]
                }]
        )
        # print(completion.choices[0].message.content)
        return {"status": "success", "video_description": completion.choices[0].message.content}
    except Exception as e:
        return {"status": "error", "error_message": str(e)}


In [13]:
# 使用示例
video_url = 'https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241115/cqqkru/1.mp4'
video_comprehension(video_url)

{'status': 'success',
 'video_description': '这段视频展示了一位年轻女性的特写镜头。她有着短发，面带微笑，看起来非常友好和愉快。她的表情从轻微的笑容逐渐变为大笑，显得非常自然和开心。她穿着一件粉色的针织开衫，内搭白色上衣，整体装扮简洁而温馨。\n\n背景模糊，但可以看出是在户外，可能是一个校园或住宅区，环境明亮且温暖，给人一种轻松愉快的感觉。视频的整体风格清新自然，传递出一种积极向上的氛围。'}

In [14]:
image_url  = "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"
image_comprehension(image_url)

{'status': 'success',
 'image_description': '这张图片展示了一位女士和一只狗在海滩上互动的场景。背景是海浪拍打沙滩的画面，天空呈现出日落时分柔和的颜色渐变效果。这位女士穿着格子衬衫坐在沙地上与狗狗进行友好交流，并且似乎正在给它一个高五的动作或者是在玩耍。整体氛围显得非常温馨和谐、充满乐趣以及放松的感觉，在夕阳下更显宁静美好。'}