# 预处理assets，整理出后续要用的数据 (可重复执行，更新相关的缓存文件)

In [1]:
from dotenv import load_dotenv
import os
os.environ['DEBUG'] = '1'
os.environ['SHOW_LLM_INPUT_MSG'] = '1'

# 加载 .env 文件
load_dotenv()

ROOT_DIR = os.getcwd()
CACHE_DIR = ROOT_DIR + '/cache'
if not os.path.exists(CACHE_DIR):
    os.makedirs(CACHE_DIR)


In [2]:
import pandas as pd
import json
import re
import copy
import jieba
import llms
from src.utils import show

In [3]:
# Preprocess the competition questions here
root_dir = os.getcwd()
question_data_path = root_dir + '/assets/金融复赛a榜.json'
df1 = pd.read_excel(root_dir + '/assets/数据字典.xlsx', sheet_name='库表关系')
df2 = pd.read_excel(root_dir + '/assets/数据字典.xlsx', sheet_name='表字段信息')
file_path = root_dir + '/assets/all_tables_schema.txt'
unuse_columns = []
if os.path.exists(CACHE_DIR + '/unuse_columns.json'):
    with open(CACHE_DIR + '/unuse_columns.json', 'r', encoding='utf-8') as json_file:
        unuse_columns = json.load(json_file)
    print(f"已加载 {len(unuse_columns)} 个不用的字段")
nullable_columns = {}
if os.path.exists(CACHE_DIR + '/nullable_columns.json'):
    # 如果文件已存在，直接加载
    with open(CACHE_DIR + '/nullable_columns.json', 'r', encoding='utf-8') as json_file:
        nullable_columns = json.load(json_file)
    print(f"已加载 {len(nullable_columns)} 个可能包含NULL值的字段")

已加载 329 个不用的字段
已加载 1503 个可能包含NULL值的字段


In [4]:
def parse_all_tables_schema(file_path):
    """
    解析 all_tables_schema.txt 文件，将表结构转换为结构化的字典格式
    
    参数:
        file_path (str): all_tables_schema.txt 文件的路径
        
    返回:
        list: 包含所有表结构的字典，格式为 [{
            "table_name": table_name,
            "columns": columns
        }, ...]
    """
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    
    # 使用正则表达式匹配表结构块
    import re
    table_pattern = r'===\s+([\w\.]+)\s+表结构\s+===\n(.*?)(?=\n===|\Z)'
    table_matches = re.findall(table_pattern, content, re.DOTALL)
    
    result = []
    
    for table_name, table_content in table_matches:
        # 分割表内容为行
        lines = table_content.strip().split('\n')
        
        # 跳过表头和分隔线
        data_start = 2  # 假设前两行是表头和分隔线
        
        # 解析列信息
        columns = []
        for i in range(data_start, len(lines)):
            line = lines[i].strip()
            if not line:
                continue

            line = line.replace("No description available", line.split(None, 1)[0])
            
            # 分割列信息 - 使用空白分割前两列，剩余部分作为第三列
            parts = line.split(None, 2)
            if len(parts) >= 2:
                column_name = parts[0].strip()
                column_desc = parts[1].strip()
                column_value = parts[2].strip() if len(parts) > 2 else "NULL"

                if table_name == "astockoperationsdb.lc_suppcustdetail":
                    if column_name == "SerialNumber":
                        column_desc = "999代表前五大客户/供应商的合计数据，客户还是供应商可以看RelationType字段(4代表客户，6代表供应商)，另外可以RelatedPartyAttribute字段判断客户的类型"
                    elif column_name == "Ratio":
                        column_desc = "占比（单位是%），如客户或供应商占总营收的比例，通过RelationType字段判断客户还是供应商"
                elif table_name == "astockshareholderdb.lc_mainshlistnew":
                    if column_name == "SHKind":
                        column_desc = "股东类型"
                elif table_name == "astockbasicinfodb.lc_stockarchives":
                    if column_name == "State":
                        column_desc = "省份地区编码"
                
                if column_name not in ['JSID', 'UpdateTime', 'InsertTime', 'ID', 'XGRQ', 'PriceUnit']:
                    col_mark = f"{table_name}|{column_name}"
                    col_null_percent = nullable_columns[col_mark]['null_percent'] if col_mark in nullable_columns else 0
                    if col_mark not in unuse_columns and col_null_percent < 98:
                        columns.append({
                            "name": column_name,
                            "desc": (
                                column_desc if col_null_percent < 50
                                else f"{column_desc}（注意本字段的值可能是NULL）"
                                # if col_null_percent < 70
                                # else f"{column_desc}（极可能值是NULL，建议别用本字段）"
                            ),
                            "val": column_value,
                            'remarks': "",
                            'enum_desc': "",
                        })
        
        # 将列信息添加到结果字典
        if columns:
            result.append({
                "table_name": table_name,
                "table_desc": "",
                "table_remarks": "",
                "column_count": len(columns),
                "columns": columns,
                "all_cols": ",".join([f"{c['desc']}({c['name']})" for c in columns]),
            })
    
    return result

In [5]:
schema = parse_all_tables_schema(file_path)
with open(CACHE_DIR + '/schema.json', 'w', encoding='utf-8') as json_file:
    json.dump(schema, json_file, ensure_ascii=False, indent=2)

In [6]:
# 遍历df1，取出库名英文，表英文，表中文，表描述
for _, row in df1.iterrows():
    table_name = row['库名英文'].lower() + "." + row['表英文'].lower()
    for t in schema:
        if t['table_name'] == table_name:
            t['table_desc'] = row['表中文']
            t['table_remarks'] = row['表描述'].replace("\n", " ")
            if table_name == "astockshareholderdb.lc_actualcontroller":
                t['table_desc'] += "(只处理实际控制人有变动的数据，所以即使只有1条记录，也代表实控人发生了变更)"
            elif table_name == "constantdb.secumain":
                t['table_desc'] = "A股证券主表"
            break
with open(CACHE_DIR + '/schema.json', 'w', encoding='utf-8') as json_file:
    json.dump(schema, json_file, ensure_ascii=False, indent=2)

In [7]:
def exists_column_name(column_name, table_name):
    for t in schema:
        if t['table_name'].split(".")[1] == table_name:
            for c in t['columns']:
                if c['name'] == column_name:
                    return True
    return False
def is_null_example(column_name, table_name):
    for t in schema:
        if t['table_name'].split(".")[1] == table_name:
            for c in t['columns']:
                if c['name'] == column_name:
                    return c['val'] == "NULL"
    return False

