In [9]:
import os
import getpass
import langsmith
from dotenv import load_dotenv
from langsmith import wrappers, traceable
import openai

os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "Patent_Project_Local_Text2Sql_Test"

In [11]:
load_dotenv()

True

In [13]:
# (1) 构建 SQLite 数据库路径
db_path = os.path.expanduser("~/Desktop/patents/data/patent.db")

# (2) 利用 SQLAlchemy 建立连接引擎
from sqlalchemy import create_engine, text
from sqlalchemy.pool import StaticPool

engine = create_engine(
    f"sqlite:///{db_path}",
    connect_args={"check_same_thread": False},
    poolclass=StaticPool
)

# 测试连接
with engine.begin() as conn:
    result = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table';"))
    tables = result.fetchall()
print(f"Successfully connected to database at {db_path}")
print("Detected tables:", tables)


Successfully connected to database at /Users/yuxiangwang/Desktop/patents/data/patent.db
Detected tables: [('patents',), ('inventors',), ('sqlite_sequence',), ('assignees',), ('prior_art_keywords',), ('events',), ('external_links',), ('images',), ('classifications',), ('claims',), ('applications_claiming_priority',), ('worldwide_applications',), ('patent_citations',), ('cited_by',), ('legal_events',), ('concepts',), ('child_applications',), ('parent_applications',), ('priority_applications',), ('non_patent_citations',), ('similar_documents',), ('error_logs',)]


In [15]:
# (3) 构建SQLDatabase
from langchain_community.utilities.sql_database import SQLDatabase
db = SQLDatabase(engine)
print("SQLDatabase object:", type(db))

SQLDatabase object: <class 'langchain_community.utilities.sql_database.SQLDatabase'>


In [17]:
# (4) 初始化 LLM
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
    model_name="gpt-4o",  # 或 "gpt-3.5-turbo"
    temperature=0.0
)
print("LLM init done.")

LLM init done.


In [19]:
# (5) 初始化 SQLDatabaseToolkit
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit
toolkit = SQLDatabaseToolkit(db=db, llm=llm)
tools = toolkit.get_tools()
print("\nDefault SQL Tools:")
for t in tools:
    print(" -", t.name, ":", t.description)



Default SQL Tools:
 - sql_db_query : Input to this tool is a detailed and correct SQL query, output is a result from the database. If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again. If you encounter an issue with Unknown column 'xxxx' in 'field list', use sql_db_schema to query the correct table fields.
 - sql_db_schema : Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables. Be sure that the tables actually exist by calling sql_db_list_tables first! Example Input: table1, table2, table3
 - sql_db_list_tables : Input is an empty string, output is a comma-separated list of tables in the database.
 - sql_db_query_checker : Use this tool to double check if your query is correct before executing it. Always use this tool before executing a query with sql_db_query!


In [25]:
#######################################
# (3) 自定义 schema_doc_tool 
#######################################
from langchain.tools import BaseTool

