In [177]:
# 从 Mongodb 数据库中加载数据

# 导入所需的库
import pandas as pd
from pymongo import MongoClient, DESCENDING # 导入 DESCENDING 用于排序
from tqdm import tqdm # 在 Jupyter 中使用 tqdm.notebook 获得更好的进度条体验
from datetime import date
# 配置数据库连接信息

MONGO_CONNECTION_STRING = "mongodb://localhost:27017/"
DB_NAME = "barra_financial_data"


# 创建一个辅助函数，用于将集合加载到 DataFrame

def load_collection_to_df(db, collection_name: str, query: dict, projection: dict) -> pd.DataFrame:
    """加载经过筛选和投影的集合数据。"""
    print(f"正在从 '{collection_name}' 加载数据...")
    collection = db[collection_name]
    cursor = collection.find(query, projection)
    df = pd.DataFrame(list(cursor))
    print(f"-> 成功加载 {len(df):,} 行数据。")
    return df

# 连接到数据库
client = MongoClient(MONGO_CONNECTION_STRING)
db = client[DB_NAME]
print("成功连接到 MongoDB。")


# 定义查询时间
start_date_str = "20200101"  # 股票日线开始日期
start_date_financial_str = "20190101"  # 财务数据开始日期
end_date_str = date.today().strftime('%Y%m%d')
print(f"数据筛选范围: {start_date_str} 到 {end_date_str}")


# 选择想要查询的股票池，如：沪深300、上证50等

# 首先连接到指数成分股集合，获取成分股列表
index_components_collection_name = "index_components" #
index_code = "000300.SH"

# 1. 查找最新的交易日期
print(f"\n正在为指数 '{index_code}' 查找最新交易日期...")
collection = db[index_components_collection_name]

# 使用 find_one() 配合 sort 可以高效地找到一个文档
latest_entry = collection.find_one(
    {"index_code": index_code},      # 筛选条件
    sort=[("trade_date", DESCENDING)]       # 按 trade_date 降序排列
)

# 检查是否找到了数据
if latest_entry:
    latest_trade_date = latest_entry['trade_date']
    print(f"-> 找到最新交易日期: {latest_trade_date}")

    # 2. 使用找到的最新日期来查询所有成分股
    index_info_query = {
        "index_code": index_code,
        "trade_date": latest_trade_date  # 使用动态获取的日期
    }
    index_info_projection = {"con_code": 1, "_id": 0}

    # 调用你的辅助函数加载数据
    index_constituents_df = load_collection_to_df(
        db,
        collection_name=index_components_collection_name,
        query=index_info_query,
        projection=index_info_projection
    )

    
    constituent_list = index_constituents_df['con_code'].tolist()
    print(f"\n共获取 {len(constituent_list)} 只成分股。")

else:
    print(f"!! 未能在集合 '{index_components_collection_name}' 中找到指数 '{index_code}' 的任何记录。")

# 根据成分股列表，加载所需的股票数据
# 日线行情：close, total_mv, pb, turnover_rate, pe_ttm, circ_mv

daily_prices_query = {
    "ts_code": {"$in": constituent_list},
    "trade_date": {"$gte": start_date_str, "$lte": end_date_str}
}

daily_prices_projection = {
    "ts_code": 1,
    "trade_date": 1,
    "close": 1,
    "total_mv": 1,
    "circ_mv": 1,
    "pb": 1,
    "turnover_rate": 1,
    "pe_ttm": 1,
    "_id": 0
}

daily_prices_df = load_collection_to_df(
    db,
    collection_name="daily_prices",
    query=daily_prices_query,
    projection=daily_prices_projection
)


# 现金流量：n_cashflow_act 
cashflow_query = {
    "ts_code": {"$in": constituent_list},
    "end_date": {"$gte": start_date_financial_str, "$lte": end_date_str}
}

cashflow_projection = {
    "ts_code": 1,
    "ann_date": 1,
    "f_ann_date": 1,
    "end_date": 1,
    "n_cashflow_act": 1,
    "_id": 0
}

cashflow_df = load_collection_to_df(
    db,
    collection_name="cashflow",
    query=cashflow_query,
    projection=cashflow_projection
)


