![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [7]:

from AlgorithmImports import *
from datetime import date
import pandas as pd

from orchestrator import run
from footprint_storage import get_year_file_path

qb = QuantBook()
symbol = qb.add_future(Futures.Indices.NASDAQ_100_E_MINI, Resolution.SECOND).symbol
# Futures.Indices.NASDAQ_100_E_MINI SP_500_E_MINI Futures.Metals.Gold

year=2022
start = date(year, 1, 1)
end = date(year, 12, 31)

v_unit = 1000  # minimal volume unit per bar

sec = qb.Securities[symbol]
tick_size = sec.SymbolProperties.MinimumPriceVariation
data_root="/QuantConnect/research-cloud/airlock/footprint_data"
run(qb=qb, symbol=symbol, start_date=start, end_date=end, v_unit=v_unit, tick_size=tick_size, force_recompute=False,data_root=data_root)
print("Done.")


2022-01-02 00:00:00 finished
2022-01-03 00:00:00 finished
2022-01-04 00:00:00 finished
2022-01-05 00:00:00 finished
2022-01-06 00:00:00 finished
2022-01-07 00:00:00 finished
2022-01-09 00:00:00 finished


KeyboardInterrupt: 

In [None]:
# 从我们刚刚创建的文件中导入校验函数
from validator import validate_daily_open


# --- 执行 ---
print(f"开始校验合约: {symbol}")
print(f"日期范围: {start} to {end}")

# ============== Cell 2: 运行校验 ==============
print("\n正在运行校验...")
# data_root 参数可以按需修改，这里使用模块中的默认值 '/LeanCLI/footprint_data'
validation_errors = validate_daily_open(
    qb=qb,
    symbol=symbol,
    start_date=start,
    end_date=end,
    data_root=data_root
)
print("校验完成。")


# ============== Cell 3: 显示结果 ==============
print("\n--- 校验结果 ---")
if not validation_errors:
    print("✅ 校验通过！在指定日期范围内，所有日期的开盘价均在2个tick的容忍误差内。")
else:
    print(f"❌ 校验发现 {len(validation_errors)} 个问题。")
    
    # 将结果转换为 DataFrame 以便清晰展示
    errors_df = pd.DataFrame(validation_errors)
    
    # 计算差异的tick数量，以便更直观地判断
    if "difference" in errors_df.columns and "tick_size" in errors_df.columns:
        # 使用 .loc 避免 SettingWithCopyWarning
        errors_df.loc[:, "difference_in_ticks"] = errors_df["difference"] / errors_df["tick_size"]
    
    # 为了更好的可读性，重新排列一下列的顺序
    cols_order = [
        "date", "status", "daily_open", "daily_open_time", "footprint_open", "footprint_open_time", 
        "difference", "tick_size", "difference_in_ticks", "message"
    ]
    
    # 过滤掉在DataFrame中不存在的列
    existing_cols = [col for col in cols_order if col in errors_df.columns]
    
    print("\n详细信息:")
    with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', 1000):
        print(errors_df[existing_cols].to_string())

In [None]:

# === 第1步：筛选符合正则的文件（支持 max_depth） ===
import os
import re
from pathlib import Path
import json
def find_files(base_dir=".", pattern=None, max_depth=None):
    """
    遍历目录并筛选出符合正则的文件
    :param base_dir: 要遍历的根目录
    :param pattern: 文件名匹配正则表达式
    :param max_depth: 最大遍历深度（None 表示不限制）
    """
    base_path = Path(base_dir).resolve()
    regex = re.compile(pattern) if pattern else None
    matched_files = []

    for root, dirs, files in os.walk(base_path):
        # 计算当前深度
        depth = len(Path(root).relative_to(base_path).parts)
        if max_depth is not None and depth > max_depth:
            dirs[:] = []  # 不再深入
            continue

        for f in files:
            fpath = Path(root) / f
            if regex is None or regex.search(f):
                matched_files.append(fpath)

    print(f"在目录 {base_path} 中找到 {len(matched_files)} 个匹配文件：")
    matched_files = sorted(matched_files, key=lambda x: str(x).lower())

    for f in matched_files:
        print("   ", f)
    print("-" * 60)
    return matched_files


# === 第2步：并行生成 Base64 下载链接 ===
import base64
from concurrent.futures import ThreadPoolExecutor, as_completed
from IPython.display import display, HTML

def generate_base64_link(file_path):
    """
    读取单个文件并生成 Base64 下载链接
    """
    file_name = os.path.basename(file_path)
    with open(file_path, "rb") as f:
        encoded = base64.b64encode(f.read()).decode()
    return f'<a download="{file_path}" href="data:application/zip;base64,{encoded}">下载 {file_name}</a>'

def create_base64_links_parallel(files, max_workers=8):
    """
    并行生成 Base64 下载链接并显示
    """
    if not files:
        return

    print(f" 开始并行生成 {len(files)} 个文件的 Base64 链接...")
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(generate_base64_link, f): f for f in files}

        for future in as_completed(futures):
            file_path = futures[future]
            try:
                link_html = future.result()
                display(HTML(link_html))
            except Exception as e:
                print(f"  生成 {file_path} 链接失败: {e}")

    print(f" 全部完成，共生成 {len(files)} 个链接。")

def split_list(lst, chunk_size):
    """将列表按指定大小分块"""
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]

file_list=[]
file_list += find_files("/QuantConnect/research-cloud/airlock/footprint_data", pattern=rf"{year}.*", max_depth=2)

batches = list(split_list(file_list, 20))

print(f"共计 {len(file_list)} 个匹配文件")

In [None]:
create_base64_links_parallel(batches[0], max_workers=2)