In [1]:
# 配置管理器
import os
import json
import logging
from typing import Dict, Any

class ConfigManager:
    """配置管理器"""
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self):
        if not hasattr(self, 'initialized'):
            # 基础路径配置
            self.BASE_DIR = 'D:\\JupyterWork'
            self.LOG_DIR = os.path.join(self.BASE_DIR, 'logs')
            self.MODEL_DIR = os.path.join(self.BASE_DIR, 'models')
            self.DATA_DIR = os.path.join(self.BASE_DIR, 'data')
            self.CHECKPOINT_DIR = os.path.join(self.BASE_DIR, 'checkpoints')
            
            # 创建必要的目录
            for dir_path in [self.LOG_DIR, self.MODEL_DIR, self.DATA_DIR, self.CHECKPOINT_DIR]:
                os.makedirs(dir_path, exist_ok=True)
            
            # 数据库配置
            self.DB_CONFIG: dict = {
                'host': 'localhost',
                'port': 5432,
                'user': 'ace_user'
            }
            
            # 训练配置
            self.TRAINING_CONFIG: dict = {
                'max_epochs': 50,
                'batch_size': 32
            }
            
            # 系统配置
            self.SYSTEM_CONFIG = {
                'memory_limit': 8000,  # MB
                'gpu_memory_limit': 4000,  # MB
                'cleanup_interval': 300,  # seconds
                'log_retention_days': 7,
                'check_interval': 60,  # 检查新期号的间隔
                'AUTO_TUNING': {
                    'enable_per_sample': True,  # 启用逐样本调整
                    'adjustment_steps': 5,      # 每个样本最大调整次数
                    'learning_rate_range': (1e-5, 1e-2)  # 学习率调整范围
                },
                'DATA_CONFIG': {
                    'cache_size': 10000,
                    'normalize_range': (-1, 1)
                },
                'max_sequence_gap': 1,          # 允许的最大期号间隔
                'max_threads': 8,  # 留出4线程给系统
                'base_batch_size': 16,  # 初始批次
                'gpu_mem_limit': 1536,  # MB (保留500MB给系统)
                'cpu_util_threshold': 70,  # CPU使用率阈值
                'SAMPLE_CONFIG': {
                    'input_length': 144000,  # 修改这里
                    'target_length': 2880,   # 修改这里
                    'total_fetch': lambda: (  # 自动计算总获取量
                        self.SYSTEM_CONFIG['SAMPLE_CONFIG']['input_length'] 
                        + self.SYSTEM_CONFIG['SAMPLE_CONFIG']['target_length']
                    )
                }
            }
            
            self.initialized = True
    
    def get_db_config(self) -> Dict[str, str]:
        """获取数据库配置"""
        return self.DB_CONFIG.copy()
    
    def get_training_config(self) -> Dict[str, Any]:
        """获取训练配置"""
        return self.TRAINING_CONFIG.copy()
    
    def get_system_config(self) -> Dict[str, Any]:
        """获取系统配置"""
        return self.SYSTEM_CONFIG.copy()
    
    def update_config(self, config_name: str, updates: Dict[str, Any]) -> bool:
        """更新指定配置"""
        try:
            config = getattr(self, f'{config_name}_CONFIG')
            config.update(updates)
            return True
        except AttributeError:
            logging.error(f"配置 {config_name} 不存在")
            return False
    
    def save_config(self, config_name: str) -> bool:
        """保存配置到文件"""
        try:
            config = getattr(self, f'{config_name}_CONFIG')
            save_path = os.path.join(self.BASE_DIR, 'configs', f'{config_name.lower()}_config.json')
            os.makedirs(os.path.dirname(save_path), exist_ok=True)
            
            with open(save_path, 'w', encoding='utf-8') as f:
                json.dump(config, f, indent=4, ensure_ascii=False)
            return True
        except Exception as e:
            logging.error(f"保存配置失败: {str(e)}")
            return False
    
    def load_config(self, config_name: str) -> bool:
        """从文件加载配置"""
        try:
            load_path = os.path.join(self.BASE_DIR, 'configs', f'{config_name.lower()}_config.json')
            if not os.path.exists(load_path):
                return False
            
            with open(load_path, 'r', encoding='utf-8') as f:
                config = json.load(f)
            
            setattr(self, f'{config_name}_CONFIG', config)
            return True
        except Exception as e:
            logging.error(f"加载配置失败: {str(e)}")
            return False

