In [5]:
import json
import os
import re
import datetime
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import requests
pd.set_option('display.max_colwidth', 200)
pd.set_option('display.max_columns', 90)

from tqdm import tqdm
from dotenv import load_dotenv
load_dotenv(verbose=True, override=True)

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all" 
# TEAM_TOKEN = os.getenv('TEAM_TOKEN')
# print(TEAM_TOKEN)

%load_ext autoreload
%autoreload 2
from utils import *
from embedding import find_similar_docs
from LLM import get_embeddings, LLM, init_log_path
from sql_retrieve import execute_sql
init_log_path('../log/test.log')

False

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### 原始表加载

In [6]:
data_save_path = "../data/interim1"
data_info_path  = '../data/raw/数据字典 .xlsx'
if not os.path.isdir(data_save_path):
    os.makedirs(data_save_path)

In [8]:
df_table = pd.read_excel(data_info_path)
df_col_origin = pd.read_excel(data_info_path, sheet_name='表字段信息')
df_table.shape, df_col_origin.shape

df_table['库_表名'] = df_table['库名中文'] + '.' + df_table['表中文']
df_table['table_name_all'] = df_table['库名英文'] + "." + df_table['表英文']
df_tmp = df_table[['table_name_all', '表英文', '库_表名']]
df_tmp.columns = ['table_name_all','table_name', '库_表名']

df_col_origin = df_col_origin[~df_col_origin['column_description'].isnull()]
df_col_origin = df_col_origin.merge(df_tmp, on='table_name', how='left')
df_col_origin = df_col_origin.drop_duplicates().reset_index(drop=True)
df_col_origin['注释'] = df_col_origin['注释'].fillna('')
print('字段数量:', df_col_origin.shape)
df_col_origin.drop(['Annotation'], inplace=True, axis=1)
df_col_origin.columns
df_col_origin.head(1)

((77, 5), (3489, 5))

字段数量: (3398, 7)


Index(['table_name', 'column_name', 'column_description', '注释',
       'table_name_all', '库_表名'],
      dtype='object')

Unnamed: 0,table_name,column_name,column_description,注释,table_name_all,库_表名
0,LC_StockArchives,ID,ID,,AStockBasicInfoDB.LC_StockArchives,上市公司基本资料.公司概况


### 空值统计

In [5]:
all_count = []
for i, row in tqdm(df_col_origin.iterrows()):
    col = row['column_name']
    table_name_all = row['table_name_all']
    # 以下是修改后的 SQL 语句，根据不同的列名动态生成
    sql = f"""SELECT 
                ROUND(SUM(CASE WHEN {col} IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS Null_Rate
            FROM {table_name_all};"""
    try:
        res_sql = execute_sql(sql)
        res = res_sql['data'][0]
    except:
        res = {'Null_Rate': '-100'}
    res['table_name_all'] = table_name_all
    res['col'] = col
    all_count.append(res)

df_null_count = pd.concat([df_col_origin.reset_index(drop=True), pd.DataFrame(all_count)[['Null_Rate']]], axis=1)
#[['库名中文', '表中文','table_name_all', 'column_name', 'column_description', 'Null_Rate']]
df_null_count['Null_Rate'] = df_null_count['Null_Rate'].astype(float)
print('空值统计:', df_null_count.shape)
df_null_count.to_excel(f'{data_save_path}/字段空值率统计.xlsx', index=False)

3398it [15:08,  3.74it/s]


空值统计: (3398, 7)


In [6]:
df_null_count = pd.read_excel(f'{data_save_path}/字段空值率统计.xlsx')
df_null_count = df_null_count.drop_duplicates()
df_null_count['table_name'] = df_null_count['table_name_all'].apply(lambda x: x.split('.')[1])
df_null_count.head(1)
len(df_null_count)

Unnamed: 0,table_name,column_name,column_description,注释,table_name_all,库_表名,Null_Rate
0,LC_StockArchives,ID,ID,,AStockBasicInfoDB.LC_StockArchives,上市公司基本资料.公司概况,0.0


3398

In [7]:
# 比例
df_ratio_col =  df_col_origin[df_col_origin['column_description'].str.contains('率')|
                       df_col_origin['column_description'].str.contains('比例')]