# 财务指标： 'q_profit_yoy', 'q_sales_yoy', debt_to_assets
financial_indicators_query = {
    "ts_code": {"$in": constituent_list},
    "end_date": {"$gte": start_date_financial_str, "$lte": end_date_str}
}

financial_indicators_projection = {
    "ts_code": 1,
    "ann_date": 1, 
    "end_date": 1,
    "q_profit_yoy": 1,
    "q_sales_yoy": 1,
    "debt_to_assets": 1,
    "_id": 0
}

financial_indicators_df = load_collection_to_df(
    db,
    collection_name="financial_indicators",
    query=financial_indicators_query,
    projection=financial_indicators_projection
)




# 资产负债表： total_ncl, total_hldr_eqy_inc_min_int, 
balance_sheet_query = {
    "ts_code": {"$in": constituent_list},
    "end_date": {"$gte": start_date_financial_str, "$lte": end_date_str}
}
balance_sheet_projection = {
    "ts_code": 1,
    "f_ann_date": 1,
    "end_date": 1,
    "total_ncl": 1,
    "total_hldr_eqy_inc_min_int": 1,
    "_id": 0
}
balance_sheet_df = load_collection_to_df(
    db,
    collection_name="balancesheet",
    query=balance_sheet_query,
    projection=balance_sheet_projection 
)



# 指数数据: close

index_prices_query = {
    "ts_code": index_code,
    "trade_date": {"$gte": start_date_str, "$lte": end_date_str}            
}

index_prices_projection = {
    "ts_code": 1,
    "trade_date": 1,
    "close": 1,
    "_id": 0
}

index_prices_df = load_collection_to_df(
    db,
    collection_name="index_daily_prices",
    query=index_prices_query,
    projection=index_prices_projection
)




# 获取数据完毕后，将数据合并，涉及到时间格式

# 首先对财务数据去重
cashflow_processed_df = cashflow_df.copy()
cashflow_processed_df['f_ann_date'] = pd.to_datetime(cashflow_processed_df['f_ann_date'], format='%Y%m%d')
cashflow_processed_df['end_date'] = pd.to_datetime(cashflow_processed_df['end_date'], format='%Y%m%d')

# 先排序（按 f_ann_date 降序保留最新），然后去重保留第一条
cashflow_processed_df = cashflow_processed_df.sort_values(
    ['ts_code', 'end_date', 'f_ann_date'],
    ascending=[True, True, False]  # f_ann_date 降序，最新在前
).drop_duplicates(subset=['ts_code', 'end_date'], keep='first')

# 这次按照 end_date 降序，保留报告期最新的
cashflow_processed_df= cashflow_processed_df.sort_values(
    ['ts_code', 'f_ann_date', 'end_date'],
    ascending=[True, True, False]  # end_date 降序，最新在前
).drop_duplicates(subset=['ts_code', 'f_ann_date'], keep='first')

print('现金流数据去重处理与日期处理完毕')

balance_sheet_processed_df = balance_sheet_df.copy()

balance_sheet_processed_df['f_ann_date'] = pd.to_datetime(balance_sheet_processed_df['f_ann_date'], format='%Y%m%d')
balance_sheet_processed_df['end_date'] = pd.to_datetime(balance_sheet_processed_df['end_date'], format='%Y%m%d')

# 先排序（按 f_ann_date 降序保留最新），然后去重保留第一条
balance_sheet_processed_df = balance_sheet_processed_df.sort_values(
    ['ts_code', 'end_date', 'f_ann_date'],
    ascending=[True, True, False]  # f_ann_date 降序，最新在前
).drop_duplicates(subset=['ts_code', 'end_date'], keep='first')

balance_sheet_processed_df= balance_sheet_processed_df.sort_values(
    ['ts_code', 'f_ann_date', 'end_date'],
    ascending=[True, True, False]  # end_date 降序，最新在前
).drop_duplicates(subset=['ts_code', 'f_ann_date'], keep='first')

print('资产负债表数据去重处理与日期处理完毕')

financial_indicators_processed_df = financial_indicators_df.copy()

