In [1]:
import pandas as pd
import akshare as ak
import numpy as np
import sqlite3 # 导入SQLite数据库接口

# --- 核心函数：定义合并逻辑 (与之前相同) ---
def safe_coerce_numeric(series):
    """辅助函数：安全地将Series转换为数值类型，无法转换的变为NaN"""
    return pd.to_numeric(series, errors='coerce')

def merge_columns(df, col1, col2, priority='ths', data_type='object'):
    """智能合并两个列的函数"""
    primary_col = df[col2] if priority == 'ths' else df[col1]
    secondary_col = df[col1] if priority == 'ths' else df[col2]
    merged_series = primary_col.fillna(secondary_col)
    if data_type == 'numeric':
        return safe_coerce_numeric(merged_series)
    elif data_type == 'datetime':
        # 在存入SQLite时，Pandas会自动处理datetime对象，这里先保持对象状态
        return pd.to_datetime(merged_series, errors='coerce')
    else:
        return merged_series.astype(str).replace('nan', '')

# --- 主程序开始 ---
print("启动最终数据整合与规范化任务...")

try:
    # 步骤 1 & 2: 获取数据并进行精准过滤
    print("正在获取并过滤数据...")
    em_df = ak.bond_zh_cov()
    ths_df = ak.bond_zh_cov_info_ths()
    em_df_prefixed = em_df.add_prefix('东方财富_')
    ths_df_prefixed = ths_df.add_prefix('同花顺_')
    merged_df = pd.merge(em_df_prefixed, ths_df_prefixed,
                         left_on='东方财富_债券代码', right_on='同花顺_债券代码',
                         how='outer', indicator=True)
    status_map = {'left_only': '仅东方财富', 'right_only': '仅同花顺', 'both': '同时存在'}
    merged_df['数据来源'] = merged_df['_merge'].map(status_map)
    merged_df.drop(columns=['_merge'], inplace=True)

    is_non_convertible_bond = (merged_df['东方财富_债券简称'].str.contains('债', na=False)) & \
                              (~merged_df['东方财富_债券简称'].str.contains('转债', na=False))
    filtered_out_df = merged_df[is_non_convertible_bond].copy()
    main_df = merged_df[~is_non_convertible_bond].copy()
    
    removed_file_name = "非可转债记录清单.xlsx"
    filtered_out_df.to_excel(removed_file_name, index=False, engine='openpyxl')
    print(f"已将非可转债记录保存至: {removed_file_name}")

    # 步骤 3: 标记状态与诊断差异
    main_df['债券状态'] = '正常'
    main_df.loc[main_df['东方财富_债券简称'].str.contains('退', na=False), '债券状态'] = '已退市'
    
    column_pairs_to_compare = {
        '东方财富_债券简称': ('同花顺_债券简称', '债券简称'),'东方财富_上市时间': ('同花顺_上市日期', '上市日期'),
        '东方财富_申购日期': ('同花顺_申购日期', '申购日期'),'东方财富_正股简称': ('同花顺_正股简称', '正股简称'),
        '东方财富_转股价': ('同花顺_转股价格', '转股价'),'东方财富_中签率': ('同花顺_中签率', '中签率'),
        '东方财富_发行规模': ('同花顺_计划发行量', '发行规模')
    }
    main_df['数据差异说明'] = ''
    for em_col, (ths_col, field_name) in column_pairs_to_compare.items():
        em_series = safe_coerce_numeric(main_df[em_col]) if field_name not in ['债券简称', '正股简称'] else main_df[em_col]
        ths_series = safe_coerce_numeric(main_df[ths_col]) if field_name not in ['债券简称', '正股简称'] else main_df[ths_col]
        diff_mask = (em_series != ths_series) & (em_series.notna()) & (ths_series.notna())
        main_df.loc[diff_mask & (main_df['数据来源'] == '同时存在'), '数据差异说明'] += f'{field_name}不一致; '
    main_df['数据差异说明'] = main_df['数据差异说明'].str.strip('; ')
    print("状态标记与差异诊断完成。")
    
    # 步骤 4: 创建最终的统一数据框
    print("正在整合列并创建最终数据集...")
    final_df = pd.DataFrame()
    final_df['债券代码'] = merge_columns(main_df, '东方财富_债券代码', '同花顺_债券代码', priority='ths')
    final_df['债券简称'] = merge_columns(main_df, '东方财富_债券简称', '同花顺_债券简称', priority='em')
    final_df['正股代码'] = merge_columns(main_df, '东方财富_正股代码', '同花顺_正股代码', priority='ths')
    final_df['正股简称'] = merge_columns(main_df, '东方财富_正股简称', '同花顺_正股简称', priority='ths')
    final_df['转股价'] = merge_columns(main_df, '东方财富_转股价', '同花顺_转股价格', priority='ths', data_type='numeric')
    final_df['中签率_percent'] = merge_columns(main_df, '东方财富_中签率', '同花顺_中签率', priority='ths', data_type='numeric')
    final_df['发行规模_yuan'] = merge_columns(main_df, '东方财富_发行规模', '同花顺_计划发行量', priority='ths', data_type='numeric')
    final_df['申购日期'] = merge_columns(main_df, '东方财富_申购日期', '同花顺_申购日期', priority='ths', data_type='datetime')
    final_df['上市日期'] = merge_columns(main_df, '东方财富_上市时间', '同花顺_上市日期', priority='ths', data_type='datetime')
    final_df['中签公布日'] = merge_columns(main_df, '东方财富_中签号发布日', '同花顺_中签公布日', priority='ths', data_type='datetime')
    final_df['正股价'] = safe_coerce_numeric(main_df['东方财富_正股价'])
    final_df['转股价值'] = safe_coerce_numeric(main_df['东方财富_转股价值'])
    final_df['债现价'] = safe_coerce_numeric(main_df['东方财富_债现价'])
    final_df['转股溢价率_percent'] = safe_coerce_numeric(main_df['东方财富_转股溢价率'])
    final_df['信用评级'] = main_df['东方财富_信用评级'].astype(str).replace('nan', '')
    final_df['到期时间'] = pd.to_datetime(main_df['同花顺_到期时间'], errors='coerce')
    final_df['申购代码'] = merge_columns(main_df, '东方财富_申购代码', '同花顺_申购代码', priority='ths')
    final_df['中签号'] = main_df['同花顺_中签号'].replace('nan', '')
    final_df['债券状态'] = main_df['债券状态']
    final_df['数据来源'] = main_df['数据来源']
    final_df['数据差异说明'] = main_df['数据差异说明']
    
    # === 步骤 5: **【核心变更】** 存储到SQLite数据库 ===
    db_file = "convertible_bonds_database.db"
    table_name = "bonds_data"
    
    # 建立数据库连接 (如果文件不存在，会自动创建)
    conn = sqlite3.connect(db_file)
    print(f"正在将数据写入SQLite数据库 '{db_file}' 的 '{table_name}' 表中...")
    
    # 使用 a.to_sql() 方法将DataFrame写入数据库
    # if_exists='replace' 表示如果表已存在，则删除旧表并创建新表。这确保了每次运行都是最新数据。
    # index=False 表示不将DataFrame的索引写入数据库。
    final_df.to_sql(name=table_name, con=conn, if_exists='replace', index=False)
    
    # 关闭数据库连接
    conn.close()
    
    print("\n任务圆满成功！")
    print(f"最终数据集已成功存入SQLite数据库: {db_file}")