class ConfigValidator:
    """配置验证器"""
    @staticmethod
    def validate_db_config(config: Dict[str, str]) -> bool:
        """验证数据库配置"""
        required_fields = ['host', 'user', 'password', 'database', 'charset']
        return all(field in config for field in required_fields)
    
    @staticmethod
    def validate_training_config(config: Dict[str, Any]) -> bool:
        """验证训练配置"""
        try:
            assert config['batch_size'] > 0
            assert 0 < config['learning_rate'] < 1
            assert config['epochs'] > 0
            return True
        except (AssertionError, KeyError):
            return False
    
    @staticmethod
    def validate_system_config(config: Dict[str, Any]) -> bool:
        """验证系统配置"""
        try:
            assert config['memory_limit'] > 0
            assert config['gpu_memory_limit'] > 0
            assert config['cleanup_interval'] > 0
            assert config['log_retention_days'] > 0
            return True
        except (AssertionError, KeyError):
            return False

# 创建全局实例
config_instance = ConfigManager()

# 导出常用配置变量
BASE_DIR = config_instance.BASE_DIR
LOG_DIR = config_instance.LOG_DIR
MODEL_DIR = config_instance.MODEL_DIR
DATA_DIR = config_instance.DATA_DIR
CHECKPOINT_DIR = config_instance.CHECKPOINT_DIR

if __name__ == "__main__":
    print("配置验证：")
    print(f"数据库配置: {config_instance.DB_CONFIG}")
    print(f"训练配置: {config_instance.TRAINING_CONFIG}")

配置验证：
数据库配置: {'host': 'localhost', 'port': 5432, 'user': 'ace_user'}
训练配置: {'max_epochs': 50, 'batch_size': 32}


In [None]:
# 数据管理器
import os
import numpy as np
import pandas as pd
import logging
import pymysql
from datetime import datetime, timedelta
import threading
from collections import deque
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from core.database_manager import db_manager  # 导入数据库管理器实例
from core.config_manager import config_instance  # 导入配置管理器实例

# 获取logger实例
logger = logging.getLogger(__name__)