financial_indicators_processed_df['ann_date'] = pd.to_datetime(financial_indicators_processed_df['ann_date'], format='%Y%m%d')
financial_indicators_processed_df['end_date'] = pd.to_datetime(financial_indicators_processed_df['end_date'], format='%Y%m%d')

financial_indicators_processed_df = financial_indicators_processed_df.sort_values(
    ['ts_code', 'ann_date', 'end_date'],
    ascending=[True, True, False]  # end_date 降序，最新在前
).drop_duplicates(subset=['ts_code', 'ann_date'], keep='first')

print('财务指标数据日期处理完毕')

daily_prices_processed_df = daily_prices_df.copy()
daily_prices_processed_df['trade_date'] = pd.to_datetime(daily_prices_processed_df['trade_date'], format='%Y%m%d')

print('日线行情数据日期处理完毕')

index_prices_processed_df = index_prices_df.copy()
index_prices_processed_df['trade_date'] = pd.to_datetime(index_prices_processed_df['trade_date'], format='%Y%m%d')
print('指数行情数据日期处理完毕')

成功连接到 MongoDB。
数据筛选范围: 20200101 到 20251018

正在为指数 '000300.SH' 查找最新交易日期...
-> 找到最新交易日期: 20251009
正在从 'index_components' 加载数据...
-> 成功加载 300 行数据。

共获取 300 只成分股。
正在从 'daily_prices' 加载数据...
-> 成功加载 404,561 行数据。
正在从 'cashflow' 加载数据...
-> 成功加载 11,142 行数据。
正在从 'financial_indicators' 加载数据...
-> 成功加载 7,686 行数据。
正在从 'balancesheet' 加载数据...
-> 成功加载 10,389 行数据。
正在从 'index_daily_prices' 加载数据...
-> 成功加载 1,402 行数据。
现金流数据去重处理与日期处理完毕
资产负债表数据去重处理与日期处理完毕
财务指标数据日期处理完毕
日线行情数据日期处理完毕
指数行情数据日期处理完毕


In [115]:
cashflow_processed_df= cashflow_processed_df.sort_values(
    ['ts_code', 'f_ann_date', 'end_date'],
    ascending=[True, True, False]  # end_date 降序，最新在前
).drop_duplicates(subset=['ts_code', 'f_ann_date'], keep='first')

In [116]:
x = cashflow_processed_df[cashflow_processed_df['update_flag'] == '0'].copy()

In [117]:
# 检查重复项的数量
duplicate_count = x.duplicated(subset=['ts_code', 'f_ann_date']).sum()
print(f"在 financial_indicators 中找到了 {duplicate_count} 个 (ts_code, f_ann_date) 的重复项。")

# 如果想查看具体的重复行
if duplicate_count > 0:
    duplicates = x[x.duplicated(subset=['ts_code', 'f_ann_date'], keep=False)]
    print("\n以下是具体的重复数据示例：")
    print(duplicates.sort_values(by=['ts_code', 'f_ann_date']).head())

在 financial_indicators 中找到了 0 个 (ts_code, f_ann_date) 的重复项。


In [119]:
cashflow_processed_df[cashflow_processed_df['ts_code']=='000001.SZ']

Unnamed: 0,ts_code,ann_date,f_ann_date,end_date,n_cashflow_act,update_flag
33,000001.SZ,20190424,2019-04-24,2019-03-31,53184000000.0,0
32,000001.SZ,20190808,2019-08-08,2019-06-30,26432000000.0,1
31,000001.SZ,20191022,2019-10-22,2019-09-30,84480000000.0,0
29,000001.SZ,20200214,2020-02-14,2019-12-31,-40025000000.0,0
28,000001.SZ,20200421,2020-04-21,2020-03-31,17989000000.0,0
26,000001.SZ,20200828,2020-08-28,2020-06-30,-25666000000.0,1
25,000001.SZ,20201022,2020-10-22,2020-09-30,30698000000.0,1
23,000001.SZ,20210202,2021-02-02,2020-12-31,-16161000000.0,1
22,000001.SZ,20210421,2021-04-21,2021-03-31,-11553000000.0,0
21,000001.SZ,20210820,2021-08-20,2021-06-30,-115972000000.0,0


