In [7]:
import tushare as ts
ts.set_token('xxx')
pro = ts.pro_api()

In [6]:
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
from pandas import IndexSlice as idx
from functools import reduce
import time
from datetime import datetime, timedelta
# 设置显示所有列
pd.set_option('display.max_columns', None)
import os
from tqdm import tqdm

# A股日线行情获取

In [5]:
def get_stock_daily(start_date, end_date):
    """
    获取A股日线行情数据（包含ST标记、停牌信息、上市天数）
    
    参数:
    start_date: 开始日期 (格式: 'YYYYMMDD')
    end_date: 结束日期 (格式: 'YYYYMMDD')
    
    返回:
    DataFrame: 包含所有A股日线数据，索引为(trade_date, ts_code)
    """
    # 获取所有start_date - end_date之间的所有交易日历
    all_trade_dates = pro.trade_cal(exchange='', start_date=start_date, end_date=end_date, is_open=1)['cal_date'].sort_values().to_list()
    
    print(f"获取交易日历完成，共{len(all_trade_dates)}个交易日")
    
    # 获取A股股票列表（包括了上市、已退市）
    stock_basic_data = pd.concat([
        pro.stock_basic(exchange='', list_status='L', fields='ts_code,symbol,name,area,industry,market,list_status,list_date,delist_date,is_hs'),
        pro.stock_basic(exchange='', list_status='D', fields='ts_code,symbol,name,area,industry,market,list_status,list_date,delist_date,is_hs'),
        ],
        axis=0
    )
    print(f"获取股票列表完成，共{len(stock_basic_data)}只股票")
    
    # 剔除在start_date之前就已经退市的股票
    stock_basic_data = stock_basic_data.loc[
        ((stock_basic_data['delist_date'] >= start_date) | 
        (stock_basic_data['delist_date'].isna())), 
        :
    ].sort_values('ts_code').reset_index(drop=True)
    print(f"筛选后退市股票后，剩余{len(stock_basic_data)}只股票")
    
    print("开始获取ST股票数据...")
    # 按日期范围批量获取ST股票数据（每3个交易日一次请求）
    st_data_list = []
    total_dates = len(all_trade_dates)
    start_time = time.time()
    
    # 设置批次大小
    BATCH_SIZE = 3
    
    # 将交易日历分成每3个一组的批次
    batches = []
    for i in range(0, len(all_trade_dates), BATCH_SIZE):
        batch_dates = all_trade_dates[i:i+BATCH_SIZE]
        start_date_batch = batch_dates[0]
        end_date_batch = batch_dates[-1]
        batches.append((start_date_batch, end_date_batch, batch_dates))
    
    total_batches = len(batches)
    
    for batch_idx, (start_date_batch, end_date_batch, batch_dates) in enumerate(batches, 1):
        while True:  # 重试循环
            try:
                # 直接调用接口
                stock_st_data = pro.stock_st(start_date=start_date_batch, end_date=end_date_batch)
                
                if not stock_st_data.empty:
                    st_data_list.append(stock_st_data)
                break
                
            except Exception as e:
                error_msg = str(e)
                if "每分钟最多访问该接口" in error_msg:
                    print(f"\n⚠️ ST接口频率限制，等待61秒...")
                    time.sleep(61)
                    continue
                else:
                    break
            
        # 实时显示进度
        processed_dates = min((batch_idx) * BATCH_SIZE, total_dates)
        elapsed = time.time() - start_time
        progress = processed_dates / total_dates * 100
        speed = batch_idx / elapsed * 60 if elapsed > 0 else 0
        remaining = (total_batches - batch_idx) / (batch_idx / elapsed) if batch_idx > 0 else 0
        
        print(f"\rST进度: {processed_dates}/{total_dates} ({progress:.1f}%) | 速度: {speed:.1f}批次/分钟 | 预计剩余: {remaining/60:.1f}分钟 | 当前批次: {start_date_batch}到{end_date_batch}", end="")

    # 合并ST数据
    print(f"\n\n{'='*60}")
    if st_data_list:
        all_st_data = pd.concat(st_data_list, ignore_index=True)
        print(f"✅ ST股票数据获取完成! 共{len(all_st_data)}条记录")
    else:
        all_st_data = pd.DataFrame()
        print("❌ 未获取到任何ST股票数据")

    print(f"{'='*60}")

    print("开始获取股票日线数据...")
    # 循环每只股票调取其日线行情数据
    stock_daily_list = []
    total_stocks = len(stock_basic_data)
    start_time = time.time()

    for i, stock in enumerate(stock_basic_data['ts_code'], 1):
        while True:  # 重试循环
            try:
                # 直接调用4个接口
                adj_factor = pro.adj_factor(ts_code=stock, start_date=start_date, end_date=end_date)
                daily_data = pro.daily(ts_code=stock, start_date=start_date, end_date=end_date)
                daily_basic = pro.daily_basic(ts_code=stock, start_date=start_date, end_date=end_date, 
                                            fields='ts_code,trade_date,turnover_rate,turnover_rate_f,volume_ratio,pe,pe_ttm,pb,ps,ps_ttm,dv_ratio,dv_ttm,total_share,float_share,free_share,total_mv,circ_mv')
                stk_limit = pro.stk_limit(ts_code=stock, start_date=start_date, end_date=end_date)

                # 合并四张表
                merged_data = reduce(
                    lambda left, right: pd.merge(left, right, on=['ts_code','trade_date'], how='inner'), 
                    [daily_data, stk_limit, daily_basic, adj_factor]
                )
                
                if not merged_data.empty:
                    stock_daily_list.append(merged_data)
                break
                
            except Exception as e:
                error_msg = str(e)
                if "每分钟最多访问该接口" in error_msg:
                    print(f"\n⚠️ 日线接口频率限制，等待61秒...")
                    time.sleep(61)
                    continue
                else:
                    break
        
        # 实时显示进度
        elapsed = time.time() - start_time
        progress = i / total_stocks * 100
        speed = i / elapsed if elapsed > 0 else 0
        remaining = (total_stocks - i) / speed if speed > 0 else 0
        
        print(f"\r日线进度: {i}/{total_stocks} ({progress:.1f}%) | 速度: {speed:.1f}只/分钟 | 预计剩余: {remaining/60:.1f}分钟 | 当前: {stock}", end="")

    # 合并日线数据
    print(f"\n\n{'='*60}")
    if stock_daily_list:
        all_daily_data = pd.concat(stock_daily_list, ignore_index=True)
        print(f"✅ 日线数据获取完成! 共{len(stock_daily_list)}只股票")
    else:
        all_daily_data = pd.DataFrame()
        print("❌ 未获取到任何日线数据")

    print(f"{'='*60}")

    print("开始获取股票停牌数据...")
    # 获取股票停牌数据
    suspend_data_list = []
    total_suspend_dates = len(all_trade_dates)
    start_time = time.time()

    for i, trade_date in enumerate(all_trade_dates, 1):
        while True:  # 重试循环
            try:
                # 获取单个交易日的停牌数据
                daily_suspend_data = pro.suspend_d(trade_date=trade_date)
                
                if not daily_suspend_data.empty:
                    suspend_data_list.append(daily_suspend_data)
                
                break
                
            except Exception as e:
                error_msg = str(e)
                if "每分钟最多访问该接口" in error_msg:
                    print(f"\n⚠️ 停牌接口频率限制，等待61秒... (日期: {trade_date})")
                    time.sleep(61)
                    continue
                else:
                    break
        
        # 实时显示进度
        elapsed = time.time() - start_time
        progress = i / total_suspend_dates * 100
        speed = i / elapsed if elapsed > 0 else 0
        remaining = (total_suspend_dates - i) / speed if speed > 0 else 0
        
        print(f"\r停牌进度: {i}/{total_suspend_dates} ({progress:.1f}%) | 速度: {speed:.1f}天/分钟 | 预计剩余: {remaining/60:.1f}分钟 | 当前: {trade_date}", end="")

    # 合并停牌数据
    print(f"\n\n{'='*60}")
    if suspend_data_list:
        all_suspend_data = pd.concat(suspend_data_list, ignore_index=True)
        print(f"✅ 停牌数据获取完成! 共{len(all_suspend_data)}条停牌记录")
    else:
        all_suspend_data = pd.DataFrame()
        print("❌ 未获取到任何停牌数据")

    print(f"{'='*60}")

    print("开始处理ST标记、上市天数和停牌数据...")
    
    # 合并ST标记
    if not all_st_data.empty:
        stock_daily = pd.merge(all_daily_data, all_st_data[['ts_code','trade_date','type']], on=['ts_code','trade_date'], how='left')
    else:
        stock_daily = all_daily_data.copy()
        stock_daily['type'] = None

    stock_daily['is_st'] = (stock_daily['type'] == 'ST').astype(int)
    stock_daily = stock_daily.drop('type', axis=1)

    # 合并停牌数据
    if not all_suspend_data.empty:
        stock_daily = pd.merge(stock_daily, all_suspend_data, on=['ts_code','trade_date'], how='left')
        
        # 将suspend_timing列中的np.nan替换为None
        stock_daily['suspend_timing'] = stock_daily['suspend_timing'].apply(lambda x: None if pd.isna(x) else x)
        
        # 处理重复行：R+S合并为R&S
        print("处理停复牌重复行（R+S合并为R&S）...")
        
        # 找到有重复的行（使用.loc正确选择）
        duplicate_mask = stock_daily.duplicated(subset=['ts_code', 'trade_date'], keep=False)
        duplicates = stock_daily.loc[duplicate_mask, :]
        
        if not duplicates.empty:
            print(f"发现 {len(duplicates)} 行重复记录，需要合并处理")
            
            # 只对有重复的行进行处理
            result_rows = []
            
            for (ts_code, trade_date), group in duplicates.groupby(['ts_code', 'trade_date']):
                if len(group) == 2:  # 一定是R和S
                    # 使用.loc选择行
                    r_row = group.loc[group['suspend_type'] == 'R'].iloc[0]
                    s_row = group.loc[group['suspend_type'] == 'S'].iloc[0]
                    
                    # 合并为R&S
                    merged_row = r_row.copy()
                    merged_row['suspend_type'] = 'R&S'
                    merged_row['suspend_timing'] = s_row['suspend_timing']
                    result_rows.append(merged_row)
                    print(f"  合并: {ts_code} {trade_date} R+S -> R&S")
            
            # 合并处理结果（使用.loc选择非重复行）
            non_duplicates = stock_daily.loc[~duplicate_mask, :]
            stock_daily = pd.concat([non_duplicates, pd.DataFrame(result_rows)], ignore_index=True)
            
            print(f"处理完成，合并了 {len(result_rows)} 组重复记录")
        
        # 处理suspend_type
        stock_daily['suspend_type'] = stock_daily['suspend_type'].fillna('N')

    # 计算上市天数
    stock_daily['listed_days'] = (pd.to_datetime(stock_daily['trade_date'], format='%Y%m%d') - 
                                pd.to_datetime(stock_daily['ts_code'].map(stock_basic_data.set_index('ts_code')['list_date']), format='%Y%m%d')).dt.days

    # 将trade_date列转化为datetime格式
    stock_daily['trade_date'] = pd.to_datetime(stock_daily['trade_date'])

    # 设置索引并排序
    stock_daily.set_index(['trade_date', 'ts_code'], inplace=True)
    stock_daily.sort_index(inplace=True)
    
    print(f"数据处理完成，最终数据形状: {stock_daily.shape}")
    print(f"列名: {list(stock_daily.columns)}")
    
    # 显示停牌类型分布
    if 'suspend_type' in stock_daily.columns:
        type_dist = stock_daily['suspend_type'].value_counts()
        print(f"停牌类型分布:")
        for suspend_type, count in type_dist.items():
            print(f"  {suspend_type}: {count} 条记录")

    return stock_daily