len(df_ratio_col)

res_max_v = []
for i, row in tqdm(df_ratio_col.iterrows()):
    col = row['column_name']
    table_name_all = row['table_name_all']

    sql = f"""SELECT 
                max({col}) as max_v
            FROM {table_name_all};"""
    res_sql = execute_sql(sql)
    res = res_sql['data'][0]
    res['table_name_all'] = table_name_all
    res['column_name'] = col
    
    res_max_v.append(res)

df_ratio_note = pd.DataFrame(res_max_v)
df_ratio_note.to_excel(f'{data_save_path}/df_ratio_note字段单位统计.xlsx', index=False)
11

318

0it [00:00, ?it/s]

318it [01:55,  2.74it/s]


11

In [8]:
# df_ratio_note = pd.read_excel('../data/interim/df_ratio_note字段单位统计.xlsx')
print(df_ratio_note.shape)
df_ratio_note = df_ratio_note[~df_ratio_note['max_v'].isnull()]
print('去除null:', df_ratio_note.shape)

df_ratio_note['max_v'] = pd.to_numeric(df_ratio_note['max_v'], errors='coerce')
df_ratio_note = df_ratio_note[df_ratio_note['max_v'] <= 2]
print('小数列数量:', df_ratio_note.shape)
df_ratio_note['额外注释'] = '值为小数'

# df_ratio_note.sample(3)
cols_temper = ['AccuPCTOfPled','AccuProportion', 'AccuProportionCalc','YearVolatilityByDay']
df_ratio_note[df_ratio_note['column_name'].isin(cols_temper)]

(318, 3)
去除null: (291, 3)
小数列数量: (39, 3)


Unnamed: 0,max_v,table_name_all,column_name,额外注释
132,1.0,AStockShareholderDB.LC_ShareFPSta,AccuPCTOfPled,值为小数
133,0.7723,AStockShareholderDB.LC_ShareFPSta,AccuProportion,值为小数
134,0.774762,AStockShareholderDB.LC_ShareFPSta,AccuProportionCalc,值为小数
255,1.467523,AStockMarketQuotesDB.QT_StockPerformance,YearVolatilityByDay,值为小数


### 表格处理

In [9]:
def get_date_note(df):
    df.loc[df['column_name']=='EndDate', '额外注释'] = '【日期字段】'
    df.loc[df['column_name']=='InfoPublDate', '额外注释'] = '【日期字段】'
    df.loc[df['column_name']=='TradingDay', '额外注释'] = '【日期字段】'
    df.loc[df['column_name']=='TradingDate', '额外注释'] = '【日期字段】'
    df.loc[df['column_description']=='上市日期', '额外注释'] = '【日期字段】'
    df.loc[(df['column_name']=='TranDate')&(df['table_name']=='LC_ShareTransfer'), '额外注释'] = '【日期字段】'
    df.loc[df['column_name']=='EstablishmentDate', '额外注释'] = '【日期字段】' # 成立日期
    df.loc[df['column_name']=='BeginDate', '额外注释'] = '【日期字段】' # 起始日期
    df.loc[df['column_name']=='InDate', '额外注释'] = '【日期字段】' # 调入日期
    df.loc[df['column_name']=='OutDate', '额外注释'] = '【日期字段】' # 调出日期
    df.loc[df['column_name']=='EffectiveDate', '额外注释'] = '【日期字段】' # 生效日期
    df.loc[df['column_name']=="FirstPublDate", '额外注释'] = '【日期字段】' # 首次信息发布日期
    for col in ['ReceptionDate', 'ReceptionDateE']:
        df.loc[df['column_name']==col, '额外注释'] = '【日期字段】' # 接待日期/截止日

    df.loc[df['column_name']=='Year', '额外注释'] = '【日期字段】,示例: 2020' # 日期
    
    for table in ['LC_ViolatiParty', 'LC_Mshareholder', 'LC_Buyback']:
        df.loc[(df['column_name']=='EndDate') & (df['table_name']==table), '额外注释'] = ''
    df.loc[(df['column_name']=='BeginDate') & (df['table_name']=='LC_ViolatiParty'), '额外注释'] = '【日期字段】'
    return df

