In [None]:
from clickhouse_connect import get_client
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
import time
import statistics
from datetime import datetime
import json
import os

# 连接ClickHouse
client = get_client(
    host="xxx",
    port=xxx,
    username="xxx",
    password="xxx"
)

# 配置参数
MAX_WORKERS = 24  # 根据你的服务器32线程，用24个
BATCH_SIZE = 100  # 每批处理100只股票
LOOP_COUNT = 50   # 循环次数

# 1. 获取撤单数据（全市场一次查询）
@lru_cache(maxsize=10)
def get_all_canceled_orders(date: str = '2025-03-03'):
    """获取全市场撤单ID"""
    sql = f"""
    SELECT code, bidno, askno
    FROM stock_base.zb
    WHERE date='{date}'
        AND date_time BETWEEN '{date} 09:15:00' AND '{date} 09:25:00'
        AND trade_flag='4'
        AND exg='1'
    """
    
    cancel_df = client.query_df(sql)
    cancel_dict = {}
    
    for _, row in cancel_df.iterrows():
        code = row['code']
        if code not in cancel_dict:
            cancel_dict[code] = set()
        
        if pd.notna(row['bidno']) and row['bidno'] != '':
            cancel_dict[code].add(int(row['bidno']))
        if pd.notna(row['askno']) and row['askno'] != '':
            cancel_dict[code].add(int(row['askno']))
    
    print(f"撤单数据: {len(cancel_dict)}只股票")
    return cancel_dict

# 2. 批量获取股票订单
def get_orders_batch(codes: list, date: str):
    """批量查询订单数据"""
    if not codes:
        return {}
    
    codes_str = "','".join(codes)
    sql = f"""
    SELECT code, price, volume, side, ordno
    FROM stock_base.zb
    WHERE code IN ('{codes_str}')
        AND date='{date}'
        AND date_time BETWEEN '{date} 09:15:00' AND '{date} 09:25:00'
        AND trade_flag NOT IN ('4', 'F')
        AND exg='1'
    """
    
    df = client.query_df(sql)
    orders_dict = {}
    
    if not df.empty:
        for code in codes:
            orders_dict[code] = df[df['code'] == code].copy()
    
    return orders_dict

# 3. 获取真实开盘价（批量）
def get_real_prices_batch(codes: list, date: str):
    """批量获取真实开盘价"""
    if not codes:
        return {}
    
    codes_str = "','".join(codes)
    sql = f"""
    SELECT code, open, pclose
    FROM stock_base.daily
    WHERE code IN ('{codes_str}')
        AND date='{date}'
        AND exg='1'
    """
    
    df = client.query_df(sql)
    price_dict = {}
    
    for _, row in df.iterrows():
        price_dict[row['code']] = (float(row['open']), float(row['pclose']))
    
    return price_dict

# 4. 优化的撮合算法
def calculate_open_price_fast(bid_list, ask_list, prev_close=None):
    """快速计算开盘价"""
    if bid_list.empty or ask_list.empty:
        return None, 0
    
    # 使用numpy数组
    bid_prices = bid_list['price'].values
    bid_volumes = bid_list['volume'].values
    ask_prices = ask_list['price'].values
    ask_volumes = ask_list['volume'].values
    
    # 所有价格
    all_prices = np.unique(np.concatenate([bid_prices, ask_prices]))
    all_prices.sort()
    
    best_price = None
    best_volume = 0
    best_diff = float('inf')
    
    for price in all_prices:
        # 买方累计
        bid_volume = bid_volumes[bid_prices >= price].sum()
        # 卖方累计
        ask_volume = ask_volumes[ask_prices <= price].sum()
        
        volume = min(bid_volume, ask_volume)
        if volume == 0:
            continue
        
        # 规则三检查
        bid_at_price = bid_volumes[bid_prices == price].sum()
        ask_at_price = ask_volumes[ask_prices == price].sum()
        
        if bid_at_price > 0 and ask_at_price > 0:
            if not (min(bid_at_price, volume) == bid_at_price or 
                   min(ask_at_price, volume) == ask_at_price):
                continue
        
        volume_diff = abs(bid_volume - ask_volume)
        
        # 更新最佳
        if volume > best_volume:
            best_price = price
            best_volume = volume
            best_diff = volume_diff
        elif volume == best_volume:
            if volume_diff < best_diff:
                best_price = price
                best_diff = volume_diff
            elif volume_diff == best_diff and prev_close:
                if abs(price - prev_close) < abs(best_price - prev_close):
                    best_price = price
    
    return best_price, best_volume

