# 5. SQEP 的性能再优化

In [1]:
from clickhouse_driver import Client
import time
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import json
import csv
from datetime import datetime, timedelta
import random
import io
import redis
import argparse

In [2]:
from matplotlib import font_manager
font_path = '/Volumes/share/data/WBQ/temp/SimHei.ttf'  # 替换为SimHei.ttf的实际路径
font_manager.fontManager.addfont(font_path)
plt.rcParams['font.family'] = 'SimHei'

## ClickHouse表结构创建

In [3]:
# ClickHouse配置
CLICKHOUSE_HOST = "localhost"
CLICKHOUSE_PORT = 9000
CLICKHOUSE_DB = "test_data_json_csv"

# 初始化ClickHouse客户端
client = Client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, database=CLICKHOUSE_DB)

In [4]:
# 删除已存在的表（如果有）
client.execute("DROP TABLE IF EXISTS bar_minute_json")
client.execute("DROP TABLE IF EXISTS bar_minute_csv")

[]

In [5]:
# 创建JSON格式表
json_table_query = """
CREATE TABLE bar_minute_json (
    symbol Int32,
    frame DateTime,
    open Float64,
    high Float64,
    low Float64,
    close Float64,
    vol Float64,
    amount Float64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(frame)
ORDER BY (symbol, frame);
"""

In [7]:
# 创建CSV格式表
csv_table_query = """
CREATE TABLE bar_minute_csv (
    symbol Int32,
    frame DateTime,
    open Float64,
    high Float64,
    low Float64,
    close Float64,
    vol Float64,
    amount Float64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(frame)
ORDER BY (symbol, frame);
"""

In [8]:
# 执行创建表操作
client.execute(json_table_query)
client.execute(csv_table_query)
print("表结构创建完成")

表结构创建完成


## 数据生成

In [9]:
# 将股票代码和交易所编码为整数
def encode_symbol(code, exchange):
    """
    Args:
        code: 股票代码，如 '000001'
        exchange: 交易所，'SH'或'SZ'
        
    Returns:
        整型编码的股票代码
    """
    if exchange.upper() == 'SH':
        prefix = '1'
    elif exchange.upper() == 'SZ':
        prefix = '2'
    else:
        raise ValueError(f"不支持的交易所: {exchange}")
        
    return int(prefix + code)

