# 沪深300成分股每日指标获取

本notebook用于获取沪深300指数成分股在指定时间段内的每日指标数据。

## 功能说明
1. 读取沪深300指数成分股代码
2. 使用多线程并行获取每只股票的每日指标
3. 合并所有数据并保存为parquet格式

## 数据源
- 成分股代码：`hs300_index_weight_20250901.csv`
- 每日指标：Tushare daily_basic接口
- 时间范围：环境变量中的START_DATE到END_DATE


In [None]:
# 导入必要的模块
import sys
import os
from pathlib import Path
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
import threading
from collections import deque
import warnings
warnings.filterwarnings('ignore')

# 添加项目路径
sys.path.append('..')

# 加载环境变量
from dotenv import load_dotenv
load_dotenv()

# 导入项目工具
from src.tools.utils import _init_tushare_api


In [2]:
# 初始化配置
print("初始化配置...")

# 从环境变量获取时间范围（现在是yyyymmdd格式）
start_date_str = os.getenv('START_DATE', '20240901')
end_date_str = os.getenv('END_DATE', '20250901')

print(f"开始日期: {start_date_str}")
print(f"结束日期: {end_date_str}")

# 转换为易读格式用于显示和目录命名
START_DATE = f"{start_date_str[:4]}-{start_date_str[4:6]}-{start_date_str[6:8]}"
END_DATE = f"{end_date_str[:4]}-{end_date_str[4:6]}-{end_date_str[6:8]}"

print(f"易读格式 - 开始: {START_DATE}, 结束: {END_DATE}")

# 初始化Tushare API
try:
    pro = _init_tushare_api()
    print("✓ Tushare API 初始化成功")
except Exception as e:
    print(f"✗ Tushare API 初始化失败: {e}")
    raise


初始化配置...
开始日期: 20240901
结束日期: 20250901
易读格式 - 开始: 2024-09-01, 结束: 2025-09-01
✓ Tushare API 初始化成功


In [3]:
# 读取沪深300成分股代码
print("读取沪深300成分股代码...")

csv_file = Path('hs300_index_weight_20250901.csv')
if not csv_file.exists():
    raise FileNotFoundError(f"找不到文件: {csv_file}")

# 读取CSV文件
df_weight = pd.read_csv(csv_file)
print(f"读取到 {len(df_weight)} 条权重数据")

# 提取唯一的成分股代码
stock_codes = df_weight['con_code'].unique().tolist()
print(f"沪深300成分股数量: {len(stock_codes)}")
print(f"前10只股票代码: {stock_codes[:10]}")


读取沪深300成分股代码...
读取到 300 条权重数据
沪深300成分股数量: 300
前10只股票代码: ['600519.SH', '300750.SZ', '601318.SH', '600036.SH', '601899.SH', '300502.SZ', '000333.SZ', '300059.SZ', '601166.SH', '300308.SZ']


In [None]:
# 定义速率限制器类
class RateLimiter:
    """
    API调用速率限制器
    确保在指定时间窗口内不超过最大调用次数
    """
    def __init__(self, max_calls=200, time_window=60):
        """
        初始化速率限制器
        
        Args:
            max_calls: 时间窗口内最大调用次数
            time_window: 时间窗口长度（秒）
        """
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = deque()  # 存储调用时间戳
        self.lock = threading.Lock()  # 线程锁
        
    def acquire(self):
        """
        请求调用许可，如果超过限制则等待
        """
        with self.lock:
            now = time.time()
            
            # 清理超过时间窗口的旧记录
            while self.calls and now - self.calls[0] >= self.time_window:
                self.calls.popleft()
            
            # 如果当前调用数已达上限，计算需要等待的时间
            if len(self.calls) >= self.max_calls:
                # 等待最老的调用过期
                wait_time = self.time_window - (now - self.calls[0]) + 0.1  # 多等0.1秒确保安全
                print(f"⏳ 达到API调用限制，等待 {wait_time:.1f} 秒...")
                time.sleep(wait_time)
                
                # 重新清理过期记录
                now = time.time()
                while self.calls and now - self.calls[0] >= self.time_window:
                    self.calls.popleft()
            
            # 记录本次调用时间
            self.calls.append(now)
            
    def get_stats(self):
        """获取当前状态统计"""
        with self.lock:
            now = time.time()
            # 清理过期记录
            while self.calls and now - self.calls[0] >= self.time_window:
                self.calls.popleft()
            
            return {
                'current_calls': len(self.calls),
                'max_calls': self.max_calls,
                'time_window': self.time_window,
                'remaining_calls': self.max_calls - len(self.calls)
            }

