在这个教程中我们将使用openai的模型

In [1]:
import getpass
import os
if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass()
# Comment out the below to opt-out of using LangSmith in this notebook. Not required.
if not os.environ.get("LANGCHAIN_API_KEY"):
    os.environ["LANGCHAIN_API_KEY"] = getpass.getpass()
    os.environ["LANGCHAIN_TRACING_V2"] = "true"

我们来看一下使用SQLite连接Chinook数据库，请按照以下安装步骤，在与此笔记本相同的目录中创建Chinook.db：

- 下载保存文件到 `Chinook.sql`
- 执行 `sqlite3 Chinook.db` 命令
- 执行 `.read Chinook.sql` 命令
- 测试查询`SELECT * FROM Artist LIMIT 10;`

现在， `Chinook.db` 已经在我们的目录中，我们可以使用由SQLAlchemy驱动的SQLDatabase类与之交互：

In [1]:
import sqlite3

# 不需要下载和创建数据库的步骤，因为数据库已经存在
# 直接连接到现有的订单数据库
db_path = "./data/order_database.db"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()


# 运行测试查询，这里我们选择一些有意义的字段作为示例
try:
    cursor.execute("""
        SELECT
            order_no,
            order_time,
            sales,
            material_name_cn,
            channel
        FROM new_fact_order_detail
        LIMIT 10;
    """)
    rows = cursor.fetchall()
    for row in rows:
        print(row)
except Exception as e:
    print(f"Failed to run test query: {e}")

# 关闭连接
conn.close()

('3c5db3f9729998569150adceca0fc0ad', '2024-10-21 23:49:35', 93.5, '芝麻开门男士滋养紧致眼部精华露  15ml', '2')
('3c0f0651639911a6de3ce6e72119b92f', '2024-10-30 00:00:00', 121, '芝麻开门男士新亮肤洁面膏 125ml', '1')
('3c0f0651639911a6de3ce6e72119b92f', '2024-10-30 00:00:00', 107.8, '芝麻开门男士全新亮肤清透爽肤水  200ml', '1')
('3c0f0651639911a6de3ce6e72119b92f', '2024-10-30 00:00:00', 173.8, '芝麻开门男士全新亮肤焕颜精华乳 50ml', '1')
('0579c2b642401b5de69edbdf20d80b78', '2024-10-27 00:00:00', 34.1, '芝麻开门男士水动力爽肤水 200ml', '1')
('6436ae0fe566d4b1b7d50840fc7863c2', '2024-10-18 12:47:19', 64.9, '芝麻开门男士淡纹焕肤精华水 150ml', '2')
('08423bb1449d18580efeb603391abdf5', '2024-10-21 20:31:33', 93.5, '芝麻开门男士滋养紧致眼部精华露  15ml', '2')
('b903bb994dc419747b9af476880f8b11', '2024-10-21 20:28:50', 93.5, '芝麻开门男士滋养紧致眼部精华露  15ml', '2')
('36775c99f024740e68cffbee351244dd', '2024-10-21 20:27:17', 93.5, '芝麻开门男士滋养紧致眼部精华露  15ml', '2')
('76fd63c06a56cecd6d14cf526ebcb901', '2024-10-21 20:41:01', 93.5, '芝麻开门男士滋养紧致眼部精华露  15ml', '2')


In [3]:
# 查看表结构
cursor.execute("PRAGMA table_info(new_fact_order_detail);")
table_info = cursor.fetchall()
print("Table Structure:")
for column in table_info:
    print(f"Column: {column[1]}, Type: {column[2]}")

ProgrammingError: Cannot operate on a closed database.

In [12]:
from langchain.llms.base import LLM
from openai import OpenAI
from langchain_community.llms.utils import enforce_stop_tokens
import requests
import os
from typing import Optional
from typing import Optional, List, Any
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain_community.utilities import SQLDatabase

In [13]:
# 使用相对路径
db = SQLDatabase.from_uri("sqlite:///data/order_database.db")

In [6]:
type(db)

langchain_community.utilities.sql_database.SQLDatabase

In [7]:
context = db.get_context()

In [8]:
context