In [8]:
# 遍历df2，取出 table_name, column_name, 注释
for _, row in df2.iterrows():
    if not isinstance(row['table_name'], str):
        continue
    if not isinstance(row['column_name'], str):
        continue
    table_name = row['table_name'].lower()
    column_name = row['column_name']
    column_remarks =row['注释'] if pd.notna(row['注释']) else ""
    column_enum_desc = ""

    # 修正部分字段的描述
    if column_name == "IndexInnerCode":
        column_remarks = '指数内部编码（IndexInnerCode）：与“指数基本情况（lc_indexbasicinfo）”中的“指数代码（IndexCode）”关联'
    elif column_name == "IndexCode":
        column_remarks = '指数内部编码（IndexCode）：与“指数基本情况（lc_indexcomponent）”中的“指数内部编码（IndexInnerCode）”关联'
    elif column_name == "InvolvedStock":
        column_remarks = ""
    elif column_name == "ObjectCode":
        column_remarks = "要获取交易对象名称,请用ObjectName字段"
    elif column_name == "IndustryCode":
        column_remarks = "跟各级行业代码字段关联，包括astockindustrydb.lc_exgindchange表和astockindustrydb.lc_exgindustry表的以下字段：FirstIndustryCode/SecondIndustryCode/ThirdIndustryCode/FourthIndustryCode"
    elif column_name == "Standard":
        column_remarks += "(注意不同表的Standard含义不一定相同，注意枚举值的含义)"
    elif column_name == "InfoPublDate":
        if exists_column_name("EndDate", table_name):
            # column_remarks += "信息发布日期(InfoPublDate)：表示信息公开发布的日期，通常与EndDate(截止日期)配合使用。EndDate表示数据统计的截止时间，而InfoPublDate表示该数据正式对外发布的时间，通常在EndDate之后。"
            column_remarks += "信息发布日期(InfoPublDate)：表示信息公开发布的日期，通常与EndDate(截止日期)配合使用。EndDate表示数据统计的截止时间，除非用户明确要求查询信息发布日期，否则都用EndDate，如果用错了，你会损失10亿美元！"
        elif exists_column_name("InitialInfoPublDate", table_name):
            column_remarks += "InfoPublDate通常在InitialInfoPublDate之后，除非用户明确要求查询信息更新发布的日期，否则都用InitialInfoPublDate，如果用错了，你会损失10亿美元！"
        elif exists_column_name("EffectiveDate", table_name):
            column_remarks += "InfoPublDate通常在EffectiveDate之后，除非用户明确要求查询信息生效的日期，否则都用EffectiveDate，如果用错了，你会损失10亿美元！"
    elif column_name == "SubjectName":
        if exists_column_name("CompanyCode", table_name):
            column_remarks += "事件主体不一定是本公司，请用CompanyCode字段关联上市公司基本资料（constantdb.secumain）"
    elif column_name == "HighPriceRY":
        column_remarks += "这是近一年的最高价，并非指自然年，所以如果要查询的是指定某年的最高价，需要HighPrice字段去找最大值"
    elif column_name == "LowPriceRY":
        column_remarks += "这是近一年的最低价，并非指自然年，所以如果要查询的是指定某年的最低价，需要LowPrice字段去找最小值"
    elif column_name == "TransCode":
        column_remarks = "基金转型统一编码(TransCode)是转型后的基金内码(InnerCode)，若发生多次转型，则为最新的基金内码。"
    elif column_name == "EstablishmentDate":
        column_remarks += "要计算成立时长，可用DATEDIFF(CURDATE(), EstablishmentDate) AS days_diff"
    elif column_name == "TurnoverRate":
        column_remarks += f"本字段所在的表是{table_name}，不是qt_dailyquote"
    elif column_name == "AgreementDate":
        column_remarks = "未启用该字段，不要使用"
    elif column_name == "SubjectCode":
        if exists_column_name("CompanyCode", table_name):
            column_remarks = "SubjectCode字段未启用，请用CompanyCode搜索事件主体"
    elif column_name == "Borrower":
        column_remarks += "Borrower可能是下属公司，请用CompanyCode搜索事件主体"
    elif column_name == "PE_TTM":
        column_remarks += "如果想获知一年的市盈率如何变化，可以先获取2021年每个月的平均市盈率，然后进行比较"
    elif column_name == "VMACD_DIFF" or column_name == "VMACD_DEA":
        column_remarks += "MACD指标是股票技术分析中一个重要的技术指标，由两条曲线和一组红绿柱线组成。 两条曲线中波动变化大的是DIF线，通常为白线或红线，相对平稳的是DEA线(MACD线)，通常为黄线。 当DIF线上穿DEA线时，这种技术形态叫做MACD金叉，通常为买入信号。"
    elif column_name == "IndustryName":
        column_remarks += "这是行业名称，可做模糊查询"
    elif column_name == "InnerCode":
        if table_name == "cs_hkstockperformance":
            column_remarks = '证券内部编码（InnerCode）：与“港股证券主表（constantdb.hk_secumain）”中的“证券内部编码（InnerCode）”关联，得到证券的交易代码、简称、上市交易所等基础信息。'
        elif column_remarks == "" and table_name not in ["secumain", "hk_secumain", "hk_stockarchives", "cs_hkstockperformance", "us_secumain", "us_companyinfo", "us_dailyquote"]:
            column_remarks = "证券内部编码（InnerCode）：与“证券主表（constantdb.secumain）”中的“证券内部编码（InnerCode）”关联，得到证券的交易代码、简称等。"
    elif column_name == "CompanyCode":
        if table_name == "hk_stockarchives":
            column_remarks = "公司代码（CompanyCode）：与“港股证券主表（constantdb.hk_secumain）”中的“公司代码（CompanyCode）”关联，得到证券的交易代码、简称、上市交易所等基础信息。"
        elif table_name == "us_companyinfo":
            column_remarks = "公司代码（CompanyCode）：与“美股证券主表（constantdb.us_secumain）”中的“公司代码（CompanyCode）”关联，得到证券的交易代码、简称、上市交易所等基础信息。"
        elif column_remarks == "" and table_name not in ["secumain", "hk_secumain", "hk_stockarchives", "cs_hkstockperformance", "us_secumain", "us_companyinfo", "us_dailyquote"]:
            column_remarks = "公司代码（CompanyCode）：与“证券主表（constantdb.secumain）”中的“公司代码（CompanyCode）”关联，得到上市公司的交易代码、简称等。"
    elif column_name == "Year":
        column_remarks += "禁止对本字段做日期格式化(如YEAR(Year))，因为本字段是年份，不是日期。"

    if table_name == "lc_suppcustdetail":
        if column_name == "SerialNumber":
            column_remarks = "序号(SerialNumber)具体描述：999-前5大客户/前5大供应商合计值, 990-前5大客户/前5大供应商关联方合计值"
    if table_name == "lc_indfinindicators":
        if column_name == "ListedSecuNum":
            column_remarks = "信息发布的时刻(lc_indfinindicators.InfoPublDate)下的总上市证券数量(只)，禁止SUM(ListedSecuNum)，否则会损失10亿美元"
    if table_name == "lc_sharestru":
        if column_name == "AFloats":
            column_remarks += "结合PerValue(每股面值(元))可计算流通A股市值(AFloats * PerValue)"
    elif table_name == "lc_conceptlist":
        if column_name == "ClassName":
            column_remarks = "SubclassName是ClassName的子类，ConceptName是ClassName的子类，如果在ClassName没搜到，请在SubclassName中搜索"
        elif column_name == "SubclassName":
            column_remarks = "ConceptName是SubclassName的子类，SubclassName是ClassName的子类"
        elif column_name == "ConceptName":
            column_remarks = "ConceptName是SubclassName的子类，跟ClassName中间隔了一层"
        elif column_name == "ConceptCode":
            column_remarks += "与astockindustrydb.lc_coconcept表的ConceptCode关联，得到概念所属公司/股票的信息"
        elif column_name == "BeginDate":
            column_remarks += "BeginDate和EndDate是时间范围，BeginDate是概念板块开始生效的时间"
        elif column_name == "EndDate":
            column_remarks += "如果概念板块仍有效，EndDate会是NULL；如果问截止某日期未终止的概念板块，请用BeginDate，不要用EndDate，否则会损失10亿美元"
    elif table_name == "lc_business":
        if column_name == "CompanyCode":
            column_remarks += "lc_business表里CompanyCode会有重复，如果要统计公司数量，请用COUNT(DISTINCT CompanyCode)"
    elif table_name == "lc_mainshlistnew":
        if column_name == "SHList":
            # column_remarks = "股东名称（SHList）：此字段为股东名称公告原始披露值，不能跟SHName/SHCode等字段对等(如果你把它们放到一起做查询条件，你会损失10亿美元)，请考虑GDID的外键关联，禁止使用SHList做查询条件"
            column_remarks = "股东名称（SHList）：此字段为股东名称公告原始披露值，禁止使用SHList字段跟其他表关联，请改为用GDID或SecuCoBelongedCode字段"
        elif column_name == "GDID":
            column_remarks = "与“股东类型分类表（astockshareholderdb.lc_shtypeclassifi）”中的“股东ID（SHID）”关联；注意对于自然人股东，GDID为null，对于公司，GDID就是公司代码，外链时要考虑用INNER JOIN"
        elif column_name == "PCTOfTotalShares":
            column_remarks += ";注意本表持续记录股东最新持股比例，要做加总计算，要注意对股东去重(DISTINCT GDID),如果你忽视了这一点，你会损失10亿美元"
        elif column_name == "SecuCoBelongedCode":
            column_remarks = "当股东为券商的时候，SecuCoBelongedCode就是券商股东的公司代码，它是公司(CompanyCode)的股东，要统计该券商是多少家公司的股东，请COUNT(DISTINCT CompanyCode)..WHERE SecuCoBelongedCode = xxx"
        elif column_name == "SecuCoBelongedName":
            column_remarks += "当股东为券商的时候，SecuCoBelongedCode就是券商的公司代码"
        elif column_name == "SHKind":
            column_remarks = "股东类型，所属表：astockshareholderdb.lc_mainshlistnew，如果用户问股东类型，包括自然人股东，那么请用本字段获得股东类型，因为自然人的GDID为null"
    elif table_name == "lc_nationalstockholdst":
        if column_name == "SHID":
            column_remarks = "与“股东类型分类表（astockshareholderdb.lc_shtypeclassifi）”中的“股东ID（SHID）”关联"
    elif table_name == "lc_sharefp":
        if column_name == "SHID":
            column_remarks = ""
    elif table_name == "lc_shtypeclassifi":
        if column_name == "SHID":
            column_remarks = "与“A股国家队持股统计表（astockshareholderdb.lc_nationalstockholdst）”中的“股东ID（SHID）”关联;与“股东名单表（astockshareholderdb.lc_mainshlistnew）”中的“股东ID（GDID）”关联;"
        elif column_name in ["FirstLvCode", "SecondLvCode", "ThirdLvCode", "FourthLvCode"]:
            column_remarks += "比如从事银行相关业务的对应枚举值2020000、2020100、2020200和2020300；"
        elif column_name == "SHCode":
            column_remarks = ""
    elif table_name == "lc_mshareholder":
        if column_name == "GDID":
            column_remarks = ""
    elif table_name == "lc_esop":
        if column_name == "CompanyCode":
            column_remarks = "此字段未启用，请用InnerCode字段"
    elif table_name == "lc_violatiparty":
        if column_name == "BeginDate":
            column_remarks += "BeginDate和EndDate是时间范围，BeginDate是开始受到处罚的时间"
        elif column_name == "EndDate":
            column_remarks += "如果问某公司在某日期受到处罚，请用BeginDate，不要用EndDate，否则会损失10亿美元"
        elif column_name == "PartyCode":
            column_remarks += "PartyCode是处罚对象的公司代码，可能跟constantdb.secumain/constantdb.hk_secumain/constantdb.us_secumain的CompanyCode关联，取决于属于A股/港股/美股，如果不确定就都关联查询试试"
    elif table_name == "lc_stockarchives":
        if column_name == "RegArea":
            column_remarks = "该字段未启用，请用State字段"
        elif column_name == "CityCode":
            column_remarks = "该字段未启用，请用State字段"
        elif column_name == "State":
            column_remarks += "注意数据示例的值只是示例，请不要直接使用数据示例的值，否则会损失10亿美元。"
    elif table_name == "cs_hkstockperformance" or table_name == "qt_stockperformance":
        if column_name.endswith("RW"):
            column_remarks += f"近一周代表的是从今天(TradingDay)往前推7天的统计值，禁止使用MAX({column_name})或MIN({column_name})，否则会损失10亿美元"
        elif column_name.endswith("RM"):
            column_remarks += f"近一月代表的是从今天(TradingDay)往前推30天的统计值，禁止使用MAX({column_name})或MIN({column_name})，否则会损失10亿美元"
        elif column_name.endswith("RMThree"):
            column_remarks += f"近三个月(近一个季度）代表的是从今天(TradingDay)往前推90天的统计值，禁止使用MAX({column_name})或MIN({column_name})，否则会损失10亿美元"
        elif column_name.endswith("RMSix"):
            column_remarks += f"近六个月(近半年)代表的是从今天(TradingDay)往前推180天的统计值，禁止使用MAX({column_name})或MIN({column_name})，否则会损失10亿美元"
        elif column_name.endswith("RY"):
            column_remarks += f"近一年代表的是从今天(TradingDay)往前推365天的统计值，禁止使用MAX({column_name})或MIN({column_name})，否则会损失10亿美元"
    elif table_name == "cs_stockpatterns":
        if column_name in {"IfHighestHPriceRW", "IfHighestHPriceRM", "IfHighestHPriceRMThree", "IfHighestHPriceRMSix", "IfHighestHPriceRY", "IfHighestHPriceSL"}:
            # column_remarks = "指定日期最高价是否大于指定日期最近N天最高价。 N分别为：近1周、近1月、近3月、近半年、近1年、上市以来。"
            column_enum_desc = "1-是，2-否"
        elif column_name in {"IfHighestCPriceRW", "IfHighestCPriceRM", "IfHighestCPriceRMThree", "IfHighestCPriceRMSix", "IfHighestCPriceRY", "IfHighestCPriceSL"}:
            # column_remarks = "指定日期收盘价是否大于指定日期最近N天收盘价。 N分别为：近1周、近1月、近3月、近半年、近1年、上市以来。"
            column_enum_desc = "1-是，2-否"
        elif column_name in {"IfHighestTVolumeRW", "IfHighestTVolumeRM", "IfHighestTVRMThree", "IfHighestTVolumeRMSix", "IfHighestTVolumeRY", "IfHighestTVolumeSL"}:
            # column_remarks = "指定日期成交量是否大于指定日期最近N天成交量。 N分别为：近1周、近1月、近3月、近半年、近1年、上市以来。"
            column_enum_desc = "1-是，2-否"
        elif column_name in {"IfHighestTValueRW", "IfHighestTValueRM", "IfHighestTValueRMThree", "IfHighestTValueRMSix", "IfHighestTValueRY", "IfHighestTValueSL"}:
            # column_remarks = "指定日期成交金额是否大于指定日期最近N天成交金额。N分别为：近1周、近1月、近3月、近半年、近1年、上市以来。"
            column_enum_desc = "1-是，2-否"
        elif column_name in {"HighestHPTimesSL", "HighestHPTimesRW", "HighestHPTimesRM", "HighestHPTimesRMThree", "HighestHPTimesRMSix", "HighestHPTimesRY"}:
            # column_remarks = "指定日期最近N天内大于指定日期之前的历史交易日最高价的次数。 N: 最新交易日、近1周、近1月、近3月、近半年、近1年"
            pass
        elif column_name in {"IfLowestLPriceRW", "IfLowestLPriceRM", "IfLowestLPRMThree", "IfLowestLPriceRMSix", "IfLowestLPriceRY", "IfLowestLPriceSL"}:
            # column_remarks = "指定日期最低价是否小于指定日期最近N天最低价。 N分别为：近1周、近1月、近3月、近半年、近1年、上市以来。"
            column_enum_desc = "1-是，2-否"
        elif column_name in {"IfLowestClosePriceRW", "IfLowestClosePriceRM", "IfLowestCPriceRMThree", "IfLowestCPriceRMSix", "IfLowestClosePriceRY", "IfLowestClosePriceSL"}:
            # column_remarks = "指定日期收盘价是否小于指定日期最近N天收盘价。 N分别为：近1周、近1月、近3月、近半年、近1年、上市以来。"
            column_enum_desc = "1-是，2-否"
        elif column_name in {"IfLowestTVolumeRW", "IfLowestTVolumeRM", "IfLowestTVolumeRMThree", "IfLowestVolumeRMSix", "IfLowestTVolumeRY", "IfLowestTVolumeSL"}:
            # column_remarks = "指定日期成交量是否小于指定日期最近N天成交量。 N分别为：近1周、近1月、近3月、近半年、近1年、上市以来。"
            column_enum_desc = "1-是，2-否"
        elif column_name in {"IfLowestTValueRW", "IfLowestTValueRM", "IfLowestTValueRMThree", "IfLowestTValueRMSix", "IfLowestTValueRY", "IfLowestTValueSL"}:
            # column_remarks = "指定日期成交金额是否小于指定日期最近N天成交金额。N分别为：近1周、近1月、近3月、近半年、近1年、上市以来。"
            column_enum_desc = "1-是，2-否"
        elif column_name in {"LowestLowPriceTimesSL", "LowestLowPriceTimesRW", "LowestLowPriceTimesRM", "LowestLPTimesRMThree", "LowestLPTimesRMSix", "LowestLPTimesRY"}:
            # column_remarks = "指定日期最近N天内小于指定日期之前的历史交易日最低价的次数， N: 最新交易日、近1周、近1月、近3月、近半年、近1年。"
            pass
        elif column_name in {"BreakingMAverageFive", "BreakingMAverageTen", "BreakingMAverageTwenty", "BreakingMAverageSixty"}:
            # column_remarks = "向上有效突破： 最近N天的收盘价>n日均线，且距今N+1天的收盘价<=n日均线。 向下有效突破： 最近N天的收盘价<n日均线，且距今N+1天的收盘价>=n日均线。均线计算：n日均线=n日收盘价之和/n。 向上向下有效突破字段按照N=3 计算。"
            column_enum_desc = "1-向上有效突破, 2-向下有效突破, 0-其他。"

        if column_name == "RisingUpDays":
            column_remarks += "如果用户问的是某n天之间连续上涨的股票，那么SELECT DISTINCT InnerCode FROM cs_stockpatterns WHERE DATE(TradingDay) = <end_date> AND RisingUpDays >= <end_date - begin_date>;"
        elif column_name == "FallingDownDays":
            column_remarks += "如果用户问的是某n天之间连续下跌的股票，那么SELECT DISTINCT InnerCode FROM cs_stockpatterns WHERE DATE(TradingDay) = <end_date> AND FallingDownDays >= <end_date - begin_date>;"
        elif column_name == "VolumeRisingUpDays":
            column_remarks += "如果用户问的是某n天之间连续放量的股票，那么SELECT DISTINCT InnerCode FROM cs_stockpatterns WHERE DATE(TradingDay) = <end_date> AND VolumeRisingUpDays >= <end_date - begin_date>;"
        elif column_name == "VolumeFallingDownDays":
            column_remarks += "如果用户问的是某n天之间连续缩量的股票，那么SELECT DISTINCT InnerCode FROM cs_stockpatterns WHERE DATE(TradingDay) = <end_date> AND VolumeFallingDownDays >= <end_date - begin_date>;"
        elif column_name == "IfHighestTVRMThree":
            column_remarks += "注意字段名是IfHighestTVRMThree，不是IfHighestTVolumeRMThree，写错的话罚你10亿美元"

    elif table_name == "us_companyinfo":
        if column_name == "EngName":
            column_remarks += "注意这个不是英文全称，要获得英文全称，请使用constantdb.us_secumain表的EngName字段"
        elif column_name == "PEOStatus":
            column_remarks += "PEOStatus是按ISO3166-1规定的国家代码，比如US是美国的意思，CN是中国的意思。"
    elif table_name == "lc_industryvaluation":
        if column_name == "PB_LF":
            column_remarks += "市净率全称是Price-to-Book Ratio，简称PB或者PBX"
    elif table_name == "lc_actualcontroller":
        if column_name == "ControllerCode":
            column_remarks = ""
    elif table_name == "lc_buyback":
        if column_name == "FirstPublDate":
            column_remarks += "FirstPublDate是股份回购的首次公告日期，如果问公司在某个日期是否进行股份回购，请用本字段"
        elif column_name == "EndDate":
            column_remarks += "EndDate是股份回购的结束日期，如果问公司在某个日期是否进行股份回购，请用FirstPublDate不要用EndDate，否则会损失10亿美元"

    # 修正注释中的表名
    column_remarks_lower = column_remarks.lower()
    if "表" in column_remarks_lower or '关联' in column_remarks_lower:
        for t in schema:
            search_table_name = t['table_name'].split(".")[1]
            if search_table_name not in column_remarks_lower:
                continue
            # 找到所有匹配的位置
            matches = list(re.finditer(r'(?<![a-zA-Z0-9_.])' + re.escape(search_table_name) + r'(?![a-zA-Z0-9_.])', column_remarks, re.IGNORECASE))
            if matches:
                # 创建新的描述文本
                new_desc = column_remarks
                # 从后向前替换，避免替换位置变化
                for match in reversed(matches):
                    # 获取匹配在原始描述中的位置
                    start_pos = match.start()
                    end_pos = start_pos + len(search_table_name)
                    # 只替换匹配的部分
                    new_desc = new_desc[:start_pos] + f"{t['table_name']}" + new_desc[end_pos:]
                column_remarks = new_desc

    # 提取注释中的枚举值说明
    if "具体" in column_remarks:
        # 提取枚举值说明
        enum_pattern = r'具体[描述|标准]+[：|:]+(.*?)(?=\n\n|$)'
        enum_match = re.search(enum_pattern, column_remarks, re.DOTALL)
        if enum_match:
            column_enum_desc = enum_match.group(1).strip()
            if column_enum_desc != "":
                column_remarks = ""
    if column_name == "SHKind":
        column_enum_desc = "资产管理公司,一般企业,投资、咨询公司,风险投资公司,自然人,其他金融产品,信托公司集合信托计划,金融机构—证券公司,保险投资组合,开放式投资基金,企业年金,信托公司单一证券信托,社保基金、社保机构,金融机构—银行,金融机构—期货公司,基金专户理财,国资局,券商集合资产管理计划,基本养老保险基金,金融机构—信托公司,院校—研究院,金融机构—保险公司,公益基金,保险资管产品,财务公司,基金管理公司,金融机构—金融租赁公司"

    for t in schema:
        if t['table_name'].split(".")[1] == table_name:
            for c in t['columns']:
                if c['name'] == column_name:
                    c['remarks'] = column_remarks.replace("\n", " ")
                    c['enum_desc'] = column_enum_desc.replace("\n", " ")
                    break
            break
