# 导入必要的库
import sys
sys.path.append('..')

# 强制禁用 tqdm 的 notebook 模式
import tqdm
tqdm.tqdm = tqdm.std.tqdm  # 使用标准 tqdm 而不是 notebook 版本

import pandas as pd
import numpy as np
import akshare as ak
import yaml
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 导入数据模型
from models import Base, BalanceSheet, IncomeStatement, CashFlowStatement, create_tables

print("库导入成功！")
print(f"akshare 版本: {ak.__version__}")
print(f"pandas 版本: {pd.__version__}")

In [1]:
# 导入必要的库
import sys
sys.path.append('..')

import pandas as pd
import numpy as np
import akshare as ak
import yaml
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 导入数据模型
from models import Base, BalanceSheet, IncomeStatement, CashFlowStatement, create_tables

print("库导入成功！")
print(f"akshare 版本: {ak.__version__}")
print(f"pandas 版本: {pd.__version__}")

库导入成功！
akshare 版本: 1.18.19
pandas 版本: 3.0.0


In [2]:
# 测试获取贵州茅台的资产负债表
stock_code = "SH600519"  # 贵州茅台（需要带市场标识：SH=上海，SZ=深圳）

print(f"正在获取 {stock_code} 的资产负债表...")
try:
    df_balance = ak.stock_balance_sheet_by_report_em(symbol=stock_code)
    if df_balance is not None and not df_balance.empty:
        print(f"\n成功获取数据！共 {len(df_balance)} 条记录")
        print(f"\n数据列名（前20个）:")
        print(df_balance.columns.tolist()[:20])
        print(f"\n数据预览:")
        display(df_balance.head())
    else:
        print("API返回空数据，请检查股票代码格式")
except Exception as e:
    print(f"获取数据失败: {e}")

正在获取 SH600519 的资产负债表...


Exception ignored in: <function tqdm.__del__ at 0x110ab36a0>
Traceback (most recent call last):
  File "/Volumes/Frank1T/VibeCoding/Stock_key_indicators2/.venv/lib/python3.13/site-packages/tqdm/std.py", line 1148, in __del__
    self.close()
  File "/Volumes/Frank1T/VibeCoding/Stock_key_indicators2/.venv/lib/python3.13/site-packages/tqdm/notebook.py", line 279, in close
    self.disp(bar_style='danger', check_delay=False)
AttributeError: 'tqdm_notebook' object has no attribute 'disp'


获取数据失败: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html


In [6]:
# 测试获取利润表
print(f"正在获取 {stock_code} 的利润表...")
try:
    df_income = ak.stock_profit_sheet_by_report_em(symbol=stock_code)
    if df_income is not None and not df_income.empty:
        print(f"\n成功获取数据！共 {len(df_income)} 条记录")
        print(f"\n数据列名（前20个）:")
        print(df_income.columns.tolist()[:20])
        print(f"\n数据预览:")
        display(df_income.head())
    else:
        print("API返回空数据")
except Exception as e:
    print(f"获取数据失败: {e}")

NameError: name 'stock_code' is not defined

In [None]:
# 测试获取现金流量表
print(f"正在获取 {stock_code} 的现金流量表...")
try:
    df_cashflow = ak.stock_cash_flow_sheet_by_report_em(symbol=stock_code)
    if df_cashflow is not None and not df_cashflow.empty:
        print(f"\n成功获取数据！共 {len(df_cashflow)} 条记录")
        print(f"\n数据列名（前20个）:")
        print(df_cashflow.columns.tolist()[:20])
        print(f"\n数据预览:")
        display(df_cashflow.head())
    else:
        print("API返回空数据")
except Exception as e:
    print(f"获取数据失败: {e}")

正在获取 600519 的现金流量表...
获取数据失败: 'NoneType' object is not subscriptable


In [None]:
# 测试获取现金流量表
print(f"正在获取 {stock_code} 的现金流量表...")
try:
    df_cashflow = ak.stock_cash_flow_sheet_by_report_em(symbol=stock_code)
    print(f"\n成功获取数据！共 {len(df_cashflow)} 条记录")
    print(f"\n数据列名（前20个）:")
    print(df_cashflow.columns.tolist()[:20])
    print(f"\n数据预览:")
    display(df_cashflow.head())
except Exception as e:
    print(f"获取数据失败: {e}")

正在获取 600519 的现金流量表...
获取数据失败: 'NoneType' object is not subscriptable


## 步骤2：实现列名标准化

加载列名映射配置，实现标准化函数

In [None]:
# 加载列名映射配置
with open('../config/column_mapping.yaml', 'r', encoding='utf-8') as f:
    column_mapping = yaml.safe_load(f)