In [11]:
# 生成测试数据
def generate_test_data(num_records, batch_size=10000):
    """
    Args:
        num_records: 要生成的记录数
        batch_size: 每批生成的记录数
        
    Returns:
        生成的数据列表
    """
    # 股票代码列表
    stock_codes = [f"{i:06d}" for i in range(1, 5001)]  # 5000支股票
    exchanges = ['SH', 'SZ']
    
    # 生成交易日期和时间
    start_date = datetime(2023, 1, 1, 9, 30, 0)
    
    # 生成数据
    all_data = []
    start_time = time.time()
    
    for batch in range(0, num_records, batch_size):
        batch_data = []
        current_size = min(batch_size, num_records - batch)
        
        for i in range(current_size):
            # 随机选择股票和交易所
            stock_code = random.choice(stock_codes)
            exchange = random.choice(exchanges)
            symbol = encode_symbol(stock_code, exchange)
            
            # 生成随机时间（在交易时间内）
            days_offset = random.randint(0, 365)
            minutes_offset = random.randint(0, 240)  # 4小时交易时间
            frame = start_date + timedelta(days=days_offset, minutes=minutes_offset)
            
            # 生成OHLC数据
            base_price = random.uniform(10.0, 100.0)
            price_range = base_price * 0.02  # 2%的价格波动
            
            open_price = base_price
            close_price = base_price + random.uniform(-price_range, price_range)
            high_price = max(open_price, close_price) + random.uniform(0, price_range)
            low_price = min(open_price, close_price) - random.uniform(0, price_range)
            
            vol = random.uniform(10000, 1000000)
            amount = vol * ((open_price + close_price) / 2) * random.uniform(0.9, 1.1)
            
            # 创建记录
            record = {
                'symbol': symbol,
                'frame': frame.strftime('%Y-%m-%d %H:%M:%S'),
                'open': round(open_price, 2),
                'high': round(high_price, 2),
                'low': round(low_price, 2),
                'close': round(close_price, 2),
                'vol': round(vol, 2),
                'amount': round(amount, 2)
            }
            
            batch_data.append(record)
        
        all_data.extend(batch_data)
        
        # 打印进度
        if (batch + batch_size) % (num_records // 10) == 0 or batch + batch_size >= num_records:
            progress = min(100, (batch + batch_size) / num_records * 100)
            elapsed = time.time() - start_time
            print(f"生成进度: {progress:.1f}%, 已生成 {len(all_data)} 条记录, 耗时: {elapsed:.2f}秒")
    
    return all_data

In [12]:
# 将数据保存为JSON格式
def save_as_json(data, output_file):
    """
    Args:
        data: 要保存的数据
        output_file: 输出文件路径
    """
    start_time = time.time()
    with open(output_file, 'w', encoding='utf-8') as f:
        for record in data:
            f.write(json.dumps(record) + '\n')
    
    elapsed = time.time() - start_time
    file_size = os.path.getsize(output_file) / (1024 * 1024)  # MB
    print(f"JSON格式保存完成，文件大小: {file_size:.2f} MB, 耗时: {elapsed:.2f}秒")

In [13]:
# 将数据保存为CSV格式
def save_as_csv(data, output_file):
    """
    Args:
        data: 要保存的数据
        output_file: 输出文件路径
    """
    start_time = time.time()
    
    # 获取字段名
    fieldnames = list(data[0].keys())
    
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        # 不写入字段名，因为我们要测试不带key的CSV
        for record in data:
            writer.writerow([record[field] for field in fieldnames])
    
    elapsed = time.time() - start_time
    file_size = os.path.getsize(output_file) / (1024 * 1024)  # MB
    print(f"CSV格式保存完成，文件大小: {file_size:.2f} MB, 耗时: {elapsed:.2f}秒")

In [14]:
# 创建输出目录
os.makedirs("data", exist_ok=True)
    
# 生成测试数据
print("开始生成测试数据...")
test_data = generate_test_data(1000000)  # 生成1亿条记录用于测试
    
# 保存为不同格式
print("\n保存为JSON格式...")
save_as_json(test_data, "data/test_data.json")
    
print("\n保存为CSV格式...")
save_as_csv(test_data, "data/test_data.csv")
    
print("\n数据生成完成")

开始生成测试数据...
生成进度: 10.0%, 已生成 100000 条记录, 耗时: 0.89秒
生成进度: 20.0%, 已生成 200000 条记录, 耗时: 1.91秒
生成进度: 30.0%, 已生成 300000 条记录, 耗时: 3.00秒
生成进度: 40.0%, 已生成 400000 条记录, 耗时: 4.04秒
生成进度: 50.0%, 已生成 500000 条记录, 耗时: 5.11秒
生成进度: 60.0%, 已生成 600000 条记录, 耗时: 6.19秒
生成进度: 70.0%, 已生成 700000 条记录, 耗时: 7.26秒
生成进度: 80.0%, 已生成 800000 条记录, 耗时: 8.31秒
生成进度: 90.0%, 已生成 900000 条记录, 耗时: 9.37秒
生成进度: 100.0%, 已生成 1000000 条记录, 耗时: 10.42秒

保存为JSON格式...
JSON格式保存完成，文件大小: 145.02 MB, 耗时: 5.58秒

保存为CSV格式...
CSV格式保存完成，文件大小: 70.63 MB, 耗时: 4.24秒

数据生成完成


## 数据生产者 —— JSON格式

In [15]:
# 配置参数
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_PASSWORD = "quantide666"  # 添加Redis密码
REDIS_QUEUE_NAME = "bar_minute_json_queue"

In [16]:
# 初始化Redis连接
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True)

In [15]:
def produce_data(redis_client, input_file, batch_size=10000):
    """将JSON格式数据推送到Redis队列
    
    Args:
        input_file: 输入文件路径
        batch_size: 每批处理的记录数
    """
    
    # 清空队列
    redis_client.delete(REDIS_QUEUE_NAME)
    
    start_time = time.time()
    total_records = 0
    batch_records = []
    
    with open(input_file, 'r', encoding='utf-8') as f:
        for line in f:
            record = json.loads(line.strip())
            batch_records.append(record)
            
            if len(batch_records) >= batch_size:
                # 创建数据包
                data_package = {
                    "timestamp": datetime.now().isoformat(),
                    "format": "json",
                    "batch_id": total_records // batch_size + 1,
                    "records": batch_records
                }
                
                # 推送到Redis
                redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(data_package))
                
                total_records += len(batch_records)
                batch_records = []
                
                # 打印进度
                if total_records % (batch_size * 100) == 0:
                    elapsed = time.time() - start_time
                    print(f"已推送 {total_records} 条记录, 耗时: {elapsed:.2f}秒, 速率: {total_records/elapsed:.2f} 条/秒")
    
    # 处理剩余的记录
    if batch_records:
        data_package = {
            "timestamp": datetime.now().isoformat(),
            "format": "json",
            "batch_id": total_records // batch_size + 1,
            "records": batch_records
        }
        redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(data_package))
        total_records += len(batch_records)
    
    total_time = time.time() - start_time
    print(f"\nJSON数据推送完成，共 {total_records} 条记录")
    print(f"总耗时: {total_time:.2f}秒, 平均速率: {total_records/total_time:.2f} 条/秒")