class DataManager:
    """数据管理器 - 单例模式"""
    _instance = None
    
    def __new__(cls, db_config=None):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, db_config=None):
        if not hasattr(self, 'initialized'):
            # 初始化组件
            self.data_pool = DataPool()
            self.data_processor = DataProcessor()
            self.data_validator = DataValidator()
            
            # 数据缓存参数
            self.cache_size = config_instance.SYSTEM_CONFIG['DATA_CONFIG']['cache_size']
            self.batch_size = 32
            self.sequence_length = 14400
            
            # 线程锁
            self.lock = threading.Lock()
            
            # 新增配置项
            self.comparison_dir = os.path.join(config_instance.BASE_DIR, 'comparison')
            os.makedirs(self.comparison_dir, exist_ok=True)
            self.issue_file = os.path.join(self.comparison_dir, 'issue_number.txt')
            
            # 标记初始化完成
            self.initialized = True
            logger.info("数据管理器初始化完成")
            
            # 在DataManager中增加锁
            self.issue_lock = threading.Lock()
            
            # 获取配置参数
            self.normalize_range = config_instance.SYSTEM_CONFIG['DATA_CONFIG']['normalize_range']
    
    def get_training_batch(self, batch_size=None):
        """获取训练批次"""
        try:
            batch_size = batch_size or self.batch_size
            with self.lock:
                return self.data_processor.get_training_batch(
                    self.data_pool.get_latest_data(),
                    batch_size
                )
        except Exception as e:
            logger.error(f"获取训练批次时出错: {str(e)}")
            return None, None
    
    def update_data(self):
        """更新数据"""
        try:
            new_data = self._fetch_new_data()
            if new_data is not None:
                with self.lock:
                    self.data_pool.update_data(new_data)
                logger.info(f"数据更新成功，当前数据量: {len(self.data_pool.data)}")
                return True
            return False
        except Exception as e:
            logger.error(f"更新数据时出错: {str(e)}")
            return False
    
    def _get_next_issue(self, current_issue):
        """计算下一期号"""
        date_str, period = current_issue.split('-')
        date = datetime.strptime(date_str, '%Y%m%d')
        period = int(period)
        
        if period == 1440:
            next_date = date + timedelta(days=1)
            next_period = 1
        else:
            next_date = date
            next_period = period + 1
            
        return f"{next_date.strftime('%Y%m%d')}-{next_period:04d}"

    def _fetch_new_data(self):
        """根据144000+2880的数据需求获取样本"""
        try:
            with self.issue_lock:
                with open(self.issue_file, 'r+') as f:
                    last_issue = f.read().strip()
                    
                    # 使用配置获取总数
                    total = config_instance.SYSTEM_CONFIG['SAMPLE_CONFIG']['total_fetch']()
                    
                    # 构建精确查询
                    query = f"""
                        SELECT date_period, number 
                        FROM admin_tab 
                        WHERE date_period {'>' if last_issue else ''}= '{last_issue}'
                        ORDER BY date_period 
                        LIMIT {total}
                    """
                    
                    records = db_manager.execute_query(query)
                    
                    # 验证数据连续性
                    if not self._validate_sequence(records):
                        raise ValueError("数据存在断层")
                    
                    # 更新期号文件
                    new_last = records[-1]['date_period']
                    f.seek(0)
                    f.write(new_last)
                    
                    return self._process_numbers(records)  # 处理五位号码
                    
        except Exception as e:
            logger.error(f"数据获取失败: {str(e)}")
            return None

    def _process_numbers(self, records):
        """处理五位数字号码"""
        processed = []
        for r in records:
            # 将"00236"转换为[0,0,2,3,6]
            numbers = [int(d) for d in r['number'].zfill(5)]
            processed.append({
                'date_period': r['date_period'],
                'numbers': numbers,
                'time_features': self.time_feature_extractor.extract_features(r['date_period'])
            })
        return processed

    def validate_data(self, data):
        """验证数据有效性"""
        return self.data_validator.validate(data)
    
    def get_data_stats(self):
        """获取数据统计信息"""
        try:
            with self.lock:
                return {
                    'total_samples': len(self.data_pool.data),
                    'cache_size': self.data_pool.get_cache_size(),
                    'last_update': self.data_pool.last_update_time
                }
        except Exception as e:
            logger.error(f"获取数据统计信息时出错: {str(e)}")
            return None

    def get_data_by_period(self, start_period, end_period):
        """获取指定期间的数据"""
        try:
            query = """
                SELECT date_period, number 
                FROM admin_tab 
                WHERE date_period BETWEEN %s AND %s
                ORDER BY date_period
            """
            records = db_manager.execute_query(
                query, 
                params=(start_period, end_period),
                use_cache=True
            )
            
            if not records:
                return None
            
            return self._process_numbers(records)
            
        except Exception as e:
            logger.error(f"获取指定期间数据时出错: {str(e)}")
            return None

# 创建全局实例
data_manager = DataManager()

In [None]:
# 模型集成类
import numpy as np
import tensorflow as tf
import logging
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, LSTM, GRU, Bidirectional, Conv1D
from tensorflow.keras.layers import MultiHeadAttention, LayerNormalization, Add
import os
import json
import threading
from core import config_manager
from optimizers import model_optimizer
from concurrent.futures import ThreadPoolExecutor
import torch

# 获取logger实例
logger = logging.getLogger(__name__)