print("列名映射配置加载成功！")
print(f"\n资产负债表映射数量: {len(column_mapping['balance_sheet'])}")
print(f"利润表映射数量: {len(column_mapping['income_statement'])}")
print(f"现金流量表映射数量: {len(column_mapping['cash_flow_statement'])}")

列名映射配置加载成功！

资产负债表映射数量: 46
利润表映射数量: 24
现金流量表映射数量: 33


In [None]:
def standardize_columns(df, report_type, mapping_config):
    """
    标准化DataFrame的列名
    
    Args:
        df: 原始DataFrame
        report_type: 报表类型 ('balance_sheet', 'income_statement', 'cash_flow_statement')
        mapping_config: 列名映射配置字典
    
    Returns:
        标准化后的DataFrame和未映射的列名列表
    """
    df_copy = df.copy()
    mapping = mapping_config.get(report_type, {})
    common_mapping = mapping_config.get('common', {})
    
    # 合并映射
    all_mapping = {**mapping, **common_mapping}
    
    # 记录未映射的列名
    unmapped_columns = []
    
    # 重命名列
    renamed_columns = {}
    for col in df_copy.columns:
        if col in all_mapping:
            renamed_columns[col] = all_mapping[col]
        else:
            unmapped_columns.append(col)
    
    df_copy.rename(columns=renamed_columns, inplace=True)
    
    return df_copy, unmapped_columns

print("列名标准化函数定义完成！")

列名标准化函数定义完成！


In [None]:
# 测试列名标准化
df_balance_std, unmapped_balance = standardize_columns(df_balance, 'balance_sheet', column_mapping)

print(f"标准化前列数: {len(df_balance.columns)}")
print(f"标准化后列数: {len(df_balance_std.columns)}")
print(f"\n未映射的列名 ({len(unmapped_balance)} 个):")
for col in unmapped_balance[:10]:  # 只显示前10个
    print(f"  - {col}")

print(f"\n标准化后的列名（前20个）:")
print(df_balance_std.columns.tolist()[:20])

NameError: name 'df_balance' is not defined

## 步骤3：创建数据库并存储数据

In [None]:
# 创建数据库引擎和表
database_url = "sqlite:///../database_prototype.sqlite"
engine = create_engine(database_url, echo=False)

# 创建所有表
Base.metadata.create_all(engine)
print("数据库表创建成功！")

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
print("数据库会话创建成功！")

In [None]:
def save_to_database(df, model_class, session):
    """
    将DataFrame保存到数据库
    
    Args:
        df: 标准化后的DataFrame
        model_class: SQLAlchemy模型类
        session: 数据库会话
    """
    records_added = 0
    records_skipped = 0
    
    for _, row in df.iterrows():
        # 准备数据字典
        data_dict = {}
        for col in df.columns:
            if hasattr(model_class, col):
                value = row[col]
                # 处理NaN值
                if pd.isna(value):
                    data_dict[col] = None
                else:
                    data_dict[col] = value
        
        # 检查记录是否已存在
        existing = session.query(model_class).filter_by(
            stock_code=data_dict.get('stock_code'),
            report_date=data_dict.get('report_date')
        ).first()
        
        if existing:
            records_skipped += 1
        else:
            record = model_class(**data_dict)
            session.add(record)
            records_added += 1
    
    session.commit()
    print(f"新增记录: {records_added}, 跳过重复记录: {records_skipped}")

print("数据保存函数定义完成！")

In [None]:
# 保存资产负债表数据
print("正在保存资产负债表数据...")
save_to_database(df_balance_std, BalanceSheet, session)

In [None]:
# 标准化并保存利润表数据
df_income_std, unmapped_income = standardize_columns(df_income, 'income_statement', column_mapping)
print("正在保存利润表数据...")
save_to_database(df_income_std, IncomeStatement, session)

In [None]:
# 标准化并保存现金流量表数据
df_cashflow_std, unmapped_cashflow = standardize_columns(df_cashflow, 'cash_flow_statement', column_mapping)
print("正在保存现金流量表数据...")
save_to_database(df_cashflow_std, CashFlowStatement, session)

## 步骤4：从数据库读取数据

In [None]:
# 从数据库读取资产负债表数据
query = f"SELECT * FROM balance_sheets WHERE stock_code = '{stock_code}' ORDER BY report_date DESC"
df_balance_from_db = pd.read_sql(query, engine)

print(f"从数据库读取到 {len(df_balance_from_db)} 条资产负债表记录")
print(f"\n数据预览:")
display(df_balance_from_db[['stock_code', 'report_date', 'total_assets', 'total_liabilities', 'total_equity']].head())

In [None]:
# 从数据库读取利润表数据
query = f"SELECT * FROM income_statements WHERE stock_code = '{stock_code}' ORDER BY report_date DESC"
df_income_from_db = pd.read_sql(query, engine)

