In [None]:
import os
import glob
import pandas as pd
import sqlite3
import datetime
import re
import time
import signal
import sys
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

# 配置参数
DATABASE_PATH = 'binance_futures_data.db'
DATA_ROOT = r'\\znas\Main\futures'
MAX_WORKERS = 32  # 减少线程数以减轻数据库争用
BATCH_SIZE = 5000  # 每500个文件显示一次详细进度

# 重试参数
MAX_RETRIES = 5   # 最大重试次数
RETRY_DELAY = 1   # 基础重试延迟秒数
LOCK_WAIT_TIMEOUT = 20  # 锁等待超时秒数

# 批量操作参数
PRAGMA_SETTINGS = {
    'journal_mode': 'WAL',      # 写入提前日志模式
    'synchronous': 'NORMAL',    # 减少同步操作
    'cache_size': 500000,        # 缓存大小(约50MB)
    'temp_store': 'MEMORY',     # 临时存储使用内存
    'busy_timeout': 30000,      # 繁忙等待超时(毫秒)
    'locking_mode': 'NORMAL',   # 标准锁定模式
}

# 全局变量跟踪已处理文件
processed_files = set()
is_interrupted = False

def signal_handler(sig, frame):
    """处理Ctrl+C中断"""
    global is_interrupted
    print("\n程序接收到中断信号，正在保存进度并安全退出...")
    is_interrupted = True

# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

def optimize_connection(conn):
    """优化SQLite连接参数以提高性能"""
    cursor = conn.cursor()
    for setting, value in PRAGMA_SETTINGS.items():
        cursor.execute(f"PRAGMA {setting} = {value}")
    conn.commit()

def get_connection_with_retry(max_retries=MAX_RETRIES):
    """创建数据库连接，支持重试机制"""
    for retry in range(max_retries):
        try:
            conn = sqlite3.connect(DATABASE_PATH, check_same_thread=False, timeout=LOCK_WAIT_TIMEOUT)
            optimize_connection(conn)
            return conn
        except sqlite3.Error as e:
            if retry < max_retries - 1:
                delay = RETRY_DELAY * (2 ** retry) + random.uniform(0, 1)  # 指数退避策略
                print(f"连接数据库失败: {str(e)}，{delay:.2f}秒后重试... ({retry+1}/{max_retries})")
                time.sleep(delay)
            else:
                raise
    
    raise sqlite3.Error("无法连接数据库，已达到最大重试次数")

def execute_with_retry(cursor, sql, params=None, max_retries=MAX_RETRIES):
    """执行SQL语句，支持重试机制"""
    for retry in range(max_retries):
        try:
            if params:
                return cursor.execute(sql, params)
            else:
                return cursor.execute(sql)
        except sqlite3.OperationalError as e:
            if "database is locked" in str(e) and retry < max_retries - 1:
                delay = RETRY_DELAY * (2 ** retry) + random.uniform(0, 1)
                print(f"数据库锁定: {str(e)}，{delay:.2f}秒后重试... ({retry+1}/{max_retries})")
                time.sleep(delay)
            else:
                raise

def executemany_with_retry(cursor, sql, params_list, max_retries=MAX_RETRIES):
    """执行批量SQL语句，支持重试机制"""
    for retry in range(max_retries):
        try:
            return cursor.executemany(sql, params_list)
        except sqlite3.OperationalError as e:
            if "database is locked" in str(e) and retry < max_retries - 1:
                delay = RETRY_DELAY * (2 ** retry) + random.uniform(0, 1)
                print(f"数据库锁定: {str(e)}，{delay:.2f}秒后重试... ({retry+1}/{max_retries})")
                time.sleep(delay)
            else:
                raise

def create_database():
    """创建SQLite数据库和表结构"""
    conn = get_connection_with_retry()
    cursor = conn.cursor()
    
    # 创建主表存储价格数据
    execute_with_retry(cursor, '''
    CREATE TABLE IF NOT EXISTS price_data (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        symbol_id INTEGER,
        datetime TEXT,
        open REAL,
        high REAL,
        low REAL,
        close REAL,
        volume REAL,
        file_path TEXT,  -- 添加文件路径以便去重
        FOREIGN KEY (symbol_id) REFERENCES symbols(id)
    )
    ''')
    
    # 创建符号表存储交易对信息
    execute_with_retry(cursor, '''
    CREATE TABLE IF NOT EXISTS symbols (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        symbol TEXT,
        quote_asset TEXT,
        timeframe TEXT,
        UNIQUE(symbol, quote_asset, timeframe)
    )
    ''')
    
    # 创建已处理文件表
    execute_with_retry(cursor, '''
    CREATE TABLE IF NOT EXISTS processed_files (
        file_path TEXT PRIMARY KEY,
        processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''')
    
    # 创建索引以加快查询速度
    execute_with_retry(cursor, 'CREATE INDEX IF NOT EXISTS idx_price_data_symbol_datetime ON price_data (symbol_id, datetime)')
    execute_with_retry(cursor, 'CREATE INDEX IF NOT EXISTS idx_symbols_symbol ON symbols (symbol)')
    execute_with_retry(cursor, 'CREATE INDEX IF NOT EXISTS idx_price_data_file_path ON price_data (file_path)')
    
    conn.commit()
    conn.close()