In [4]:
stock_daily = get_stock_daily('20170101','20251130')

Exception: 您的token不对，请确认。

# A股分钟频行情数据获取

In [None]:
def get_stock_min(start_date, end_date, freq):
    """
    获取A股分钟线数据（包含复权因子）
    
    参数:
    start_date: 开始日期 (格式: 'YYYYMMDD')
    end_date: 结束日期 (格式: 'YYYYMMDD')
    freq: 分钟频率，可选 '1min', '5min', '15min', '30min', '60min'
    """
    # 获取所有start_date - end_date之间的所有交易日历
    all_trade_dates = pro.trade_cal(exchange='', start_date=start_date, end_date=end_date, is_open=1)['cal_date'].sort_values().to_list()
    
    print(f"获取交易日历完成，共{len(all_trade_dates)}个交易日")
    
    # 获取A股股票列表（包括了上市、已退市）
    stock_basic_data = pd.concat([
        pro.stock_basic(exchange='', list_status='L', fields='ts_code,symbol,name,area,industry,market,list_status,list_date,delist_date,is_hs'),
        pro.stock_basic(exchange='', list_status='D', fields='ts_code,symbol,name,area,industry,market,list_status,list_date,delist_date,is_hs'),
        ],
        axis=0
    )
    print(f"获取股票列表完成，共{len(stock_basic_data)}只股票")
    
    # 剔除在start_date之前就已经退市的股票
    stock_basic_data = stock_basic_data.loc[
        ((stock_basic_data['delist_date'] >= start_date) | 
        (stock_basic_data['delist_date'].isna())), 
        :
    ].sort_values('ts_code').reset_index(drop=True)
    print(f"筛选后退市股票后，剩余{len(stock_basic_data)}只股票")
    
    # 创建保存目录
    save_dir = f'data/stock_{freq}'
    os.makedirs(save_dir, exist_ok=True)
    print(f"数据将保存到: {save_dir}/")
    
    # 设置滚动窗口步长
    WINDOW_SIZE = 33  # 每次33个交易日
    
    # 遍历每只股票
    total_stocks = len(stock_basic_data)
    start_time = time.time()
    
    for idx, stock in enumerate(stock_basic_data['ts_code'], 1):
        ts_code = stock
        symbol = stock_basic_data.loc[stock_basic_data['ts_code'] == ts_code, 'symbol'].iloc[0]
        name = stock_basic_data.loc[stock_basic_data['ts_code'] == ts_code, 'name'].iloc[0]
        
        # 存储当前股票的所有分钟数据
        stock_min_list = []
        
        # 滚动窗口遍历交易日历
        for i in range(0, len(all_trade_dates), WINDOW_SIZE):
            batch_dates = all_trade_dates[i:i+WINDOW_SIZE]
            
            if len(batch_dates) == 0:
                continue
                
            # 获取批次起始和结束日期
            start_date_batch = batch_dates[0]  # 格式: YYYYMMDD
            end_date_batch = batch_dates[-1]   # 格式: YYYYMMDD
            
            # 转换为接口需要的格式: YYYY-MM-DD HH:MM:SS
            start_datetime = f"{start_date_batch[:4]}-{start_date_batch[4:6]}-{start_date_batch[6:]} 09:00:00"
            end_datetime = f"{end_date_batch[:4]}-{end_date_batch[4:6]}-{end_date_batch[6:]} 19:00:00"
            
            while True:  # 重试循环
                try:
                    # 调用分钟线接口
                    stk_mins = pro.stk_mins(
                        ts_code=ts_code,
                        freq=freq,
                        start_date=start_datetime,
                        end_date=end_datetime
                    )
                    
                    if not stk_mins.empty:
                        stock_min_list.append(stk_mins)
                    
                    break  # 成功则跳出重试循环
                    
                except Exception as e:
                    error_msg = str(e)
                    if "每分钟最多访问该接口" in error_msg:
                        print(f"\n⚠️ 分钟线接口频率限制，等待61秒... (股票: {ts_code})")
                        time.sleep(61)
                        continue  # 继续重试
                    else:
                        break  # 其他错误跳过该批次
        
        # 合并当前股票的所有分钟数据
        if stock_min_list:
            stock_data = pd.concat(stock_min_list, ignore_index=True)
            
            # 数据处理
            stock_data['trade_time'] = pd.to_datetime(stock_data['trade_time'])
            stock_data['trade_date'] = pd.to_datetime(stock_data['trade_time'].dt.date)  # 保持datetime类型
            
            # 获取复权因子数据

            adj_factor = pro.adj_factor(
                ts_code=ts_code, 
                start_date=start_date,
                end_date=end_date
            )
                    
            adj_factor['trade_date'] = pd.to_datetime(adj_factor['trade_date'])
                        
            # 合并复权因子数据
            stock_data = pd.merge(
                stock_data, 
                adj_factor, 
                on=['ts_code', 'trade_date'], 
            )
            
            # 设置索引
            stock_data.set_index(['trade_date', 'trade_time'], inplace=True)
            stock_data.sort_index(inplace=True)
            
            # 保存当前股票数据
            save_path = os.path.join(save_dir, f"{ts_code}.parquet")
            stock_data.to_parquet(save_path)
        else:
            stock_data = pd.DataFrame()
        
        # 实时显示进度（只显示股票级别进度）
        elapsed = time.time() - start_time
        progress = idx / total_stocks * 100
        speed = idx / elapsed * 60 if elapsed > 0 else 0
        
        if not stock_data.empty:
            print(f"\r进度: {idx}/{total_stocks} ({progress:.1f}%) | 速度: {speed:.1f}只/分钟 | 当前: {ts_code} ({len(stock_data)}行)", end="")
        else:
            print(f"\r进度: {idx}/{total_stocks} ({progress:.1f}%) | 速度: {speed:.1f}只/分钟 | 当前: {ts_code} (无数据)", end="")
    
    print(f"\n\n{'='*60}")
    print(f"✅ 所有股票分钟数据获取完成!")
    print(f"   数据保存到: {save_dir}/")
    print(f"   频率: {freq}")
    print(f"   时间范围: {all_trade_dates[0]} 到 {all_trade_dates[-1]}")
    print(f"{'='*60}")

