In [1]:
# 导入相关模块，包括运算符、输出解析器、聊天模板、ChatOpenAI 和 运行器
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnablePassthrough, RunnableMap

# 初始化 ChatOpenAI 模型，指定使用的模型为 'gpt-4o-mini'
llm = ChatOpenAI(
    model="gpt-4o-mini",
    openai_api_base="https://api.gptsapi.net/v1",
    openai_api_key="sk-XPp5a40638eec6f1ce1c0333e36bf6305e53c8ea95ccTUXc"
)


In [2]:

# 1.开始节点================================
# 创建一个开始节点，用于传递初始输入
start = RunnablePassthrough()


In [3]:

# 2.小模型节点================================
# 小模型本质返回指标名称后，再通过指标名称去查到该指标名称的维度列表，最终输出是如下：
## ["指标名：经营收入，维度列表：[完工日期, 报表项目分类, 数据类型, 描述, 阿米巴, 来源单据编号, 产成品物料编码, 客户区域, 统计项目]","",""]

# 模拟指标查询接口
def mock_indicator_api(input_text):
    # 模拟指标数据库
    indicators = {
        "营业收入": 0.95,
        "退货及时率": 0.78,
        "客户满意度": 0.85,
        "库存周转率": 0.72,
        "毛利率": 0.88
    }
    
    # 构造返回结果
    result = {
        "data": {
            "indicatorResult": []
        },
        "description": "success", 
        "errorCode": 0,
        "requestId": "mock-request-id"
    }
    
    # 查找最匹配的指标
    matched = False
    for name, score in indicators.items():
        if input_text in name:
            result["data"]["indicatorResult"].append({
                "指标名称": name,
                "相似度": score
            })
            matched = True
            break
    
    # 如果没找到匹配项,返回空结果
    if not matched:
        result["data"]["indicatorResult"] = []
        
    return result
## ["指标名：经营收入，维度列表：[完工日期, 报表项目分类, 数据类型, 描述, 阿米巴, 来源单据编号, 产成品物料编码, 客户区域, 统计项目]","",""]
print("小模型节点结果：", mock_indicator_api("营业收入"))



小模型节点结果： {'data': {'indicatorResult': [{'指标名称': '营业收入', '相似度': 0.95}]}, 'description': 'success', 'errorCode': 0, 'requestId': 'mock-request-id'}


In [4]:

# 2.1 小模型节点-请求指标平台接口获取具体维度================================


# 2.1.1 先定义一个维度查询接口
def mock_indicator_dimensions_api(indicator_name):
    # 模拟维度数据库
    dimensions = {
        "营业收入": ["完工日期", "报表项目分类", "数据类型", "描述", "阿米巴", "来源单据编号", "产成品物料编码", "客户区域", "统计项目"],
        "退货及时率": ["退货日期", "产品类型", "客户类型"],
        "客户满意度": ["评价时间", "客户区域", "产品类型"],
        "库存周转率": ["统计日期", "仓库", "物料类型"],
        "毛利率": ["统计月份", "产品线", "客户类型", "销售区域"]
    }
    
    # 构造返回结果
    result = {
        "data": {
            "indicatorName": indicator_name,
            "dimensionList": []
        },
        "description": "success",
        "errorCode": 0,
        "requestId": "mock-dimension-request-id"
    }
    
    # 查找指标对应的维度
    if indicator_name in dimensions:
        result["data"]["dimensionList"] = dimensions[indicator_name]
    
    return result


# 2.1.2调用2.小模型提供的召回的指标名称，通过mock_indicator_api获取指标结果
indicator_result = mock_indicator_api("营业收入")

# 从结果中提取指标名称和维度
if indicator_result["data"]["indicatorResult"]:
    indicator_name = indicator_result["data"]["indicatorResult"][0]["指标名称"]
    # 将指标名称传入维度查询接口
    dimensions_result = mock_indicator_dimensions_api(indicator_name)
    
    # 构造返回格式
    dimensions_str = ", ".join(dimensions_result["data"]["dimensionList"])
    result = [f"指标名：{indicator_name}，维度列表：[{dimensions_str}]", "", ""]
else:
    result = ["", "", ""]

print(result)


['指标名：营业收入，维度列表：[完工日期, 报表项目分类, 数据类型, 描述, 阿米巴, 来源单据编号, 产成品物料编码, 客户区域, 统计项目]', '', '']


In [11]:

# 3.查询参数提取================================
# 创建提示词模板
param_extract_prompt = ChatPromptTemplate.from_template("""
你是一个数据分析专家，擅长对用户自然语言进行解析，识别用户问题中的统计维度和筛选条件，以便获取更准确的指标数据。

技能: 数据分析、指标识别、维度匹配、逻辑推理。

目标: 根据用户问题从背景信息中选出与问题最相关的维度，并从用户问题中识别出其中的统计维度（group by）。

背景信息:
指标：{indicator}

约束条件:
1. 所有的思考和输出都严格基于给定的背景信息，不得超出范围，不得伪造或猜测内容。
2. 必须严格按照以下JSON格式输出，禁止附加任何其他内容。
3. 所有指标名称和维度名称都必须来自你选定的指标。
4. 牢记！当问题中的维度有具体维度值时，禁止将该维度加入统计维度。

输出格式:
{{"指标名称":"XXX","统计维度名称":[{{"维度名称":"XXX"}}],"排序维度名称":"XXX","排序方式":"排序方式(1-升序，2-降序，0-不需要排序)","结果限制条数":"结果限制条数(数字，1，2，3...)","卡片类型":"XXX(NumberCard/Chart)","图表类型":"XXX(bar/line/pie)"}}

若通过背景信息的指标无法回答用户的问题或用户的问题与指标查询无关，不要伪造内容，返回指标名称为空，按以下JSON格式返回，不要附加任何其他内容：
{{"指标名称":""}}

如果统计维度为时间维度，则"统计维度名称"节点按照：{{"维度名称":"XXX","时间粒度":"year/quarter/month/day"}} 格式输出。注意！仅统计维度为时间维度时增加"时间粒度"节点，非时间维度不增加"时间粒度"节点。

用户问题: {question}
""")