def get_other_note(df):
    df.loc[df['column_description']=='证券代码', '额外注释'] = '股票'
    df.loc[df['column_name']=='SubclassName', '额外注释'] = "是'所属1级概念名称'的子类"
    df.loc[df['column_name']=='SubclassCode', '额外注释'] = "是'SubclassName'的对应代码"
    df.loc[df['column_name']=='ConceptName', '额外注释'] = "是'所属2级概念名称'的子类"
    df.loc[df['column_name']=='ConceptName', '额外注释'] = "是'所属2级概念名称'的子类"
    df.loc[df['column_name']=='ChiName', '额外注释'] = "公司全称"
    df.loc[df['column_name']=='ChiNameAbbr', '额外注释'] = "公司中文名称缩写,中文简称"
    df.loc[df['column_name']=='RegAddr', '额外注释'] = "可用 LIKE '%{city}%'对注册地址进行模糊匹配查询"
    df.loc[df['column_name']=='PartyName', '额外注释'] = "受到处罚的公司名称(公司全称)" #或者个人名称
    df.loc[df['column_name']=='Manager', '额外注释'] = "Manager字段中可能包含多个管理人的名字, 作为筛选条件要用Manager like '%某某%'"
    df.loc[(df['column_name']=='SerialNumber')&(df['table_name']=='LC_SuppCustDetail'), '额外注释'] = "999表示前5大客户、前5大供应商的合计值；990表示前5大客户、前5大供应商关联方合计值"
    df.loc[(df['column_name']=='SerialNumber')&(df['table_name']=='LC_SuppCustDetail'), 'column_description'] = "序号-前5大客户"
    # df.loc[df['column_name']=="ContractEffect", '额外注释'] = "详细描述了影响情况"
    df.loc[(df['column_name']=='IndustryName')&(df['table_name']=='LC_IndustryValuation'), 'column_description'] = "行业名称"

    # 非结构化数据处理
    df.loc[(df['column_name']=='Participant')&(df['table_name']=='LC_InvestorRa'), '额外注释'] = "记录了所有的人员，不同人员用；或者、分隔开。统计人员数量可以用以下SQL(计算替换；和、字符前后的长度差异)：(CHAR_LENGTH(Participant) - CHAR_LENGTH(REPLACE(REPLACE(Participant, '；', ''), '、', ''))) +1 AS Participant_count"
    
    df.loc[(df['column_name']=='ListingCreper')&(df['table_name']=='LC_InvestorRa'), '额外注释'] = """记录了所有人员，不同人员存在：时用；分隔，不存在：时用、分隔)，统计人员数量可以用以下SQL：
(CASE WHEN ListingCreper LIKE '%：%' THEN (CHAR_LENGTH(ListingCreper) - CHAR_LENGTH(REPLACE(ListingCreper, '；', ''))) +1
      WHEN ListingCreper LIKE '%、%' THEN (CHAR_LENGTH(ListingCreper) - CHAR_LENGTH(REPLACE(ListingCreper, '、', ''))) +1
      ELSE 1 END
) AS ListingCreper_count"""

    df.loc[(df['column_name']=='IssueObject')&(df['table_name']=='LC_AShareSeasonedNewIssue'), '额外注释'] = """记录了所有的发行对象，不同人员用、分隔开。统计人员数量可以用以下SQL(计算替换、字符前后的长度差异)：
(CHAR_LENGTH(IssueObject) - CHAR_LENGTH(REPLACE(IssueObject, '、', ''))) +1 AS IssueObject_count"""

    df.loc[(df['column_name']=='MainBusiness')&(df['table_name']=='LC_Mshareholder'), '额外注释'] = """记录了所有主要业务，不同业务用、或者；或者;分隔开。统计数量可以用以下SQL：
CHAR_LENGTH(MainBusiness) - CHAR_LENGTH(REGEXP_REPLACE(MainBusiness, '[;、；]', '')) + 1 as MainBusiness_count"""

    df.loc[(df['column_name']=='Lawyer')&(df['table_name']=='LC_SMAttendInfo'), '额外注释'] = """记录了所有律师，不同律师用、或者；或者;分隔开。统计数量可以用以下SQL：
CHAR_LENGTH(Lawyer) - CHAR_LENGTH(REGEXP_REPLACE(Lawyer, '[、]', '')) + 1 as Lawyer_count"""


    # 是否的注释
    df.loc[df['注释'].str.contains('1-是'), '额外注释'] = df[df['注释'].str.contains('1-是')]['注释'].apply(get_code_value_yesno)
    return df