print(f"从数据库读取到 {len(df_income_from_db)} 条利润表记录")
print(f"\n数据预览:")
display(df_income_from_db[['stock_code', 'report_date', 'operating_revenue', 'net_profit']].head())

## 步骤5：计算 ROE（净资产收益率）

ROE = 净利润 / 平均所有者权益

平均所有者权益 = (期初所有者权益 + 期末所有者权益) / 2

In [None]:
def calculate_roe(income_df, balance_df):
    """
    计算净资产收益率 (ROE)
    
    Args:
        income_df: 利润表DataFrame
        balance_df: 资产负债表DataFrame
    
    Returns:
        包含ROE计算结果的DataFrame
    """
    # 合并数据
    merged = pd.merge(
        income_df[['stock_code', 'report_date', 'net_profit']],
        balance_df[['stock_code', 'report_date', 'total_equity']],
        on=['stock_code', 'report_date'],
        how='inner'
    )
    
    # 按日期排序
    merged = merged.sort_values('report_date')
    
    # 计算平均所有者权益（期初+期末）/2
    merged['equity_avg'] = (merged['total_equity'] + merged['total_equity'].shift(1)) / 2
    
    # 计算ROE
    merged['roe'] = merged['net_profit'] / merged['equity_avg']
    
    # 转换为百分比
    merged['roe_pct'] = merged['roe'] * 100
    
    return merged

print("ROE计算函数定义完成！")

In [None]:
# 计算ROE
roe_result = calculate_roe(df_income_from_db, df_balance_from_db)

print(f"\nROE计算结果（最近10期）:")
display(roe_result[['stock_code', 'report_date', 'net_profit', 'total_equity', 'equity_avg', 'roe_pct']].tail(10))

## 步骤6：验证结果准确性

将计算结果与公开数据进行对比验证

In [None]:
# 显示最近一期的详细计算过程
latest = roe_result.iloc[-1]

print("="*60)
print(f"股票代码: {latest['stock_code']}")
print(f"报告日期: {latest['report_date']}")
print("="*60)
print(f"净利润: {latest['net_profit']:,.2f}")
print(f"期末所有者权益: {latest['total_equity']:,.2f}")
print(f"平均所有者权益: {latest['equity_avg']:,.2f}")
print(f"ROE: {latest['roe_pct']:.2f}%")
print("="*60)
print("\n请手动验证此结果是否与财经网站（如东方财富）的数据一致")

# 测试其他股票（需要带市场标识）
test_stocks = ['SZ300750', 'SZ000725']  # 宁德时代、京东方A

for stock in test_stocks:
    print(f"\n{'='*60}")
    print(f"正在测试股票: {stock}")
    print(f"{'='*60}")
    
    try:
        # 获取数据
        df_b = ak.stock_balance_sheet_by_report_em(symbol=stock)
        df_i = ak.stock_profit_sheet_by_report_em(symbol=stock)
        
        if df_b is None or df_b.empty or df_i is None or df_i.empty:
            print(f"✗ {stock} 获取数据失败：API返回空数据")
            continue
        
        # 标准化
        df_b_std, _ = standardize_columns(df_b, 'balance_sheet', column_mapping)
        df_i_std, _ = standardize_columns(df_i, 'income_statement', column_mapping)
        
        # 保存
        save_to_database(df_b_std, BalanceSheet, session)
        save_to_database(df_i_std, IncomeStatement, session)
        
        print(f"✓ {stock} 数据处理成功")
        
    except Exception as e:
        print(f"✗ {stock} 处理失败: {e}")

In [None]:
# 测试宁德时代
test_stocks = ['300750', '000725']  # 宁德时代、京东方A

for stock in test_stocks:
    print(f"\n{'='*60}")
    print(f"正在测试股票: {stock}")
    print(f"{'='*60}")
    
    try:
        # 获取数据
        df_b = ak.stock_balance_sheet_by_report_em(symbol=stock)
        df_i = ak.stock_profit_sheet_by_report_em(symbol=stock)
        
        # 标准化
        df_b_std, _ = standardize_columns(df_b, 'balance_sheet', column_mapping)
        df_i_std, _ = standardize_columns(df_i, 'income_statement', column_mapping)
        
        # 保存
        save_to_database(df_b_std, BalanceSheet, session)
        save_to_database(df_i_std, IncomeStatement, session)
        
        print(f"✓ {stock} 数据处理成功")
        
    except Exception as e:
        print(f"✗ {stock} 处理失败: {e}")

## 总结

### 验证结果

- [ ] 数据管道验证通过（获取→存储→读取）
- [ ] 列名标准化机制工作正常
- [ ] ROE计算结果准确
- [ ] 发现的问题和改进点

### 下一步

根据原型验证的结果，进入阶段2：健壮的数据更新器开发

In [None]:
# 关闭数据库连接
session.close()
print("数据库连接已关闭")