# 第四部分：Milvus 进阶实战

欢迎来到 Milvus Workshop 的第四部分！这一部分主要是关于 Milvus 可观测性、运维、调优等实战，通过这部分能帮助我们更好得了解 Milvus 运行状态，保障 Milvus 稳定高效运行。

## 4.1 Milvus 可观测性运维实战

### 学习目标
- 理解 Milvus 可观测性架构和核心组件
- 掌握基于 Prometheus + Loki + Jaeger + Grafana 的可观测性方案部署
- 学会分析 Milvus 关键性能指标
- 配置监控告警和故障排查

### 本节内容概览
1. **Milvus 可观测性架构概览** - 了解完整的监控框架
2. **监控服务部署实战** - 部署 Prometheus + Grafana 监控栈
3. **核心监控指标解析** - 深入理解关键性能指标
4. **可视化仪表板配置** - 构建专业的监控面板
5. **告警策略配置** - 设置智能告警和通知
6. **故障排查与优化** - 基于监控数据的运维实践


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import subprocess
import sys
import os
import requests
import time
import socket
from datetime import datetime

def check_docker_compose():
    """检查 docker-compose 是否可用"""
    try:
        result = subprocess.run(['docker', 'compose', 'version'], 
                              capture_output=True, text=True)
        if result.returncode == 0:
            print("✅ Docker Compose 可用")
            print(f"版本信息: {result.stdout.strip()}")
            return True
        else:
            print("❌ Docker Compose 不可用")
            return False
    except Exception as e:
        print(f"❌ 检查 Docker Compose 时出错: {e}")
        return False

def check_milvus_connection():
    """检查 Milvus 连接"""
    try:
        from pymilvus import connections, list_connections
        
        # 尝试连接默认的 Milvus 实例
        connections.connect("default", host="localhost", port="19530")
        print("✅ Milvus 连接成功")
        
        # 显示连接信息
        conn_info = list_connections()
        print(f"当前连接: {conn_info}")
        return True
        
    except Exception as e:
        print(f"❌ Milvus 连接失败: {e}")
        print("请确保 Milvus 实例正在运行")
        return False