class ModelEnsemble:
    """模型集成类"""
    
    def __init__(self, sequence_length=14400, feature_dim=5):
        """
        初始化模型集成
        Args:
            sequence_length: 输入序列长度
            feature_dim: 特征维度
        """
        self.sequence_length = sequence_length
        self.feature_dim = feature_dim
        self.prediction_range = 2880  # 预测范围
        self.models = []  # 模型列表
        self.weights = np.ones(6) / 6  # 初始权重平均分配
        
        # 模型性能追踪
        self.performance_history = {i: [] for i in range(6)}
        
        # 新增训练同步机制
        self.training_lock = threading.Lock()
        self.finished_models = 0
        self.total_models = 6
        
        # 初始化模型
        self._build_models()
        logger.info("模型集成初始化完成")
        
    @staticmethod
    def enhanced_match_loss(y_true, y_pred):
        """
        增强型匹配损失函数 - 结合匹配度评判和方向性引导
        
        Args:
            y_true: 真实值 (batch_size, 2880, 5) - 2880期目标值
            y_pred: 预测值 (batch_size, 5) - 预测的一组五位数
            
        Returns:
            total_loss: 综合损失值,包含匹配损失和方向性损失
        """
        try:
            # 1. 预处理
            y_pred_expanded = tf.expand_dims(y_pred, axis=1)  # (batch_size, 1, 5)
            y_pred_rounded = tf.round(y_pred_expanded)
            
            # 2. 计算匹配情况
            matches = tf.cast(tf.equal(y_true, y_pred_rounded), tf.float32)  # (batch_size, 2880, 5)
            match_counts = tf.reduce_sum(matches, axis=-1)  # (batch_size, 2880)
            
            # 3. 找出最佳匹配的目标值
            best_match_indices = tf.argmax(match_counts, axis=1)  # (batch_size,)
            best_targets = tf.gather(y_true, best_match_indices, batch_dims=1)  # (batch_size, 5)
            best_match_counts = tf.reduce_max(match_counts, axis=1)  # (batch_size,)
            
            # 4. 计算基础匹配损失
            base_loss = 5.0 - best_match_counts  # (batch_size,)
            
            # 5. 计算方向性损失
            # 计算预测值与最佳匹配目标的差异
            value_diff = tf.squeeze(best_targets - y_pred, axis=1)  # (batch_size, 5)
            
            # 创建方向性掩码(只对未匹配的位置计算方向性损失)
            direction_mask = tf.cast(
                tf.not_equal(y_pred_rounded, tf.expand_dims(best_targets, axis=1)),
                tf.float32
            )  # (batch_size, 1, 5)
            
            # 使用sigmoid函数平滑方向性梯度
            direction_factor = tf.sigmoid(value_diff * 2.0) * 2.0 - 1.0  # 范围(-1, 1)
            
            # 计算方向性损失(差异越大,损失越大)
            direction_loss = tf.reduce_mean(
                direction_mask * direction_factor * tf.abs(value_diff),
                axis=-1
            )  # (batch_size,)
            
            # 6. 完全匹配时损失为0
            perfect_match = tf.cast(tf.equal(best_match_counts, 5.0), tf.float32)
            
            # 7. 组合损失(动态权重)
            # 匹配数越少,方向性损失权重越大
            direction_weight = tf.exp(-best_match_counts / 5.0) * 0.5  # 随匹配数增加呈指数衰减
            total_loss = base_loss * (1.0 - perfect_match) + direction_weight * direction_loss
            
            # 8. 添加调试信息
            tf.debugging.assert_all_finite(
                total_loss,
                "Loss computation resulted in invalid values"
            )
            
            return total_loss
            
        except Exception as e:
            logger.error(f"计算损失时出错: {str(e)}")
            return 5.0 * tf.ones_like(y_pred[:, 0])
        
    def _build_models(self):
        """构建所有模型"""
        try:
            model_params = {
                'model_1': {
                    'conv_filters': [32, 64, 128],
                    'lstm_units': 128,
                    'attention_heads': 8
                },
                'model_2': {
                    'conv_filters': [64, 128],
                    'lstm_units': 256,
                    'dropout': 0.2
                },
                'model_3': {
                    'gru_units': 128,
                    'dense_units': [256, 128],
                    'learning_rate': 0.001
                },
                'model_4': {
                    'num_heads': 8,
                    'key_dim': 64,
                    'ff_dim': 256
                },
                'model_5': {
                    'lstm_units': 128,
                    'attention_heads': 4,
                    'dropout': 0.1
                },
                'model_6': {
                    'conv_filters': [32, 64],
                    'gru_units': 128,
                    'dense_units': [128, 64]
                }
            }
            
            for i in range(6):
                logger.info(f"构建模型 {i+1}/6...")
                # 这里省略了具体的模型构建代码
                # 你可以根据需要添加具体的模型构建逻辑
                # self.models.append(构建的模型)
            
        except Exception as e:
            logger.error(f"构建模型时出错: {str(e)}")
    
    def parallel_training(self, data):
        """补全并行训练逻辑"""
        try:
            # 根据当前线程数调整
            with ThreadPoolExecutor(max_workers=config_manager.SYSTEM_CONFIG['max_threads']) as executor:
                # 动态分配任务
                chunk_size = len(data) // self.total_models
                futures = []
                for i in range(self.total_models):
                    chunk = data[i*chunk_size : (i+1)*chunk_size]
                    futures.append(executor.submit(self._train_chunk, chunk))
            return True
        except Exception as e:
            logger.error(f"训练失败: {str(e)}")
            return False

    def _train_chunk(self, chunk):
        """训练数据块"""
        # 这里省略了具体的训练逻辑
        # 你可以根据需要添加具体的训练逻辑
        pass

    def save_model(self, model_idx):
        model = self.models[model_idx]
        model.save(os.path.join(config_manager.MODEL_DIR, f'model_{model_idx}')) 

    def save_ensemble_info(self):
        with open('ensemble_info.json', 'w') as f:
            json.dump({
                'weights': self.weights.tolist(),
                'performance': self.performance_history
            }, f) 

    def load_pretrained_models(self, model_paths):
        for path in model_paths:
            # 添加device参数适配CPU
            model = torch.load(path, map_location=torch.device('cpu'))  
            self.models.append(model) 