def load_processed_files():
    """从数据库加载已处理的文件列表"""
    global processed_files
    
    # 先检查数据库是否存在
    if not os.path.exists(DATABASE_PATH):
        return
    
    conn = get_connection_with_retry()
    cursor = conn.cursor()
    
    # 检查表是否存在
    execute_with_retry(cursor, "SELECT name FROM sqlite_master WHERE type='table' AND name='processed_files'")
    if cursor.fetchone() is None:
        conn.close()
        return
    
    # 加载已处理文件列表
    execute_with_retry(cursor, "SELECT file_path FROM processed_files")
    for row in cursor.fetchall():
        processed_files.add(row[0])
    
    conn.close()
    print(f"从数据库中加载了 {len(processed_files)} 个已处理文件记录")

def mark_file_as_processed(conn, file_path):
    """将文件标记为已处理"""
    global processed_files
    
    # 添加到已处理集合
    processed_files.add(file_path)
    
    # 添加到数据库
    cursor = conn.cursor()
    execute_with_retry(cursor, "INSERT OR REPLACE INTO processed_files (file_path) VALUES (?)", (file_path,))
    conn.commit()

def get_or_create_symbol_id(conn, symbol, quote_asset, timeframe):
    """获取或创建交易对ID"""
    cursor = conn.cursor()
    execute_with_retry(cursor,
        'SELECT id FROM symbols WHERE symbol = ? AND quote_asset = ? AND timeframe = ?',
        (symbol, quote_asset, timeframe)
    )
    result = cursor.fetchone()
    
    if result:
        return result[0]
    else:
        execute_with_retry(cursor,
            'INSERT INTO symbols (symbol, quote_asset, timeframe) VALUES (?, ?, ?)',
            (symbol, quote_asset, timeframe)
        )
        conn.commit()
        return cursor.lastrowid

def extract_file_info(filename):
    """从文件名中提取交易对、引用资产和时间帧信息"""
    # 提取基本文件名，不包括路径
    base_filename = os.path.basename(filename)
    
    # 使用正则表达式提取信息
    match = re.match(r'(\d{4}-\d{2}-\d{2})_(.+)_([A-Z]+)_([a-zA-Z0-9]+)\.csv', base_filename)
    if match:
        date, symbol, quote_asset, timeframe = match.groups()
        return {
            'date': date,
            'symbol': symbol,
            'quote_asset': quote_asset,
            'timeframe': timeframe
        }
    return None