In [149]:
# 合并数据
daily_prices_processed_df = daily_prices_processed_df.sort_values(by=['trade_date'])
balance_sheet_processed_df = balance_sheet_processed_df.sort_values(by='f_ann_date')
financial_indicators_processed_df = financial_indicators_processed_df.sort_values(by='ann_date')
cashflow_processed_df = cashflow_processed_df.sort_values(by='f_ann_date')

In [148]:
# 检查重复项的数量
duplicate_count = financial_indicators_processed_df.duplicated(subset=['ts_code', 'ann_date']).sum()
print(f"在 financial_indicators_processed_df 中找到了 {duplicate_count} 个 (ts_code, ann_date) 的重复项。")

# 如果想查看具体的重复行
if duplicate_count > 0:
    duplicates = financial_indicators_processed_df[financial_indicators_processed_df.duplicated(subset=['ts_code', 'ann_date'], keep=False)]
    print("\n以下是具体的重复数据示例：")
    print(duplicates.sort_values(by=['ts_code', 'f_ann_date']).head())

在 financial_indicators_processed_df 中找到了 0 个 (ts_code, ann_date) 的重复项。


In [126]:
balance_sheet_processed_df

Unnamed: 0,ts_code,f_ann_date,end_date,total_ncl,total_hldr_eqy_inc_min_int
37,000001.SZ,2019-04-24,2019-03-31,,2.509380e+11
36,000001.SZ,2019-08-08,2019-06-30,,2.566030e+11
35,000001.SZ,2019-10-22,2019-09-30,,2.880730e+11
33,000001.SZ,2020-02-14,2019-12-31,,3.129830e+11
32,000001.SZ,2020-04-21,2020-03-31,,3.523550e+11
...,...,...,...,...,...
10362,688981.SH,2024-08-30,2024-06-30,6.415649e+10,2.207147e+11
10361,688981.SH,2024-11-08,2024-09-30,6.249007e+10,2.202995e+11
10360,688981.SH,2025-03-28,2024-12-31,6.276304e+10,2.291078e+11
10359,688981.SH,2025-05-09,2025-03-31,6.154068e+10,2.312315e+11


In [178]:
daily_prices_processed_df = daily_prices_processed_df.sort_values(by=['ts_code', 'trade_date'])
balance_sheet_processed_df = balance_sheet_processed_df.sort_values(by=['ts_code', 'f_ann_date'])

In [170]:
# 在 merge_asof 之前进行最终检查
print("--- 正在对左表 (daily_prices_processed_df) 进行最终诊断 ---")

# 1. 检查关键列是否存在空值
null_check = daily_prices_processed_df[['ts_code', 'trade_date']].isnull().sum()
print("关键列空值数量:\n", null_check)

# 2. 检查关键列的数据类型
print("\n关键列数据类型:")
print(daily_prices_processed_df[['ts_code', 'trade_date']].dtypes)

if null_check.sum() > 0:
    print("\n[!] 诊断结果: 发现空值！这是导致排序错误的极高概率原因。")
    # (可选) 展示存在空值的行，以便定位
    # print("存在空值的行示例:")
    # print(daily_prices_processed_df[daily_prices_processed_df['ts_code'].isnull() | daily_prices_processed_df['trade_date'].isnull()])
else:
    print("\n[✓] 诊断结果: 未在关键列发现空值。")

--- 正在对左表 (daily_prices_processed_df) 进行最终诊断 ---
关键列空值数量:
 ts_code       0
trade_date    0
dtype: int64

关键列数据类型:
ts_code               object
trade_date    datetime64[ns]
dtype: object

[✓] 诊断结果: 未在关键列发现空值。


In [171]:
# 确保你已经执行了排序
daily_prices_processed_df = daily_prices_processed_df.sort_values(by=['ts_code', 'trade_date'])

# 程序化验证排序是否在每个分组内都正确
is_left_sorted = daily_prices_processed_df.groupby('ts_code')['trade_date'].is_monotonic_increasing.all()