def filter_df(df, drop_cols, drop_tables):
    for drop_item in drop_cols:
        table_name = drop_item['table_name']
        column_names = drop_item['column_name']
        mask = (df['table_name'] == table_name) & df['column_name'].isin(column_names)
        df = df[~mask]
    print('filter_df:', df.shape)
    df = df[~df['table_name'].isin(drop_tables)]
    return df

In [10]:
# drop_table = ['上市公司股票行情.日行情表'] # 和'股票行情表现(新)'重复
df_table['库_表名'] = df_table['库名中文'] + '.' + df_table['表中文']
df_table['table_name_all'] = df_table['库名英文'] + "." + df_table['表英文']
df_table['表描述简介'] = df_table['表描述'].apply(extract_data_for_desp)
df_table['表描述简介'] = df_table['表描述简介'].apply(lambda x: x.replace('1.', ''))
print('表描述简介完整性:', len(df_table[df_table['表描述简介']=='']))


#######字段处理
df_col = df_col_origin.merge(df_table[['库_表名', '库名中文', '表中文']], on='库_表名', how='left')
df_col = df_col.merge(df_ratio_note, how='left', on=['table_name_all', 'column_name'])
print('df_col:', df_col.shape)

drop_cols = [{'table_name': 'HK_SecuMain', 'column_name':['ChiNameAbbr', 'SecuCategory']},
             {'table_name': 'LC_InvestorDetail', 'column_name':['PersonalID']},
             {'table_name': 'LC_ESOP', 'column_name':['CompanyCode']}, # 空字段
             ]
drop_tables = ['LC_InvestorDetail']

## 注释字段
df_col = get_date_note(df_col)
df_col = get_other_note(df_col)

# 过滤字段
df_col = filter_df(df_col, drop_cols, drop_tables)
df_col = df_col.merge(df_null_count[['table_name_all', 'column_name', 'Null_Rate']], how='left',
                      on=['table_name_all', 'column_name'])

df_col = df_col[df_col['Null_Rate']<99]
df_col['column_description'] = df_col['column_description'].str.replace(r'1+$', '', regex=True)
df_col['Null_Rate_str'] = df_col['Null_Rate'].apply(lambda x: f"null ratio:{int(x)}%" if x > 15 else "")

# df_table = df_table[~df_table['库_表名'].isin(drop_table)]
# df_col = df_col[~df_col['库_表名'].isin(drop_table)]
print('-'*5, 'drop table 字段后', '-'*5)
print('表数量', df_table.shape)
print('字段数量:', df_col.shape)
print('日期字段数量', (df_col['额外注释']=='【日期字段】').sum())
print('带额外注释数据:', len(df_col[~df_col['额外注释'].isnull()]))

表描述简介完整性: 0
df_col: (3398, 10)
filter_df: (3394, 10)
----- drop table 字段后 -----
表数量 (77, 8)
字段数量: (2772, 12)
日期字段数量 94
带额外注释数据: 189


In [12]:
# df_tmp = df_col[~df_col['额外注释'].isnull()]
# df_tmp.shape
# df_tmp[df_tmp['额外注释'].str.contains('CHAR_LENGTH')][['table_name', 'column_name', 'column_description', '额外注释']]
df_col.head(1)
df_table.head(1)

Unnamed: 0,table_name,column_name,column_description,注释,table_name_all,库_表名,库名中文,表中文,max_v,额外注释,Null_Rate,Null_Rate_str
0,LC_StockArchives,ID,ID,,AStockBasicInfoDB.LC_StockArchives,上市公司基本资料.公司概况,上市公司基本资料,公司概况,,,0.0,


Unnamed: 0,库名中文,库名英文,表英文,表中文,表描述,库_表名,table_name_all,表描述简介
0,上市公司基本资料,AStockBasicInfoDB,LC_StockArchives,公司概况,收录上市公司的基本情况，包括：联系方式、注册信息、中介机构、行业和产品、公司证券品种及背景资料等内容。,上市公司基本资料.公司概况,AStockBasicInfoDB.LC_StockArchives,收录上市公司的基本情况，包括：联系方式、注册信息、中介机构、行业和产品、公司证券品种及背景资料等内容