# Hard-code: 维护一个字典，描述每张表的用途、每列含义等
SCHEMA_DOCS = {
    "patents": {
        "table_comment": "Stores the core patent data using a text-based primary key.",
        "columns": {
            "patent_id": "Unique text-based primary key (e.g., US20180044418A1).",
            "title": "Patent title.",
            "type": "Patent type (e.g., patent).",
            "pdf_link": "Link to the PDF of the patent.",
            "publication_number": "Publication number (e.g., US20180044418A1).",
            "country": "Country/region (e.g., United States).",
            "application_number": "Application number.",
            "priority_date": "Priority date in YYYY-MM-DD format.",
            "filing_date": "Filing date in YYYY-MM-DD format.",
            "publication_date": "Publication date in YYYY-MM-DD format.",
            "prior_art_date": "Prior art date if applicable (YYYY-MM-DD).",
            "family_id": "Family ID if provided.",
            "abstract": "Abstract of the patent.",
            "description_link": "Link to the detailed description."
        }
    },

    "inventors": {
        "table_comment": "Records the list of inventors for each patent.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "inventor_name": "Name of the inventor.",
            "link": "URL to inventor info (optional).",
            "serpapi_link": "SerpApi link to inventor info (optional)."
        }
    },

    "assignees": {
        "table_comment": "Stores the assignees (applicants/patent owners) of each patent.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "name": "Name of the assignee (e.g., Merck Sharp & Dohme LLC)."
        }
    },

    "prior_art_keywords": {
        "table_comment": "Keeps track of keywords related to prior art.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "keyword": "One keyword (e.g., cancer, tumor)."
        }
    },

    "events": {
        "table_comment": "Stores various patent events like filings, publications, assignments, etc.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "event_date": "Date of the event in YYYY-MM-DD format.",
            "title": "Title or short description of the event.",
            "type": "Type of event (e.g., filed, publication, legal-status).",
            "critical": "Indicator if the event is critical (0/1).",
            "assignee_search": "Assignee details if relevant.",
            "description": "Extended description (may concatenate arrays if needed)."
        }
    },

    "external_links": {
        "table_comment": "Contains external links related to the patent (e.g., USPTO link).",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "text": "Display text or label for the link.",
            "link": "URL of the external link."
        }
    },

    "images": {
        "table_comment": "Stores image URLs related to the patent.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "image_url": "URL of an image."
        }
    },

    "classifications": {
        "table_comment": "Keeps track of classification codes for the patent (CPC, IPC, etc.).",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "code": "Classification code (e.g., C07K16/2818).",
            "description": "Description of the classification code.",
            "leaf": "Indicator whether this classification is a leaf node (0/1).",
            "first_code": "Whether it is the first code from the JSON (0/1).",
            "is_cpc": "Whether this classification is a CPC (0/1).",
            "additional": "Whether this is an additional classification (0/1)."
        }
    },

    "claims": {
        "table_comment": "Stores textual claims for each patent.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "claim_no": "Claim number (integer).",
            "claim_txt": "Full text of the claim."
        }
    },

    "applications_claiming_priority": {
        "table_comment": "Applications that claim priority to the given patent.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "application_number": "Application number of the priority claimant.",
            "priority_date": "Priority date in YYYY-MM-DD format.",
            "filing_date": "Filing date in YYYY-MM-DD format.",
            "representative_publication": "Representative publication number.",
            "primary_language": "Primary language (e.g., en).",
            "title": "Application title."
        }
    },

    "worldwide_applications": {
        "table_comment": "Stores worldwide family application data for the patent.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "year": "Year (integer).",
            "application_number": "Application number.",
            "country_code": "Country or region code (e.g., US, WO).",
            "document_id": "Document identifier.",
            "filing_date": "Filing date in YYYY-MM-DD format.",
            "legal_status": "Legal status (e.g., Active).",
            "legal_status_cat": "Category for the legal status (active, not_active, etc.).",
            "this_app": "Indicator if this is the same application (0/1)."
        }
    },

    "patent_citations": {
        "table_comment": "Patent citations referenced by the given patent.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "is_family_to_family": "Family-to-family citation indicator (0/1).",
            "publication_number": "Publication number of the cited patent.",
            "primary_language": "Language of the cited patent.",
            "examiner_cited": "Indicator if cited by an examiner (0/1).",
            "priority_date": "Priority date of the cited patent.",
            "publication_date": "Publication date of the cited patent.",
            "assignee_original": "Original assignee name of the cited patent.",
            "title": "Title of the cited patent.",
            "serpapi_link": "Related SerpApi link.",
            "patent_id_ref": "Internal reference to the cited patent ID."
        }
    },

    "cited_by": {
        "table_comment": "Patents that cite this patent (reverse citations).",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "is_family_to_family": "Family-to-family citation indicator (0/1).",
            "publication_number": "Publication number of the citing patent.",
            "primary_language": "Language of the citing patent.",
            "examiner_cited": "Indicator if cited by an examiner (0/1).",
            "priority_date": "Priority date of the citing patent.",
            "publication_date": "Publication date of the citing patent.",
            "assignee_original": "Original assignee of the citing patent.",
            "title": "Title of the citing patent.",
            "serpapi_link": "Related SerpApi link.",
            "patent_id_ref": "Patent ID reference of the citing patent."
        }
    },

    "legal_events": {
        "table_comment": "Legal events or changes in patent status.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "date": "Event date in YYYY-MM-DD format.",
            "code": "Event code (e.g., AS, STPP).",
            "title": "Event title (e.g., Assignment).",
            "attributes_json": "JSON string of event attributes."
        }
    },

    "concepts": {
        "table_comment": "Stores recognized concepts or compounds within the patent data.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "concept_id": "Unique ID of the concept (e.g., from data['concepts']['match'][*].id).",
            "domain": "Domain (e.g., Diseases).",
            "name": "Name of the concept (e.g., Neoplasm).",
            "similarity": "Similarity score (float).",
            "sections": "Concatenated list of sections where the concept appeared.",
            "count": "Occurrence count of the concept.",
            "inchi_key": "InChI Key for chemical compounds.",
            "smiles": "SMILES notation for chemical compounds."
        }
    },

    "child_applications": {
        "table_comment": "Continuation or child applications referencing the current patent.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "application_number": "Child application number.",
            "relation_type": "Type of relationship (e.g., Continuation).",
            "representative_publication": "Representative publication number for the child.",
            "primary_language": "Primary language of the child application.",
            "priority_date": "Priority date in YYYY-MM-DD format.",
            "filing_date": "Filing date in YYYY-MM-DD format.",
            "title": "Title of the child application."
        }
    },

    "parent_applications": {
        "table_comment": "Parent applications of the current patent (e.g., parent continuation).",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "application_number": "Parent application number.",
            "relation_type": "Type of relationship (e.g., Continuation).",
            "representative_publication": "Representative publication number for the parent.",
            "primary_language": "Primary language of the parent application.",
            "priority_date": "Priority date in YYYY-MM-DD format.",
            "filing_date": "Filing date in YYYY-MM-DD format.",
            "title": "Title of the parent application."
        }
    },

    "priority_applications": {
        "table_comment": "Priority applications that the current patent claims.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "application_number": "Priority application number.",
            "representative_publication": "Representative publication number.",
            "primary_language": "Language of the priority application.",
            "priority_date": "Priority date in YYYY-MM-DD format.",
            "filing_date": "Filing date in YYYY-MM-DD format.",
            "title": "Title of the priority application."
        }
    },

    "non_patent_citations": {
        "table_comment": "Non-patent literature cited by the patent.",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "citation_title": "Title of the cited non-patent reference.",
            "examiner_cited": "Indicator if cited by an examiner (0/1)."
        }
    },

    "similar_documents": {
        "table_comment": "Keeps track of documents deemed similar to this patent (patent or non-patent).",
        "columns": {
            "id": "Auto-increment primary key.",
            "patent_id": "References patents(patent_id).",
            "is_patent": "Indicator if the similar document is a patent (0/1).",
            "doc_patent_id": "If a patent, ID (e.g., patent/US11734097B1/en).",
            "serpapi_link": "SerpApi link for the similar document.",
            "publication_number": "Publication number for the similar document.",
            "primary_language": "Language of the similar document.",
            "publication_date": "Publication date in YYYY-MM-DD format.",
            "title": "Title of the similar document."
        }
    },

    "error_logs": {
        "table_comment": "Logs errors and exceptions encountered during processing.",
        "columns": {
            "id": "Auto-increment primary key.",
            "error_message": "Description of the error.",
            "stack_trace": "Full stack trace if available.",
            "created_at": "Timestamp (YYYY-MM-DD HH:MM:SS) when the error was recorded."
        }
    }
}