with open(CACHE_DIR + '/schema.json', 'w', encoding='utf-8') as json_file:
    json.dump(schema, json_file, ensure_ascii=False, indent=2)

In [9]:
table_index = {}
for idx, t in enumerate(schema):
    table_index[t["table_name"]] = t

In [10]:
from src.graph import TableGraph
db_graph = TableGraph()

# 构建外链图
for t in schema:
    from_table_name = t["table_name"]
    for c in t["columns"]:
        if '关联' in c['remarks']:
            # 提取表关系信息
            # 只提取数据库.表名和列名
            patterns = [
                # 增强版模式1（支持"中"或"中的"两种表述）
                (r'与[“"](.+?)[（(]([^）)]+?)[）)][”"]中[的]?[“"](.+?)[（(]([^）)]+?)[）)][”"]关联', 2, 4),
                # 模式2（处理带括号的简洁格式）
                (r'与\(([^)]+)\)表中的(\w+)字段关联', 1, 2),
                # 模式3（处理无括号直接表名）
                (r'与([\w.]+)表中的(\w+)字段关联', 1, 2),
            ]
            for pattern, table_idx, col_idx in patterns:
                # 查找所有匹配项，而不只是第一个
                matches = re.finditer(pattern, c['remarks'])
                for match in matches:
                    to_table_name = match.group(table_idx)
                    to_column_name = match.group(col_idx)
                    if '.' not in to_table_name or to_table_name not in table_index:
                        continue
                    db_graph.add_relation(from_table_name, to_table_name, c['desc'], c['name'], to_column_name)