print(f"\n--- 排序验证 ---")
print(f"左表 (daily_prices) 是否在每个ts_code内都按trade_date正确排序? -> {is_left_sorted}")

if not is_left_sorted:
    print("\n[!] 验证失败: 左表排序不满足 merge_asof 的要求。")
    # 找出排序有问题的股票代码，帮助进一步分析
    bad_groups = daily_prices_processed_df.groupby('ts_code')['trade_date'].is_monotonic_increasing
    problematic_codes = bad_groups[~bad_groups].index.unique().tolist()
    print(f"排序有问题的股票代码 (前5个): {problematic_codes[:5]}")


--- 排序验证 ---
左表 (daily_prices) 是否在每个ts_code内都按trade_date正确排序? -> True


In [172]:
# --- 正在对右表 (balance_sheet_processed_df) 进行最终诊断 ---
print("--- 正在对右表 (balance_sheet_processed_df) 进行最终诊断 ---")

# 1. 检查关键列是否存在空值
null_check_right = balance_sheet_processed_df[['ts_code', 'f_ann_date']].isnull().sum()
print("关键列空值数量:\n", null_check_right)

# 2. 检查关键列的数据类型
print("\n关键列数据类型:")
print(balance_sheet_processed_df[['ts_code', 'f_ann_date']].dtypes)

if null_check_right.sum() > 0:
    print("\n[!] 诊断结果: 在右表中发现空值！这很可能是根本原因。")
else:
    print("\n[✓] 诊断结果: 未在右表关键列发现空值。")


# --- 对右表进行排序验证 ---
# 确保你已经执行了排序
balance_sheet_processed_df = balance_sheet_processed_df.sort_values(by=['ts_code', 'f_ann_date'])

# 程序化验证排序是否在每个分组内都正确
is_right_sorted = balance_sheet_processed_df.groupby('ts_code')['f_ann_date'].is_monotonic_increasing.all()

print(f"\n--- 右表排序验证 ---")
print(f"右表 (balance_sheet) 是否在每个ts_code内都按f_ann_date正确排序? -> {is_right_sorted}")

if not is_right_sorted:
    print("\n[!] 验证失败: 右表排序不满足 merge_asof 的要求。")
    # 找出排序有问题的股票代码
    bad_groups_right = balance_sheet_processed_df.groupby('ts_code')['f_ann_date'].is_monotonic_increasing
    problematic_codes_right = bad_groups_right[~bad_groups_right].index.unique().tolist()
    print(f"排序有问题的股票代码 (前5个): {problematic_codes_right[:5]}")

--- 正在对右表 (balance_sheet_processed_df) 进行最终诊断 ---
关键列空值数量:
 ts_code       0
f_ann_date    0
dtype: int64

关键列数据类型:
ts_code               object
f_ann_date    datetime64[ns]
dtype: object

[✓] 诊断结果: 未在右表关键列发现空值。

--- 右表排序验证 ---
右表 (balance_sheet) 是否在每个ts_code内都按f_ann_date正确排序? -> True


In [175]:
# --- Final Isolation Debugging ---

# Ensure both dataframes are fully sanitized and sorted one last time
daily_prices_processed_df = daily_prices_processed_df.reset_index(drop=True).sort_values(by=['ts_code', 'trade_date'])
balance_sheet_processed_df = balance_sheet_processed_df.reset_index(drop=True).sort_values(by=['ts_code', 'f_ann_date'])

# Get a unique list of all stock codes to iterate over
all_ts_codes = daily_prices_processed_df['ts_code'].unique()
problematic_code = None
merged_chunks = []

print(f"Starting isolation test for {len(all_ts_codes)} unique stock codes...")