In [40]:
def create_schema_texts(schema_docs: dict):
    """
    将 SCHEMA_DOCS 转化为 [ {text: ..., metadata: {...}}, ... ] 用于向量化
    这里每张表做一个大段，如果你想更精细（每列一个文档），可自行拆分
    """
    docs = []
    for table_name, info in schema_docs.items():
        table_comment = info.get("table_comment", "")
        col_texts = []
        for col, desc in info["columns"].items():
            col_texts.append(f"{col}: {desc}")
        col_section = "\n".join(col_texts)
        
        text_chunk = f"Table: {table_name}\nComment: {table_comment}\nColumns:\n{col_section}"
        doc_item = {
            "text": text_chunk,
            "metadata": {"table_name": table_name}
        }
        docs.append(doc_item)
    return docs

schema_list = create_schema_texts(SCHEMA_DOCS)
for item in schema_list:
    print(item["text"], item["metadata"])

Table: patents
Comment: Stores the core patent data using a text-based primary key.
Columns:
patent_id: Unique text-based primary key (e.g., US20180044418A1).
title: Patent title.
type: Patent type (e.g., patent).
pdf_link: Link to the PDF of the patent.
publication_number: Publication number (e.g., US20180044418A1).
country: Country/region (e.g., United States).
application_number: Application number.
priority_date: Priority date in YYYY-MM-DD format.
filing_date: Filing date in YYYY-MM-DD format.
publication_date: Publication date in YYYY-MM-DD format.
prior_art_date: Prior art date if applicable (YYYY-MM-DD).
family_id: Family ID if provided.
abstract: Abstract of the patent.
description_link: Link to the detailed description. {'table_name': 'patents'}
Table: inventors
Comment: Records the list of inventors for each patent.
Columns:
id: Auto-increment primary key.
patent_id: References patents(patent_id).
inventor_name: Name of the inventor.
link: URL to inventor info (optional).
se