{'table_info': '\nCREATE TABLE new_fact_order_detail (\n\torder_no VARCHAR(255), \n\torder_time TIMESTAMP, \n\torder_date DATE, \n\tbrand_code VARCHAR(255), \n\tprogram_code VARCHAR(255), \n\torder_type INTEGER, \n\tsales DECIMAL(18, 2), \n\titem_qty INTEGER, \n\titem_price DECIMAL(18, 2), \n\tchannel VARCHAR(255), \n\tsubchannel VARCHAR(255), \n\tsub_subchannel VARCHAR(255), \n\tmaterial_code VARCHAR(255), \n\tmaterial_name_cn VARCHAR(255), \n\tmaterial_type VARCHAR(255), \n\tmerged_c_code VARCHAR(255), \n\ttier_code VARCHAR(255), \n\tfirst_order_date DATE, \n\tis_mtd_active_member_flag INTEGER, \n\tytd_active_arr VARCHAR(255), \n\tr12_active_arr VARCHAR(255), \n\tmanager_counter_code VARCHAR(255), \n\tba_code VARCHAR(255), \n\tprovince_name VARCHAR(255), \n\tline_city_name VARCHAR(255), \n\tline_city_level VARCHAR(255), \n\tstore_no VARCHAR(255), \n\tterminal_name VARCHAR(255), \n\tterminal_code VARCHAR(255), \n\tterminal_region VARCHAR(255), \n\tdefault_flag INTEGER\n)\n\n/*\n3 rows

In [34]:

# 设置API密钥和基础URL环境变量
API_KEY = "9f414214-5dda-48c5-8960-332b8125e086"
BASE_URL = "https://api-inference.modelscope.cn/v1/"

class SiliconFlow(LLM):
    def __init__(self):
        super().__init__()

    @property
    def _llm_type(self) -> str:
        """返回 LLM 的类型标识符"""
        return "silicon_flow"

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """实现实际的 LLM 调用逻辑"""
        # 初始化OpenAI客户端
        client = OpenAI(api_key=API_KEY, base_url=BASE_URL)

        # 发送请求到模型
        response = client.chat.completions.create(
            model='Qwen/Qwen2.5-Coder-32B-Instruct',
            messages=[
                {'role': 'user', 'content': prompt}
            ],
        )

        # 收集响应内容
        content = ""
        if hasattr(response, 'choices') and response.choices:
            for choice in response.choices:
                if hasattr(choice, 'message') and hasattr(choice.message, 'content'):
                    content += choice.message.content
        else:
            raise ValueError("Unexpected response structure")

        # 处理停止词
        if stop is not None:
            content = enforce_stop_tokens(content, stop)

        return content

In [14]:
from langchain.llms.base import LLM
from openai import OpenAI
from typing import Optional, List, Any
from langchain.callbacks.manager import CallbackManagerForLLMRun

# 设置API密钥和基础URL环境变量
API_KEY = "9f414214-5dda-48c5-8960-332b8125e086"
BASE_URL = "https://api-inference.modelscope.cn/v1/"

class SiliconFlow(LLM):
    def __init__(self):
        super().__init__()

    @property
    def _llm_type(self) -> str:
        return "silicon_flow"

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        try:
            client = OpenAI(
                api_key=API_KEY,
                base_url=BASE_URL  # 修改API路径
            )

            response = client.chat.completions.create(
                model='Qwen/Qwen2.5-Coder-32B-Instruct',
                messages=[
                    {'role': 'user', 'content': prompt}
                ],
            )

            content = ""
            if hasattr(response, 'choices') and response.choices:
                for choice in response.choices:
                    if hasattr(choice, 'message') and hasattr(choice.message, 'content'):
                        content += choice.message.content
            else:
                raise ValueError("Unexpected response structure")

            if stop is not None:
                content = enforce_stop_tokens(content, stop)

            return content
        except Exception as e:
            print(f"API调用出错: {str(e)}")
            print(f"完整错误: {e.__class__.__name__}")
            raise

# 测试代码
if __name__ == "__main__":
    # 创建 LLM 实例
    llm = SiliconFlow()

    # 测试简单问题
    test_prompt = "你好,请用简短的话介绍一下你自己"

    print("发送问题:", test_prompt)
    try:
        # 使用 invoke 方法替代直接调用
        response = llm.invoke(test_prompt)
        print("\n收到回复:", response)
    except Exception as e:
        print("发生错误:", str(e))

发送问题: 你好,请用简短的话介绍一下你自己

收到回复: 你好！我是一个由阿里巴巴云开发的人工智能助手，旨在帮助回答问题、提供信息和支持各种任务。


In [9]:
llm = SiliconFlow()

In [16]:
type(llm)

__main__.SiliconFlow

In [10]:
llm = SiliconFlow()

# 测试简单问题
test_prompt = "你好,请用简短的话介绍一下你自己"

print("发送问题:", test_prompt)
try:
    response = llm(test_prompt)
    print("\n收到回复:", response)
except Exception as e:
    print("发生错误:", str(e))

发送问题: 你好,请用简短的话介绍一下你自己


  warn_deprecated(



收到回复: 你好！我是一个由阿里巴巴云开发的人工智能助手，旨在帮助用户回答问题、提供信息和完成各种任务。


In [41]:
from langchain.chains import create_sql_query_chain
chain = create_sql_query_chain(llm, db)
response = chain.invoke({"question": "what is the highest sales"})
response

'SQLQuery: SELECT "sales" FROM "new_fact_order_detail" ORDER BY "sales" DESC LIMIT 1'

In [44]:
db.run('SELECT "sales" FROM "new_fact_order_detail" ORDER BY "sales" DESC LIMIT 1')

'[(275013.75,)]'

In [16]:
from langchain_core.runnables import RunnableLambda

# 定义清洗函数：去除 "SQLQuery: " 前缀
def clean_sql_response(response: str) -> str:
    # 检查前缀是否存在
    if response.startswith("SQLQuery:"):
        # 提取前缀后的内容并去除首尾空格
        return response.split("SQLQuery:", 1)[1].strip()
    return response  # 若无前缀，直接返回

# 创建链的完整流程
sql_chain = create_sql_query_chain(llm, db)
execute_query = QuerySQLDataBaseTool(db=db)  # 确保已正确导入执行工具

# 组合链：生成 SQL → 清洗 → 执行
chain = sql_chain | RunnableLambda(clean_sql_response) | execute_query

# 调用链
result = chain.invoke({"question": "what is the lowest sales"})
print(result)

[(0,)]


In [None]:
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough, RunnableLambda

# ================ 1. 定义组件 ================
# 假设以下组件已初始化
# llm = ...
# db = ...

# SQL 生成链（原始输出含 "SQLQuery: " 前缀）
write_query = create_sql_query_chain(llm, db)
execute_query = QuerySQLDataBaseTool(db=db)

# ================ 2. 数据处理函数 ================
def clean_sql_response(response: str) -> str:
    """清洗 SQL 前缀"""
    if response.startswith("SQLQuery:"):
        return response.split("SQLQuery:", 1)[1].strip()
    return response

def format_result_wrapper(result: str) -> dict:
    """将执行结果包装为字典，保留原始 SQL 和结果"""
    return {"raw_result": result}

# ================ 3. 回答生成提示模板 ================
answer_prompt = PromptTemplate.from_template(
    """基于以下信息回答问题：
问题：{question}
生成的 SQL 查询：{clean_query}
数据库返回结果：{result}

请用自然语言给出简洁答案。如果结果中的数值为 0，明确说明“没有记录”"""
)

# ================ 4. 构建完整链 ================
chain = (
    # 第一步：接收原始输入，保留问题字段
    RunnablePassthrough.assign(question=lambda x: x["question"])
    # 第二步：生成并清洗 SQL
    .assign(
        clean_query=write_query | RunnableLambda(clean_sql_response)
    )
    # 第三步：执行 SQL 并包装结果
    .assign(
        result=itemgetter("clean_query") | execute_query | RunnableLambda(format_result_wrapper)
    )
    # 第四步：组合所有数据到提示模板
    | {
        "question": itemgetter("question"),
        "clean_query": itemgetter("clean_query"),
        "result": itemgetter("result")
    }
    | answer_prompt  # 填充模板
    | llm  # 生成自然语言回答
    | StrOutputParser()  # 解析输出
)

# ================ 5. 调用示例 ================
response = chain.invoke({"question": "How many orders are there"})
print(response)
# 输出示例：根据数据库结果，可能为 "当前共有42条订单记录" 或 "没有订单记录"

根据数据库返回的结果，有 116771 条订单。