# 创建全局实例
model_ensemble = ModelEnsemble()

In [None]:
# 训练策略优化器
import numpy as np
from bayes_opt import BayesianOptimization
from bayes_opt.util import Events
import tensorflow as tf
import logging
import os
import json
from sklearn.model_selection import cross_val_score
import psutil

# 获取logger实例
logger = logging.getLogger(__name__)

class TrainingOptimizer:
    """训练策略优化器类"""
    
    def __init__(self, model_ensemble, data_processor):
        """
        初始化训练优化器
        Args:
            model_ensemble: 模型集成实例
            data_processor: 数据处理器实例
        """
        self.model_ensemble = model_ensemble
        self.data_processor = data_processor
        self.best_params = None
        self.optimization_history = []
        self.best_match_history = []  # 记录最佳匹配数历史
        self.match_distribution = {i: 0 for i in range(6)}  # 记录各匹配数的分布
        
        # 定义参数范围
        self.param_ranges = {
            # 1. 优化器参数
            'optimizer_params': {
                'learning_rate': (1e-5, 1e-2),
                'beta_1': (0.8, 0.999),
                'beta_2': (0.8, 0.999),
                'epsilon': (1e-8, 1e-6)
            },
            
            # 2. 学习率调度参数
            'lr_schedule_params': {
                'decay_rate': (0.9, 0.99),
                'decay_steps': (100, 1000),
                'warmup_steps': (0, 100),
                'min_lr': (1e-6, 1e-4)
            },
            
            # 3. 训练控制参数
            'training_control': {
                'batch_size': (16, 128),
                'epochs_per_iteration': (1, 10),
                'validation_frequency': (1, 10),
                'early_stopping_patience': (10, 50)
            }
        }
        
        # 定义离散参数
        self.discrete_params = {
            'optimizer_type': ['adam', 'adamw', 'radam'],
            'scheduler_type': ['exponential', 'cosine', 'step']
        }
        
        logger.info("训练优化器初始化完成")

    def optimize(self, n_iter=50):
        """
        运行优化过程
        Args:
            n_iter: 优化迭代次数
        Returns:
            dict: 最佳参数
        """
        try:
            optimizer = BayesianOptimization(
                f=self._objective_function,
                pbounds=self._flatten_param_ranges(),
                random_state=42
            )
            
            # 定义优化回调
            def optimization_callback(res):
                progress = self._check_optimization_progress()
                if progress and not progress['is_improving'] and progress['is_stable']:
                    logger.info("优化进入稳定阶段，但仍将继续探索")
            
            optimizer.subscribe(Events.OPTIMIZATION_STEP, optimization_callback)
            
            optimizer.maximize(
                init_points=5,
                n_iter=n_iter
            )
            
            self.best_params = self._process_params(optimizer.max['params'])
            self._save_best_params()
            
            # 最终优化结果总结
            final_progress = self._check_optimization_progress()
            if final_progress:
                logger.info("优化完成总结:")
                logger.info(f"最终得分: {final_progress['current_score']:.4f}")
                logger.info(f"最佳得分: {final_progress['best_score']:.4f}")
                logger.info(f"优化稳定性: {final_progress['std_score']:.4f}")
            
            return self.best_params
            
        except Exception as e:
            logger.error(f"运行优化过程时出错: {str(e)}")
            return None

    def _objective_function(self, **params):
        """优化目标函数"""
        try:
            # 1. 验证参数
            is_valid, message = self._validate_params(params)
            if not is_valid:
                logger.warning(f"参数验证失败: {message}")
                return float('-inf')
            
            # 2. 更新训练参数
            self._update_training_params(params)
            
            # 3. 获取评估数据
            X, y = self.data_processor.get_validation_data()
            if X is None or y is None:
                return float('-inf')
            
            # 4. 评估模型性能
            score = self._evaluate_model(X, y)
            
            # 5. 记录优化历史
            self.optimization_history.append({
                'params': params,
                'score': score
            })
            
            return score
            
        except Exception as e:
            logger.error(f"优化目标函数执行出错: {str(e)}")
            return float('-inf')

    def _evaluate_model(self, X, y):
        """评估模型性能"""
        try:
            # 这里省略了具体的评估逻辑
            # 你可以根据需要添加具体的评估逻辑
            return np.random.rand()  # 返回随机得分作为示例
        except Exception as e:
            logger.error(f"评估模型性能时出错: {str(e)}")
            return 0.0

    def _validate_params(self, params):
        """验证参数"""
        try:
            # 这里省略了具体的验证逻辑
            # 你可以根据需要添加具体的验证逻辑
            return True, ""
        except Exception as e:
            logger.error(f"验证参数时出错: {str(e)}")
            return False, str(e)

    def _update_training_params(self, params):
        """更新训练参数"""
        # 这里省略了具体的更新逻辑
        # 你可以根据需要添加具体的更新逻辑
        pass

    def _flatten_param_ranges(self):
        """将嵌套参数范围展平"""
        flat_ranges = {}
        for category, params in self.param_ranges.items():
            for param_name, param_range in params.items():
                flat_name = f"{category}__{param_name}"
                flat_ranges[flat_name] = param_range
        return flat_ranges

    def _process_params(self, flat_params):
        """将展平的参数重构为嵌套结构"""
        nested_params = {}
        for flat_name, value in flat_params.items():
            category, param_name = flat_name.split('__')
            if category not in nested_params:
                nested_params[category] = {}
            nested_params[category][param_name] = value
        return nested_params

    def _save_best_params(self):
        """保存最佳参数"""
        try:
            save_path = os.path.join('best_params.json')
            with open(save_path, 'w') as f:
                json.dump(self.best_params, f, indent=4)
            logger.info(f"最佳参数已保存到: {save_path}")
        except Exception as e:
            logger.error(f"保存最佳参数时出错: {str(e)}")