# 创建全局速率限制器实例
rate_limiter = RateLimiter(max_calls=190, time_window=60)  # 设置为190次/分钟，留点余量

# 定义获取单只股票每日指标的函数（带速率限制）
def fetch_stock_daily_basic(ts_code, start_date, end_date, retry_count=3):
    """
    获取单只股票的每日指标数据（带速率限制）
    
    Args:
        ts_code: 股票代码
        start_date: 开始日期 (YYYYMMDD)
        end_date: 结束日期 (YYYYMMDD)
        retry_count: 重试次数
    
    Returns:
        DataFrame: 该股票的每日指标数据
    """
    for attempt in range(retry_count):
        try:
            # 获取API调用许可（速率限制）
            rate_limiter.acquire()
            
            # 调用Tushare API
            df = pro.daily_basic(
                ts_code=ts_code,
                start_date=start_date,
                end_date=end_date
            )
            
            if not df.empty:
                return df
            else:
                print(f"⚠️  {ts_code}: 无数据")
                return pd.DataFrame()
                
        except Exception as e:
            error_msg = str(e)
            if "每分钟最多访问该接口200次" in error_msg:
                # 如果遇到速率限制错误，等待更长时间
                print(f"⚠️  {ts_code}: API速率限制，等待60秒后重试...")
                time.sleep(60)
            elif attempt < retry_count - 1:
                print(f"⚠️  {ts_code}: 第{attempt+1}次尝试失败 ({e})，重试中...")
                time.sleep(2)  # 等待2秒后重试
            else:
                print(f"✗ {ts_code}: 获取失败 - {e}")
                return pd.DataFrame()
    
    return pd.DataFrame()

print("定义了速率限制器和股票数据获取函数")


定义了股票数据获取函数


In [None]:
# 测试速率限制器功能（可选执行）
def test_rate_limiter():
    """测试速率限制器是否正常工作"""
    print("🧪 测试速率限制器...")
    
    # 创建一个测试用的速率限制器（更小的限制用于快速测试）
    test_limiter = RateLimiter(max_calls=5, time_window=10)
    
    start_time = time.time()
    for i in range(8):  # 尝试8次调用，应该在第6次时开始等待
        print(f"第 {i+1} 次调用...")
        test_limiter.acquire()
        stats = test_limiter.get_stats()
        elapsed = time.time() - start_time
        print(f"  耗时: {elapsed:.1f}s, 状态: {stats['current_calls']}/{stats['max_calls']}")
    
    total_time = time.time() - start_time
    print(f"✓ 测试完成，总耗时: {total_time:.1f}s")
    print(f"  预期: 前5次快速执行，后3次需要等待")
    return total_time > 5  # 如果总时间超过5秒，说明速率限制生效了

# 取消注释下面这行来运行测试
# test_passed = test_rate_limiter()
# print(f"速率限制器测试{'通过' if test_passed else '失败'}")

print("速率限制器测试函数已定义（如需测试请取消注释相关代码）")