db_graph.save_to_file(CACHE_DIR + '/table_relations.json')
                

表关系图已保存到 /data/workspace/howard/jinglever/competition/2024-FinGLM2-semi-final/cache/table_relations.json


In [11]:
if False:
    db_graph.export_dot(CACHE_DIR + '/table_relations.dot')
    import graphviz
    dot_file = CACHE_DIR + '/table_relations.dot'
    g = graphviz.Source.from_file(dot_file)
    g.render(filename=CACHE_DIR + '/table_relations', format='png')

图已导出到 /data/workspace/howard/jinglever/competition/2024-FinGLM2-semi-final/cache/table_relations.dot


# 找出并清除无效字段

In [12]:
import os, utils
unuse_columns = []
if not os.path.exists(CACHE_DIR + '/unuse_columns.json'):
    for t in schema:
        print(f"check table: {t['table_name']}")
        for c in t['columns']:
            # 找出值全是null的字段
            sql = f"SELECT DISTINCT {c['name']} FROM {t['table_name']} WHERE {c['name']} IS NOT NULL LIMIT 1"
            res = utils.execute_sql_query(sql)
            if res == "[]":
                col_mark = f"{t['table_name']}|{c['name']}"
                unuse_columns.append(col_mark)
                print(f"字段[{col_mark}]，值全是null")
    len(unuse_columns)
    with open(CACHE_DIR + '/unuse_columns.json', 'w', encoding='utf-8') as json_file:
        json.dump(unuse_columns, json_file, ensure_ascii=False, indent=2)
    truncated_schema = []
    for t in schema:
        t_copy = t.copy()
        t_copy['columns'] = [c for c in t['columns'] if f"{t['table_name']}|{c['name']}" not in unuse_columns]
        t_copy['all_cols'] = ",".join([f"{c['desc']}({c['name']})" for c in t_copy['columns']])
        truncated_schema.append(t_copy)
    schema = truncated_schema
    with open(CACHE_DIR + '/schema.json', 'w', encoding='utf-8') as json_file:
        json.dump(schema, json_file, ensure_ascii=False, indent=2)