for i, code in enumerate(all_ts_codes):
    # Create small, single-stock dataframes
    left_chunk = daily_prices_processed_df[daily_prices_processed_df['ts_code'] == code]
    right_chunk = balance_sheet_processed_df[balance_sheet_processed_df['ts_code'] == code]
    
    # Skip if there's no financial data for this stock
    if right_chunk.empty:
        merged_chunks.append(left_chunk) # Append the daily data directly
        continue

    try:
        # Attempt the merge on this single stock
        merged_chunk = pd.merge_asof(
            left=left_chunk,
            right=right_chunk,
            left_on='trade_date',
            right_on='f_ann_date',
            by='ts_code',
            direction='backward'
        )
        merged_chunks.append(merged_chunk)
        
        # Progress indicator
        if (i + 1) % 100 == 0:
            print(f"  ... successfully processed {i + 1} / {len(all_ts_codes)} stocks")

    except ValueError as e:
        # If it fails, we have found the culprit!
        print("\n" + "="*50)
        print(f"❌ FAILURE DETECTED!")
        print(f"The merge failed on ts_code: '{code}'")
        print(f"Error message: {e}")
        problematic_code = code
        
        # Print the data for the failing stock for inspection
        print("\n--- Data for failing left_chunk (Daily Prices) ---")
        print(left_chunk)
        print("\n--- Data for failing right_chunk (Balance Sheet) ---")
        print(right_chunk)
        print("="*50)
        break # Stop the loop

if problematic_code is None:
    print("\n✅ All stocks processed successfully individually.")
    # If the loop completes, we can safely combine the results
    final_merged_df = pd.concat(merged_chunks, ignore_index=True)
    print("Final DataFrame created successfully.")
else:
    print("\nPlease inspect the data printed above for the problematic stock code.")
    print("Look for duplicate dates, strange values, or anything unusual.")

Starting isolation test for 300 unique stock codes...
  ... successfully processed 100 / 300 stocks
  ... successfully processed 200 / 300 stocks
  ... successfully processed 300 / 300 stocks

✅ All stocks processed successfully individually.
Final DataFrame created successfully.


In [187]:
def robust_merge_asof(left_df, right_df, left_on, right_on, by):
    """
    A robust version of pd.merge_asof that merges group by group
    to avoid potential bugs with large DataFrames.
    """
    print(f"--- Performing robust merge_asof on '{by}' ---")
    
    # Sanitize and sort inputs
    left_df = left_df.reset_index(drop=True).sort_values(by=[by, left_on])
    right_df = right_df.reset_index(drop=True).sort_values(by=[by, right_on])
    
    all_keys = left_df[by].unique()
    merged_chunks = []
    
    for key in all_keys:
        left_chunk = left_df[left_df[by] == key]
        right_chunk = right_df[right_df[by] == key]
        
        merged_chunk = pd.merge_asof(
            left=left_chunk,
            right=right_chunk,
            left_on=left_on,
            right_on=right_on,
            by=by,
            direction='backward'
        )
        merged_chunks.append(merged_chunk)
        
    print("Robust merge successful.")
    return pd.concat(merged_chunks, ignore_index=True)

# --- How to use it ---
merged_df_1 = robust_merge_asof(
     daily_prices_processed_df,
     balance_sheet_processed_df,
     left_on='trade_date',
     right_on='f_ann_date',
     by='ts_code'
)

merged_df_1.rename(columns={
    'f_ann_date': 'balance_sheet_f_ann_date',
}, inplace=True)


merged_df_2 = robust_merge_asof(
     merged_df_1,
     financial_indicators_processed_df,
     left_on='trade_date',
     right_on='ann_date',
     by='ts_code'
)
merged_df_2.rename(columns={
    'ann_date': 'financial_indicators_ann_date',
}, inplace=True)

final_merged_df = robust_merge_asof(
     merged_df_2,
     cashflow_processed_df,
     left_on='trade_date',
     right_on='f_ann_date',
     by='ts_code'
)
final_merged_df.rename(columns={
    'f_ann_date': 'cashflow_f_ann_date',
}, inplace=True)    


columns_to_drop = ['end_date_y','ann_date','end_date_x'] 
# The date columns from the right tables are redundant after the merge.
# Pandas might add suffixes like _x, _y if column names conflict. Check final_merged_df.columns to be sure.

# It's good practice to check for and remove potentially conflicting columns
# For example, if 'f_ann_date' was in both balance_sheet and cashflow, it becomes f_ann_date_x and f_ann_date_y
final_merged_df = final_merged_df.drop(columns=columns_to_drop)