In [None]:
# 速率限制配置和预估工具
def estimate_completion_time(total_stocks, rate_limit_per_minute=190):
    """
    预估完成时间
    
    Args:
        total_stocks: 总股票数量
        rate_limit_per_minute: 每分钟API调用限制
    
    Returns:
        dict: 包含预估时间的字典
    """
    # 每只股票需要1次API调用
    total_api_calls = total_stocks
    
    # 计算需要的分钟数
    minutes_needed = total_api_calls / rate_limit_per_minute
    
    # 考虑重试和网络延迟，增加20%的缓冲时间
    estimated_minutes = minutes_needed * 1.2
    
    # 转换为小时和分钟
    hours = int(estimated_minutes // 60)
    minutes = int(estimated_minutes % 60)
    
    return {
        'total_api_calls': total_api_calls,
        'rate_limit_per_minute': rate_limit_per_minute,
        'estimated_minutes': estimated_minutes,
        'estimated_time_str': f"{hours}小时{minutes}分钟" if hours > 0 else f"{minutes}分钟"
    }

# 显示当前配置和预估时间
current_estimate = estimate_completion_time(len(stock_codes))
print("📊 速率限制配置和预估:")
print(f"  股票数量: {len(stock_codes)}")
print(f"  API调用限制: {rate_limiter.max_calls}次/分钟")
print(f"  预估API调用总数: {current_estimate['total_api_calls']}")
print(f"  预估完成时间: {current_estimate['estimated_time_str']}")
print(f"  工作线程数: {max_workers}")

print("\n💡 优化建议:")
if len(stock_codes) > 200:
    print(f"  - 由于股票数量较多({len(stock_codes)}只)，建议分批处理")
    batch_size = 190  # 每批处理的股票数量
    batches = (len(stock_codes) + batch_size - 1) // batch_size
    print(f"  - 可分为 {batches} 批，每批 {batch_size} 只股票")
    print(f"  - 每批预计耗时: {estimate_completion_time(batch_size)['estimated_time_str']}")
else:
    print(f"  - 股票数量适中，可一次性处理完成")

print(f"  - 当前线程数({max_workers})已针对速率限制优化")
print(f"  - 如遇到速率限制错误，系统会自动等待和重试")


In [None]:
# 使用多线程并行获取所有股票的每日指标（带速率限制）
print(f"开始并行获取 {len(stock_codes)} 只股票的每日指标数据...")
print(f"时间范围: {start_date_str} - {end_date_str}")
print(f"速率限制: 每分钟最多 {rate_limiter.max_calls} 次API调用")

# 存储所有结果的列表
all_data = []
failed_stocks = []

# 调整线程数，配合速率限制
# 由于有速率限制，不需要太多线程，3-5个线程足够
max_workers = 5  # 减少并发数，配合速率限制器

print(f"使用 {max_workers} 个工作线程")

# 添加进度监控
def log_progress():
    """定期输出进度和速率限制状态"""
    while True:
        stats = rate_limiter.get_stats()
        print(f"📊 API调用状态: {stats['current_calls']}/{stats['max_calls']} (剩余: {stats['remaining_calls']})")
        time.sleep(30)  # 每30秒输出一次状态
        if len(all_data) + len(failed_stocks) >= len(stock_codes):
            break

# 启动进度监控线程
import threading
monitor_thread = threading.Thread(target=log_progress, daemon=True)
monitor_thread.start()

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    # 提交所有任务
    future_to_stock = {
        executor.submit(fetch_stock_daily_basic, stock_code, start_date_str, end_date_str): stock_code
        for stock_code in stock_codes
    }
    
    # 使用tqdm显示进度
    with tqdm(total=len(stock_codes), desc="获取进度", 
              bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar:
        
        for future in as_completed(future_to_stock):
            stock_code = future_to_stock[future]
            try:
                df_stock = future.result()
                if not df_stock.empty:
                    all_data.append(df_stock)
                    # 显示当前API调用统计
                    stats = rate_limiter.get_stats()
                    pbar.set_postfix({
                        "成功": len(all_data), 
                        "失败": len(failed_stocks),
                        "API": f"{stats['current_calls']}/{stats['max_calls']}"
                    })
                else:
                    failed_stocks.append(stock_code)
                    stats = rate_limiter.get_stats()
                    pbar.set_postfix({
                        "成功": len(all_data), 
                        "失败": len(failed_stocks),
                        "API": f"{stats['current_calls']}/{stats['max_calls']}"
                    })
            except Exception as e:
                failed_stocks.append(stock_code)
                print(f"✗ {stock_code}: 处理异常 - {e}")
                stats = rate_limiter.get_stats()
                pbar.set_postfix({
                    "成功": len(all_data), 
                    "失败": len(failed_stocks),
                    "API": f"{stats['current_calls']}/{stats['max_calls']}"
                })
            
            pbar.update(1)

print(f"\n数据获取完成!")
print(f"成功获取: {len(all_data)} 只股票")
print(f"获取失败: {len(failed_stocks)} 只股票")

# 显示最终API调用统计
final_stats = rate_limiter.get_stats()
print(f"最终API调用统计: {final_stats}")

if failed_stocks:
    print(f"失败的股票代码: {failed_stocks[:10]}{'...' if len(failed_stocks) > 10 else ''}")


开始并行获取 300 只股票的每日指标数据...
时间范围: 20240901 - 20250901


获取进度:  66%|██████▋   | 199/300 [00:03<00:01, 59.19it/s, 成功=200, 失败=0]

⚠️  000617.SZ: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  600588.SH: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  601633.SH: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  601878.SH: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  600011.SH: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  600029.SH: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  601800.SH: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  603369.SH: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  601868.SH: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  002601.SZ: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tus

获取进度:  70%|██████▉   | 209/300 [00:06<00:08, 10.45it/s, 成功=200, 失败=10]

✗ 000617.SZ: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 600011.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 600588.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 601878.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 601633.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 600029.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 601800.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 002601.SZ: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 601868.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
⚠️  300782.SZ: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
✗ 603369.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro

获取进度:  73%|███████▎  | 219/300 [00:08<00:15,  5.25it/s, 成功=200, 失败=20]

✗ 300782.SZ: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 002493.SZ: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 600372.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 600362.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 302132.SZ: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 600741.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 600233.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 601877.SH: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 003816.SZ: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
✗ 300896.SZ: 获取失败 - 抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。
⚠️  002709.SZ: 第1次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/documen

获取进度:  73%|███████▎  | 220/300 [00:09<00:03, 22.77it/s, 成功=200, 失败=20]


⚠️  002709.SZ: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  603296.SH: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  301269.SZ: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  600674.SH: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  601111.SH: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  601117.SH: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  688506.SH: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  000596.SZ: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  601319.SH: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tushare.pro/document/1?doc_id=108。)，重试中...
⚠️  600219.SH: 第2次尝试失败 (抱歉，您每分钟最多访问该接口200次，权限的具体详情访问：https://tus

In [None]:
# 合并所有数据
if all_data:
    print("合并所有股票的每日指标数据...")
    
    # 将所有DataFrame合并为一个
    df_all_daily = pd.concat(all_data, ignore_index=True)
    
    print(f"合并后数据形状: {df_all_daily.shape}")
    print(f"包含股票数量: {df_all_daily['ts_code'].nunique()}")
    print(f"日期范围: {df_all_daily['trade_date'].min()} - {df_all_daily['trade_date'].max()}")
    
    # 显示数据概览
    print("\n数据概览:")
    print(df_all_daily.head())
    
    print("\n数据类型:")
    print(df_all_daily.dtypes)
    
    print("\n数据统计:")
    print(df_all_daily.describe())
    
else:
    print("⚠️ 没有获取到任何数据！")
    df_all_daily = pd.DataFrame()


In [None]:
# 保存数据为parquet格式
if not df_all_daily.empty:
    print("保存数据为parquet格式...")
    
    # 创建保存目录（使用yyyymmdd格式）
    save_dir = Path(f"{start_date_str}{end_date_str}")
    save_dir.mkdir(parents=True, exist_ok=True)
    
    # 保存文件
    save_path = save_dir / "daily_ind.parquet"
    df_all_daily.to_parquet(save_path, index=False)
    
    print(f"✓ 数据已保存到: {save_path}")
    print(f"文件大小: {save_path.stat().st_size / 1024 / 1024:.2f} MB")
    
    # 验证保存的数据
    df_verify = pd.read_parquet(save_path)
    print(f"验证: 保存的数据形状为 {df_verify.shape}")
    
    # 同时保存一份CSV格式用于查看
    csv_path = save_dir / "daily_ind.csv"
    df_all_daily.to_csv(csv_path, index=False)
    print(f"✓ 同时保存CSV格式到: {csv_path}")
    
else:
    print("⚠️ 没有数据可保存！")


In [None]:
# 数据质量检查和统计报告
if not df_all_daily.empty:
    print("=" * 50)
    print("数据质量检查报告")
    print("=" * 50)
    
    # 基本统计
    total_stocks = len(stock_codes)
    successful_stocks = df_all_daily['ts_code'].nunique()
    success_rate = successful_stocks / total_stocks * 100
    
    print(f"目标股票数量: {total_stocks}")
    print(f"成功获取数据的股票数量: {successful_stocks}")
    print(f"成功率: {success_rate:.1f}%")
    
    # 日期覆盖情况
    date_range = pd.date_range(start=START_DATE, end=END_DATE, freq='D')
    trading_days = df_all_daily['trade_date'].unique()
    trading_days_count = len(trading_days)
    
    print(f"\n时间范围: {START_DATE} 到 {END_DATE}")
    print(f"实际交易日数量: {trading_days_count}")
    print(f"总数据记录数: {len(df_all_daily)}")
    print(f"平均每个交易日股票数量: {len(df_all_daily) / trading_days_count:.1f}")
    
    # 缺失值检查
    print("\n缺失值统计:")
    missing_stats = df_all_daily.isnull().sum()
    for col, missing_count in missing_stats.items():
        if missing_count > 0:
            missing_pct = missing_count / len(df_all_daily) * 100
            print(f"  {col}: {missing_count} ({missing_pct:.1f}%)")
    
    if missing_stats.sum() == 0:
        print("  ✓ 无缺失值")
    
    # 重要指标统计
    print("\n重要指标统计:")
    key_metrics = ['pe', 'pb', 'turnover_rate', 'total_mv']
    for metric in key_metrics:
        if metric in df_all_daily.columns:
            valid_count = df_all_daily[metric].notna().sum()
            valid_pct = valid_count / len(df_all_daily) * 100
            print(f"  {metric}: {valid_count} 有效值 ({valid_pct:.1f}%)")
    
    print("\n✓ 数据获取和保存任务完成！")
else:
    print("⚠️ 未能获取到任何有效数据，请检查网络连接和API配置。")