# 创建全局实例
training_optimizer = TrainingOptimizer(model_ensemble=None, data_processor=None)

In [None]:
# 性能监控器
import logging
import numpy as np
import time
from datetime import datetime
import json
import os
from collections import deque
import threading

# 获取logger实例
logger = logging.getLogger(__name__)

class PerformanceMonitor:
    """性能监控器 - 负责收集和分析性能指标"""
    def __init__(self, save_dir, window_size=100):
        self.save_dir = save_dir
        self.window_size = window_size
        self.lock = threading.Lock()
        
        # 性能指标存储
        self.metrics = {
            'loss': deque(maxlen=window_size),
            'accuracy': deque(maxlen=window_size),
            'training_time': deque(maxlen=window_size),
            'prediction_time': deque(maxlen=window_size),
            'memory_usage': deque(maxlen=window_size),
            'gpu_usage': deque(maxlen=window_size),  # 新增GPU使用率指标
            'batch_time': deque(maxlen=window_size)  # 新增批次处理时间指标
        }
        
        # 警报阈值
        self.thresholds = {
            'loss_increase': 0.2,        # 损失增加超过20%
            'accuracy_drop': 0.1,        # 准确率下降超过10%
            'training_time_increase': 0.5,  # 训练时间增加超过50%
            'memory_usage': 0.9,         # 内存使用率超过90%
            'gpu_usage': 0.95,           # GPU使用率超过95%
            'batch_time_increase': 0.3   # 批次时间增加超过30%
        }
        
        # 性能趋势分析
        self.trend_window = 10  # 趋势分析窗口
        self.trend_threshold = 0.05  # 趋势判定阈值
        
        # 初始化性能日志文件
        self._init_log_file()
        
    def _init_log_file(self):
        """初始化性能日志文件"""
        try:
            self.log_file = os.path.join(
                self.save_dir, 
                f'performance_{datetime.now().strftime("%Y%m%d")}.json'
            )
            os.makedirs(self.save_dir, exist_ok=True)
            
            if not os.path.exists(self.log_file):
                with open(self.log_file, 'w') as f:
                    json.dump([], f)
                    
        except Exception as e:
            logger.error(f"初始化性能日志文件失败: {str(e)}")
    
    def update_metrics(self, metrics_dict):
        """更新性能指标"""
        try:
            with self.lock:
                # 更新指标
                for metric_name, value in metrics_dict.items():
                    if metric_name in self.metrics:
                        self.metrics[metric_name].append(value)
                
                # 记录到日志文件
                self._log_metrics(metrics_dict)
                
                # 检查警报
                self._check_alerts()
                
                # 分析趋势
                self._analyze_trends()
                
        except Exception as e:
            logger.error(f"更新性能指标失败: {str(e)}")
    
    def _log_metrics(self, metrics_dict):
        """记录性能指标到日志文件"""
        try:
            log_entry = {
                'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'metrics': metrics_dict
            }
            
            # 读取现有日志
            with open(self.log_file, 'r') as f:
                logs = json.load(f)
                
            # 添加新记录
            logs.append(log_entry)
            
            # 写回文件
            with open(self.log_file, 'w') as f:
                json.dump(logs, f, indent=4)
                
        except Exception as e:
            logger.error(f"记录性能指标失败: {str(e)}")
    
    def _check_alerts(self):
        """检查性能警报"""
        try:
            for metric_name, values in self.metrics.items():
                if len(values) < 2:
                    continue
                
                # 检查是否超过阈值
                if metric_name in self.thresholds:
                    threshold = self.thresholds[metric_name]
                    if values[-1] > threshold:
                        logger.warning(f"{metric_name} 超过阈值: {values[-1]:.2f} > {threshold:.2f}")
                
        except Exception as e:
            logger.error(f"检查性能警报失败: {str(e)}")
    
    def _analyze_trends(self):
        """分析性能趋势"""
        try:
            for metric_name, values in self.metrics.items():
                if len(values) < self.trend_window:
                    continue
                
                # 计算趋势
                trend = np.polyfit(range(self.trend_window), list(values)[-self.trend_window:], 1)[0]
                if abs(trend) > self.trend_threshold:
                    trend_direction = "上升" if trend > 0 else "下降"
                    logger.info(f"{metric_name} 趋势: {trend_direction} (斜率: {trend:.4f})")
                
        except Exception as e:
            logger.error(f"分析性能趋势失败: {str(e)}")