--- Performing robust merge_asof on 'ts_code' ---
Robust merge successful.
--- Performing robust merge_asof on 'ts_code' ---
Robust merge successful.
--- Performing robust merge_asof on 'ts_code' ---
Robust merge successful.


In [189]:
final_merged_df.columns

Index(['ts_code', 'trade_date', 'close', 'turnover_rate', 'pe_ttm', 'pb',
       'total_mv', 'circ_mv', 'balance_sheet_f_ann_date', 'total_ncl',
       'total_hldr_eqy_inc_min_int', 'financial_indicators_ann_date',
       'debt_to_assets', 'q_sales_yoy', 'q_profit_yoy', 'cashflow_f_ann_date',
       'end_date', 'n_cashflow_act'],
      dtype='object')

In [179]:
merged_df_1 = pd.merge_asof(
    left=daily_prices_processed_df,               # 日度数据作为左表
    right=balance_sheet_processed_df,          # 季度数据作为右表
    left_on='trade_date',        # 左表的日期键
    right_on='f_ann_date',         # 右表的日期键 (必须用公告日!)
    by='ts_code',                # 必须指定股票代码，确保数据在股票内部匹配
    direction='backward'         # 关键参数：'backward'表示对于每一行日度数据，向前(过去)寻找最近的财务数据
)

ValueError: left keys must be sorted

In [166]:
merged_df_1 = pd.merge_asof(
    left=daily_prices_processed_df,               # 日度数据作为左表
    right=balance_sheet_processed_df,          # 季度数据作为右表
    left_on='trade_date',        # 左表的日期键
    right_on='f_ann_date',         # 右表的日期键 (必须用公告日!)
    by='ts_code',                # 必须指定股票代码，确保数据在股票内部匹配
    direction='backward'         # 关键参数：'backward'表示对于每一行日度数据，向前(过去)寻找最近的财务数据
)

# --- 3. Second Merge: Result + Financial Indicators ---
# Now, merge the result with the financial indicators.
# Note that the 'right_on' key changes to 'ann_date' for this DataFrame.
merged_df_2 = pd.merge_asof(
    left=merged_df_1,
    right=financial_indicators_processed_df,
    left_on='trade_date',
    right_on='ann_date',    # Use the announcement date from financial indicators
    by='ts_code',
    direction='backward'
)

# --- 4. Final Merge: Result + Cash Flow ---
# Finally, merge the result with the cash flow data.
final_merged_df = pd.merge_asof(
    left=merged_df_2,
    right=cashflow_processed_df,
    left_on='trade_date',
    right_on='f_ann_date',  # Use the announcement date from the cash flow statement
    by='ts_code',
    direction='backward'
)

# The 'final_merged_df' now contains the combined data.
# You might want to clean up the extra date columns from the right tables.
columns_to_drop = ['f_ann_date_x', 'ann_date_x', 'f_ann_date_y','end_date_y','ann_date_y','end_date_x'] 
# The date columns from the right tables are redundant after the merge.
# Pandas might add suffixes like _x, _y if column names conflict. Check final_merged_df.columns to be sure.

# It's good practice to check for and remove potentially conflicting columns
# For example, if 'f_ann_date' was in both balance_sheet and cashflow, it becomes f_ann_date_x and f_ann_date_y
final_merged_df = final_merged_df.drop(columns=columns_to_drop)

ValueError: left keys must be sorted

In [163]:
final_merged_df =  final_merged_df.sort_values(by=['ts_code', 'trade_date'])

In [164]:
final_merged_df