In [15]:
df_table.to_parquet(f'{data_save_path}/df_table.parquet')
df_col.to_parquet(f'{data_save_path}/df_col.parquet')
print('ok')

ok


### embedding初始化

In [16]:
# tabel_columns & schema embedding
schema_embeddings_list = get_embeddings(df_table['库_表名'].tolist(), tqdm_flag=True)
table_columns_embeddings_list = get_embeddings(df_col['column_description'].tolist(), tqdm_flag=True)
len(schema_embeddings_list), len(table_columns_embeddings_list)
pd.DataFrame(table_columns_embeddings_list, columns=['text', 'embedding']).to_parquet(f'{data_save_path}/table_columns_embeddings.parquet')
pd.DataFrame(schema_embeddings_list, columns=['schema', 'embedding']).to_parquet(f'{data_save_path}/schema_embeddings.parquet')

100%|██████████| 4/4 [00:02<00:00,  1.62it/s]
100%|██████████| 139/139 [01:31<00:00,  1.52it/s]


(77, 2772)

In [19]:
# # 读取
# schema_embeddings_list = pd.read_parquet('../data/interim/schema_embeddings.parquet')
# schema_embeddings_list = [tuple(x.values()) for x in schema_embeddings_list.to_dict('records')]
# table_columns_embeddings_list = pd.read_parquet('../data/interim/table_columns_embeddings.parquet')
# table_columns_embeddings_list = [tuple(x.values()) for x in table_columns_embeddings_list.to_dict('records')]
# len(table_columns_embeddings_list), len(schema_embeddings_list)

#### 码值处理embedding

In [20]:
# 正常码值处理
df_col['code_value'] = df_col['注释'].apply(lambda x: get_code_value(x) if isinstance(x, str) else '')
df_col['code_v_dict'] = df_col['code_value'].apply(lambda x: code_value2dict(x) if isinstance(x, str) else {})
df_code_v = df_col[df_col['code_value']!=''][['code_v_dict','column_description', 'table_name', 'column_name', 'table_name_all']]
df_col = df_col.drop('code_v_dict', axis=1)
df_code_v.sample(2)
df_code_v.shape
df_code_v.to_parquet(f'{data_save_path}/df_code_v完整码值信息.parquet')

Unnamed: 0,code_v_dict,column_description,table_name,column_name,table_name_all
781,"[{'code': '1000', 'doc': '美元'}, {'code': '1001', 'doc': '美分'}, {'code': '1002', 'doc': '美元(次日'}, {'code': '1003', 'doc': '美元(同日'}, {'code': '1100', 'doc': '港元'}, {'code': '1110', 'doc': '印度卢比'}, {...",货币单位,LC_BuybackAttach,CurrencyUnit,AStockShareholderDB.LC_BuybackAttach
1118,"[{'code': '0', 'doc': '否'}, {'code': '1', 'doc': '是'}, {'code': '8', 'doc': '对价'}, {'code': '24', 'doc': '重整计划'}, {'code': '25', 'doc': '特殊分红'}, {'code': '26', 'doc': '面值拆分'}, {'code': '99', 'doc'...",是否分红,LC_Dividend,IfDividend,AStockFinanceDB.LC_Dividend


(244, 5)

In [21]:
def tmp_get_code_v_dict(data):
    result = []
    for item in data:
        new_item = {}
        new_item['doc'] = item['code']
        new_item['code'] = item['code']
        result.append(new_item)
    return result

## 数据字典没有添加的码值信息添加
### 年报
table_contain_InfoSource = df_code_v[(df_code_v['column_name']=='InfoSource')]['table_name'].to_list()
df_report = df_col[(df_col['column_name']=='InfoSource') & (~df_col['table_name'].isin(table_contain_InfoSource))]

### 案由描述等
cols2_desc = ['案由描述', '股东类别']
df_report2 = df_col[(df_col['column_description'].isin(cols2_desc))]
df_report = pd.concat([df_report2, df_report])
df_report.shape