In [None]:
get_stock_min('20170101', '20251130', '1min')

获取交易日历完成，共2163个交易日
获取股票列表完成，共5774只股票
筛选后退市股票后，剩余5684只股票
数据将保存到: data/stock_1min/
进度: 209/3279 (6.4%) | 速度: 7.2只/分钟 | 当前: 301055.SZ (246784行)
⚠️ 分钟线接口频率限制，等待61秒... (股票: 301056.SZ)
进度: 224/3279 (6.8%) | 速度: 7.0只/分钟 | 当前: 301071.SZ (243892行)
⚠️ 分钟线接口频率限制，等待61秒... (股票: 301072.SZ)
进度: 239/3279 (7.3%) | 速度: 6.8只/分钟 | 当前: 301088.SZ (239313行)
⚠️ 分钟线接口频率限制，等待61秒... (股票: 301089.SZ)
进度: 258/3279 (7.9%) | 速度: 6.6只/分钟 | 当前: 301109.SZ (212562行)
⚠️ 分钟线接口频率限制，等待61秒... (股票: 301110.SZ)
进度: 340/3279 (10.4%) | 速度: 6.5只/分钟 | 当前: 301207.SZ (221238行)
⚠️ 分钟线接口频率限制，等待61秒... (股票: 301208.SZ)
进度: 360/3279 (11.0%) | 速度: 6.4只/分钟 | 当前: 301229.SZ (220756行)
⚠️ 分钟线接口频率限制，等待61秒... (股票: 301230.SZ)
进度: 473/3279 (14.4%) | 速度: 6.5只/分钟 | 当前: 301373.SZ (164603行)
⚠️ 分钟线接口频率限制，等待61秒... (股票: 301376.SZ)
进度: 493/3279 (15.0%) | 速度: 6.4只/分钟 | 当前: 301398.SZ (172556行)
⚠️ 分钟线接口频率限制，等待61秒... (股票: 301399.SZ)
进度: 507/3279 (15.5%) | 速度: 6.3只/分钟 | 当前: 301459.SZ (114475行)
⚠️ 分钟线接口频率限制，等待61秒... (股票: 301468.SZ)
进度: 518/3279 (15.8%) | 速度: 6.0只/