Unnamed: 0,ts_code,trade_date,close,turnover_rate,pe_ttm,pb,total_mv,circ_mv,total_ncl,total_hldr_eqy_inc_min_int,debt_to_assets,q_sales_yoy,q_profit_yoy,end_date,n_cashflow_act
0,000001.SZ,2020-01-02,16.87,0.7885,11.6992,1.2210,3.273778e+07,3.273750e+07,,2.880730e+11,92.2304,19.3930,16.0079,2019-09-30,8.448000e+10
318,000001.SZ,2020-01-03,17.18,0.5752,11.9142,1.2434,3.333937e+07,3.333908e+07,,2.880730e+11,92.2304,19.3930,16.0079,2019-09-30,8.448000e+10
602,000001.SZ,2020-01-06,17.07,0.4442,11.8379,1.2355,3.312590e+07,3.312562e+07,,2.880730e+11,92.2304,19.3930,16.0079,2019-09-30,8.448000e+10
806,000001.SZ,2020-01-07,17.15,0.3755,11.8933,1.2413,3.328115e+07,3.328087e+07,,2.880730e+11,92.2304,19.3930,16.0079,2019-09-30,8.448000e+10
1189,000001.SZ,2020-01-08,16.66,0.4369,11.5535,1.2058,3.233026e+07,3.232998e+07,,2.880730e+11,92.2304,19.3930,16.0079,2019-09-30,8.448000e+10
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
403077,688981.SH,2025-10-13,136.47,7.0165,250.7944,7.2284,1.091770e+08,2.728803e+07,6.755544e+10,2.345195e+11,33.7830,17.3407,-12.1687,2025-06-30,5.897793e+09
403572,688981.SH,2025-10-14,127.20,6.6065,233.7587,6.7374,1.017609e+08,2.543444e+07,6.755544e+10,2.345195e+11,33.7830,17.3407,-12.1687,2025-06-30,5.897793e+09
403960,688981.SH,2025-10-15,129.56,4.0408,238.0957,6.8624,1.036489e+08,2.590633e+07,6.755544e+10,2.345195e+11,33.7830,17.3407,-12.1687,2025-06-30,5.897793e+09
404225,688981.SH,2025-10-16,127.30,3.0622,233.9425,6.7427,1.018409e+08,2.545443e+07,6.755544e+10,2.345195e+11,33.7830,17.3407,-12.1687,2025-06-30,5.897793e+09


正在从 'sw_industries' 加载数据...
-> 成功加载 3,000 行数据。


In [206]:
sw_industry_query = {}
sw_industry_projection = {
    "ts_code": 1,
    "l1_code": 1,
    "l1_name": 1,
    "in_date": 1,
    "out_date": 1,
    "is_new": 1,
    "_id": 0
}

sw_industry_data = load_collection_to_df(db, 'sw_industries', {}, sw_industry_projection)

正在从 'sw_industries' 加载数据...
-> 成功加载 5,724 行数据。


In [207]:
sw_industry_data

Unnamed: 0,l1_code,l1_name,ts_code,in_date,out_date,is_new
0,801150.SI,医药生物,688068.SH,20190911,,Y
1,801140.SI,轻工制造,002862.SZ,20170330,,Y
2,801080.SI,电子,300408.SZ,20140127,,Y
3,801710.SI,建筑材料,600819.SH,19940128,,Y
4,801880.SI,汽车,300580.SZ,20161212,,Y
...,...,...,...,...,...,...
5719,801080.SI,电子,600074.SH,20150401,,Y
5720,801740.SI,国防军工,600677.SH,20070702,,Y
5721,801770.SI,通信,600485.SH,20030804,,Y
5722,801180.SI,房地产,600240.SH,20000628,,Y


In [200]:
df = pd.read_csv('Barra_factor_cal/data/stk_sw_industry.csv')

In [203]:
df

Unnamed: 0,l1_code,l1_name,ts_code,name,in_date,out_date,is_new
0,801150.SI,医药生物,688068.SH,热景生物,20190911,,Y
1,801140.SI,轻工制造,002862.SZ,实丰文化,20170330,,Y
2,801080.SI,电子,300408.SZ,三环集团,20140127,,Y
3,801710.SI,建筑材料,600819.SH,耀皮玻璃,19940128,,Y
4,801880.SI,汽车,300580.SZ,贝斯特,20161212,,Y
...,...,...,...,...,...,...,...
5719,801080.SI,电子,600074.SH,退市保千(退市),20150401,,Y
5720,801740.SI,国防军工,600677.SH,*ST航通(退市),20070702,,Y
5721,801770.SI,通信,600485.SH,*ST信威(退市),20030804,,Y
5722,801180.SI,房地产,600240.SH,退市华业(退市),20000628,,Y
