In [1]:
# Cell 1: 导入必要的库
import os
import pandas as pd
from datetime import datetime
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import threading
import time
import traceback
import gc
import psutil

def process_file(args):
    """
    处理单个文件
    """
    try:
        file_path, folder_path = args
        full_path = os.path.join(folder_path, file_path)
        
        if not os.path.exists(full_path):
            return f"文件不存在: {full_path}"
            
        # 快速检查表头
        try:
            with open(full_path, 'r', encoding='utf-8') as f:
                header = f.readline().strip()
        except UnicodeDecodeError:
            with open(full_path, 'r', encoding='gbk') as f:
                header = f.readline().strip()
        
        # 如果不需要更新，直接跳过
        if 'candle_begin_time' not in header:
            return f"跳过: {full_path}"
            
        # 分块读取和处理文件
        temp_path = full_path + '.tmp'
        chunk_size = 1024 * 1024  # 1MB chunks
        
        with open(full_path, 'rb') as f_in, open(temp_path, 'wb') as f_out:
            # 写入新表头
            new_header = header.replace('candle_begin_time', 'datetime') + '\n'
            f_out.write(new_header.encode('utf-8'))
            
            # 跳过原文件的表头行
            f_in.readline()
            
            # 分块复制剩余内容
            while True:
                chunk = f_in.read(chunk_size)
                if not chunk:
                    break
                f_out.write(chunk)
                
        # 替换原文件
        os.replace(temp_path, full_path)
        
        return f"已更新: {full_path}"
        
    except Exception as e:
        return f"错误 {full_path if 'full_path' in locals() else '未知文件'}: {str(e)}"
    finally:
        # 强制清理内存
        gc.collect()

def get_memory_usage():
    """
    获取当前进程的内存使用情况
    """
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024  # 转换为MB

def update_csv_headers_threaded(base_path):
    """
    使用多线程处理文件
    """
    print(f"[{time.strftime('%H:%M:%S')}] 开始收集文件列表...")
    
    # 获取所有文件路径
    all_files = []
    for date_folder in os.listdir(base_path):
        folder_path = os.path.join(base_path, date_folder)
        if os.path.isdir(folder_path):
            try:
                csv_files = [(f, folder_path) for f in os.listdir(folder_path) if f.endswith('.csv')]
                all_files.extend(csv_files)
                print(f"[{time.strftime('%H:%M:%S')}] 已添加文件夹 {date_folder}: {len(csv_files)} 个文件")
            except Exception as e:
                print(f"[{time.strftime('%H:%M:%S')}] 读取文件夹 {date_folder} 时出错: {e}")
    
    total_files = len(all_files)
    print(f"\n[{time.strftime('%H:%M:%S')}] 总文件数: {total_files}")
    
    # 创建计数器和内存监控
    processed_count = 0
    lock = threading.Lock()
    max_memory = 0
    
    def update_progress(future):
        nonlocal processed_count, max_memory
        with lock:
            processed_count += 1
            current_memory = get_memory_usage()
            max_memory = max(max_memory, current_memory)
            
            if processed_count % 10 == 0:
                elapsed_time = time.time() - start_time
                speed = processed_count / elapsed_time
                remaining = (total_files - processed_count) / speed if speed > 0 else 0
                print(f"[{time.strftime('%H:%M:%S')}] 进度: {processed_count}/{total_files} "
                      f"({processed_count/total_files*100:.2f}%) "
                      f"速度: {speed:.2f} 文件/秒 "
                      f"预计剩余时间: {remaining/60:.2f} 分钟 "
                      f"内存使用: {current_memory:.1f}MB")
                
                # 如果内存使用过高，强制进行垃圾回收
                if current_memory > 1000:  # 超过1GB时
                    gc.collect()
    
    start_time = time.time()
    
    # 使用线程池处理文件
    with ThreadPoolExecutor(max_workers=8) as executor:  # 减少线程数
        # 分批提交任务
        batch_size = 100
        for i in range(0, len(all_files), batch_size):
            batch = all_files[i:i+batch_size]
            
            # 提交批次任务
            future_to_file = {executor.submit(process_file, file_info): file_info 
                            for file_info in batch}
            
            # 添加回调
            for future in future_to_file:
                future.add_done_callback(update_progress)
            
            # 获取结果
            for future in future_to_file:
                try:
                    result = future.result()
                    if result.startswith(("已更新", "错误")):
                        print(f"[{time.strftime('%H:%M:%S')}] {result}")
                except Exception as e:
                    print(f"[{time.strftime('%H:%M:%S')}] 处理出错: {e}")
            
            # 批次处理完成后强制清理内存
            gc.collect()
    
    total_time = time.time() - start_time
    print(f"\n[{time.strftime('%H:%M:%S')}] 处理完成！")
    print(f"[{time.strftime('%H:%M:%S')}] 总处理文件数: {processed_count}")
    print(f"[{time.strftime('%H:%M:%S')}] 总用时: {total_time:.2f}秒")
    print(f"[{time.strftime('%H:%M:%S')}] 平均速度: {processed_count/total_time:.2f} 文件/秒")
    print(f"[{time.strftime('%H:%M:%S')}] 最大内存使用: {max_memory:.1f}MB")

# 执行处理
base_path = r'\\znas\Main\spot'
update_csv_headers_threaded(base_path)