except Exception as e:
    print(f"\n处理过程中发生严重错误: {e}")

启动最终数据整合与规范化任务...
正在获取并过滤数据...


  0%|          | 0/2 [00:00<?, ?it/s]

  big_df = pd.concat(objs=[big_df, temp_df], ignore_index=True)


已将非可转债记录保存至: 非可转债记录清单.xlsx
状态标记与差异诊断完成。
正在整合列并创建最终数据集...
正在将数据写入SQLite数据库 'convertible_bonds_database.db' 的 'bonds_data' 表中...

任务圆满成功！
最终数据集已成功存入SQLite数据库: convertible_bonds_database.db


In [2]:
import akshare as ak
import pandas as pd
import sqlite3
import time
from tqdm import tqdm

# --- 配置区 ---
DB_NAME = "convertible_bonds_database.db"
SOURCE_TABLE_NAME = "bonds_data"
TARGET_TABLE_NAME = "bond_details_eastmoney"
SUCCESS_LOG_FILE = "success_log.txt"
ERROR_LOG_FILE = "error_log.txt"

def initialize_database_connection(db_name):
    """建立数据库连接"""
    try:
        conn = sqlite3.connect(db_name)
        print(f"成功连接到数据库: {db_name}")
        return conn
    except sqlite3.Error as e:
        print(f"数据库连接失败: {e}")
        return None