# 5. 处理单只股票
def process_stock(code, orders_df, cancel_dict, price_dict):
    """处理单只股票"""
    try:
        if orders_df.empty:
            return code, None, None, None, "无数据"
        
        # 过滤撤单
        if code in cancel_dict:
            mask = ~orders_df['ordno'].isin(cancel_dict[code])
            orders_df = orders_df[mask]
        
        # 分离买卖盘
        bid_df = orders_df[orders_df['side'] == b'B']
        ask_df = orders_df[orders_df['side'] == b'S']
        
        if bid_df.empty or ask_df.empty:
            return code, None, None, None, "买卖盘不全"
        
        # 排序
        bid_df = bid_df.sort_values(['price', 'ordno'], ascending=[False, True])
        ask_df = ask_df.sort_values(['price', 'ordno'], ascending=[True, True])
        
        # 获取真实价格
        prev_close = None
        real_open = None
        if code in price_dict:
            real_open, prev_close = price_dict[code]
        
        # 计算
        calc_open, volume = calculate_open_price_fast(bid_df, ask_df, prev_close)
        
        return code, calc_open, real_open, volume, None
    except Exception as e:
        return code, None, None, None, str(e)

# 6. 单次运行函数
def run_single_iteration(date='2025-03-03'):
    """单次运行，返回详细统计信息"""
    start_time = time.time()
    
    # 获取所有股票
    sql = f"""
    SELECT DISTINCT code
    FROM stock_base.daily
    WHERE date='{date}' AND exg='1'
    ORDER BY toInt32(code)
    """
    
    codes_df = client.query_df(sql)
    all_codes = codes_df['code'].tolist()
    
    # 获取基础数据
    cancel_dict = get_all_canceled_orders(date)
    
    # 分批处理
    results = []
    error_stocks = {
        'no_data': [],      # 无数据
        'no_price': [],     # 价格缺失
        'no_match': [],     # 无法撮合
        'calc_error': [],   # 计算错误
        'incorrect': []     # 计算结果不正确
    }
    
    for i in range(0, len(all_codes), BATCH_SIZE):
        batch_codes = all_codes[i:i+BATCH_SIZE]
        
        # 批量获取数据
        orders_dict = get_orders_batch(batch_codes, date)
        price_dict = get_real_prices_batch(batch_codes, date)
        
        # 并行处理
        batch_results = []
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            futures = []
            for code in batch_codes:
                orders_df = orders_dict.get(code, pd.DataFrame())
                future = executor.submit(process_stock, code, orders_df, cancel_dict, price_dict)
                futures.append(future)
            
            for future in as_completed(futures):
                batch_results.append(future.result())
        
        # 统计结果
        for code, calc_open, real_open, volume, error in batch_results:
            if error:
                if "无数据" in error:
                    error_stocks['no_data'].append(code)
                elif "买卖盘不全" in error:
                    error_stocks['no_match'].append(code)
                else:
                    error_stocks['calc_error'].append(code)
                continue
            
            if real_open is None:
                error_stocks['no_price'].append(code)
                continue
            
            if calc_open is None:
                error_stocks['no_match'].append(code)
                continue
            
            is_correct = abs(calc_open - real_open) < 0.001
            results.append({
                'code': code,
                'calc_open': calc_open,
                'real_open': real_open,
                'volume': volume,
                'is_correct': is_correct,
                'error': calc_open - real_open if calc_open else None
            })
            
            if not is_correct:
                error_stocks['incorrect'].append(code)
    
    total_time = time.time() - start_time
    correct_count = sum(1 for r in results if r['is_correct'])
    
    # 返回统计信息
    return {
        'total_time': total_time,
        'total_stocks': len(all_codes),
        'success_stocks': len(results),
        'correct_count': correct_count,
        'error_stocks': error_stocks,
        'accuracy': correct_count / max(1, len(results)) * 100,
        'avg_time_per_stock': total_time / len(all_codes) * 1000 if all_codes else 0
    }