# 查询码值
all_count = []
code_v_new_all = []
for i, row in tqdm(df_report.iterrows()):
    col = row['column_name']
    table_name_all = row['table_name_all']
    
    sql = f"""SELECT {col} as code,
                    count(*) as num
    FROM {table_name_all}
    GROUP BY {col}
    """
    res_sql = execute_sql(sql)
    res_data = [x for x in res_sql['data'] if (x['num'] > 8) & (x['code'] is not None)]
    all_count.append(res_data)

df_report['code_num'] = all_count
print('num:', df_report.shape)
df_report = df_report[(df_report['code_num'].apply(len)>0)]
print('code_num 有数据', df_report.shape)

df_report['code_v_dict'] = df_report['code_num'].apply(tmp_get_code_v_dict)
df_report.head(1)

(34, 13)

0it [00:00, ?it/s]

34it [00:16,  2.06it/s]

num: (34, 14)
code_num 有数据 (33, 14)





Unnamed: 0,table_name,column_name,column_description,注释,table_name_all,库_表名,库名中文,表中文,max_v,额外注释,Null_Rate,Null_Rate_str,code_value,code_num,code_v_dict
278,LC_MainSHListNew,SHType,股东类别,,AStockShareholderDB.LC_MainSHListNew,上市公司股东与股本/公司治理.股东名单(新),上市公司股东与股本/公司治理,股东名单(新),,,49.67,null ratio:49%,,"[{'code': '其他股东', 'num': 41804}, {'code': '外资股东', 'num': 4394}, {'code': '国有股东', 'num': 12065}]","[{'doc': '其他股东', 'code': '其他股东'}, {'doc': '外资股东', 'code': '外资股东'}, {'doc': '国有股东', 'code': '国有股东'}]"


In [22]:
# 数据字典码值信息 + 正常码值处理 合并处理
df_code_v_all = pd.concat([df_code_v, df_report[df_code_v.columns]]).reset_index(drop=True)
df_code_v_all.shape
# 查询码值
all_count = []
code_v_new_all = []
for i, row in tqdm(df_code_v_all.iterrows()):
    col = row['column_name']
    table_name_all = row['table_name_all']
    code_v = row['code_v_dict']
    sql = f"""SELECT {col} as code,
                    count(*) as num
    FROM {table_name_all}
    GROUP BY {col}
    """
    res_sql = execute_sql(sql)
    all_count.append(res_sql['data'])

    code_true = [str(x['code']) for x in res_sql['data']]
    # 删除和保留的码值信息
    code_delete = [x for x in code_v if x['code'] not in code_true]
    code_v_new = [x for x in code_v if x['code'] in code_true]
    code_v_new_all.append(code_v_new) 
    # print(f'D: {table_name_all}, {col}:', code_delete)

df_code_v_all['code_num'] = all_count
df_code_v_all['code_v_new'] = code_v_new_all
df_code_v_all.head(1)

(277, 5)

277it [01:11,  3.90it/s]


Unnamed: 0,code_v_dict,column_description,table_name,column_name,table_name_all,code_num,code_v_new
0,"[{'code': '1', 'doc': '是；0-否'}]",是否否决,LC_NameChange,IfPassed,AStockBasicInfoDB.LC_NameChange,"[{'code': 0, 'num': 73}]",[]


In [23]:
# 保存码值描述
df_code_v_desc = pd.DataFrame()
for _, row in df_code_v_all.iterrows():
    df_tmp = pd.DataFrame(row['code_v_new'])
    df_tmp['column_description'] = row['column_description']
    df_tmp['table_name'] = row['table_name']
    df_code_v_desc = pd.concat([df_code_v_desc, df_tmp])
df_code_v_desc.shape

(2410, 4)

In [24]:
# embedding存储
code_v_list = list(df_code_v_desc['doc'].unique())
len(code_v_list)

codev_emb_list = get_embeddings(code_v_list, tqdm_flag=True)
pd.DataFrame(codev_emb_list, columns=['code_value', 'embedding']).to_parquet(f'{data_save_path}/codev_emb_list.parquet')
df_code_v_desc.to_parquet(f'{data_save_path}/df_code_v_desc.parquet')
print('ok')

1699

100%|██████████| 85/85 [00:55<00:00,  1.53it/s]

ok