def process_csv_file(file_path, file_index, total_files):
    """处理单个CSV文件并将数据存储到数据库"""
    global is_interrupted
    
    # 检查是否被中断
    if is_interrupted:
        return {"processed": 0, "file_path": file_path, "status": "中断"}
    
    # 检查文件是否已处理
    if file_path in processed_files:
        return {"processed": 0, "file_path": file_path, "status": "已跳过(已处理)"}
    
    file_info = extract_file_info(file_path)
    if not file_info:
        return {"processed": 0, "file_path": file_path, "status": "无法解析文件名"}
    
    # 添加随机延迟以减少冲突
    time.sleep(random.uniform(0.05, 0.2))
    
    for retry in range(MAX_RETRIES):
        try:
            # 创建线程独立的连接
            conn = get_connection_with_retry()
            
            # 检查文件是否已经导入
            cursor = conn.cursor()
            execute_with_retry(cursor, "SELECT COUNT(*) FROM processed_files WHERE file_path = ?", (file_path,))
            if cursor.fetchone()[0] > 0:
                conn.close()
                return {"processed": 0, "file_path": file_path, "status": "已跳过(已存在)"}
            
            # 读取CSV文件
            try:
                df = pd.read_csv(file_path)
            except Exception as e:
                conn.close()
                return {"processed": 0, "file_path": file_path, "status": f"读取CSV失败: {str(e)}"}
            
            # 获取或创建交易对ID
            symbol_id = get_or_create_symbol_id(
                conn, 
                file_info['symbol'], 
                file_info['quote_asset'], 
                file_info['timeframe']
            )
            
            # 准备数据插入
            cursor = conn.cursor()
            
            # 创建数据元组列表用于批量插入
            data_to_insert = []
            for _, row in df.iterrows():
                data_to_insert.append((
                    symbol_id,
                    row['datetime'],
                    row['open'],
                    row['high'],
                    row['low'],
                    row['close'],
                    row['volume'],
                    file_path  # 存储文件路径以便去重
                ))
            
            # 开始批量事务
            execute_with_retry(cursor, 'BEGIN TRANSACTION')
            
            # 批量插入数据
            executemany_with_retry(cursor,
                'INSERT INTO price_data (symbol_id, datetime, open, high, low, close, volume, file_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
                data_to_insert
            )
            
            # 标记文件为已处理
            mark_file_as_processed(conn, file_path)
            
            # 提交事务
            conn.commit()
            conn.close()
            
            # 插入后短暂延迟，让其他线程有机会访问数据库
            time.sleep(random.uniform(0.1, 0.3))
            
            # 显示定期详细进度
            if file_index % 10 == 0:
                progress = (file_index / total_files) * 100
                print(f"\r当前进度: {progress:.2f}% - 正在处理: {file_path}", end="")
            
            return {"processed": len(data_to_insert), "file_path": file_path, "status": "成功"}
            
        except sqlite3.OperationalError as e:
            if 'conn' in locals():
                try:
                    conn.rollback()
                    conn.close()
                except:
                    pass
                
            if "database is locked" in str(e) and retry < MAX_RETRIES - 1:
                delay = RETRY_DELAY * (2 ** retry) + random.uniform(0, 1)
                print(f"处理文件 {file_path} 时数据库锁定，{delay:.2f}秒后重试... ({retry+1}/{MAX_RETRIES})")
                time.sleep(delay)
            else:
                return {"processed": 0, "file_path": file_path, "status": f"处理失败: {str(e)}"}
        except Exception as e:
            if 'conn' in locals():
                try:
                    conn.rollback()
                    conn.close()
                except:
                    pass
            return {"processed": 0, "file_path": file_path, "status": f"处理失败: {str(e)}"}
    
    return {"processed": 0, "file_path": file_path, "status": "处理失败: 超过最大重试次数"}

def get_all_csv_files():
    """获取所有CSV文件路径"""
    all_files = []
    for year_folder in glob.glob(os.path.join(DATA_ROOT, "*")):
        if os.path.isdir(year_folder):
            pattern = os.path.join(year_folder, "*.csv")
            files = glob.glob(pattern)
            all_files.extend(files)
    return all_files

def process_batch(batch_files, batch_num, total_batches, total_files):
    """处理一批文件"""
    results = []
    
    # 每个批次的进度条
    batch_size = len(batch_files)
    with tqdm(total=batch_size, desc=f"批次 {batch_num}/{total_batches}", unit="文件") as pbar:
        for i, file_path in enumerate(batch_files):
            if is_interrupted:
                break
                
            result = process_csv_file(file_path, i, batch_size)
            results.append(result)
            
            # 更新进度条
            pbar.update(1)
            pbar.set_postfix(文件=os.path.basename(file_path)[:20], 状态=result["status"])
    
    # 返回批次统计信息
    successful = sum(1 for r in results if r["status"] == "成功")
    skipped = sum(1 for r in results if "已跳过" in r["status"])
    failed = sum(1 for r in results if "失败" in r["status"])
    
    return {
        "processed_records": sum(r["processed"] for r in results),
        "successful_files": successful,
        "skipped_files": skipped,
        "failed_files": failed,
        "interrupted": is_interrupted
    }