In [46]:
from langchain.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.docstore.document import Document


In [48]:
embedding = OpenAIEmbeddings()  # 需OPENAI_API_KEY


In [50]:
# 将 schema_list 转成 Document 对象
documents = [
    Document(page_content=x["text"], metadata=x["metadata"]) 
    for x in schema_list
]

vectorstore = Chroma.from_documents(
    documents=documents,
    embedding=embedding,
    collection_name="schema_docs_collection"
)

In [52]:
from langchain.tools import BaseTool

class SchemaDocRAGTool(BaseTool):
    name: str = "schema_doc_rag_tool"
    description: str = (
        "Do a semantic search over the large schema doc. "
        "Input: any question about tables or columns. Output: the relevant snippet(s)."
    )

    vectorstore: Chroma  # 在初始化时传入

    def _run(self, query: str) -> str:
        # top_k=3 等可自己调
        docs_and_scores = self.vectorstore.similarity_search_with_score(query, k=3)
        if not docs_and_scores:
            return "No relevant schema snippet found."

        lines = []
        for doc, score in docs_and_scores:
            # doc.page_content
            # doc.metadata
            lines.append(
                f"score={score:.2f}, table_name={doc.metadata.get('table_name', '')}\nContent:\n{doc.page_content}"
            )
        return "\n\n".join(lines)

    async def _arun(self, query: str) -> str:
        return self._run(query)

In [54]:
rag_tool = SchemaDocRAGTool(vectorstore=vectorstore)
all_tools = tools + [rag_tool]

# system prompt
system_message_with_tool = """
You are an SQL agent with knowledge of a large schema doc. 
When you have questions about table or column details, call the 'schema_doc_rag_tool'.
If the user asks "what is the 'country' column of the 'patents' table", you can retrieve the snippet from that tool.
"""

from langgraph.prebuilt import create_react_agent
agent_executor = create_react_agent(
    model=llm,
    tools=all_tools,
    prompt=system_message_with_tool
)

question = "在 'inventors' 表里, inventor_name 是什么含义？"
events = agent_executor.stream({"messages": [("user", question)]}, stream_mode="values")

for event in events:
    msg = event["messages"][-1]
    msg.pretty_print()


在 'inventors' 表里, inventor_name 是什么含义？
Tool Calls:
  schema_doc_rag_tool (call_0PyqB14ksvH8K6iH2098fL0y)
 Call ID: call_0PyqB14ksvH8K6iH2098fL0y
  Args:
    query: inventor_name column in inventors table
Name: schema_doc_rag_tool

score=0.30, table_name=inventors
Content:
Table: inventors
Comment: Records the list of inventors for each patent.
Columns:
id: Auto-increment primary key.
patent_id: References patents(patent_id).
inventor_name: Name of the inventor.
link: URL to inventor info (optional).
serpapi_link: SerpApi link to inventor info (optional).

score=0.39, table_name=assignees
Content:
Table: assignees
Comment: Stores the assignees (applicants/patent owners) of each patent.
Columns:
id: Auto-increment primary key.
patent_id: References patents(patent_id).
name: Name of the assignee (e.g., Merck Sharp & Dohme LLC).

score=0.42, table_name=cited_by
Content:
Table: cited_by
Comment: Patents that cite this patent (reverse citations).
Columns:
id: Auto-increment primary key.
pat