def get_bond_symbols_to_fetch(conn):
    """
    从源数据表中获取所有债券代码。
    同时，检查目标表中已存在的代码，避免重复抓取。
    """
    try:
        # 检查目标表是否存在，如果不存在，则所有代码都需要获取
        cursor = conn.cursor()
        cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{TARGET_TABLE_NAME}';")
        if cursor.fetchone() is None:
            print(f"目标表 {TARGET_TABLE_NAME} 不存在，将获取所有债券信息。")
            existing_symbols = set()
        else:
            df_existing = pd.read_sql_query(f'SELECT DISTINCT "SECURITY_CODE" FROM {TARGET_TABLE_NAME}', conn)
            existing_symbols = set(df_existing["SECURITY_CODE"].astype(str))
            print(f"目标表中已存在 {len(existing_symbols)} 条债券的详细信息。")

        # 从源表获取所有债券代码，不再根据“债券状态”进行过滤
        df_source = pd.read_sql_query(f'SELECT "债券代码" FROM {SOURCE_TABLE_NAME}', conn)
        source_symbols = set(df_source["债券代码"].astype(str))
        
        # 计算需要新获取的债券代码
        symbols_to_fetch = list(source_symbols - existing_symbols)
        print(f"源表中共有 {len(source_symbols)} 条债券，其中 {len(symbols_to_fetch)} 条是新增的，需要获取详细数据。")
        
        return symbols_to_fetch
        
    except Exception as e:
        print(f"从数据库获取债券代码列表时出错: {e}")
        return []

def fetch_and_store_bond_details(conn, symbol):
    """获取单个可转债的详细信息并存入数据库"""
    try:
        # 调用akshare接口获取"基本信息"
        bond_info_df = ak.bond_zh_cov_info(symbol=symbol, indicator="基本信息")
        
        if bond_info_df.empty:
            # 对于已退市的债券，接口可能返回空，这是正常现象，但我们仍然记录下来
            raise ValueError("API返回数据为空（可能该债券信息已无法获取）")

        # 使用pandas的to_sql方法
        bond_info_df.to_sql(
            name=TARGET_TABLE_NAME,
            con=conn,
            if_exists='append',
            index=False
        )
        return True, None # (成功状态, 错误信息)
        
    except Exception as e:
        return False, str(e) # (成功状态, 错误信息)

def log_activity(file_path, message):
    """记录成功或失败的日志"""
    with open(file_path, 'a', encoding='utf-8') as f:
        f.write(message + '\n')

def main():
    """主执行函数"""
    conn = initialize_database_connection(DB_NAME)
    if not conn:
        return

    symbols_to_fetch = get_bond_symbols_to_fetch(conn)
    
    if not symbols_to_fetch:
        print("所有债券的详细信息均已在库中，无需更新。程序结束。")
        conn.close()
        return

    print(f"\n即将开始为 {len(symbols_to_fetch)} 条新债券获取详细信息...")
    
    # 使用tqdm创建进度条
    for symbol in tqdm(symbols_to_fetch, desc="处理进度"):
        success, error_msg = fetch_and_store_bond_details(conn, symbol)
        
        if success:
            log_msg = f"{time.strftime('%Y-%m-%d %H:%M:%S')} - 成功获取并存储: {symbol}"
            log_activity(SUCCESS_LOG_FILE, log_msg)
        else:
            log_msg = f"{time.strftime('%Y-%m-%d %H:%M:%S')} - 获取失败: {symbol}, 原因: {error_msg}"
            log_activity(ERROR_LOG_FILE, log_msg)
        
        # 礼貌性延时
        time.sleep(0.5) 
        
    print("\n所有任务处理完毕。")
    conn.close()
    print("数据库连接已关闭。")


if __name__ == "__main__":
    main()

成功连接到数据库: convertible_bonds_database.db
目标表 bond_details_eastmoney 不存在，将获取所有债券信息。
源表中共有 964 条债券，其中 964 条是新增的，需要获取详细数据。

即将开始为 964 条新债券获取详细信息...


处理进度: 100%|██████████| 964/964 [14:11<00:00,  1.13it/s]


所有任务处理完毕。
数据库连接已关闭。