def import_data_parallel():
    """使用多线程并行导入数据"""
    global is_interrupted
    
    # 创建数据库结构
    create_database()
    
    # 加载已处理文件记录
    load_processed_files()
    
    # 获取所有CSV文件
    all_csv_files = get_all_csv_files()
    total_files = len(all_csv_files)
    
    # 过滤掉已处理的文件
    files_to_process = [f for f in all_csv_files if f not in processed_files]
    
    print(f"找到 {total_files} 个CSV文件，其中 {len(processed_files)} 个已处理，需要处理 {len(files_to_process)} 个")
    
    if len(files_to_process) == 0:
        print("所有文件已处理完毕！")
        return
    
    # 记录开始时间
    start_time = datetime.datetime.now()
    
    # 将文件分成批次
    batches = [files_to_process[i:i + BATCH_SIZE] for i in range(0, len(files_to_process), BATCH_SIZE)]
    total_batches = len(batches)
    
    print(f"将处理 {len(files_to_process)} 个文件，分成 {total_batches} 个批次，每批 {BATCH_SIZE} 个文件")
    print(f"使用 {MAX_WORKERS} 个工作线程处理数据")
    
    # 批次级别的统计
    total_processed = 0
    total_successful = 0
    total_skipped = 0
    total_failed = 0
    
    # 处理每个批次
    for batch_num, batch_files in enumerate(batches, 1):
        if is_interrupted:
            break
            
        print(f"\n开始处理批次 {batch_num}/{total_batches}...")
        
        # 处理当前批次
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            futures = []
            for i, file_path in enumerate(batch_files):
                file_idx = (batch_num - 1) * BATCH_SIZE + i
                futures.append(executor.submit(process_csv_file, file_path, file_idx, len(files_to_process)))
            
            # 收集每个文件的结果
            batch_results = []
            for future in tqdm(as_completed(futures), total=len(futures), desc=f"批次 {batch_num}/{total_batches}", unit="文件"):
                result = future.result()
                batch_results.append(result)
                
                # 在进度条中显示最新处理的文件
                tqdm.write(f"已处理: {os.path.basename(result['file_path'])} - {result['status']}")
                
                if is_interrupted:
                    break
        
        # 计算批次统计信息
        successful = sum(1 for r in batch_results if r["status"] == "成功")
        skipped = sum(1 for r in batch_results if "已跳过" in r["status"])
        failed = len(batch_results) - successful - skipped
        processed_records = sum(r["processed"] for r in batch_results)
        
        # 更新总统计
        total_processed += processed_records
        total_successful += successful
        total_skipped += skipped
        total_failed += failed
        
        # 显示批次统计
        print(f"\n批次 {batch_num}/{total_batches} 统计:")
        print(f"- 成功处理: {successful} 个文件")
        print(f"- 跳过的文件: {skipped} 个文件")
        print(f"- 失败的文件: {failed} 个文件")
        print(f"- 导入记录数: {processed_records} 条")
        
        # 显示总体进度
        overall_progress = (batch_num / total_batches) * 100
        print(f"\n总体进度: {overall_progress:.2f}% 已完成")
        
        # 计算并显示预计剩余时间
        elapsed = (datetime.datetime.now() - start_time).total_seconds()
        if batch_num > 0:
            avg_time_per_batch = elapsed / batch_num
            remaining_batches = total_batches - batch_num
            eta_seconds = avg_time_per_batch * remaining_batches
            eta = datetime.timedelta(seconds=int(eta_seconds))
            print(f"预计剩余时间: {eta}")
        
        # 批次间暂停一下，减少数据库压力
        time.sleep(1)
    
    # 计算总耗时
    end_time = datetime.datetime.now()
    duration = (end_time - start_time).total_seconds()
    
    # 在所有数据导入完成后优化数据库
    if not is_interrupted:
        print("\n正在优化数据库...")
        conn = get_connection_with_retry()
        cursor = conn.cursor()
        execute_with_retry(cursor, 'PRAGMA optimize')
        conn.close()
    
    # 显示最终统计
    print("\n导入任务完成!")
    print(f"总耗时: {datetime.timedelta(seconds=int(duration))}")
    print(f"总导入记录: {total_processed}")
    print(f"成功处理文件: {total_successful}")
    print(f"跳过的文件: {total_skipped}")
    print(f"失败的文件: {total_failed}")
    
    if is_interrupted:
        print("\n注意: 程序被中断，未完成所有文件的处理。")
        print("您可以随时重新运行程序继续处理剩余文件。")

def query_data_example():
    """示例查询函数"""
    conn = get_connection_with_retry()
    cursor = conn.cursor()
    
    # 创建一个便于查询的视图
    execute_with_retry(cursor, '''
    CREATE VIEW IF NOT EXISTS vw_price_data AS
    SELECT 
        s.symbol, 
        s.quote_asset, 
        s.timeframe, 
        p.datetime, 
        p.open, 
        p.high, 
        p.low, 
        p.close, 
        p.volume
    FROM price_data p
    JOIN symbols s ON p.symbol_id = s.id
    ''')
    
    # 示例1: 查询特定交易对在特定日期的数据
    print("\n示例查询1: 特定交易对在特定日期的数据")
    query1 = '''
    SELECT datetime, open, high, low, close, volume
    FROM vw_price_data
    WHERE symbol = '1000PEPEUSDT' AND quote_asset = 'USDT' AND timeframe = '1m'
    AND datetime LIKE '2024-01-29%'
    LIMIT 5
    '''
    
    try:
        execute_with_retry(cursor, query1)
        column_names = [description[0] for description in cursor.description]
        rows = cursor.fetchall()
        df1 = pd.DataFrame(rows, columns=column_names)
        print(df1)
    except Exception as e:
        print(f"查询1执行失败: {str(e)}")
    
    conn.close()


In [None]:

if __name__ == "__main__":
    try:
        # 导入所有数据
        import_data_parallel()
        
        # 如果没有中断，运行示例查询
        if not is_interrupted:
            query_data_example()
            
    except Exception as e:
        print(f"程序执行出错: {str(e)}")
        sys.exit(1)