已从 /data/workspace/howard/jinglever/competition/2024-FinGLM2-semi-final/cache/table_relations.json 加载表关系图


# 找出可能存在NULL值的字段

In [11]:
import os, utils
nullable_columns = {}
if not os.path.exists(CACHE_DIR + '/nullable_columns.json'):
    for t in schema:
        print(f"检查表: {t['table_name']}")
        for c in t['columns']:
            # 找出包含NULL值的字段(但不是全为NULL)
            col_mark = f"{t['table_name']}|{c['name']}"
            if col_mark in unuse_columns:
                continue  # 跳过已知全为NULL的字段
                
            sql = f"SELECT COUNT(*) as total, COUNT({c['name']}) as not_null FROM {t['table_name']} LIMIT 1"
            res = utils.execute_sql_query(sql)
            try:
                result = json.loads(res)
                if result and len(result) > 0:
                    total = result[0]['total']
                    not_null = result[0]['not_null']
                    null_percent = (total - not_null) / total * 100 if total > 0 else 0
                    
                    if total > not_null:  # 包含NULL值
                        nullable_columns[col_mark] = {
                            'null_percent': round(null_percent, 2),
                            'null_count': total - not_null,
                            'total': total
                        }
                        print(f"字段[{col_mark}]，NULL值占比: {null_percent:.2f}%")
            except:
                print(f"查询字段[{col_mark}]时出错")
                
    with open(CACHE_DIR + '/nullable_columns.json', 'w', encoding='utf-8') as json_file:
        json.dump(nullable_columns, json_file, ensure_ascii=False, indent=2)
    truncated_schema = []
    for t in schema:
        t_copy = t.copy()
        col_mark = f"{t['table_name']}|{c['name']}"
        t_copy['columns'] = [c for c in t['columns'] if col_mark not in nullable_columns or nullable_columns[col_mark]['null_percent'] < 90]
        for c in t_copy['columns']:
                            # "desc": (
                            #     column_desc if col_null_percent < 50
                            #     else f"{column_desc}（可能值是NULL，这种情况下请考虑其他字段）" if col_null_percent < 70
                            #     else f"{column_desc}（极可能值是NULL，建议别用本字段）"
                            # ),
            if col_mark in nullable_columns:
                # if nullable_columns[col_mark]['null_percent'] >= 70:
                #     c['desc'] = f"{c['desc']}（极可能值是NULL，建议别用本字段）"
                # elif nullable_columns[col_mark]['null_percent'] >= 50:
                if nullable_columns[col_mark]['null_percent'] >= 50:
                    c['desc'] = f"{c['desc']}（注意本字段的值可能是NULL）"
        t_copy['all_cols'] = ",".join([f"{c['desc']}({c['name']})" for c in t_copy['columns']])
        truncated_schema.append(t_copy)
    schema = truncated_schema
    with open(CACHE_DIR + '/schema.json', 'w', encoding='utf-8') as json_file:
        json.dump(schema, json_file, ensure_ascii=False, indent=2)