# 7. 主函数 - 运行多次循环
def main_with_loops(date='2025-03-03', loop_count=LOOP_COUNT):
    """运行多次循环，统计性能数据"""
    print(f"开始处理 {date} 的数据，共运行 {loop_count} 次...")
    
    # 存储每次循环的结果
    all_stats = []
    all_error_stocks = {
        'no_data': [],
        'no_price': [],
        'no_match': [],
        'calc_error': [],
        'incorrect': []
    }
    
    for i in range(loop_count):
        print(f"\n第 {i+1}/{loop_count} 次运行...")
        loop_start = time.time()
        
        stats = run_single_iteration(date)
        stats['loop_num'] = i + 1
        all_stats.append(stats)
        
        # 累计错误股票
        for error_type in all_error_stocks.keys():
            all_error_stocks[error_type].extend(stats['error_stocks'][error_type])
        
        loop_time = time.time() - loop_start
        print(f"第 {i+1} 次完成，耗时: {loop_time:.2f}秒，正确率: {stats['accuracy']:.1f}%")
    
    # 计算统计指标
    total_times = [s['total_time'] for s in all_stats]
    accuracies = [s['accuracy'] for s in all_stats]
    
    stats_summary = {
        'date': date,
        'loop_count': loop_count,
        'total_time_mean': statistics.mean(total_times),
        'total_time_std': statistics.stdev(total_times) if len(total_times) > 1 else 0,
        'total_time_min': min(total_times),
        'total_time_max': max(total_times),
        'accuracy_mean': statistics.mean(accuracies),
        'accuracy_std': statistics.stdev(accuracies) if len(accuracies) > 1 else 0,
        'accuracy_min': min(accuracies),
        'accuracy_max': max(accuracies),
        'avg_stocks_per_loop': statistics.mean([s['total_stocks'] for s in all_stats]),
        'avg_success_per_loop': statistics.mean([s['success_stocks'] for s in all_stats]),
        'avg_correct_per_loop': statistics.mean([s['correct_count'] for s in all_stats])
    }
    
    # 去重错误股票
    for error_type in all_error_stocks.keys():
        all_error_stocks[error_type] = list(set(all_error_stocks[error_type]))
        stats_summary[f'error_{error_type}_count'] = len(all_error_stocks[error_type])
    
    # 创建详细的结果表格
    detailed_df = pd.DataFrame(all_stats)
    
    # 添加序号列
    detailed_df.insert(0, 'iteration', range(1, len(detailed_df) + 1))
    
    # 创建汇总表格
    summary_df = pd.DataFrame([stats_summary])
    
    # 创建错误股票表格
    error_dfs = {}
    for error_type, codes in all_error_stocks.items():
        if codes:
            error_dfs[error_type] = pd.DataFrame({
                'error_type': error_type,
                'code': codes,
                'count': len(codes)
            })
    
    # 保存到文件
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir = f"performance_results_{timestamp}"
    os.makedirs(output_dir, exist_ok=True)
    
    # 保存详细结果
    detailed_file = os.path.join(output_dir, f"detailed_results_{timestamp}.csv")
    detailed_df.to_csv(detailed_file, index=False, encoding='utf-8-sig')
    
    # 保存汇总结果
    summary_file = os.path.join(output_dir, f"summary_{timestamp}.csv")
    summary_df.to_csv(summary_file, index=False, encoding='utf-8-sig')
    
    # 保存错误股票
    if any(len(codes) > 0 for codes in all_error_stocks.values()):
        error_file = os.path.join(output_dir, f"error_stocks_{timestamp}.csv")
        all_error_df = pd.concat(error_dfs.values(), ignore_index=True) if error_dfs else pd.DataFrame()
        all_error_df.to_csv(error_file, index=False, encoding='utf-8-sig')
    
    # 保存JSON格式的完整统计
    json_file = os.path.join(output_dir, f"full_stats_{timestamp}.json")
    full_stats = {
        'summary': stats_summary,
        'error_stocks': all_error_stocks,
        'all_iterations': all_stats
    }
    with open(json_file, 'w', encoding='utf-8') as f:
        json.dump(full_stats, f, ensure_ascii=False, indent=2)
    
    # 输出统计结果
    print(f"\n{'='*60}")
    print("性能统计汇总:")
    print(f"{'='*60}")
    print(f"运行日期: {date}")
    print(f"循环次数: {loop_count}")
    print(f"\n时间统计 (秒):")
    print(f"  平均值: {stats_summary['total_time_mean']:.2f}")
    print(f"  标准差: {stats_summary['total_time_std']:.2f}")
    print(f"  最小值: {stats_summary['total_time_min']:.2f}")
    print(f"  最大值: {stats_summary['total_time_max']:.2f}")
    
    print(f"\n正确率统计 (%):")
    print(f"  平均值: {stats_summary['accuracy_mean']:.1f}")
    print(f"  标准差: {stats_summary['accuracy_std']:.1f}")
    print(f"  最小值: {stats_summary['accuracy_min']:.1f}")
    print(f"  最大值: {stats_summary['accuracy_max']:.1f}")
    
    print(f"\n股票处理统计:")
    print(f"  平均每轮股票数: {stats_summary['avg_stocks_per_loop']:.0f}")
    print(f"  平均成功处理: {stats_summary['avg_success_per_loop']:.0f}")
    print(f"  平均正确计算: {stats_summary['avg_correct_per_loop']:.0f}")
    
    print(f"\n错误股票统计:")
    for error_type in ['no_data', 'no_price', 'no_match', 'calc_error', 'incorrect']:
        count = stats_summary.get(f'error_{error_type}_count', 0)
        if count > 0:
            print(f"  {error_type}: {count} 只")
    
    print(f"\n文件保存:")
    print(f"  详细结果: {detailed_file}")
    print(f"  汇总统计: {summary_file}")
    if any(len(codes) > 0 for codes in all_error_stocks.values()):
        print(f"  错误股票: {error_file}")
    print(f"  完整统计: {json_file}")
    print(f"{'='*60}")
    
    return {
        'detailed_df': detailed_df,
        'summary_df': summary_df,
        'error_dfs': error_dfs,
        'output_dir': output_dir
    }