def check_port_availability(port, service_name):
    """检查端口是否可用"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(1)
    try:
        result = sock.connect_ex(('localhost', int(port)))
        if result == 0:
            print(f"⚠️  端口 {port} ({service_name}) 已被占用")
            return False
        else:
            print(f"✅ 端口 {port} ({service_name}) 可用")
            return True
    except Exception as e:
        print(f"❓ 无法检查端口 {port}: {e}")
        return False
    finally:
        sock.close()

def check_ports():
    """检查关键端口是否被占用"""
    ports_to_check = {
        '9090': 'Prometheus',
        '3000': 'Grafana', 
        '9093': 'AlertManager',
        '16686': 'Jaeger UI',
        '14268': 'Jaeger Collector',
        '3100': 'Loki'
    }
    
    print("检查端口占用情况：")
    available_ports = 0
    for port, service in ports_to_check.items():
        if check_port_availability(port, service):
            available_ports += 1
    
    print(f"\n📊 端口检查结果: {available_ports}/{len(ports_to_check)} 个端口可用")
    return available_ports == len(ports_to_check)

def check_system_requirements():
    """检查系统要求"""
    print("\n🔍 检查系统要求...")
    
    # 检查 Python 版本
    python_version = sys.version_info
    if python_version.major >= 3 and python_version.minor >= 8:
        print(f"✅ Python 版本: {python_version.major}.{python_version.minor}.{python_version.micro}")
    else:
        print(f"⚠️  Python 版本过低: {python_version.major}.{python_version.minor}.{python_version.micro} (建议 3.8+)")
    
    # 检查必要的 Python 包
    required_packages = ['pymilvus', 'requests', 'pyyaml', 'pandas', 'matplotlib']
    missing_packages = []
    
    for package in required_packages:
        try:
            __import__(package)
            print(f"✅ {package} 已安装")
        except ImportError:
            print(f"❌ {package} 未安装")
            missing_packages.append(package)
    
    if missing_packages:
        print(f"\n📦 需要安装的包: {' '.join(missing_packages)}")
        print("安装命令: pip install " + " ".join(missing_packages))

# 执行检查
print("=== Milvus 监控环境检查 ===")
print(f"检查时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()

# 系统环境检查
check_system_requirements()
print()

# Docker 环境检查
check_docker_compose()
print()

# Milvus 连接检查
check_milvus_connection()
print()

# 端口可用性检查
ports_available = check_ports()

print("\n" + "="*50)
if ports_available:
    print("🎉 环境检查完成！可以开始部署监控栈")
else:
    print("⚠️  部分端口被占用，可能需要调整配置或停止占用端口的服务")


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import yaml
import subprocess
from pathlib import Path

def create_monitoring_configs():
    """创建监控配置文件"""
    
    # 创建配置目录
    config_dir = Path("./monitoring_config")
    config_dir.mkdir(exist_ok=True)
    
    grafana_dir = config_dir / "grafana" / "provisioning" / "datasources"
    grafana_dir.mkdir(parents=True, exist_ok=True)
    
    # 1. 创建 Docker Compose 文件
    docker_compose_content = '''
version: '3.8'

services:
  prometheus:
    image: prom/prometheus:latest
    container_name: milvus-prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./monitoring_config/prometheus.yml:/etc/prometheus/prometheus.yml
      - ./monitoring_config/alert_rules.yml:/etc/prometheus/alert_rules.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/usr/share/prometheus/console_libraries'
      - '--web.console.templates=/usr/share/prometheus/consoles'
      - '--web.enable-lifecycle'
      - '--web.enable-admin-api'
      - '--storage.tsdb.retention.time=30d'
    networks:
      - milvus_monitoring

  grafana:
    image: grafana/grafana:latest
    container_name: milvus-grafana
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana
      - ./monitoring_config/grafana/provisioning:/etc/grafana/provisioning
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
      - GF_INSTALL_PLUGINS=grafana-clock-panel,grafana-simple-json-datasource
    depends_on:
      - prometheus
    networks:
      - milvus_monitoring

  alertmanager:
    image: prom/alertmanager:latest
    container_name: milvus-alertmanager
    ports:
      - "9093:9093"
    volumes:
      - ./monitoring_config/alertmanager.yml:/etc/alertmanager/alertmanager.yml
      - alertmanager_data:/alertmanager
    command:
      - '--config.file=/etc/alertmanager/alertmanager.yml'
      - '--storage.path=/alertmanager'
    depends_on:
      - prometheus
    networks:
      - milvus_monitoring

networks:
  milvus_monitoring:
    driver: bridge

volumes:
  prometheus_data:
  grafana_data:
  alertmanager_data:
'''
    
    with open('docker-compose.monitoring.yml', 'w') as f:
        f.write(docker_compose_content)
    
    # 2. 创建 Prometheus 配置
    prometheus_config = {
        'global': {
            'scrape_interval': '15s',
            'evaluation_interval': '15s'
        },
        'rule_files': ['alert_rules.yml'],
        'alerting': {
            'alertmanagers': [{
                'static_configs': [{
                    'targets': ['alertmanager:9093']
                }]
            }]
        },
        'scrape_configs': [
            {
                'job_name': 'prometheus',
                'static_configs': [{'targets': ['localhost:9090']}]
            },
            {
                'job_name': 'milvus-proxy',
                'static_configs': [{'targets': ['host.docker.internal:9091']}],
                'metrics_path': '/metrics',
                'scrape_interval': '10s'
            },
            {
                'job_name': 'milvus-components',
                'static_configs': [{'targets': [
                    'host.docker.internal:9092',  # rootcoord
                    'host.docker.internal:9093',  # datacoord
                    'host.docker.internal:9094',  # querycoord
                    'host.docker.internal:9095',  # indexcoord
                    'host.docker.internal:9096',  # datanode
                    'host.docker.internal:9097',  # querynode
                    'host.docker.internal:9098',  # indexnode
                ]}],
                'metrics_path': '/metrics',
                'scrape_interval': '10s'
            }
        ]
    }
    
    with open(config_dir / 'prometheus.yml', 'w') as f:
        yaml.dump(prometheus_config, f, default_flow_style=False)
    
    # 3. 创建告警规则
    alert_rules = {
        'groups': [{
            'name': 'milvus_alerts',
            'rules': [
                {
                    'alert': 'MilvusHighMemoryUsage',
                    'expr': 'milvus_querynode_memory_usage_bytes > 8e9',
                    'for': '5m',
                    'labels': {'severity': 'warning'},
                    'annotations': {
                        'summary': 'Milvus QueryNode 内存使用率过高',
                        'description': 'QueryNode {{ $labels.instance }} 内存使用超过 8GB'
                    }
                },
                {
                    'alert': 'MilvusHighCPUUsage',
                    'expr': 'rate(milvus_querynode_cpu_usage_total[5m]) > 0.8',
                    'for': '5m',
                    'labels': {'severity': 'warning'},
                    'annotations': {
                        'summary': 'Milvus QueryNode CPU 使用率过高',
                        'description': 'QueryNode {{ $labels.instance }} CPU 使用率超过 80%'
                    }
                },
                {
                    'alert': 'MilvusSearchLatencyHigh',
                    'expr': 'histogram_quantile(0.95, milvus_proxy_search_duration_seconds_bucket) > 1.0',
                    'for': '5m',
                    'labels': {'severity': 'warning'},
                    'annotations': {
                        'summary': 'Milvus 搜索延迟过高',
                        'description': 'Proxy {{ $labels.instance }} 95%搜索延迟超过 1秒'
                    }
                }
            ]
        }]
    }
    
    with open(config_dir / 'alert_rules.yml', 'w') as f:
        yaml.dump(alert_rules, f, default_flow_style=False)
    
    # 4. 创建 AlertManager 配置
    alertmanager_config = {
        'global': {
            'smtp_smarthost': 'smtp.gmail.com:587',
            'smtp_from': 'your-email@gmail.com'
        },
        'route': {
            'group_by': ['alertname', 'instance'],
            'group_wait': '10s',
            'group_interval': '10s',
            'repeat_interval': '1h',
            'receiver': 'web.hook'
        },
        'receivers': [{
            'name': 'web.hook',
            'email_configs': [{
                'to': 'admin@yourcompany.com',
                'subject': 'Milvus Alert: {{ .GroupLabels.alertname }}',
                'body': '''{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
{{ end }}'''
            }]
        }],
        'inhibit_rules': [{
            'source_match': {'severity': 'critical'},
            'target_match': {'severity': 'warning'},
            'equal': ['alertname', 'instance']
        }]
    }
    
    with open(config_dir / 'alertmanager.yml', 'w') as f:
        yaml.dump(alertmanager_config, f, default_flow_style=False)
    
    # 5. 创建 Grafana 数据源配置
    grafana_datasource = {
        'apiVersion': 1,
        'datasources': [{
            'name': 'Prometheus',
            'type': 'prometheus',
            'url': 'http://prometheus:9090',
            'access': 'proxy',
            'isDefault': True
        }]
    }
    
    with open(grafana_dir / 'prometheus.yml', 'w') as f:
        yaml.dump(grafana_datasource, f, default_flow_style=False)
    
    print("✅ 监控配置文件创建完成")
    print("📁 配置文件位置:")
    print(f"  - Docker Compose: docker-compose.monitoring.yml")
    print(f"  - Prometheus: {config_dir}/prometheus.yml")
    print(f"  - AlertManager: {config_dir}/alertmanager.yml")
    print(f"  - Grafana: {grafana_dir}/prometheus.yml")

def deploy_monitoring_stack():
    """部署监控栈"""
    try:
        print("🚀 开始部署监控栈...")
        
        # 启动监控服务
        result = subprocess.run([
            'docker', 'compose', 
            '-f', 'docker-compose.monitoring.yml', 
            'up', '-d'
        ], capture_output=True, text=True)
        
        if result.returncode == 0:
            print("✅ 监控栈部署成功")
            print("\n🌐 访问地址:")
            print("  - Prometheus: http://localhost:9090")
            print("  - Grafana: http://localhost:3000 (admin/admin123)")
            print("  - AlertManager: http://localhost:9093")
        else:
            print("❌ 监控栈部署失败")
            print("错误信息:", result.stderr)
            
    except Exception as e:
        print(f"❌ 部署过程中出错: {e}")

# 执行配置创建和部署
create_monitoring_configs()
print("\n" + "="*50)
deploy_monitoring_stack()


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 监控栈管理工具类
class MilvusMonitoringStack:
    """Milvus 监控栈管理工具"""
    
    def __init__(self, config_dir="./monitoring_config"):
        self.config_dir = Path(config_dir)
        self.compose_file = "docker-compose.monitoring.yml"
        self.services = [
            "milvus-prometheus",
            "milvus-grafana", 
            "milvus-alertmanager"
        ]
    
    def check_prerequisites(self):
        """检查部署前提条件"""
        print("🔍 检查部署前提条件...")
        
        # 检查配置文件
        required_configs = [
            "prometheus.yml",
            "alert_rules.yml", 
            "alertmanager.yml",
            "grafana/provisioning/datasources/prometheus.yml"
        ]
        
        missing_configs = []
        for config in required_configs:
            config_path = self.config_dir / config
            if not config_path.exists():
                missing_configs.append(config)
            else:
                print(f"✅ 配置文件存在: {config}")
        
        if missing_configs:
            print(f"❌ 缺少配置文件: {', '.join(missing_configs)}")
            return False
        
        # 检查 Docker Compose 文件
        if not Path(self.compose_file).exists():
            print(f"❌ Docker Compose 文件不存在: {self.compose_file}")
            return False
        
        print(f"✅ Docker Compose 文件存在: {self.compose_file}")
        return True
    
    def deploy_stack(self):
        """部署监控栈"""
        if not self.check_prerequisites():
            print("❌ 前提条件检查失败，无法部署")
            return False
        
        print("🚀 开始部署监控栈...")
        
        try:
            # 启动服务
            result = subprocess.run([
                'docker', 'compose', 
                '-f', self.compose_file, 
                'up', '-d'
            ], capture_output=True, text=True)
            
            if result.returncode == 0:
                print("✅ 监控栈部署成功")
                self.show_access_info()
                return True
            else:
                print("❌ 监控栈部署失败")
                print(f"错误信息: {result.stderr}")
                return False
                
        except Exception as e:
            print(f"❌ 部署过程中出错: {e}")
            return False
    
    def show_access_info(self):
        """显示访问信息"""
        print("\n🌐 监控服务访问地址:")
        print("  - Prometheus: http://localhost:9090")
        print("  - Grafana: http://localhost:3000 (admin/admin123)")
        print("  - AlertManager: http://localhost:9093")
        print("\n📊 建议操作:")
        print("  1. 访问 Grafana 导入 Milvus 监控面板")
        print("  2. 在 Prometheus 中验证指标收集")
        print("  3. 配置 AlertManager 通知渠道")
    
    def check_service_status(self):
        """检查服务状态"""
        print("🔍 检查监控服务状态...")
        
        try:
            result = subprocess.run([
                'docker', 'compose', 
                '-f', self.compose_file, 
                'ps'
            ], capture_output=True, text=True)
            
            if result.returncode == 0:
                print("📊 服务状态:")
                print(result.stdout)
                
                # 检查服务健康状态
                for service in self.services:
                    health_result = subprocess.run([
                        'docker', 'inspect', 
                        '--format', '{{.State.Health.Status}}',
                        service
                    ], capture_output=True, text=True)
                    
                    if health_result.returncode == 0:
                        status = health_result.stdout.strip()
                        if status == "healthy":
                            print(f"✅ {service}: 健康")
                        else:
                            print(f"⚠️  {service}: {status}")
                    else:
                        print(f"❓ {service}: 状态未知")
            else:
                print(f"❌ 获取服务状态失败: {result.stderr}")
                
        except Exception as e:
            print(f"❌ 检查服务状态时出错: {e}")
    
    def stop_stack(self):
        """停止监控栈"""
        print("🛑 停止监控栈...")
        
        try:
            result = subprocess.run([
                'docker', 'compose', 
                '-f', self.compose_file, 
                'down'
            ], capture_output=True, text=True)
            
            if result.returncode == 0:
                print("✅ 监控栈已停止")
            else:
                print(f"❌ 停止监控栈失败: {result.stderr}")
                
        except Exception as e:
            print(f"❌ 停止过程中出错: {e}")
    
    def cleanup(self):
        """清理资源"""
        print("🧹 清理监控资源...")
        
        try:
            # 停止并删除容器，同时删除卷
            result = subprocess.run([
                'docker', 'compose', 
                '-f', self.compose_file, 
                'down', '-v'
            ], capture_output=True, text=True)
            
            if result.returncode == 0:
                print("✅ 监控资源已清理")
            else:
                print(f"❌ 清理失败: {result.stderr}")
                
        except Exception as e:
            print(f"❌ 清理过程中出错: {e}")

# 使用示例
def main():
    """主函数 - 监控栈管理"""
    print("=== Milvus 监控栈管理 ===")
    
    # 创建监控栈管理实例
    stack = MilvusMonitoringStack()
    
    # 检查并部署
    if stack.deploy_stack():
        # 等待服务启动
        print("\n⏳ 等待服务启动...")
        time.sleep(30)
        
        # 检查服务状态
        stack.check_service_status()
        
        print("\n🎉 监控栈部署完成！")
        print("现在可以开始使用监控功能了。")
    else:
        print("❌ 监控栈部署失败")

if __name__ == "__main__":
    main()


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import requests
import json
import time
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

class MilvusMonitor:
    """Milvus 监控查询类"""
    
    def __init__(self, prometheus_url="http://localhost:9090"):
        self.prometheus_url = prometheus_url
        self.session = requests.Session()
        
    def query_prometheus(self, query, start_time=None, end_time=None, step='15s'):
        """查询 Prometheus 指标"""
        try:
            if start_time and end_time:
                # 范围查询
                url = f"{self.prometheus_url}/api/v1/query_range"
                params = {
                    'query': query,
                    'start': start_time,
                    'end': end_time,
                    'step': step
                }
            else:
                # 即时查询
                url = f"{self.prometheus_url}/api/v1/query"
                params = {'query': query}
            
            response = self.session.get(url, params=params)
            response.raise_for_status()
            
            data = response.json()
            if data['status'] == 'success':
                return data['data']['result']
            else:
                print(f"查询失败: {data.get('error', 'Unknown error')}")
                return None
                
        except Exception as e:
            print(f"查询 Prometheus 时出错: {e}")
            return None
    
    def check_milvus_health(self):
        """检查 Milvus 健康状态"""
        print("🔍 检查 Milvus 组件健康状态...")
        
        # 检查各组件是否在线
        components = ['milvus-proxy', 'milvus-components']
        
        for component in components:
            query = f'up{{job="{component}"}}'
            result = self.query_prometheus(query)
            
            if result:
                print(f"\n📊 {component} 状态:")
                for item in result:
                    instance = item['metric'].get('instance', 'Unknown')
                    value = item['value'][1]
                    status = "🟢 在线" if value == '1' else "🔴 离线"
                    print(f"  - {instance}: {status}")
            else:
                print(f"❌ 无法获取 {component} 状态")
    
    def get_performance_metrics(self):
        """获取性能指标"""
        print("\n📈 获取性能指标...")
        
        metrics = {
            'QPS': 'rate(milvus_proxy_search_total[5m])',
            'P95延迟': 'histogram_quantile(0.95, rate(milvus_proxy_search_duration_seconds_bucket[5m]))',
            '内存使用': 'milvus_querynode_memory_usage_bytes',
            'CPU使用': 'rate(milvus_querynode_cpu_usage_total[5m])',
            '错误率': 'rate(milvus_proxy_error_total[5m])',
        }
        
        results = {}
        for name, query in metrics.items():
            result = self.query_prometheus(query)
            if result:
                results[name] = result
                print(f"\n🔍 {name}:")
                for item in result[:3]:  # 显示前3个结果
                    instance = item['metric'].get('instance', 'Unknown')
                    value = item['value'][1]
                    
                    # 格式化不同类型的指标
                    if name == 'P95延迟':
                        formatted_value = f"{float(value)*1000:.2f}ms"
                    elif name == '内存使用':
                        formatted_value = f"{float(value)/(1024**3):.2f}GB"
                    elif name in ['QPS', 'CPU使用', '错误率']:
                        formatted_value = f"{float(value):.2f}"
                    else:
                        formatted_value = value
                    
                    print(f"  - {instance}: {formatted_value}")
            else:
                print(f"❌ 无法获取 {name} 数据")
        
        return results
    
    def get_business_metrics(self):
        """获取业务指标"""
        print("\n📊 获取业务指标...")
        
        queries = {
            '搜索成功率': '''
                rate(milvus_proxy_search_total{status="success"}[5m]) / 
                rate(milvus_proxy_search_total[5m]) * 100
            ''',
            '查询类型分布': 'sum by (query_type) (rate(milvus_proxy_request_total[5m]))',
            '段状态分布': 'sum by (segment_state) (milvus_querynode_segment_num)',
            '活跃连接数': 'milvus_proxy_connection_count',
        }
        
        for name, query in queries.items():
            result = self.query_prometheus(query)
            if result:
                print(f"\n📈 {name}:")
                for item in result:
                    labels = item['metric']
                    value = item['value'][1]
                    
                    # 提取关键标签
                    if 'query_type' in labels:
                        key = labels['query_type']
                    elif 'segment_state' in labels:
                        key = labels['segment_state']
                    elif 'instance' in labels:
                        key = labels['instance']
                    else:
                        key = 'total'
                    
                    if name == '搜索成功率':
                        formatted_value = f"{float(value):.2f}%"
                    else:
                        formatted_value = f"{float(value):.2f}"
                    
                    print(f"  - {key}: {formatted_value}")
            else:
                print(f"❌ 无法获取 {name} 数据")
    
    def plot_metrics_trend(self, hours=1):
        """绘制指标趋势图"""
        print(f"\n📊 绘制最近 {hours} 小时的指标趋势...")
        
        end_time = datetime.now()
        start_time = end_time - timedelta(hours=hours)
        
        # 转换为 Unix 时间戳
        start_timestamp = int(start_time.timestamp())
        end_timestamp = int(end_time.timestamp())
        
        # 查询指标
        metrics = {
            'QPS': 'rate(milvus_proxy_search_total[5m])',
            'P95延迟(ms)': 'histogram_quantile(0.95, rate(milvus_proxy_search_duration_seconds_bucket[5m])) * 1000',
            '内存使用(GB)': 'milvus_querynode_memory_usage_bytes / (1024^3)',
        }
        
        fig, axes = plt.subplots(len(metrics), 1, figsize=(12, 8))
        if len(metrics) == 1:
            axes = [axes]
        
        for i, (name, query) in enumerate(metrics.items()):
            result = self.query_prometheus(query, start_timestamp, end_timestamp)
            
            if result and result[0].get('values'):
                values = result[0]['values']
                timestamps = [datetime.fromtimestamp(float(v[0])) for v in values]
                data = [float(v[1]) for v in values]
                
                axes[i].plot(timestamps, data, label=name)
                axes[i].set_title(f'{name} 趋势')
                axes[i].set_ylabel(name)
                axes[i].grid(True)
                axes[i].legend()
            else:
                axes[i].text(0.5, 0.5, f'无 {name} 数据', 
                           transform=axes[i].transAxes, 
                           ha='center', va='center')
        
        plt.tight_layout()
        plt.xticks(rotation=45)
        plt.show()

# 使用示例
def main():
    print("=== Milvus 监控指标查询 ===")
    
    # 创建监控器实例
    monitor = MilvusMonitor()
    
    # 检查健康状态
    monitor.check_milvus_health()
    
    # 获取性能指标
    monitor.get_performance_metrics()
    
    # 获取业务指标
    monitor.get_business_metrics()
    
    # 绘制趋势图
    try:
        monitor.plot_metrics_trend(hours=1)
    except Exception as e:
        print(f"📊 绘制趋势图时出错: {e}")

# 运行监控
if __name__ == "__main__":
    main()