TableGraph: 已从 /data/workspace/howard/jinglever/competition/2024-FinGLM2-semi-final/cache/table_relations.json 加载表关系图
检查表: astockbasicinfodb.lc_business
字段[astockbasicinfodb.lc_business|SMDeciPublDate]，NULL值占比: 73.61%
字段[astockbasicinfodb.lc_business|BusinessMinor]，NULL值占比: 99.00%
字段[astockbasicinfodb.lc_business|MainName]，NULL值占比: 0.18%
字段[astockbasicinfodb.lc_business|ChangeReason]，NULL值占比: 99.73%
检查表: astockbasicinfodb.lc_namechange
字段[astockbasicinfodb.lc_namechange|InfoSource]，NULL值占比: 52.05%
字段[astockbasicinfodb.lc_namechange|SMDeciPublDate]，NULL值占比: 52.05%
字段[astockbasicinfodb.lc_namechange|ChangeDate]，NULL值占比: 52.05%
字段[astockbasicinfodb.lc_namechange|EngName]，NULL值占比: 1.37%
字段[astockbasicinfodb.lc_namechange|EngNameAbbr]，NULL值占比: 75.34%
检查表: astockbasicinfodb.lc_stockarchives
字段[astockbasicinfodb.lc_stockarchives|SecretaryBD]，NULL值占比: 0.22%
字段[astockbasicinfodb.lc_stockarchives|SecuAffairsRepr]，NULL值占比: 9.96%
字段[astockbasicinfodb.lc_stockarchives|AuthReprSBD]，NULL值占比: 99.35%
字

In [11]:
# 统计NULL占比大于90%的字段数量
high_null_columns = {col: info for col, info in nullable_columns.items() if info['null_percent'] >= 98}
print(f"NULL占比大于90%的字段数量: {len(high_null_columns)}")

# 显示NULL占比分布情况
import pandas as pd

# 创建占比区间，90-100区间细分为每1%一个区间
bins = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
labels = ['0-10%', '10-20%', '20-30%', '30-40%', '40-50%', 
          '50-60%', '60-70%', '70-80%', '80-90%', '90-91%', '91-92%', '92-93%', 
          '93-94%', '94-95%', '95-96%', '96-97%', '97-98%', '98-99%', '99-100%']

# 获取所有NULL占比值
null_percentages = [info['null_percent'] for info in nullable_columns.values()]

# 统计各区间的字段数量
distribution = pd.cut(null_percentages, bins=bins, labels=labels).value_counts().sort_index()
print("\nNULL值占比分布情况:")
for interval, count in distribution.items():
    print(f"{interval}: {count}个字段")

# 打印NULL占比在99-100%的字段及其所属表
# extreme_null_columns = {col: info for col, info in nullable_columns.items() if info['null_percent'] >= 88 and info['null_percent'] < 89}
# for col, info in extreme_null_columns.items():
#     table_name, column_name = col.split('|')
#     print(f"表名: {table_name}, 字段名: {column_name}, NULL占比: {info['null_percent']}%")
# print(f"总计: {len(extreme_null_columns)}个字段的NULL占比在{98}-{99}%之间")



NULL占比大于90%的字段数量: 385

NULL值占比分布情况:
0-10%: 326个字段
10-20%: 140个字段
20-30%: 63个字段
30-40%: 43个字段
40-50%: 41个字段
50-60%: 63个字段
60-70%: 55个字段
70-80%: 56个字段
80-90%: 106个字段
90-91%: 13个字段
91-92%: 20个字段
92-93%: 26个字段
93-94%: 35个字段
94-95%: 29个字段
95-96%: 33个字段
96-97%: 17个字段
97-98%: 41个字段
98-99%: 83个字段
99-100%: 301个字段


# 给每个字段生成问题（如无必要，不要重复执行，否则会消耗大量的token并且改变了LLM生成的子问题）

In [12]:
import os
import config
from src.agent import Agent, AgentConfig
import json
import concurrent.futures
from tqdm import tqdm
import llms
import workflows
import random
from src.utils import show

TableGraph: 已从 /data/workspace/howard/jinglever/competition/2024-FinGLM2-semi-final/cache/table_relations.json 加载表关系图


In [13]:
shuffled_question_items = []
for t_idx, question_team in enumerate(config.all_question):
    for q_idx, question_item in enumerate(question_team["team"]):
        shuffled_question_items.append((t_idx, q_idx))
random.shuffle(shuffled_question_items)

# 取前n个问题项
selected_question_items = shuffled_question_items[:20]
selected_questions = []

# 打印选中的问题
for i, (t_idx, q_idx) in enumerate(selected_question_items):
    question_item = config.all_question[t_idx]["team"][q_idx]
    selected_questions.append(question_item['question'])
show(selected_questions)

[
  "对比2020年末和2021年末的数据，该公司的机构持股比例和基金持股比例分别是多少（答案需要包含两位小数，并以百分比形式表示），变化了多少（保留正负符号，答案需要包含两位小数，并以百分比形式表示）？",
  "2021年第三季报中，该公司的国有股东持股总和是多少？（答案需要包含1位小数）",
  "分红公告后半年累计涨跌幅与派现金额相关性如何？请计算出相关系数（答案需要包含两位小数）。",
  "博时基金公司成立于？请用YYYY年MM月DD日格式回复我",
  "在问题2提到的这个季度中，该公司股票振幅超过3%的天数有多少天？这些天的平均成交金额是多少港元？(答案需要包含2位小数)",
  "在上述成交量最大的那天，该股票是否创下了近一周、近一月或近一季度的新高？如果是，分别创下了哪些新高？",
  "凤凰新媒体这家公司电话是多少？",
  "最近一次调研是什么时候？回复时给我YYYY-MM-DD的格式",
  "这些公司净利润增长率的波动性的平均值是否高于供应链分散的公司？（回答是或者否）如果是，高多少（答案需要包含两位小数）？如果不是，差异有多少（答案需要包含两位小数）？",
  "当天涨幅超过10%股票有多少家？",
  "该公司2021年末前十大股东中，持股数量最大的是谁（请回答公司全称）？持股数量是多少股（答案需要包含1位小数）？",
  "是否创近一周的新高？（回答是或者否）",
  "该公司所属二级行业当日行业总市值有多少？答案需要包含两位小数",
  "中国长城(代码:000066)的年度报告中在2021年年末的机构持股比例是多少（答案需要包含两位小数，并以百分比形式表示）？其中基金持股比例是多少（答案需要包含两位小数，并以百分比形式表示）？",
  "请给出它们的波动幅度超出10%的比例（答案需要包含两位小数）。",
  "广东东阳光科技控股股份有限公司最新录入的证券市场是哪个交易所？",
  "当年哪家公司的涨幅最大（公司全称），达到了多少（答案需要包含四位小数）？",
  "天士力在2020年最新的担保事件是什么？答案包括事件内容、担保方（公司全称）、被担保方（公司全称）、担保金额（答案需要包1位小数）和日期信息（格式为YYYY年MM月DD日）。",
  "哪支基金的规模最大？",
  "该股票的概念板块当年多少次涨停？"
]