: 

In [None]:
input = 'data/test_data.json'
batch_size = 10000

if not os.path.exists(input):
    print(f"错误: 输入文件 {input} 不存在")
    exit(1)
    
produce_data(redis_client, input, batch_size)

已推送 1000000 条记录, 耗时: 4.81秒, 速率: 207904.87 条/秒
已推送 2000000 条记录, 耗时: 10.09秒, 速率: 198150.78 条/秒
已推送 3000000 条记录, 耗时: 15.08秒, 速率: 198964.02 条/秒
已推送 4000000 条记录, 耗时: 20.16秒, 速率: 198437.68 条/秒
已推送 5000000 条记录, 耗时: 25.53秒, 速率: 195852.91 条/秒
已推送 6000000 条记录, 耗时: 31.14秒, 速率: 192695.91 条/秒
已推送 7000000 条记录, 耗时: 36.50秒, 速率: 191789.71 条/秒
已推送 8000000 条记录, 耗时: 41.90秒, 速率: 190946.45 条/秒
已推送 9000000 条记录, 耗时: 46.96秒, 速率: 191635.71 条/秒
已推送 10000000 条记录, 耗时: 52.07秒, 速率: 192059.43 条/秒
已推送 11000000 条记录, 耗时: 57.34秒, 速率: 191839.01 条/秒
已推送 12000000 条记录, 耗时: 62.86秒, 速率: 190914.00 条/秒
已推送 13000000 条记录, 耗时: 68.67秒, 速率: 189323.47 条/秒
已推送 14000000 条记录, 耗时: 74.21秒, 速率: 188649.72 条/秒
已推送 15000000 条记录, 耗时: 79.49秒, 速率: 188710.85 条/秒
已推送 16000000 条记录, 耗时: 84.90秒, 速率: 188449.27 条/秒
已推送 17000000 条记录, 耗时: 90.54秒, 速率: 187765.46 条/秒
已推送 18000000 条记录, 耗时: 96.75秒, 速率: 186039.79 条/秒
已推送 19000000 条记录, 耗时: 102.34秒, 速率: 185659.40 条/秒
已推送 20000000 条记录, 耗时: 107.82秒, 速率: 185501.58 条/秒
已推送 21000000 条记录, 耗时: 113.29秒, 速率: 185360.03 条/秒

## 数据生产者 —— CSV格式

In [35]:
# 配置参数
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_PASSWORD = "quantide666"  # 添加Redis密码
REDIS_QUEUE_NAME = "bar_minute_csv_queue"

In [36]:
# 初始化Redis连接
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True)

In [37]:
# 将CSV格式数据推送到Redis队列
def produce_data(redis_client,input_file, batch_size=10000):
    """
    Args:
        input_file: 输入文件路径
        batch_size: 每批处理的记录数
    """
    
    # 清空队列
    redis_client.delete(REDIS_QUEUE_NAME)
    
    start_time = time.time()
    total_records = 0
    batch_records = []
    
    # 字段名列表（与data_generator.py中保持一致）
    fieldnames = ['symbol', 'frame', 'open', 'high', 'low', 'close', 'vol', 'amount']
    
    with open(input_file, 'r', encoding='utf-8') as f:
        reader = csv.reader(f)
        for row in reader:
            # CSV不带key，所以我们需要手动映射字段
            record = []
            for i, value in enumerate(row):
                # 根据字段类型进行转换
                if i == 0:  # symbol
                    record.append(int(value))
                elif i == 1:  # frame
                    record.append(value)
                else:  # 数值字段
                    record.append(float(value))
            
            batch_records.append(record)
            
            if len(batch_records) >= batch_size:
                # 创建数据包
                data_package = {
                    "timestamp": datetime.now().isoformat(),
                    "format": "csv",
                    "batch_id": total_records // batch_size + 1,
                    "fieldnames": fieldnames,
                    "records": batch_records
                }
                
                # 推送到Redis
                redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(data_package))
                
                total_records += len(batch_records)
                batch_records = []
                
                # 打印进度
                if total_records % (batch_size * 100) == 0:
                    elapsed = time.time() - start_time
                    print(f"已推送 {total_records} 条记录, 耗时: {elapsed:.2f}秒, 速率: {total_records/elapsed:.2f} 条/秒")
    
    # 处理剩余的记录
    if batch_records:
        data_package = {
            "timestamp": datetime.now().isoformat(),
            "format": "csv",
            "batch_id": total_records // batch_size + 1,
            "fieldnames": fieldnames,
            "records": batch_records
        }
        redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(data_package))
        total_records += len(batch_records)
    
    total_time = time.time() - start_time
    print(f"\nCSV数据推送完成，共 {total_records} 条记录")
    print(f"总耗时: {total_time:.2f}秒, 平均速率: {total_records/total_time:.2f} 条/秒")

