In [3]:
# 查询数据库模式
import os
from sqlalchemy import create_engine, inspect
from geoalchemy2 import Geometry

dbName = "parks"

dburl = os.getenv("DB_URL") + dbName + os.getenv("DB_PARMS")
engine = create_engine(dburl)
inspector = inspect(engine)

custom_schema = {}

for table_name in inspector.get_table_names():
    columns = inspector.get_columns(table_name)
    col_list = []
    for col in columns:
        # 类型转换为字符串，防止类型对象无法序列化
        col_type = str(col['type'])
        col_list.append({
            "name": col['name'],
            "type": col_type,
        })
    custom_schema[table_name] = {"columns": col_list}

# 保存到文件
import json
with open("data/schema.json", "w") as f:
    json.dump(custom_schema, f, indent=4)

# 打印结果
import pprint
pprint.pprint(custom_schema)

{'facilities': {'columns': [{'name': 'facility_id', 'type': 'VARCHAR'},
                            {'name': 'park_id', 'type': 'VARCHAR'},
                            {'name': 'facility_name', 'type': 'VARCHAR'},
                            {'name': 'facility_type', 'type': 'VARCHAR'},
                            {'name': 'location_wkt',
                             'type': 'geometry(GEOMETRY,-1)'}]},
 'maintenance_logs': {'columns': [{'name': 'log_id', 'type': 'VARCHAR'},
                                  {'name': 'facility_id', 'type': 'VARCHAR'},
                                  {'name': 'technician', 'type': 'VARCHAR'},
                                  {'name': 'status', 'type': 'VARCHAR'},
                                  {'name': 'date', 'type': 'DATE'},
                                  {'name': 'maintenance_notes',
                                   'type': 'TEXT'}]},
 'parks': {'columns': [{'name': 'park_id', 'type': 'VARCHAR'},
                       {'name': 'park_name',

In [2]:
# 创建向量数据库检索工具

from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings  # 或你自己的 embedding

# 1. 创建 embedding 实例
embeddings = OpenAIEmbeddings(model ='text-embedding-3-small')

# 2. 加载已保存的向量库
vectordb = Chroma(
    persist_directory="chroma_db",
    embedding_function=embeddings
)

from langchain_core.tools import create_retriever_tool

retriever = vectordb.as_retriever(search_kwargs={"k": int(os.environ.get("VECTOR_QURY_TOP_K"))})

retriever_tool = create_retriever_tool(
    retriever,
    name="pdf_knowledge_retriever",
    description="A tool for retrieving relevant information",
)

In [3]:
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit
from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model='gpt-4.1-mini', temperature=0)

toolkit = SQLDatabaseToolkit(db = SQLDatabase.from_uri(dburl), 
                             llm = model, 
                             custom_schema = custom_schema)

tools = toolkit.get_tools()
tools.append(retriever_tool)

In [4]:
# 提示词
system_prompt = """
    You are an agent designed to interact with a SQL database.
    Given an input question, create a syntactically correct {dialect} query to run,
    then look at the results of the query and return the answer. Unless the user
    specifies a specific number of examples they wish to obtain, always limit your
    query to at most {top_k} results.

    This is the schema of the database: {schema}

    You can order the results by a relevant column to return the most interesting
    examples in the database. Never query for all the columns from a specific table,
    only ask for the relevant columns given the question.

    You MUST double check your query before executing it. If you get an error while
    executing a query, rewrite the query and try again.

    DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the
    database.

    To start you should ALWAYS look at the tables in the database to see what you
    can query. Do NOT skip this step.

    Then you should query the schema of the most relevant tables.

""".format(
    dialect=os.environ.get("DIALECT"),
    top_k=os.environ.get("DB_QURY_TOP_K"),
    schema=custom_schema,
)

In [5]:
# 使用 LangGraph 构建 agent

from langchain.agents import create_agent
from langgraph.graph import StateGraph

agent = create_agent(
    model=model,
    tools=tools,
    system_prompt=system_prompt,
)

graph = StateGraph(state_schema=str)

graph.add_node("agent", agent)

graph.set_entry_point("agent")  # 入口节点
graph.set_finish_point("agent") # 结束节点

app = graph.compile()

In [9]:
# 提问回答

question = "生成数据库er图"
# qu = "植物生长需要什么条件？"


for step in app.stream(
    {"messages": [{"role": "user", "content": question}]},
    stream_mode="values",
):
    message = step['messages']
    if type(message[0]) == dict:
        pass
    else:
        for msg in message:
            msg.pretty_print()


生成数据库er图
Tool Calls:
  sql_db_list_tables (call_qvoMsapXYVJ5k3l9Z6BTDrNe)
 Call ID: call_qvoMsapXYVJ5k3l9Z6BTDrNe
  Args:
Name: sql_db_list_tables

facilities, maintenance_logs, parks, spatial_ref_sys
Tool Calls:
  sql_db_schema (call_HQXhUVw7DrUTTGDgyOWIHzGM)
 Call ID: call_HQXhUVw7DrUTTGDgyOWIHzGM
  Args:
    table_names: facilities
  sql_db_schema (call_auYrRXcGKYrzWcNw29MTtAqU)
 Call ID: call_auYrRXcGKYrzWcNw29MTtAqU
  Args:
    table_names: maintenance_logs
  sql_db_schema (call_szjCGQllErZYFfEMWssnTVeX)
 Call ID: call_szjCGQllErZYFfEMWssnTVeX
  Args:
    table_names: parks
  sql_db_schema (call_lo4JXFJOFZOhBjvbPAAvu2aN)
 Call ID: call_lo4JXFJOFZOhBjvbPAAvu2aN
  Args:
    table_names: spatial_ref_sys
Name: sql_db_schema


CREATE TABLE facilities (
	facility_id VARCHAR NOT NULL, 
	park_id VARCHAR, 
	facility_name VARCHAR, 
	facility_type VARCHAR, 
	location_wkt geometry(GEOMETRY,-1), 
	CONSTRAINT facilities_pkey PRIMARY KEY (facility_id), 
	CONSTRAINT facilities_park_id_fkey FOREI

In [None]:
# 提问回答

question = "最南边的公园是什么，距离他最近的设施最近的维护日期是什么，设施是什么？"
# qu = "植物生长需要什么条件？"


for step in app.stream(
    {"messages": [{"role": "user", "content": question}]},
    stream_mode="values",
):
    message = step['messages']
    if type(message[0]) == dict:
        pass
    else:
        for msg in message:
            msg.pretty_print()


最南边的公园是什么，距离他最近的设施最近的维护日期是什么
Tool Calls:
  sql_db_list_tables (call_vkOufwxNzOvK9M78dMx4kJiG)
 Call ID: call_vkOufwxNzOvK9M78dMx4kJiG
  Args:
Name: sql_db_list_tables

facilities, maintenance_logs, parks, spatial_ref_sys
Tool Calls:
  sql_db_schema (call_mOI00yPdAq8GA4K4m6lzpOZi)
 Call ID: call_mOI00yPdAq8GA4K4m6lzpOZi
  Args:
    table_names: parks
  sql_db_schema (call_1GePqzO852Df4odAy1rUuNZZ)
 Call ID: call_1GePqzO852Df4odAy1rUuNZZ
  Args:
    table_names: facilities
  sql_db_schema (call_JJI6p3FDkIkvVq6VF2w1U0zq)
 Call ID: call_JJI6p3FDkIkvVq6VF2w1U0zq
  Args:
    table_names: maintenance_logs
Name: sql_db_schema


CREATE TABLE parks (
	park_id VARCHAR NOT NULL, 
	park_name VARCHAR, 
	area_name VARCHAR, 
	boundary_wkt geometry(GEOMETRY,-1), 
	CONSTRAINT parks_pkey PRIMARY KEY (park_id)
)

/*
3 rows from parks table:
park_id	park_name	area_name	boundary_wkt
P001	南湖湿地公园	洪山区	01030000000100000005000000a2f8e582318e5c409736969cfc943e405a176b6e838e5c409736969cfc943e405a176b6e83
P002	纱帽江