In [29]:
class SchemaDocTool(BaseTool):
    """返回指定表的详细注释和列说明."""
    
    # 1. 给字段加上类型注解
    name: str = "schema_doc_tool"
    description: str = (
        "Use this tool to get a table's doc/metadata. Input is a single table name, "
        "output is the textual description, with each column's meaning. "
        "If the table name not found, return an error message."
    )

    # 2. _run 接受 str 作为输入并返回 str
    def _run(self, table_name: str) -> str:
        table_name = table_name.strip()
        if table_name in SCHEMA_DOCS:
            info = SCHEMA_DOCS[table_name]
            lines = [f"【表 {table_name}】: {info['table_comment']}"]
            lines.append("列信息：")
            for col, desc in info['columns'].items():
                lines.append(f" - {col}: {desc}")
            return "\n".join(lines)
        else:
            return f"无法找到表 '{table_name}' 的描述。"

    async def _arun(self, table_name: str) -> str:
        return self._run(table_name)

In [31]:
# 把 schema_doc_tool 加到原有Toolkit工具列表里
schema_tool = SchemaDocTool()
all_tools = tools + [schema_tool]

print("\nFinal Tools with custom schema_doc_tool:")
for t in all_tools:
    print(" -", t.name, ":", t.description)



Final Tools with custom schema_doc_tool:
 - sql_db_query : Input to this tool is a detailed and correct SQL query, output is a result from the database. If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again. If you encounter an issue with Unknown column 'xxxx' in 'field list', use sql_db_schema to query the correct table fields.
 - sql_db_schema : Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables. Be sure that the tables actually exist by calling sql_db_list_tables first! Example Input: table1, table2, table3
 - sql_db_list_tables : Input is an empty string, output is a comma-separated list of tables in the database.
 - sql_db_query_checker : Use this tool to double check if your query is correct before executing it. Always use this tool before executing a query with sql_db_query!
 - schema_doc_tool : Use this tool to get a table's doc/metada

In [33]:
#######################################
# (4) 创建ReAct Agent
#######################################
from langchain import hub
prompt_template = hub.pull("langchain-ai/sql-agent-system-prompt")

# 需要 dialect, top_k
system_message = prompt_template.format(dialect="SQLite", top_k=5)
# 提醒Agent可以调用 schema_doc_tool
additional_note = (
    "\nYou also have a tool called 'schema_doc_tool' which provides table commentary and column meaning. "
    "Call it if you need to clarify a table's usage or columns."
)
system_message_with_tool = system_message + additional_note


In [35]:
from langgraph.prebuilt import create_react_agent
agent_executor = create_react_agent(
    model=llm,
    tools=all_tools,
    prompt=system_message_with_tool
)


In [37]:
# (4) 给个示例问题
user_question = "inventors这个表是干嘛用的？里面的每个列分别是什么意思？"

events = agent_executor.stream(
    {"messages": [("user", user_question)]},
    stream_mode="values"
)

print("\n--- Agent conversation log ---")
for event in events:
    # each event["messages"] is a list of messages
    msg = event["messages"][-1]
    # pretty_print to see the chain-of-thought or final answer
    msg.pretty_print()


--- Agent conversation log ---

inventors这个表是干嘛用的？里面的每个列分别是什么意思？
Tool Calls:
  schema_doc_tool (call_lJkLaXBGMNZeQQ6Pzmpyu1rs)
 Call ID: call_lJkLaXBGMNZeQQ6Pzmpyu1rs
  Args:
    table_name: inventors
Name: schema_doc_tool

【表 inventors】: Records the list of inventors for each patent.
列信息：
 - id: Auto-increment primary key.
 - patent_id: References patents(patent_id).
 - inventor_name: Name of the inventor.
 - link: URL to inventor info (optional).
 - serpapi_link: SerpApi link to inventor info (optional).

表 `inventors` 用于记录每个专利的发明者列表。每个列的含义如下：

- `id`: 自动递增的主键。
- `patent_id`: 引用专利表中的 `patent_id`，用于关联专利。
- `inventor_name`: 发明者的姓名。
- `link`: 指向发明者信息的URL（可选）。
- `serpapi_link`: 指向发明者信息的SerpApi链接（可选）。