In [None]:
input = 'data/test_data.csv'
batch_size = 10000
    
if not os.path.exists(input):
    print(f"错误: 输入文件 {input} 不存在")
    exit(1)
    
produce_data(redis_client, input, batch_size)

## 数据消费者 —— JSON格式

In [43]:
# 配置参数
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_PASSWORD = "quantide666"  # 添加Redis密码
REDIS_QUEUE_NAME = "bar_minute_json_queue"

# 初始化Redis连接
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True)

In [44]:
# ClickHouse配置
CLICKHOUSE_HOST = "localhost"
CLICKHOUSE_PORT = 9000
CLICKHOUSE_DB = "test_data_json_csv"

# 初始化ClickHouse客户端
client = Client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, database=CLICKHOUSE_DB)

In [45]:
# 将JSON格式数据插入到ClickHouse
def insert_to_clickhouse(client, data_package):
    """
    Args:
        client: ClickHouse客户端
        data_package: 数据包
        
    Returns:
        插入的记录数
    """
    records = data_package["records"]
    if not records:
        return 0
    
    # 准备插入数据
    values = []
    for record in records:
        # 转换日期时间格式
        frame = datetime.fromisoformat(record["frame"])
        
        # 准备行数据
        row = (
            record["symbol"],
            frame,
            record["open"],
            record["high"],
            record["low"],
            record["close"],
            record["vol"],
            record["amount"]
        )
        values.append(row)
    
    # 执行插入
    query = """
    INSERT INTO bar_minute_json (
        symbol, frame, open, high, low, close, vol, amount
    ) VALUES
    """
    
    client.execute(query, values)
    return len(values)

In [46]:
# 从Redis队列消费数据并插入到ClickHouse
def consume_data(redis_client, clickhouse_client, timeout=1, max_records=None):
    """
    Args:
        timeout: Redis阻塞超时时间（秒）
        max_records: 最大处理记录数，None表示不限制
    """
    start_time = time.time()
    total_records = 0
    batch_count = 0
    
    print("开始消费JSON格式数据...")
    
    while max_records is None or total_records < max_records:
        try:
            # 阻塞式获取队列数据
            result = redis_client.brpop(REDIS_QUEUE_NAME, timeout=timeout)
            if result is None:
                # 如果没有获取到数据，说明队列为空，退出循环
                print("Redis队列为空，停止消费数据。")
                break
            
            _, json_data = result
            data_package = json.loads(json_data)
            
            # 记录批次开始时间
            batch_start_time = time.time()
            
            # 插入数据
            inserted_count = insert_to_clickhouse(clickhouse_client, data_package)
            total_records += inserted_count
            batch_count += 1
            
            # 计算批次耗时
            batch_time = time.time() - batch_start_time
            
            # 打印进度
            if batch_count % 10 == 0:
                elapsed = time.time() - start_time
                print(f"已处理 {batch_count} 批次, 共 {total_records} 条记录")
                print(f"当前批次耗时: {batch_time:.2f}秒, 速率: {inserted_count/batch_time:.2f} 条/秒")
                print(f"总体耗时: {elapsed:.2f}秒, 平均速率: {total_records/elapsed:.2f} 条/秒")
                
        except Exception as e:
            print(f"数据处理异常: {str(e)}")
            continue
    
    total_time = time.time() - start_time
    print(f"\nJSON数据消费完成，共处理 {batch_count} 批次, {total_records} 条记录")
    print(f"总耗时: {total_time:.2f}秒, 平均速率: {total_records/total_time:.2f} 条/秒")

In [None]:
timeout = 1  # Redis阻塞超时时间（秒）
max_records = None  # 最大处理记录数，None表示不限制

consume_data(timeout, max_records)

## 数据消费者 —— CSV格式

## 测试前的准备

## 性能测试

## 主测试脚本

## 小规模测试（百万级数据）

## 大规模数据生成脚本（亿级数据）

本测试已配置为生成和处理1亿条数据记录，包含5000支不同的股票。主要修改包括：

1. 股票数量从100增加到5000
2. 数据量从100万增加到1亿
3. 批处理大小从1000增加到10000
4. 数据生成批次大小从10000增加到100000

这些修改确保了测试能够更好地模拟真实环境下的大规模数据处理场景，并提供更加严谨和公平的JSON与CSV格式性能对比结果。