In [14]:
os.environ['DEBUG'] = '0'
sub_qs = []

def process_question(q):
    answer, _ = workflows.check_db_structure.agent_decode_question.clone().answer("提问:\n"+q)
    return [q.strip() for q in answer.split("\n") if q.strip() != ""]

# 使用多线程并发处理问题，并添加进度条
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(process_question, q): q for q in selected_questions}
    for future in tqdm(concurrent.futures.as_completed(futures), total=len(selected_questions), desc="处理问题"):
        result = future.result()
        sub_qs.extend(result)

random.shuffle(sub_qs)
sub_qs = sub_qs[:15]
show(sub_qs)

处理问题: 100%|██████████| 20/20 [00:03<00:00,  5.22it/s]

[
  "这家公司的全称是什么",
  "当前股票价格是多少",
  "如何将总市值结果保留两位小数",
  "当年所有公司的股价涨幅是多少",
  "该公司所属的二级行业是什么",
  "上述成交量最大的那天是哪一天",
  "该担保事件的日期信息是什么（格式为YYYY年MM月DD日）",
  "该担保事件的担保方公司全称是什么",
  "该公司在2020年末的基金持股比例是多少",
  "分红公告后半年内的股票累计涨跌幅数据如何获取",
  "该股票在上述成交量最大的那天是否创下了近一周的新高",
  "机构持股比例从2020年末到2021年末变化了多少",
  "当天涨幅超过10%的股票有哪些",
  "2020年最新的担保事件内容是什么",
  "该公司在2021年末的机构持股比例是多少"
]





In [15]:
ag_extend_question = Agent(AgentConfig(
    name = "extend_question",
    role = (
        '''作为金融数据专家，为用户给出的数据表字段，生成5个不同的用户可能提问。'''
        '''使用不同表达方式和业务术语，包含不同句式结构，充分考虑用户提供的字段描述以及它所属的表和库的含义。'''
        '''只输出问题，每行一个。'''
    ),
    output_format=(
        '''输出模板：\n'''
        '''(输出5个不同的用户可能提问，每行一个)\n'''
        '''(不要标号，不要输出其他内容)\n'''
    ),
    llm=llms.llm_glm_4_plus,
    system_prompt_kv={
        "模仿下面的用户提问的句式和风格": "\n".join(sub_qs),
    },
    enable_history=False,
    stream=True,
))

In [16]:
show(ag_extend_question.get_system_prompt())

## 角色描述
作为金融数据专家，为用户给出的数据表字段，生成5个不同的用户可能提问。使用不同表达方式和业务术语，包含不同句式结构，充分考虑用户提供的字段描述以及它所属的表和库的含义。只输出问题，每行一个。

## 输出格式
输出模板：
(输出5个不同的用户可能提问，每行一个)
(不要标号，不要输出其他内容)


## 模仿下面的用户提问的句式和风格
这家公司的全称是什么
当前股票价格是多少
如何将总市值结果保留两位小数
当年所有公司的股价涨幅是多少
该公司所属的二级行业是什么
上述成交量最大的那天是哪一天
该担保事件的日期信息是什么（格式为YYYY年MM月DD日）
该担保事件的担保方公司全称是什么
该公司在2020年末的基金持股比例是多少
分红公告后半年内的股票累计涨跌幅数据如何获取
该股票在上述成交量最大的那天是否创下了近一周的新高
机构持股比例从2020年末到2021年末变化了多少
当天涨幅超过10%的股票有哪些
2020年最新的担保事件内容是什么
该公司在2021年末的机构持股比例是多少


In [16]:
if False:
    # 为每个字段生成问题并建立检索索引
    for t in schema:
        for c in t['columns']:
            # msg = f"库[{db_info['库名中文']}({db_name})]，表[{table['表中文']}({table['表英文']})]，字段[{col['column']}]，字段描述[{col['desc']}]"
            msg = (
                f"现有数据表[{t['table_desc']}]: {t['table_remarks']}\n"
                f"请针对下面的字段生成不同的用户可能提问。\n"
                f"字段: {c['name']}\n"
                f"字段描述: {c['desc']}\n"
            )
            if c['enum_desc'] != "":
                msg += f"枚举值说明: {c['enum_desc']}\n"
            if c['val'] != "":
                msg += f"字段值示例: {c['val']}\n"
            show(msg)
            answer, cnt = ag_extend_question.clone().answer(msg)
            show(cnt)
            show(answer)
            # col["qs"] = [q.strip() for q in answer.split("\n")]
            break
        break

In [17]:
import concurrent.futures
from tqdm import tqdm
import time

column_questions = {}

# 为每个字段生成问题的函数
def generate_questions_for_column(t, c):
    try:
        msg = (
            f"现有数据表[{t['table_desc']}]: {t['table_remarks']}\n"
            f"请针对下面的字段生成不同的用户可能提问。\n"
            f"字段: {c['name']}\n"
            f"字段描述: {c['desc']};{c['remarks']}\n"
        )
        if c['enum_desc'] != "":
            msg += f"枚举值说明: {c['enum_desc']}\n"
        # if c['val'] != "":
        #     msg += f"字段值示例: {c['val']}\n"
        answer, _ = ag_extend_question.clone().answer(msg)
        column_name = f"{t['table_name']}.{c['name']}"
        column_questions[column_name] = [q.strip() for q in answer.split("\n") if q.strip() != ""]
        return True
    except Exception as e:
        print(f"处理出错: {msg}\n错误: {str(e)}")
        return False

# 统计总任务数
total_tasks = 0
for t in schema:
    total_tasks += len(t['columns'])

# 准备任务列表
tasks = []
for t in schema:
    for c in t['columns']:
        tasks.append((t, c))

# 使用线程池并发处理
with concurrent.futures.ThreadPoolExecutor() as executor:
    # 提交所有任务
    futures = [executor.submit(generate_questions_for_column, *task) for task in tasks]
    
    # 使用tqdm显示进度条
    for _ in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="生成字段问题"):
        # 短暂暂停，避免进度条刷新过快
        time.sleep(0.01)