# 创建全局实例
performance_monitor = PerformanceMonitor(save_dir='logs/performance')

In [None]:
# 资源监控器
import psutil
import logging
import threading
import time
import numpy as np
from collections import deque
from datetime import datetime

# 获取logger实例
logger = logging.getLogger(__name__)

class ResourceMonitor:
    """资源监控器 - 负责监控CPU、内存、GPU等资源使用情况"""
    
    def __init__(self, window_size=100, check_interval=5):
        self.window_size = window_size
        self.check_interval = check_interval
        self.lock = threading.Lock()
        
        # 监控指标存储
        self.metrics = {
            'cpu_usage': deque(maxlen=window_size),
            'memory_usage': deque(maxlen=window_size),
            'disk_usage': deque(maxlen=window_size),
            'gpu_usage': None,
            'gpu_memory': None
        }
        
        # 警报阈值
        self.thresholds = {
            'cpu_usage': 90,    # CPU使用率超过90%
            'memory_usage': 90,  # 内存使用率超过90%
            'disk_usage': 90,    # 磁盘使用率超过90%
            'gpu_usage': 90,     # GPU使用率超过90%
            'gpu_memory': 90     # GPU内存使用率超过90%
        }
        
        # 监控线程
        self.monitor_thread = None
        self.is_running = False
        
        # 警报历史
        self.alerts = []
        
        self.memory_usage = 0.0  # 确保有此属性初始化
        self.cpu_usage = 0.0
        self.gpu_usage = 0.0
        
        logger.info("资源监控器初始化完成")
    
    def start(self):
        """启动资源监控"""
        if self.monitor_thread and self.monitor_thread.is_alive():
            logger.warning("资源监控器已在运行")
            return
            
        self.is_running = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        logger.info("资源监控器已启动")
    
    def stop(self):
        """停止资源监控"""
        self.is_running = False
        if self.monitor_thread:
            self.monitor_thread.join()
        logger.info("资源监控器已停止")
    
    def _monitor_loop(self):
        """监控循环"""
        while self.is_running:
            try:
                self._collect_metrics()
                self._check_alerts()
                time.sleep(self.check_interval)
            except Exception as e:
                logger.error(f"资源监控循环出错: {str(e)}")
    
    def _collect_metrics(self):
        """收集资源指标"""
        with self.lock:
            # CPU使用率
            self.cpu_usage = psutil.cpu_percent()
            self.metrics['cpu_usage'].append(self.cpu_usage)
            
            # 内存使用率
            mem = psutil.virtual_memory()
            self.memory_usage = mem.percent  # 确保有此属性更新
            self.metrics['memory_usage'].append(self.memory_usage)
            
            # 磁盘使用率
            disk = psutil.disk_usage('/')
            self.metrics['disk_usage'].append(disk.percent)
    
    def _check_alerts(self):
        """检查警报"""
        with self.lock:
            for metric, values in self.metrics.items():
                if not values:
                    continue
                current = values[-1]
                if current > self.thresholds[metric]:
                    alert = {
                        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                        'metric': metric,
                        'value': current,
                        'threshold': self.thresholds[metric]
                    }
                    self.alerts.append(alert)
                    logger.warning(f"资源警报: {metric} = {current}% (阈值: {self.thresholds[metric]}%)")