In [2]:
# Cell 2: 定义配置
TIME_INTERVAL = '1m'  # 添加时间间隔配置
COLORS = {
    0: 'red',     # 文件不存在
    1: 'green',   # 数据完整
    2: 'yellow',  # 有时间断层
    3: 'orange',  # 数据点不足
    4: 'purple'   # 处理错误
}

In [9]:
def check_spot_data_integrity(base_path, symbols, start_date, end_date):
    """
    检查现货数据完整性
    
    参数:
    base_path (str): 数据根目录
    symbols (list): 交易对列表
    start_date (str): 起始日期 (YYYY-MM-DD)
    end_date (str): 结束日期 (YYYY-MM-DD)
    
    返回:
    DataFrame: 数据完整性检查结果
    """
    # 生成日期范围
    date_range = pd.date_range(start=start_date, end=end_date, freq='D')
    date_list = [str(date.date()) for date in date_range]
    
    # 创建结果DataFrame
    results = pd.DataFrame(index=date_list, columns=symbols)
    results = results.fillna(0)  # 默认填充0，表示数据不存在
    
    # 遍历每个交易对和日期
    for symbol in symbols:
        for date in date_list:
            file_path = os.path.join(
                base_path, 
                'data', 
                'spot',
                date,
                f"{date}_{symbol.replace('/', '')}_{TIME_INTERVAL}.csv"  # 使用定义的时间间隔
            )
            
            if not os.path.exists(file_path):
                print(f"缺失文件: {file_path}")
                continue
                
            try:
                # 读取数据
                df = pd.read_csv(file_path)
                
                # 基本数据检查
                if df.empty:
                    print(f"空文件: {file_path}")
                    continue
                    
                # 检查必要列
                required_cols = ['datetime', 'open', 'high', 'low', 'close', 'volume']
                if not all(col in df.columns for col in required_cols):
                    print(f"列缺失: {file_path}")
                    continue
                
                # 转换时间列
                df['datetime'] = pd.to_datetime(df['datetime'])
                
                # 检查时间间隔（对于1分钟数据）
                time_diffs = df['datetime'].diff()
                expected_diff = pd.Timedelta(minutes=1)
                
                # 检查是否有时间断层
                gaps = time_diffs[time_diffs > expected_diff]
                if not gaps.empty:
                    print(f"发现时间断层 {file_path}:")
                    for idx in gaps.index:
                        gap_start = df['datetime'][idx-1]
                        gap_end = df['datetime'][idx]
                        print(f"  断层: {gap_start} 到 {gap_end}")
                    results.loc[date, symbol] = 2  # 2表示有时间断层
                else:
                    # 检查数据点数量（一天应该有1440个1分钟数据）
                    expected_points = 1440
                    if len(df) < expected_points:
                        print(f"数据点不足: {file_path}, 实际: {len(df)}, 应有: {expected_points}")
                        results.loc[date, symbol] = 3  # 3表示数据点不足
                    else:
                        results.loc[date, symbol] = 1  # 1表示数据完整
                        
            except Exception as e:
                print(f"处理文件出错 {file_path}: {str(e)}")
                results.loc[date, symbol] = 4  # 4表示处理错误
    
    # 保存检查结果
    result_path = os.path.join(base_path, 'data_integrity_check.xlsx')
    
    # 创建一个样式化的Excel文件
    def color_cells(val):
        colors = {
            0: 'background-color: red',     # 文件不存在
            1: 'background-color: green',   # 数据完整
            2: 'background-color: yellow',  # 有时间断层
            3: 'background-color: orange',  # 数据点不足
            4: 'background-color: purple'   # 处理错误
        }
        return colors.get(val, '')
    
    # 保存带样式的结果
    styled_results = results.style.applymap(color_cells)
    styled_results.to_excel(result_path)
    
    print(f"\n检查结果已保存至: {result_path}")
    print("\n状态说明:")
    print("0 (红色) - 文件不存在")
    print("1 (绿色) - 数据完整")
    print("2 (黄色) - 有时间断层")
    print("3 (橙色) - 数据点不足")
    print("4 (紫色) - 处理错误")
    
    return results

In [10]:
# Cell 4: 运行示例
if __name__ == '__main__':
    # 配置参数
    params = {
        'base_path': 'D:/CryptoData',  # 修改为正确的路径
        'begin_date': '2024-01-01',
        'end_date': '2024-01-31'
    }
    
    target_symbols = [
        'BTC/USDT',
        'ETH/USDT',
        'BNB/USDT'
    ]
    
    # 运行检查
    results = check_spot_data_integrity(
        base_path=params['base_path'],
        symbols=target_symbols,
        start_date=params['begin_date'],
        end_date=params['end_date']
    )

缺失文件: D:/CryptoData\data\spot\2024-01-08\2024-01-08_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-09\2024-01-09_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-10\2024-01-10_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-11\2024-01-11_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-12\2024-01-12_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-13\2024-01-13_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-14\2024-01-14_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-15\2024-01-15_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-16\2024-01-16_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-17\2024-01-17_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-18\2024-01-18_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-19\2024-01-19_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-20\2024-01-20_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-21\2024-01-21_BTCUSDT_1m.csv
缺失文件: D:/CryptoData\data\spot\2024-01-22\2024-01-22_BTCUSDT_1m

  results = results.fillna(0)  # 默认填充0，表示数据不存在
  styled_results = results.style.applymap(color_cells)