# 创建参数提取链
param_extract_chain = (
    RunnableMap({
        "indicator": lambda x: x["indicator"],
        "question": lambda x: x["question"]
    })
    | param_extract_prompt
    | llm
    | StrOutputParser()
)
# 测试数据
test_question = "营业收入"

# 测试参数提取链
async def test_param_extract():
    print(f"\n测试问题: {test_question}")
    result = await param_extract_chain.ainvoke({
        "indicator": indicator_result,
        "question": test_question
    })
    print(f"提取结果: {result}")

await test_param_extract()



测试问题: 营业收入
提取结果: {"指标名称":"营业收入","统计维度名称":[],"排序维度名称":"","排序方式":"0","结果限制条数":"1","卡片类型":"NumberCard","图表类型":""}


In [13]:


# 4.获取时间和指标================================
import json
import re
from datetime import datetime

def invoke_action(s: str, data_map: dict) -> dict:
    """
    与 Java 中的 invokeAction(String s, Map<String, String> map) 相对应的函数。
    """
    # 从 data_map 中获取 JSON 字符串
    indicators_json = data_map.get("indicatorsJson", "")
    indicator_json = data_map.get("indicatorJson", "")

    # 获取当前日期和时间并格式化
    now = datetime.now()
    formatted_date_time = now.strftime("%Y-%m-%d %H:%M:%S")

    # 创建返回结果字典
    map_output = {
        "time": formatted_date_time
    }

    # 调用 find_matching_indicator，并捕获可能的异常
    try:
        result = find_matching_indicator(indicators_json, indicator_json)
        map_output["matchedIndicator"] = result
    except Exception as e:
        map_output["error"] = f"Error processing JSON input: {str(e)}"

    return map_output

def find_matching_indicator(indicators_json: str, indicator_json: str) -> str:
    """
    查找和匹配指标名称。
    """
    # 提取指标名
    indicators = extract_indicator_names(indicators_json)
    if not indicators:
        return "null"

    # 将指标名列表字符串转换为数组
    indicators_array = convert_string_to_array(indicators_json)

    # 解析 JSON，获取目标指标名称
    data = json.loads(indicator_json) if indicator_json else {}
    target_indicator_name = data.get("指标名称", "")
    if not target_indicator_name:
        return "null"

    # 在提取的指标名中查找
    for i, indicator_name in enumerate(indicators):
        if indicator_name == target_indicator_name:
            # 找到匹配则返回原始字符串数组里的对应值
            return indicators_array[i]

    return "null"

def extract_indicator_names(input_str: str) -> list:
    """
    使用正则表达式从输入字符串中提取“指标名：”后面的内容。
    """
    pattern = re.compile(r"指标名：([^，]+)")  # 与 Java 中的正则相对应
    return pattern.findall(input_str)

def convert_string_to_array(indicators_string: str) -> list:
    """
    将类似 ["指标名：ABC","指标名：XYZ"] 这样的字符串转换为 list。
    """
    if not indicators_string.startswith("[") or not indicators_string.endswith("]"):
        return []

    trimmed = indicators_string[1:-1]  # 去掉 '[' 和 ']'
    # 按照 "," 拆分，然后去除多余的引号
    indicators = trimmed.split("\",\"")
    indicators = [x.replace("\"", "").strip() for x in indicators]
    return indicators

# -------------------------------------
# 下面是测试用例示例：
if __name__ == "__main__":
    # 模拟提供的测试用例数据
    data_map = {
        # 对应 Java 中的 @String[indicatorsJson]
        "indicatorsJson": "[\"指标名：经营收入，维度列表：[完工日期, 报表项目分类, 数据类型, 描述, 阿米巴, 来源单据编号, 产成品物料编码, 客户区域, 统计项目]\",\"\",\"\"]",
        # 对应 Java 中的 @String[indicatorJson]
        "indicatorJson": '{"图表类型":"bar","排序方式":"0","结果限制条数":"1","卡片类型":"Chart","指标名称":"经营收入","统计维度名称":[{"维度名称":"完工日期","时间粒度":"month"}],"排序维度名称":""}'
    }

    # 调用测试方法
    result = invoke_action("test", data_map)
    print("测试结果: ", result)


测试结果:  {'time': '2025-03-06 20:04:19', 'matchedIndicator': '指标名：经营收入，维度列表：[完工日期, 报表项目分类, 数据类型, 描述, 阿米巴, 来源单据编号, 产成品物料编码, 客户区域, 统计项目]'}