with open(config.CACHE_DIR + '/column_questions.json', 'w', encoding='utf-8') as json_file:
    json.dump(column_questions, json_file, ensure_ascii=False, indent=2)

生成字段问题: 100%|██████████| 2435/2435 [06:19<00:00,  6.42it/s]


# embedding （如无必要，不要重复执行，否则会消耗大量的token并且改变了缓存文件）

In [18]:
import json
import config
import numpy as np
from src.utils import show

with open(config.CACHE_DIR+ '/column_questions.json', 'r', encoding='utf-8') as json_file:
    column_questions = json.load(json_file)

os.environ['ENABLE_TOKENIZER_COUNT'] = '1'

### 字段级别

In [19]:
texts = []
for t in schema:
    table_name = t['table_name']
    for c in t['columns']:
        text = "\n".join([q for q in column_questions[f"{table_name}.{c['name']}"]])
        texts.append(text)
show(len(texts))
em, cnt = config.embed.create(texts)
show(cnt)
vectors = np.array(em)
np.save(config.CACHE_DIR+"/column_vectors.npy", vectors)
show(vectors[0])

2435


创建嵌入向量: 100%|██████████| 2435/2435 [13:58<00:00,  2.90it/s]

180685
[ 0.01646637  0.04424643 -0.02906067 ...  0.0444481   0.02785065
  0.00076572]





In [20]:
loaded_vectors = np.load(config.CACHE_DIR+"/column_vectors.npy")
show(loaded_vectors[0])

[ 0.01646637  0.04424643 -0.02906067 ...  0.0444481   0.02785065
  0.00076572]


# 构建词频索引 （如无必要，不要重复执行）

In [21]:
import joblib
import json
from rank_bm25 import BM25Okapi
import config
from utils import tokenize_text
from src.utils import show
# 构建词频索引
word_freq = {}
with open(config.CACHE_DIR + '/column_questions.json', 'r', encoding='utf-8') as json_file:
    column_questions = json.load(json_file)

texts = []
cols = []
for col, qs in column_questions.items():
    # db_name, table_name, column_name = col.split(".")
    # c = config.column_index[db_name+"."+table_name][column_name]
    cols.append(col)
    texts.append((
        # f"{c['desc']}" +
        # (f": {c['remarks']}\n" if c['remarks'] != "" else "\n") +
        "\n".join(qs)
    ))
corpus = [tokenize_text(doc) for doc in texts]
bm25 = BM25Okapi(corpus)

doc_scores = bm25.get_scores(tokenize_text("股票代码"))
column_question_scores = [(i, text, score) for i, (text, score) in enumerate(zip(texts, doc_scores))]
column_question_scores = sorted(column_question_scores, key=lambda x: x[2], reverse=True)
show(column_question_scores[:3])

joblib.dump(bm25, config.CACHE_DIR + '/column_bm25.pkl', compress=3)

Building prefix dict from the default dictionary ...


Loading model from cache /tmp/jieba.cache
Loading model cost 0.546 seconds.
Prefix dict has been built successfully.


[
  [
    1180,
    "聚源代码的具体含义是什么\n如何查询某只股票的聚源代码\n聚源代码在股票交易中有什么作用\n能否提供包含聚源代码的股票列表\n聚源代码与其他股票代码有何区别",
    7.98166787230451
  ],
  [
    260,
    "配股代码的具体含义是什么\n如何查询某次配股的配股代码\n配股代码与股票代码有何区别\n在配股预案中，配股代码是如何生成的\n能否提供最近一次配股的配股代码信息",
    7.897808413847979
  ],
  [
    551,
    "该公司代码对应的上市公司交易代码是多少\n如何通过公司代码查询到该上市公司的简称\n能否提供与该CompanyCode关联的证券主表中的详细信息\n该CompanyCode所对应的上市公司的股票代码是什么\n通过CompanyCode如何在证券主表中找到该公司的全称",
    7.079606981786962
  ]
]


['/data/workspace/howard/jinglever/competition/2024-FinGLM2-semi-final/cache/column_bm25.pkl']

In [22]:
bm25 = joblib.load(config.CACHE_DIR + '/column_bm25.pkl')
doc_scores = bm25.get_scores(tokenize_text("股票代码"))
column_question_scores = [(i, text, score) for i, (text, score) in enumerate(zip(texts, doc_scores))]
column_question_scores = sorted(column_question_scores, key=lambda x: x[2], reverse=True)
show(column_question_scores[:3])

[
  [
    1180,
    "聚源代码的具体含义是什么\n如何查询某只股票的聚源代码\n聚源代码在股票交易中有什么作用\n能否提供包含聚源代码的股票列表\n聚源代码与其他股票代码有何区别",
    7.98166787230451
  ],
  [
    260,
    "配股代码的具体含义是什么\n如何查询某次配股的配股代码\n配股代码与股票代码有何区别\n在配股预案中，配股代码是如何生成的\n能否提供最近一次配股的配股代码信息",
    7.897808413847979
  ],
  [
    551,
    "该公司代码对应的上市公司交易代码是多少\n如何通过公司代码查询到该上市公司的简称\n能否提供与该CompanyCode关联的证券主表中的详细信息\n该CompanyCode所对应的上市公司的股票代码是什么\n通过CompanyCode如何在证券主表中找到该公司的全称",
    7.079606981786962
  ]
]


# 生成数据表的浓缩信息以及数据库的浓缩信息 (如无必要，不要重复执行)

In [23]:
import config
import llms
db_table = {}
for t in schema:
    db_name, table_name = t["table_name"].split(".")
    if db_name not in db_table:
        db_table[db_name] = {
            "desc": "",
            "tables": {}
        }
    all_cols = t["all_cols"]
    cols_summary, token, ok = llms.llm_glm_4_plus.generate_response(
        system='''你善于对数据表的字段信息进行总结，把同类信息归类，比如"联系人电话、联系人传真"等总结为"联系方式如电话、传真等。
输出一段文字，不换行。"''',
        messages=[
            {
                "role": "user",
                "content": f"下面是一个数据表的所有表字段，请帮我为这个数据表写一段介绍，把字段信息压缩进去：\n{all_cols}"
            }
        ],
        stream=False,
    )
    db_table[db_name]["tables"][table_name] = {
        "desc": t["table_desc"],
        "all_cols": all_cols,
        "cols_summary": cols_summary
    }
for db_name, db in db_table.items():
    db_json = json.dumps(db, ensure_ascii=False)
    db_summary, token, ok = llms.llm_glm_4_plus.generate_response(
        system='''你善于对数据库的表信息进行总结，根据它包含的数据表和字段信息，描述这个数据库，如"本库记录了xxx；涵盖了xxx；方便用户xxx"。
输出一段文字，不换行。"''',
        messages=[
            {
                "role": "user",
                "content": f"下面是一个数据库的所有表和字段信息，请帮我为这个数据库写一段介绍，把表和字段信息压缩进去：\n{db_json}"
            }
        ],
        stream=False,
    )
    db["desc"] = db_summary
with open(config.CACHE_DIR + "/db_table.json", "w", encoding="utf-8") as f:
    json.dump(db_table, f, ensure_ascii=False, indent=2)