# 运行
if __name__ == "__main__":
    # 可以调整循环次数
    result = main_with_loops('2025-03-03', loop_count=LOOP_COUNT)
    
    # 显示前几次的详细结果
    print("\n前5次运行详细结果:")
    print(result['detailed_df'].head(5).to_string())

开始处理 2025-03-03 的数据，共运行 50 次...

第 1/50 次运行...
撤单数据: 2634只股票
第 1 次完成，耗时: 49.44秒，正确率: 95.3%

第 2/50 次运行...
第 2 次完成，耗时: 46.65秒，正确率: 95.3%

第 3/50 次运行...
第 3 次完成，耗时: 47.95秒，正确率: 95.3%

第 4/50 次运行...
第 4 次完成，耗时: 47.38秒，正确率: 95.3%

第 5/50 次运行...
第 5 次完成，耗时: 44.63秒，正确率: 95.3%

第 6/50 次运行...
第 6 次完成，耗时: 45.38秒，正确率: 95.3%

第 7/50 次运行...
第 7 次完成，耗时: 46.02秒，正确率: 95.3%

第 8/50 次运行...
第 8 次完成，耗时: 45.98秒，正确率: 95.3%

第 9/50 次运行...
第 9 次完成，耗时: 44.01秒，正确率: 95.3%

第 10/50 次运行...
第 10 次完成，耗时: 48.00秒，正确率: 95.3%

第 11/50 次运行...
第 11 次完成，耗时: 46.02秒，正确率: 95.3%

第 12/50 次运行...
第 12 次完成，耗时: 45.98秒，正确率: 95.3%

第 13/50 次运行...
第 13 次完成，耗时: 46.64秒，正确率: 95.3%

第 14/50 次运行...
第 14 次完成，耗时: 44.06秒，正确率: 95.3%

第 15/50 次运行...
第 15 次完成，耗时: 49.33秒，正确率: 95.3%

第 16/50 次运行...
第 16 次完成，耗时: 46.75秒，正确率: 95.3%

第 17/50 次运行...
第 17 次完成，耗时: 47.98秒，正确率: 95.3%

第 18/50 次运行...
第 18 次完成，耗时: 45.97秒，正确率: 95.3%

第 19/50 次运行...
第 19 次完成，耗时: 47.37秒，正确率: 95.3%

第 20/50 次运行...
第 20 次完成，耗时: 49.73秒，正确率: 95.3%

第 21/50 次运行...
第 21 次完成，耗时: 46.