# 创建全局实例
resource_monitor = ResourceMonitor()

In [None]:
# 数据库管理器
import pymysql
import logging
import threading
from pymysql.cursors import DictCursor
from sqlalchemy.pool import QueuePool  # 使用SQLAlchemy自带的连接池
from datetime import datetime, timedelta
from core.config_manager import config_instance

# 获取logger实例
logger = logging.getLogger(__name__)

class DatabaseManager:
    """数据库管理器 - 单例模式"""
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls, db_config=None):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, db_config=None):
        if not hasattr(self, 'initialized'):
            self.DB_CONFIG = {
                'host': 'localhost',
                'user': 'root',
                'password': 'tt198803',  # 使用提供的密码
                'database': 'admin_data',
                'charset': 'utf8mb4'
            }
            
            # 创建数据库连接池
            self.pool = self._create_pool()
            
            # 初始化查询缓存
            self.query_cache = {}
            self.cache_timeout = 300  # 5分钟缓存超时
            
            # 标记初始化完成
            self.initialized = True
            logger.info("数据库管理器初始化完成")
    
    def _create_pool(self):
        """创建数据库连接池"""
        try:
            self.pool = QueuePool(
                creator=lambda: pymysql.connect(**self.DB_CONFIG),
                pool_size=10,
                max_overflow=20,
                timeout=30
            )
            logger.info("数据库连接池创建成功")
            return self.pool
        except Exception as e:
            logger.error(f"创建数据库连接池失败: {str(e)}")
            raise
    
    def execute_query(self, query, params=None, use_cache=False):
        """执行查询"""
        try:
            # 检查缓存
            if use_cache:
                cache_key = f"{query}_{str(params)}"
                cached_result = self._get_from_cache(cache_key)
                if cached_result is not None:
                    return cached_result
            
            # 获取连接和游标
            connection = self.pool.connect()
            try:
                cursor = connection.cursor(DictCursor)
                # 执行查询
                cursor.execute(query, params)
                result = cursor.fetchall()
                
                # 更新缓存
                if use_cache:
                    self._update_cache(cache_key, result)
                
                return result
            finally:
                cursor.close()
                connection.close()
                
        except Exception as e:
            logger.error(f"执行查询失败: {str(e)}")
            return None
    
    def execute_batch(self, query, params_list):
        """批量执行查询"""
        try:
            connection = self.pool.connect()
            cursor = connection.cursor()
            
            try:
                cursor.executemany(query, params_list)
                connection.commit()
                return True
            finally:
                cursor.close()
                connection.close()
                
        except Exception as e:
            logger.error(f"批量执行查询失败: {str(e)}")
            return False
    
    def _get_from_cache(self, key):
        """从缓存获取数据"""
        if key in self.query_cache:
            timestamp, data = self.query_cache[key]
            if datetime.now() - timestamp < timedelta(seconds=self.cache_timeout):
                return data
            else:
                del self.query_cache[key]
        return None
    
    def _update_cache(self, key, data):
        """更新缓存"""
        self.query_cache[key] = (datetime.now(), data)
    
    def clear_cache(self):
        """清理缓存"""
        self.query_cache.clear()
        logger.info("查询缓存已清理")

    def get_records_by_issue(self, start_issue, limit):
        """按期号范围获取记录"""
        query = f"""
            SELECT * FROM admin_tab 
            WHERE date_period >= %s
            ORDER BY date_period ASC
            LIMIT %s
        """
        return self.execute_query(query, (start_issue, limit))

# 创建全局实例
db_manager = DatabaseManager()