In [4]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基于官方接口文档的优矿数据测试脚本
===============================

根据优矿极速版接口清单2025.xlsx创建的准确API调用脚本

核心API说明:
1. getMktEqud - 沪深股票日行情 (今天-180天～今天)
2. getEqu - 股票基本信息
3. getIdxCons - 指数成分构成 (今天-180天～今天)
4. getIdxCloseWeight - 指数成分股权重 (今天-180天～今天)
5. getMktIdxd - 指数日行情 (今天-180天～今天)

项目路径: /Users/jackstudio/Desktop/QuantTrade
"""

import os
import sys
import pandas as pd
import numpy as np
import warnings
from datetime import datetime, timedelta

# 设置项目路径
project_path = "/Users/jackstudio/Desktop/QuantTrade"
if os.path.exists(project_path):
    os.chdir(project_path)
    print(f"✅ 项目路径: {project_path}")
else:
    print(f"📁 当前路径: {os.getcwd()}")

warnings.filterwarnings('ignore')

print("🚀 基于官方接口的优矿数据测试")
print("=" * 50)
print(f"📅 测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

class AccurateUQerTest:
    """基于官方接口文档的准确测试"""
    
    def __init__(self):
        # 优矿配置
        self.UQER_TOKEN = "68b9922817ae6273137bda7acba81e293582ba347281dfcc056dcb245b23faf3"
        self.client = None
        self.DataAPI = None
        
        # 测试结果
        self.results = {
            'connection': False,
            'stock_basic': False,
            'index_components': False,
            'price_data': False,
            'index_data': False
        }
        
    def step1_connect_api(self):
        """步骤1: 连接优矿API"""
        print("\n🔌 步骤1: 连接优矿API")
        print("-" * 30)
        
        try:
            from uqer import Client, DataAPI
            
            # 建立连接
            self.client = Client(token=self.UQER_TOKEN)
            self.DataAPI = DataAPI
            print("✅ 优矿客户端连接成功")
            
            self.results['connection'] = True
            return True
            
        except ImportError:
            print("❌ 优矿SDK未安装: pip install uqer")
            return False
        except Exception as e:
            print(f"❌ 连接失败: {e}")
            return False
    
    def step2_test_stock_basic(self):
        """步骤2: 测试股票基本信息 (getEqu)"""
        print("\n📋 步骤2: 获取股票基本信息")
        print("-" * 30)
        
        if not self.results['connection']:
            print("❌ API未连接")
            return False
            
        try:
            # 使用官方API: getEqu - 股票基本信息
            print("🧪 调用 DataAPI.getEqu...")
            
            result = self.DataAPI.getEqu(
                ticker='000001',  # 平安银行
                field='ticker,secShortName,listDate,secFullName,equTypeCD'
            )
            
            if not result.empty:
                print("✅ 股票基本信息获取成功!")
                info = result.iloc[0]
                print(f"   📊 股票代码: {info.get('ticker', 'N/A')}")
                print(f"   📝 股票简称: {info.get('secShortName', 'N/A')}")
                print(f"   📅 上市日期: {info.get('listDate', 'N/A')}")
                print(f"   📄 股票全称: {info.get('secFullName', 'N/A')}")
                
                self.results['stock_basic'] = True
                return True
            else:
                print("❌ 返回空数据")
                return False
                
        except Exception as e:
            print(f"❌ 获取失败: {e}")
            
            # 尝试其他字段组合
            try:
                print("🔄 尝试备用字段...")
                result = self.DataAPI.getEqu(ticker='000001', field='')
                if not result.empty:
                    print("✅ 备用方案成功!")
                    print(f"   可用字段: {list(result.columns)}")
                    return True
            except Exception as e2:
                print(f"❌ 备用方案也失败: {e2}")
            
            return False
    
    def step3_test_index_components(self):
        """步骤3: 测试指数成分股 (getIdxCons)"""
        print("\n📈 步骤3: 获取沪深300成分股")
        print("-" * 30)
        
        if not self.results['connection']:
            print("❌ API未连接")
            return False
            
        try:
            # 使用官方API: getIdxCons - 指数成分构成
            print("🧪 调用 DataAPI.getIdxCons...")
            
            # 获取最近的成分股数据
            end_date = datetime.now().strftime('%Y%m%d')
            
            result = self.DataAPI.getIdxCons(
                ticker='000300.ZICN',  # 沪深300指数
                endDate=end_date,
                field='ticker,secShortName'
            )
            
            if not result.empty:
                print(f"✅ 指数成分股获取成功: {len(result)} 只")
                
                # 显示前10只成分股
                print("📋 前10只成分股:")
                for i, (_, row) in enumerate(result.head(10).iterrows(), 1):
                    ticker = row.get('ticker', 'N/A')
                    name = row.get('secShortName', 'N/A')
                    print(f"   {i:2d}. {ticker} - {name}")
                
                self.stock_list = result['ticker'].tolist()
                self.results['index_components'] = True
                return True
            else:
                print("❌ 未获取到成分股数据")
                
        except Exception as e:
            print(f"❌ 第一次尝试失败: {e}")
            
        # 尝试备用API: getIdxCloseWeight
        try:
            print("🔄 尝试备用API: getIdxCloseWeight...")
            
            result = self.DataAPI.getIdxCloseWeight(
                ticker='000300.ZICN',
                endDate='',  # 最新数据
                field='ticker,secShortName,weight'
            )
            
            if not result.empty:
                print(f"✅ 备用API成功: {len(result)} 只成分股")
                
                # 显示前5只及权重
                print("📋 前5只成分股及权重:")
                for i, (_, row) in enumerate(result.head(5).iterrows(), 1):
                    ticker = row.get('ticker', 'N/A')
                    name = row.get('secShortName', 'N/A') 
                    weight = row.get('weight', 0)
                    print(f"   {i}. {ticker} - {name} ({weight:.2f}%)")
                
                self.stock_list = result['ticker'].tolist()
                self.results['index_components'] = True
                return True
                
        except Exception as e2:
            print(f"❌ 备用API也失败: {e2}")
        
        # 使用硬编码股票列表作为最后备选
        print("🔄 使用默认股票池...")
        self.stock_list = [
            '000001.XSHE',  # 平安银行
            '000002.XSHE',  # 万科A  
            '600000.XSHG',  # 浦发银行
            '600036.XSHG',  # 招商银行
            '000858.XSHE'   # 五粮液
        ]
        print(f"📋 默认股票池: {len(self.stock_list)} 只")
        return True
    
    def step4_test_price_data(self):
        """步骤4: 测试价格数据 (getMktEqud)"""
        print("\n💰 步骤4: 获取股票价格数据")
        print("-" * 30)
        
        if not hasattr(self, 'stock_list') or not self.stock_list:
            print("❌ 股票列表为空")
            return False
            
        try:
            # 使用官方API: getMktEqud - 沪深股票日行情
            print("🧪 调用 DataAPI.getMktEqud...")
            
            # 取前3只股票测试
            test_stocks = self.stock_list[:3]
            ticker_str = ','.join(test_stocks)
            
            # 设置日期范围（最近2周）
            end_date = datetime.now().strftime('%Y%m%d')
            start_date = (datetime.now() - timedelta(days=14)).strftime('%Y%m%d')
            
            print(f"📊 测试股票: {test_stocks}")
            print(f"📅 日期范围: {start_date} -> {end_date}")
            
            result = self.DataAPI.getMktEqud(
                ticker=ticker_str,
                beginDate=start_date,
                endDate=end_date,
                field='ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,marketValue'
            )
            
            if not result.empty:
                print(f"✅ 价格数据获取成功!")
                print(f"   📊 数据维度: {result.shape}")
                print(f"   🏢 股票数量: {result['ticker'].nunique()}")
                print(f"   📅 交易日数: {result['tradeDate'].nunique()}")
                
                # 数据预览
                print("\n📋 最新价格数据:")
                latest_data = result.sort_values('tradeDate').groupby('ticker').tail(1)
                for _, row in latest_data.iterrows():
                    print(f"   {row['ticker']} | {row['tradeDate']} | 收盘: {row['closePrice']:.2f}元")
                
                self.price_data = result
                self.results['price_data'] = True
                return True
            else:
                print("❌ 未获取到价格数据")
                
        except Exception as e:
            print(f"❌ 价格数据获取失败: {e}")
            print("💡 可能的原因: 股票代码格式、日期范围或字段名称问题")
            
        return False
    
    def step5_test_index_data(self):
        """步骤5: 测试指数数据 (getMktIdxd)"""
        print("\n📊 步骤5: 获取指数行情数据")
        print("-" * 30)
        
        if not self.results['connection']:
            print("❌ API未连接")
            return False
            
        try:
            # 使用官方API: getMktIdxd - 指数日行情
            print("🧪 调用 DataAPI.getMktIdxd...")
            
            # 设置日期范围
            end_date = datetime.now().strftime('%Y%m%d')
            start_date = (datetime.now() - timedelta(days=14)).strftime('%Y%m%d')
            
            result = self.DataAPI.getMktIdxd(
                ticker='000300.ZICN',  # 沪深300指数
                beginDate=start_date,
                endDate=end_date,
                field='ticker,tradeDate,openIndex,highestIndex,lowestIndex,closeIndex,turnoverVol'
            )
            
            if not result.empty:
                print(f"✅ 指数数据获取成功!")
                print(f"   📊 数据维度: {result.shape}")
                print(f"   📅 交易日数: {len(result)}")
                
                # 显示最新数据
                latest = result.iloc[-1]
                print(f"\n📈 最新指数数据:")
                print(f"   日期: {latest['tradeDate']}")
                print(f"   收盘: {latest['closeIndex']:.2f}")
                print(f"   涨跌: {(latest['closeIndex'] - latest['openIndex']):.2f}")
                print(f"   成交量: {latest['turnoverVol']:,.0f}")
                
                self.results['index_data'] = True
                return True
            else:
                print("❌ 未获取到指数数据")
                
        except Exception as e:
            print(f"❌ 指数数据获取失败: {e}")
            
        return False
    
    def step6_data_analysis(self):
        """步骤6: 简单数据分析"""
        print("\n🔧 步骤6: 数据质量分析")
        print("-" * 30)
        
        if not self.results['price_data']:
            print("❌ 无价格数据可分析")
            return False
            
        try:
            # 数据质量检查
            data = self.price_data.copy()
            
            print("📊 数据质量检查:")
            
            # 1. 缺失值检查
            missing_counts = data.isnull().sum()
            missing_pct = (missing_counts / len(data) * 100).round(2)
            
            print("   缺失值统计:")
            for col in data.columns:
                if missing_counts[col] > 0:
                    print(f"      {col}: {missing_counts[col]} ({missing_pct[col]}%)")
                else:
                    print(f"      {col}: ✅ 无缺失")
            
            # 2. 价格异常检查
            print("\n   价格合理性检查:")
            price_cols = ['openPrice', 'highestPrice', 'lowestPrice', 'closePrice']
            for col in price_cols:
                if col in data.columns:
                    min_price = data[col].min()
                    max_price = data[col].max()
                    avg_price = data[col].mean()
                    
                    print(f"      {col}: {min_price:.2f} ~ {max_price:.2f} (均值: {avg_price:.2f})")
                    
                    if min_price <= 0:
                        print(f"      ⚠️  发现异常: {col} 有非正数价格")
            
            # 3. 简单收益率计算
            if len(data) > 1:
                data['tradeDate'] = pd.to_datetime(data['tradeDate'], format='%Y%m%d')
                data = data.sort_values(['ticker', 'tradeDate'])
                data['daily_return'] = data.groupby('ticker')['closePrice'].pct_change()
                
                print("\n   收益率统计:")
                returns_data = data.dropna(subset=['daily_return'])
                if len(returns_data) > 0:
                    print(f"      样本数: {len(returns_data)}")
                    print(f"      均值: {returns_data['daily_return'].mean():.4f}")
                    print(f"      标准差: {returns_data['daily_return'].std():.4f}")
                    print(f"      最大涨幅: {returns_data['daily_return'].max():.4f}")
                    print(f"      最大跌幅: {returns_data['daily_return'].min():.4f}")
            
            print("✅ 数据分析完成")
            return True
            
        except Exception as e:
            print(f"❌ 数据分析失败: {e}")
            return False
    
    def generate_final_report(self):
        """生成最终测试报告"""
        print("\n" + "=" * 50)
        print("📋 最终测试报告")
        print("=" * 50)
        
        # 计算通过率
        passed = sum(self.results.values())
        total = len(self.results)
        pass_rate = (passed / total) * 100
        
        print(f"📊 测试统计:")
        print(f"   通过: {passed}/{total} ({pass_rate:.1f}%)")
        
        print(f"\n📋 详细结果:")
        test_names = {
            'connection': '优矿API连接',
            'stock_basic': '股票基本信息',
            'index_components': '指数成分股',
            'price_data': '股票价格数据',
            'index_data': '指数行情数据'
        }
        
        for key, name in test_names.items():
            status = "✅ 成功" if self.results[key] else "❌ 失败"
            print(f"   {name}: {status}")
        
        # 评估和建议
        print(f"\n🎯 总体评估:")
        if pass_rate >= 80:
            print("状态: 🎉 优秀")
            print("评价: 优矿数据获取功能完全正常")
            print("建议: 可以开始开发策略模块和回测引擎")
            
        elif pass_rate >= 60:
            print("状态: ✅ 良好")
            print("评价: 核心功能可用，部分功能需要优化")
            print("建议: 修复剩余问题后继续开发")
            
        else:
            print("状态: ⚠️  需要改进")
            print("评价: 基础数据获取存在问题")
            print("建议: 优先解决API调用和数据获取问题")
        
        # 下一步建议
        print(f"\n💡 下一步开发建议:")
        
        if self.results['connection'] and self.results['price_data']:
            print("✅ 数据获取功能基本正常，建议按以下顺序开发:")
            print("   1. 🧠 完善 data 模块 - 增加更多数据源和特征工程")
            print("   2. 🎯 开发 strategy 模块 - 实现各种交易策略")
            print("   3. ⚡ 开发 backtest 模块 - 策略回测引擎")
            print("   4. 📈 开发 visualization 模块 - 结果可视化")
            print("   5. 🔧 开发 utils 模块 - 工具函数和辅助功能")
            
        else:
            print("🔧 优先修复以下问题:")
            if not self.results['connection']:
                print("   - 检查优矿SDK安装和Token配置")
            if not self.results['price_data']:
                print("   - 调试API参数和数据格式问题")
                print("   - 验证股票代码格式是否正确")
        
        print(f"\n📚 相关资源:")
        print("   📖 优矿API文档: https://uqer.datayes.com/help/api")
        print("   💬 技术支持: 联系优矿技术支持团队")
        print("   🔧 项目仓库: 在GitHub上分享和协作开发")
        
        return pass_rate >= 60
    
    def run_complete_test(self):
        """运行完整测试流程"""
        print("🧪 开始完整测试...")
        
        # 按步骤执行测试
        step_functions = [
            self.step1_connect_api,
            self.step2_test_stock_basic,
            self.step3_test_index_components,
            self.step4_test_price_data,
            self.step5_test_index_data,
            self.step6_data_analysis
        ]
        
        for i, func in enumerate(step_functions, 1):
            try:
                success = func()
                if not success and i <= 2:  # 前两步是必须的
                    print(f"⚠️  步骤{i}失败，跳过后续测试")
                    break
            except Exception as e:
                print(f"❌ 步骤{i}执行异常: {e}")
        
        # 生成最终报告
        success = self.generate_final_report()
        
        print(f"\n{'='*50}")
        print("🎊 测试完成!")
        print(f"⏰ 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("="*50)
        
        return success

def main():
    """主程序入口"""
    print("🎯 开始完整的优矿数据测试流程...")
    
    # 创建测试实例
    tester = AccurateUQerTest()
    
    # 运行完整测试
    success = tester.run_complete_test()
    
    if success:
        print("\n🎊 恭喜！优矿数据连接测试通过")
        print("💡 建议下一步: 开始开发量化策略模块")
        return 0
    else:
        print("\n⚠️  测试未完全通过，请检查相关配置")
        print("💡 建议: 参考测试报告修复问题后重新测试")
        return 1

# ==========================================
# 📈 扩展功能: 数据质量分析工具
# ==========================================

class DataQualityAnalyzer:
    """数据质量分析器"""
    
    def __init__(self, data):
        self.data = data.copy() if data is not None else None
        
    def analyze_completeness(self):
        """分析数据完整性"""
        if self.data is None or self.data.empty:
            return {"status": "empty", "details": "数据为空"}
            
        total_cells = self.data.size
        missing_cells = self.data.isnull().sum().sum()
        completeness_rate = (total_cells - missing_cells) / total_cells * 100
        
        result = {
            "total_cells": total_cells,
            "missing_cells": missing_cells,
            "completeness_rate": round(completeness_rate, 2),
            "status": "excellent" if completeness_rate >= 95 else 
                     "good" if completeness_rate >= 90 else 
                     "fair" if completeness_rate >= 80 else "poor"
        }
        
        return result
    
    def analyze_consistency(self):
        """分析数据一致性"""
        if self.data is None or self.data.empty:
            return {"status": "empty"}
            
        issues = []
        
        # 检查价格合理性
        if 'closePrice' in self.data.columns:
            negative_prices = (self.data['closePrice'] <= 0).sum()
            if negative_prices > 0:
                issues.append(f"发现{negative_prices}个非正数价格")
        
        # 检查日期连续性
        if 'tradeDate' in self.data.columns:
            self.data['tradeDate'] = pd.to_datetime(self.data['tradeDate'], format='%Y%m%d')
            date_gaps = []
            for ticker in self.data['ticker'].unique():
                ticker_data = self.data[self.data['ticker'] == ticker].sort_values('tradeDate')
                if len(ticker_data) > 1:
                    date_diff = ticker_data['tradeDate'].diff().dt.days
                    large_gaps = date_diff[date_diff > 5]  # 超过5天的间隔
                    if len(large_gaps) > 0:
                        date_gaps.append(f"{ticker}: {len(large_gaps)}个大间隔")
            
            if date_gaps:
                issues.append(f"日期间隔问题: {'; '.join(date_gaps[:3])}")
        
        return {
            "status": "clean" if len(issues) == 0 else "has_issues",
            "issues": issues
        }
    
    def generate_summary_report(self):
        """生成数据质量总结报告"""
        if self.data is None:
            return "❌ 无数据可分析"
            
        completeness = self.analyze_completeness()
        consistency = self.analyze_consistency()
        
        report = f"""
📊 数据质量分析报告
{'='*40}
📏 数据规模: {self.data.shape}
📋 完整性: {completeness['completeness_rate']}% ({completeness['status']})
🔍 一致性: {consistency['status']}
        """
        
        if consistency['issues']:
            report += f"\n⚠️  发现问题:\n"
            for issue in consistency['issues']:
                report += f"   - {issue}\n"
        
        return report.strip()

# ==========================================
# 🧪 高级测试功能
# ==========================================

def advanced_api_test():
    """高级API测试功能"""
    print("\n🧪 高级API功能测试")
    print("-" * 40)
    
    try:
        from uqer import DataAPI
        
        # 测试多个API的组合使用
        print("🔗 测试API组合调用...")
        
        # 获取沪深300成分股
        idx_cons = DataAPI.getIdxCons(
            ticker='000300.ZICN',
            endDate='',
            field='ticker,secShortName'
        )
        
        if not idx_cons.empty:
            # 选择前5只股票获取详细数据
            sample_stocks = idx_cons.head(5)['ticker'].tolist()
            
            print(f"📋 测试股票: {sample_stocks}")
            
            # 获取基本信息
            basic_info = DataAPI.getEqu(
                ticker=','.join(sample_stocks),
                field='ticker,secShortName,listDate,industryName1'
            )
            
            # 获取最近价格数据
            end_date = datetime.now().strftime('%Y%m%d')
            start_date = (datetime.now() - timedelta(days=7)).strftime('%Y%m%d')
            
            price_data = DataAPI.getMktEqud(
                ticker=','.join(sample_stocks),
                beginDate=start_date,
                endDate=end_date,
                field='ticker,tradeDate,closePrice,turnoverVol'
            )
            
            if not basic_info.empty and not price_data.empty:
                print("✅ API组合调用成功!")
                
                # 合并数据进行简单分析
                merged_data = price_data.merge(
                    basic_info[['ticker', 'secShortName', 'industryName1']], 
                    on='ticker', 
                    how='left'
                )
                
                print(f"📊 合并数据维度: {merged_data.shape}")
                
                # 按行业统计
                if 'industryName1' in merged_data.columns:
                    industry_stats = merged_data.groupby('industryName1')['ticker'].nunique()
                    print("🏭 行业分布:")
                    for industry, count in industry_stats.items():
                        print(f"   {industry}: {count}只")
                
                return True
            else:
                print("❌ 数据获取不完整")
                return False
        else:
            print("❌ 无法获取指数成分股")
            return False
            
    except Exception as e:
        print(f"❌ 高级测试失败: {e}")
        return False

def benchmark_api_performance():
    """API性能基准测试"""
    print("\n⚡ API性能基准测试")
    print("-" * 40)
    
    try:
        from uqer import DataAPI
        import time
        
        # 测试不同规模数据获取的性能
        test_cases = [
            {"stocks": 1, "days": 7, "desc": "小规模"},
            {"stocks": 5, "days": 7, "desc": "中规模"},
            {"stocks": 10, "days": 7, "desc": "大规模"},
        ]
        
        performance_results = []
        
        # 准备测试股票
        test_stocks = [
            '000001.XSHE', '000002.XSHE', '600000.XSHG', 
            '600036.XSHG', '000858.XSHE', '000001.XSHE',
            '600519.XSHG', '000002.XSHE', '600036.XSHG', 
            '300015.XSZE'
        ]
        
        end_date = datetime.now().strftime('%Y%m%d')
        
        for test_case in test_cases:
            stock_count = test_case["stocks"]
            day_count = test_case["days"] 
            desc = test_case["desc"]
            
            # 准备参数
            selected_stocks = test_stocks[:stock_count]
            start_date = (datetime.now() - timedelta(days=day_count)).strftime('%Y%m%d')
            ticker_str = ','.join(selected_stocks)
            
            print(f"🧪 测试 {desc}: {stock_count}只股票，{day_count}天数据")
            
            # 开始计时
            start_time = time.time()
            
            try:
                result = DataAPI.getMktEqud(
                    ticker=ticker_str,
                    beginDate=start_date,
                    endDate=end_date,
                    field='ticker,tradeDate,closePrice,turnoverVol'
                )
                
                end_time = time.time()
                duration = end_time - start_time
                
                if not result.empty:
                    rows = len(result)
                    performance_results.append({
                        "case": desc,
                        "stocks": stock_count,
                        "days": day_count,
                        "duration": round(duration, 3),
                        "rows": rows,
                        "rows_per_second": round(rows / duration, 1)
                    })
                    
                    print(f"   ✅ 耗时: {duration:.3f}秒, 数据量: {rows}行, 速率: {rows/duration:.1f}行/秒")
                else:
                    print(f"   ❌ 未获取到数据")
                    
            except Exception as e:
                print(f"   ❌ 测试失败: {e}")
        
        # 性能总结
        if performance_results:
            print(f"\n📊 性能测试总结:")
            print(f"{'测试案例':<8} {'耗时(秒)':<8} {'数据量':<8} {'速率(行/秒)':<12}")
            print("-" * 40)
            for result in performance_results:
                print(f"{result['case']:<8} {result['duration']:<8} {result['rows']:<8} {result['rows_per_second']:<12}")
            
            # 计算平均性能
            avg_speed = sum(r['rows_per_second'] for r in performance_results) / len(performance_results)
            print(f"\n📈 平均处理速度: {avg_speed:.1f} 行/秒")
            
            if avg_speed > 1000:
                print("🚀 性能评估: 优秀")
            elif avg_speed > 500:
                print("✅ 性能评估: 良好") 
            else:
                print("⚠️  性能评估: 需优化")
            
            return True
        else:
            print("❌ 所有性能测试均失败")
            return False
            
    except Exception as e:
        print(f"❌ 性能测试异常: {e}")
        return False

def export_test_data(data_dict, filename=None):
    """导出测试数据为文件"""
    if not filename:
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f"uqer_test_data_{timestamp}.json"
    
    try:
        # 确保数据目录存在
        os.makedirs("./data", exist_ok=True)
        filepath = os.path.join("./data", filename)
        
        # 转换pandas DataFrame为字典格式便于JSON序列化
        export_data = {}
        for key, value in data_dict.items():
            if isinstance(value, pd.DataFrame):
                export_data[key] = {
                    'data': value.to_dict('records'),
                    'shape': value.shape,
                    'columns': list(value.columns)
                }
            else:
                export_data[key] = value
        
        # 添加元数据
        export_data['metadata'] = {
            'export_time': datetime.now().isoformat(),
            'total_datasets': len([k for k, v in data_dict.items() if isinstance(v, pd.DataFrame)]),
            'framework_version': '1.0.0'
        }
        
        # 写入JSON文件
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(export_data, f, ensure_ascii=False, indent=2)
        
        print(f"💾 测试数据已导出: {filepath}")
        return filepath
        
    except Exception as e:
        print(f"❌ 数据导出失败: {e}")
        return None

# ==========================================
# 🎯 完整测试流程增强
# ==========================================

# 在AccurateUQerTest类中添加新的测试方法
def add_enhanced_methods():
    """为AccurateUQerTest类添加增强功能"""
    
    def step7_advanced_features_test(self):
        """步骤7: 高级功能测试"""
        print("\n🚀 步骤7: 高级功能测试")
        print("-" * 30)
        
        success_count = 0
        
        # API组合测试
        if advanced_api_test():
            success_count += 1
            print("✅ API组合调用测试通过")
        else:
            print("❌ API组合调用测试失败")
        
        # 性能基准测试
        if benchmark_api_performance():
            success_count += 1
            print("✅ 性能基准测试通过")
        else:
            print("❌ 性能基准测试失败")
        
        # 数据质量分析
        if hasattr(self, 'price_data') and self.price_data is not None:
            analyzer = DataQualityAnalyzer(self.price_data)
            quality_report = analyzer.generate_summary_report()
            print(f"\n📊 数据质量分析:")
            print(quality_report)
            success_count += 1
        
        return success_count >= 2
    
    def step8_data_export_test(self):
        """步骤8: 数据导出测试"""
        print("\n💾 步骤8: 数据导出功能测试")
        print("-" * 30)
        
        # 收集所有测试数据
        export_data = {}
        
        if hasattr(self, 'price_data') and self.price_data is not None:
            export_data['price_data'] = self.price_data
            
        if hasattr(self, 'stock_list') and self.stock_list:
            export_data['stock_list'] = self.stock_list
            
        export_data['test_results'] = self.results
        
        # 导出数据
        if export_data:
            filepath = export_test_data(export_data)
            if filepath:
                print("✅ 数据导出测试通过")
                return True
            else:
                print("❌ 数据导出测试失败")
                return False
        else:
            print("⚠️  无数据可导出")
            return False
    
    def generate_comprehensive_report(self):
        """生成全面测试报告"""
        print("\n" + "=" * 60)
        print("📋 全面测试报告")
        print("=" * 60)
        
        # 计算通过率
        passed = sum(self.results.values())
        total = len(self.results)
        pass_rate = (passed / total) * 100
        
        print(f"📊 核心功能测试统计:")
        print(f"   通过: {passed}/{total} ({pass_rate:.1f}%)")
        
        print(f"\n📋 详细测试结果:")
        test_names = {
            'connection': '优矿API连接',
            'stock_basic': '股票基本信息',
            'index_components': '指数成分股', 
            'price_data': '股票价格数据',
            'index_data': '指数行情数据'
        }
        
        for key, name in test_names.items():
            status = "✅ 成功" if self.results[key] else "❌ 失败"
            print(f"   {name}: {status}")
        
        # 生成建议
        self._generate_development_suggestions(pass_rate)
        
        # 输出框架集成指导
        self._output_framework_integration_guide()
        
        return pass_rate >= 60
    
    def _generate_development_suggestions(self, pass_rate):
        """生成开发建议"""
        print(f"\n🎯 开发建议:")
        
        if pass_rate >= 80:
            print("✅ 数据层基础扎实，建议开发顺序:")
            print("   1. 🧠 完善data模块 - 增加特征工程和数据管道")
            print("   2. 🎯 开发strategy模块 - 实现量化策略")
            print("   3. ⚡ 开发backtest模块 - 构建回测引擎")
            print("   4. 📊 开发visualization模块 - 策略分析可视化")
        else:
            print("🔧 优先修复以下问题:")
            if not self.results.get('connection'):
                print("   - 检查优矿SDK和Token配置")
            if not self.results.get('price_data'):
                print("   - 调试股票代码格式和API参数")
    
    def _output_framework_integration_guide(self):
        """输出框架集成指导"""
        print(f"\n🏗️  量化框架集成指导:")
        print("   📁 推荐项目结构:")
        print("   QuantTrade/")
        print("   ├── core/")
        print("   │   ├── data/          # 当前测试的模块")
        print("   │   ├── strategy/      # 待开发：策略模块")
        print("   │   ├── backtest/      # 待开发：回测引擎")
        print("   │   └── utils/         # 待开发：工具函数")
        print("   ├── configs/           # 配置文件")
        print("   ├── notebooks/         # Jupyter分析笔记") 
        print("   └── tests/             # 测试脚本")
        
        print(f"\n💡 下一步行动清单:")
        print("   🔹 创建core/data模块并集成当前测试功能")
        print("   🔹 设计统一的配置管理系统")
        print("   🔹 实现数据缓存和持久化机制")
        print("   🔹 开发策略基类和回测框架")
    
    # 将方法绑定到AccurateUQerTest类
    AccurateUQerTest.step7_advanced_features_test = step7_advanced_features_test
    AccurateUQerTest.step8_data_export_test = step8_data_export_test
    AccurateUQerTest.generate_comprehensive_report = generate_comprehensive_report
    AccurateUQerTest._generate_development_suggestions = _generate_development_suggestions
    AccurateUQerTest._output_framework_integration_guide = _output_framework_integration_guide

# 添加增强方法
add_enhanced_methods()

# 更新run_complete_test方法
def enhanced_run_complete_test(self):
    """增强版完整测试流程"""
    print("🧪 开始增强版完整测试...")
    
    # 原有测试步骤
    step_functions = [
        self.step1_connect_api,
        self.step2_test_stock_basic,
        self.step3_test_index_components,
        self.step4_test_price_data,
        self.step5_test_index_data,
        self.step6_data_analysis,
    ]
    
    # 新增步骤
    enhanced_steps = [
        self.step7_advanced_features_test,
        self.step8_data_export_test,
    ]
    
    # 执行所有测试
    for i, func in enumerate(step_functions + enhanced_steps, 1):
        try:
            success = func()
            if not success and i <= 2:  # 前两步是必须的
                print(f"⚠️  步骤{i}失败，可能影响后续测试")
        except Exception as e:
            print(f"❌ 步骤{i}执行异常: {e}")
    
    # 生成全面报告
    success = self.generate_comprehensive_report()
    
    print(f"\n{'='*60}")
    print("🎊 增强版测试完成!")
    print(f"⏰ 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("="*60)
    
    return success

# 绑定增强版测试方法
AccurateUQerTest.run_complete_test = enhanced_run_complete_test

# ==========================================
# 🎯 程序入口
# ==========================================

if __name__ == "__main__":
    exit_code = main()
    sys.exit(exit_code)

📁 当前路径: /Users/jackstudio/QuantTrade/tests
🚀 基于官方接口的优矿数据测试
📅 测试时间: 2025-08-26 06:57:17
🎯 开始完整的优矿数据测试流程...
🧪 开始增强版完整测试...

🔌 步骤1: 连接优矿API
------------------------------
db861372k7@16720 账号登录成功
✅ 优矿客户端连接成功

📋 步骤2: 获取股票基本信息
------------------------------
🧪 调用 DataAPI.getEqu...
❌ 获取失败: module 'uqer.DataAPI' has no attribute 'getEqu'
🔄 尝试备用字段...
❌ 备用方案也失败: module 'uqer.DataAPI' has no attribute 'getEqu'
⚠️  步骤2失败，可能影响后续测试

📈 步骤3: 获取沪深300成分股
------------------------------
🧪 调用 DataAPI.getIdxCons...
❌ 第一次尝试失败: module 'uqer.DataAPI' has no attribute 'getIdxCons'
🔄 尝试备用API: getIdxCloseWeight...
❌ 备用API也失败: module 'uqer.DataAPI' has no attribute 'getIdxCloseWeight'
🔄 使用默认股票池...
📋 默认股票池: 5 只

💰 步骤4: 获取股票价格数据
------------------------------
🧪 调用 DataAPI.getMktEqud...
📊 测试股票: ['000001.XSHE', '000002.XSHE', '600000.XSHG']
📅 日期范围: 20250812 -> 20250826
❌ 价格数据获取失败: module 'uqer.DataAPI' has no attribute 'getMktEqud'
💡 可能的原因: 股票代码格式、日期范围或字段名称问题

📊 步骤5: 获取指数行情数据
------------------------------
🧪 调用 DataAPI

SystemExit: 1

In [3]:
# 优矿API方法名称修复补丁
# 在原有测试脚本中添加以下修复代码

def fix_uqer_api_methods(test_instance):
    """修复优矿API方法调用"""
    
    # 修复step2_test_stock_basic方法
    def step2_test_stock_basic_fixed(self):
        """步骤2: 测试股票基本信息 (修复版)"""
        print("\n📋 步骤2: 获取股票基本信息 (修复版)")
        print("-" * 30)
        
        if not self.results['connection']:
            print("❌ API未连接")
            return False
            
        try:
            # 使用正确的API方法: EquGet (不是getEqu)
            print("🧪 调用 DataAPI.EquGet...")
            
            result = self.DataAPI.EquGet(
                ticker='000001',
                field='ticker,shortName,listDate,fullName,equTypeCD'
            )
            
            if not result.empty:
                print("✅ 股票基本信息获取成功!")
                info = result.iloc[0]
                print(f"   📊 股票代码: {info.get('ticker', 'N/A')}")
                print(f"   📝 股票简称: {info.get('shortName', 'N/A')}")
                print(f"   📅 上市日期: {info.get('listDate', 'N/A')}")
                
                self.results['stock_basic'] = True
                return True
            else:
                print("❌ 返回空数据")
                return False
                
        except Exception as e:
            print(f"❌ 获取失败: {e}")
            return False
    
    # 修复step3_test_index_components方法
    def step3_test_index_components_fixed(self):
        """步骤3: 测试指数成分股 (修复版)"""
        print("\n📈 步骤3: 获取沪深300成分股 (修复版)")
        print("-" * 30)
        
        if not self.results['connection']:
            print("❌ API未连接")
            return False
            
        try:
            # 使用正确的API方法: IdxConsGet (不是getIdxCons)
            print("🧪 调用 DataAPI.IdxConsGet...")
            
            result = self.DataAPI.IdxConsGet(
                ticker='000300',  # 沪深300指数代码
                field='consTickerSymbol,consShortName'
            )
            
            if not result.empty:
                print(f"✅ 指数成分股获取成功: {len(result)} 只")
                
                # 显示前10只成分股
                print("📋 前10只成分股:")
                for i, (_, row) in enumerate(result.head(10).iterrows(), 1):
                    ticker = row.get('consTickerSymbol', 'N/A')
                    name = row.get('consShortName', 'N/A')
                    print(f"   {i:2d}. {ticker} - {name}")
                
                self.stock_list = result['consTickerSymbol'].tolist()
                self.results['index_components'] = True
                return True
            else:
                print("❌ 未获取到成分股数据")
                
        except Exception as e:
            print(f"❌ 获取失败: {e}")
        
        # 使用硬编码股票列表作为备选
        print("🔄 使用默认股票池...")
        self.stock_list = [
            '000001.XSHE',  # 平安银行
            '000002.XSHE',  # 万科A  
            '600000.XSHG',  # 浦发银行
            '600036.XSHG',  # 招商银行
            '000858.XSHE'   # 五粮液
        ]
        print(f"📋 默认股票池: {len(self.stock_list)} 只")
        return True
    
    # 修复step4_test_price_data方法
    def step4_test_price_data_fixed(self):
        """步骤4: 测试价格数据 (修复版)"""
        print("\n💰 步骤4: 获取股票价格数据 (修复版)")
        print("-" * 30)
        
        if not hasattr(self, 'stock_list') or not self.stock_list:
            print("❌ 股票列表为空")
            return False
            
        try:
            # 使用正确的API方法: MktEqudGet (不是getMktEqud)
            print("🧪 调用 DataAPI.MktEqudGet...")
            
            # 取前3只股票测试
            test_stocks = [stock.replace('.XSHE', '').replace('.XSHG', '') for stock in self.stock_list[:3]]
            ticker_str = ','.join(test_stocks)
            
            # 设置日期范围（最近2周）
            end_date = datetime.now().strftime('%Y%m%d')
            start_date = (datetime.now() - timedelta(days=14)).strftime('%Y%m%d')
            
            print(f"📊 测试股票: {test_stocks}")
            print(f"📅 日期范围: {start_date} -> {end_date}")
            
            result = self.DataAPI.MktEqudGet(
                ticker=ticker_str,
                beginDate=start_date,
                endDate=end_date,
                field='ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,marketValue'
            )
            
            if not result.empty:
                print(f"✅ 价格数据获取成功!")
                print(f"   📊 数据维度: {result.shape}")
                print(f"   🏢 股票数量: {result['ticker'].nunique()}")
                print(f"   📅 交易日数: {result['tradeDate'].nunique()}")
                
                # 数据预览
                print("\n📋 最新价格数据:")
                latest_data = result.sort_values('tradeDate').groupby('ticker').tail(1)
                for _, row in latest_data.iterrows():
                    print(f"   {row['ticker']} | {row['tradeDate']} | 收盘: {row['closePrice']:.2f}元")
                
                self.price_data = result
                self.results['price_data'] = True
                return True
            else:
                print("❌ 未获取到价格数据")
                
        except Exception as e:
            print(f"❌ 价格数据获取失败: {e}")
            
        return False
    
    # 修复step5_test_index_data方法
    def step5_test_index_data_fixed(self):
        """步骤5: 测试指数数据 (修复版)"""
        print("\n📊 步骤5: 获取指数行情数据 (修复版)")
        print("-" * 30)
        
        if not self.results['connection']:
            print("❌ API未连接")
            return False
            
        try:
            # 使用正确的API方法: MktIdxdGet (不是getMktIdxd)
            print("🧪 调用 DataAPI.MktIdxdGet...")
            
            # 设置日期范围
            end_date = datetime.now().strftime('%Y%m%d')
            start_date = (datetime.now() - timedelta(days=14)).strftime('%Y%m%d')
            
            result = self.DataAPI.MktIdxdGet(
                ticker='000300',  # 沪深300指数
                beginDate=start_date,
                endDate=end_date,
                field='ticker,tradeDate,openIndex,highestIndex,lowestIndex,closeIndex,turnoverVol'
            )
            
            if not result.empty:
                print(f"✅ 指数数据获取成功!")
                print(f"   📊 数据维度: {result.shape}")
                print(f"   📅 交易日数: {len(result)}")
                
                # 显示最新数据
                latest = result.iloc[-1]
                print(f"\n📈 最新指数数据:")
                print(f"   日期: {latest['tradeDate']}")
                print(f"   收盘: {latest['closeIndex']:.2f}")
                print(f"   涨跌: {(latest['closeIndex'] - latest['openIndex']):.2f}")
                print(f"   成交量: {latest['turnoverVol']:,.0f}")
                
                self.results['index_data'] = True
                return True
            else:
                print("❌ 未获取到指数数据")
                
        except Exception as e:
            print(f"❌ 指数数据获取失败: {e}")
            
        return False
    
    # 绑定修复后的方法到测试实例
    test_instance.step2_test_stock_basic = step2_test_stock_basic_fixed.__get__(test_instance)
    test_instance.step3_test_index_components = step3_test_index_components_fixed.__get__(test_instance)
    test_instance.step4_test_price_data = step4_test_price_data_fixed.__get__(test_instance)
    test_instance.step5_test_index_data = step5_test_index_data_fixed.__get__(test_instance)
    
    return test_instance

# 修复高级测试功能
def fix_advanced_api_test():
    """修复高级API测试功能"""
    print("\n🧪 高级API功能测试 (修复版)")
    print("-" * 40)
    
    try:
        from uqer import DataAPI
        
        # 测试多个API的组合使用
        print("🔗 测试API组合调用...")
        
        # 获取沪深300成分股 (修复版)
        idx_cons = DataAPI.IdxConsGet(
            ticker='000300',
            field='consTickerSymbol,consShortName'
        )
        
        if not idx_cons.empty:
            # 选择前5只股票获取详细数据
            sample_stocks = [stock.replace('.XSHE', '').replace('.XSHG', '') 
                            for stock in idx_cons.head(5)['consTickerSymbol'].tolist()]
            
            print(f"📋 测试股票: {sample_stocks}")
            
            # 获取基本信息 (修复版)
            basic_info = DataAPI.EquGet(
                ticker=','.join(sample_stocks),
                field='ticker,shortName,listDate,industryName1'
            )
            
            # 获取最近价格数据 (修复版)
            end_date = datetime.now().strftime('%Y%m%d')
            start_date = (datetime.now() - timedelta(days=7)).strftime('%Y%m%d')
            
            price_data = DataAPI.MktEqudGet(
                ticker=','.join(sample_stocks),
                beginDate=start_date,
                endDate=end_date,
                field='ticker,tradeDate,closePrice,turnoverVol'
            )
            
            if not basic_info.empty and not price_data.empty:
                print("✅ API组合调用成功!")
                
                # 合并数据进行简单分析
                merged_data = price_data.merge(
                    basic_info[['ticker', 'shortName', 'industryName1']], 
                    on='ticker', 
                    how='left'
                )
                
                print(f"📊 合并数据维度: {merged_data.shape}")
                
                # 按行业统计
                if 'industryName1' in merged_data.columns:
                    industry_stats = merged_data.groupby('industryName1')['ticker'].nunique()
                    print("🏭 行业分布:")
                    for industry, count in industry_stats.items():
                        print(f"   {industry}: {count}只")
                
                return True
            else:
                print("❌ 数据获取不完整")
                return False
        else:
            print("❌ 无法获取指数成分股")
            return False
            
    except Exception as e:
        print(f"❌ 高级测试失败: {e}")
        return False

# 修复性能测试
def fix_benchmark_api_performance():
    """修复API性能基准测试"""
    print("\n⚡ API性能基准测试 (修复版)")
    print("-" * 40)
    
    try:
        from uqer import DataAPI
        import time
        
        # 测试不同规模数据获取的性能
        test_cases = [
            {"stocks": 1, "days": 7, "desc": "小规模"},
            {"stocks": 3, "days": 7, "desc": "中规模"},
            {"stocks": 5, "days": 7, "desc": "大规模"},
        ]
        
        performance_results = []
        
        # 准备测试股票（去掉交易所后缀）
        test_stocks = ['000001', '000002', '600000', '600036', '000858']
        
        end_date = datetime.now().strftime('%Y%m%d')
        
        for test_case in test_cases:
            stock_count = test_case["stocks"]
            day_count = test_case["days"] 
            desc = test_case["desc"]
            
            # 准备参数
            selected_stocks = test_stocks[:stock_count]
            start_date = (datetime.now() - timedelta(days=day_count)).strftime('%Y%m%d')
            ticker_str = ','.join(selected_stocks)
            
            print(f"🧪 测试 {desc}: {stock_count}只股票，{day_count}天数据")
            
            # 开始计时
            start_time = time.time()
            
            try:
                result = DataAPI.MktEqudGet(  # 使用正确的方法名
                    ticker=ticker_str,
                    beginDate=start_date,
                    endDate=end_date,
                    field='ticker,tradeDate,closePrice,turnoverVol'
                )
                
                end_time = time.time()
                duration = end_time - start_time
                
                if not result.empty:
                    rows = len(result)
                    performance_results.append({
                        "case": desc,
                        "stocks": stock_count,
                        "days": day_count,
                        "duration": round(duration, 3),
                        "rows": rows,
                        "rows_per_second": round(rows / duration, 1)
                    })
                    
                    print(f"   ✅ 耗时: {duration:.3f}秒, 数据量: {rows}行, 速率: {rows/duration:.1f}行/秒")
                else:
                    print(f"   ❌ 未获取到数据")
                    
            except Exception as e:
                print(f"   ❌ 测试失败: {e}")
        
        # 性能总结
        if performance_results:
            print(f"\n📊 性能测试总结:")
            print(f"{'测试案例':<8} {'耗时(秒)':<8} {'数据量':<8} {'速率(行/秒)':<12}")
            print("-" * 40)
            for result in performance_results:
                print(f"{result['case']:<8} {result['duration']:<8} {result['rows']:<8} {result['rows_per_second']:<12}")
            
            # 计算平均性能
            avg_speed = sum(r['rows_per_second'] for r in performance_results) / len(performance_results)
            print(f"\n📈 平均处理速度: {avg_speed:.1f} 行/秒")
            
            if avg_speed > 100:
                print("🚀 性能评估: 优秀")
            elif avg_speed > 50:
                print("✅ 性能评估: 良好") 
            else:
                print("⚠️  性能评估: 需优化")
            
            return True
        else:
            print("❌ 所有性能测试均失败")
            return False
            
    except Exception as e:
        print(f"❌ 性能测试异常: {e}")
        return False

# 添加json导入修复
import json

print("🔧 优矿API方法修复补丁已加载")
print("✅ 修复内容:")
print("   - getEqu -> EquGet")
print("   - getIdxCons -> IdxConsGet") 
print("   - getMktEqud -> MktEqudGet")
print("   - getMktIdxd -> MktIdxdGet")
print("   - 添加json模块导入")
print("   - 修复股票代码格式")
print("   - 优化API调用参数")

# 使用方法:
# 在创建AccurateUQerTest实例后调用:
# test_instance = AccurateUQerTest()
# test_instance = fix_uqer_api_methods(test_instance)

🔧 优矿API方法修复补丁已加载
✅ 修复内容:
   - getEqu -> EquGet
   - getIdxCons -> IdxConsGet
   - getMktEqud -> MktEqudGet
   - getMktIdxd -> MktIdxdGet
   - 添加json模块导入
   - 修复股票代码格式
   - 优化API调用参数


In [5]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
修复后的优矿数据测试脚本 - 完全可运行版本
============================================

修复内容:
1. ✅ 正确的API方法名称 (EquGet, MktEqudGet, IdxConsGet等)  
2. ✅ 股票代码格式处理
3. ✅ 导入json模块
4. ✅ 优化API调用参数
5. ✅ 错误处理增强

项目路径: /Users/jackstudio/QuantTrade
"""

import os
import sys
import pandas as pd
import numpy as np
import warnings
import json
import time
from datetime import datetime, timedelta

# 设置项目路径
project_path = "/Users/jackstudio/QuantTrade"
if os.path.exists(project_path):
    os.chdir(project_path)
    print(f"✅ 项目路径: {project_path}")
else:
    print(f"📁 当前路径: {os.getcwd()}")

warnings.filterwarnings('ignore')

print("🚀 修复后的优矿数据测试 - 完全可运行版本")
print("=" * 60)
print(f"📅 测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

class FixedUQerTest:
    """修复后的优矿API测试类"""
    
    def __init__(self):
        # 优矿配置
        self.UQER_TOKEN = "68b9922817ae6273137bda7acba81e293582ba347281dfcc056dcb245b23faf3"
        self.client = None
        self.DataAPI = None
        
        # 测试结果
        self.results = {
            'connection': False,
            'stock_basic': False,
            'index_components': False,
            'price_data': False,
            'index_data': False
        }
        
    def step1_connect_api(self):
        """步骤1: 连接优矿API"""
        print("\n🔌 步骤1: 连接优矿API")
        print("-" * 30)
        
        try:
            from uqer import Client, DataAPI
            
            # 建立连接
            self.client = Client(token=self.UQER_TOKEN)
            self.DataAPI = DataAPI
            print("✅ 优矿客户端连接成功")
            
            self.results['connection'] = True
            return True
            
        except ImportError:
            print("❌ 优矿SDK未安装: pip install uqer")
            return False
        except Exception as e:
            print(f"❌ 连接失败: {e}")
            return False
    
    def step2_test_stock_basic(self):
        """步骤2: 测试股票基本信息 (修复版)"""
        print("\n📋 步骤2: 获取股票基本信息")
        print("-" * 30)
        
        if not self.results['connection']:
            print("❌ API未连接")
            return False
            
        try:
            # 使用正确的API方法: EquGet
            print("🧪 调用 DataAPI.EquGet...")
            
            result = self.DataAPI.EquGet(
                ticker='000001',
                field='ticker,shortName,listDate,fullName,equTypeCD'
            )
            
            if not result.empty:
                print("✅ 股票基本信息获取成功!")
                info = result.iloc[0]
                print(f"   📊 股票代码: {info.get('ticker', 'N/A')}")
                print(f"   📝 股票简称: {info.get('shortName', 'N/A')}")
                print(f"   📅 上市日期: {info.get('listDate', 'N/A')}")
                
                self.results['stock_basic'] = True
                return True
            else:
                print("❌ 返回空数据")
                return False
                
        except Exception as e:
            print(f"❌ 获取失败: {e}")
            return False
    
    def step3_test_index_components(self):
        """步骤3: 测试指数成分股 (修复版)"""
        print("\n📈 步骤3: 获取沪深300成分股")
        print("-" * 30)
        
        if not self.results['connection']:
            print("❌ API未连接")
            return False
            
        try:
            # 使用正确的API方法: IdxConsGet
            print("🧪 调用 DataAPI.IdxConsGet...")
            
            result = self.DataAPI.IdxConsGet(
                ticker='000300',
                field='consTickerSymbol,consShortName'
            )
            
            if not result.empty:
                print(f"✅ 指数成分股获取成功: {len(result)} 只")
                
                # 显示前10只成分股
                print("📋 前10只成分股:")
                for i, (_, row) in enumerate(result.head(10).iterrows(), 1):
                    ticker = row.get('consTickerSymbol', 'N/A')
                    name = row.get('consShortName', 'N/A')
                    print(f"   {i:2d}. {ticker} - {name}")
                
                # 保存股票列表（保持原格式便于后续使用）
                self.stock_list = result['consTickerSymbol'].tolist()
                self.results['index_components'] = True
                return True
            else:
                print("❌ 未获取到成分股数据")
                
        except Exception as e:
            print(f"❌ 获取失败: {e}")
        
        # 使用硬编码股票列表作为备选
        print("🔄 使用默认股票池...")
        self.stock_list = [
            '000001.XSHE',  # 平安银行
            '000002.XSHE',  # 万科A  
            '600000.XSHG',  # 浦发银行
            '600036.XSHG',  # 招商银行
            '000858.XSHE'   # 五粮液
        ]
        print(f"📋 默认股票池: {len(self.stock_list)} 只")
        return True
    
    def step4_test_price_data(self):
        """步骤4: 测试价格数据 (修复版)"""
        print("\n💰 步骤4: 获取股票价格数据")
        print("-" * 30)
        
        if not hasattr(self, 'stock_list') or not self.stock_list:
            print("❌ 股票列表为空")
            return False
            
        try:
            # 使用正确的API方法: MktEqudGet
            print("🧪 调用 DataAPI.MktEqudGet...")
            
            # 取前3只股票测试，处理股票代码格式
            raw_stocks = self.stock_list[:3]
            test_stocks = []
            
            for stock in raw_stocks:
                # 去掉交易所后缀，优矿API使用不带后缀的格式
                clean_code = stock.replace('.XSHE', '').replace('.XSHG', '').replace('.XSZE', '')
                test_stocks.append(clean_code)
            
            ticker_str = ','.join(test_stocks)
            
            # 设置日期范围（最近2周）
            end_date = datetime.now().strftime('%Y%m%d')
            start_date = (datetime.now() - timedelta(days=14)).strftime('%Y%m%d')
            
            print(f"📊 测试股票: {test_stocks}")
            print(f"📅 日期范围: {start_date} -> {end_date}")
            
            result = self.DataAPI.MktEqudGet(
                ticker=ticker_str,
                beginDate=start_date,
                endDate=end_date,
                field='ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,marketValue'
            )
            
            if not result.empty:
                print(f"✅ 价格数据获取成功!")
                print(f"   📊 数据维度: {result.shape}")
                print(f"   🏢 股票数量: {result['ticker'].nunique()}")
                print(f"   📅 交易日数: {result['tradeDate'].nunique()}")
                
                # 数据预览
                print("\n📋 最新价格数据:")
                latest_data = result.sort_values('tradeDate').groupby('ticker').tail(1)
                for _, row in latest_data.iterrows():
                    print(f"   {row['ticker']} | {row['tradeDate']} | 收盘: {row['closePrice']:.2f}元")
                
                self.price_data = result
                self.results['price_data'] = True
                return True
            else:
                print("❌ 未获取到价格数据")
                
        except Exception as e:
            print(f"❌ 价格数据获取失败: {e}")
            
        return False
    
    def step5_test_index_data(self):
        """步骤5: 测试指数数据 (修复版)"""
        print("\n📊 步骤5: 获取指数行情数据")
        print("-" * 30)
        
        if not self.results['connection']:
            print("❌ API未连接")
            return False
            
        try:
            # 使用正确的API方法: MktIdxdGet
            print("🧪 调用 DataAPI.MktIdxdGet...")
            
            # 设置日期范围
            end_date = datetime.now().strftime('%Y%m%d')
            start_date = (datetime.now() - timedelta(days=14)).strftime('%Y%m%d')
            
            result = self.DataAPI.MktIdxdGet(
                ticker='000300',  # 沪深300指数
                beginDate=start_date,
                endDate=end_date,
                field='ticker,tradeDate,openIndex,highestIndex,lowestIndex,closeIndex,turnoverVol'
            )
            
            if not result.empty:
                print(f"✅ 指数数据获取成功!")
                print(f"   📊 数据维度: {result.shape}")
                print(f"   📅 交易日数: {len(result)}")
                
                # 显示最新数据
                latest = result.iloc[-1]
                print(f"\n📈 最新指数数据:")
                print(f"   日期: {latest['tradeDate']}")
                print(f"   收盘: {latest['closeIndex']:.2f}")
                print(f"   涨跌: {(latest['closeIndex'] - latest['openIndex']):.2f}")
                print(f"   成交量: {latest['turnoverVol']:,.0f}")
                
                self.results['index_data'] = True
                return True
            else:
                print("❌ 未获取到指数数据")
                
        except Exception as e:
            print(f"❌ 指数数据获取失败: {e}")
            
        return False
    
    def step6_data_analysis(self):
        """步骤6: 简单数据分析"""
        print("\n🔧 步骤6: 数据质量分析")
        print("-" * 30)
        
        if not self.results['price_data']:
            print("❌ 无价格数据可分析")
            return False
            
        try:
            # 数据质量检查
            data = self.price_data.copy()
            
            print("📊 数据质量检查:")
            
            # 1. 缺失值检查
            missing_counts = data.isnull().sum()
            missing_pct = (missing_counts / len(data) * 100).round(2)
            
            print("   缺失值统计:")
            for col in data.columns:
                if missing_counts[col] > 0:
                    print(f"      {col}: {missing_counts[col]} ({missing_pct[col]}%)")
                else:
                    print(f"      {col}: ✅ 无缺失")
            
            # 2. 价格合理性检查
            print("\n   价格合理性检查:")
            price_cols = ['openPrice', 'highestPrice', 'lowestPrice', 'closePrice']
            for col in price_cols:
                if col in data.columns:
                    min_price = data[col].min()
                    max_price = data[col].max()
                    avg_price = data[col].mean()
                    
                    print(f"      {col}: {min_price:.2f} ~ {max_price:.2f} (均值: {avg_price:.2f})")
                    
                    if min_price <= 0:
                        print(f"      ⚠️  发现异常: {col} 有非正数价格")
            
            # 3. 简单收益率计算
            if len(data) > 1:
                data['tradeDate'] = pd.to_datetime(data['tradeDate'], format='%Y%m%d')
                data = data.sort_values(['ticker', 'tradeDate'])
                data['daily_return'] = data.groupby('ticker')['closePrice'].pct_change()
                
                print("\n   收益率统计:")
                returns_data = data.dropna(subset=['daily_return'])
                if len(returns_data) > 0:
                    print(f"      样本数: {len(returns_data)}")
                    print(f"      均值: {returns_data['daily_return'].mean():.4f}")
                    print(f"      标准差: {returns_data['daily_return'].std():.4f}")
                    print(f"      最大涨幅: {returns_data['daily_return'].max():.4f}")
                    print(f"      最大跌幅: {returns_data['daily_return'].min():.4f}")
            
            print("✅ 数据分析完成")
            return True
            
        except Exception as e:
            print(f"❌ 数据分析失败: {e}")
            return False
    
    def step7_advanced_features_test(self):
        """步骤7: 高级功能测试 (修复版)"""
        print("\n🚀 步骤7: 高级功能测试")
        print("-" * 30)
        
        success_count = 0
        
        # API组合测试
        if self._test_api_combination():
            success_count += 1
            print("✅ API组合调用测试通过")
        else:
            print("❌ API组合调用测试失败")
        
        # 性能基准测试
        if self._test_api_performance():
            success_count += 1
            print("✅ 性能基准测试通过")
        else:
            print("❌ 性能基准测试失败")
        
        # 数据质量分析
        if hasattr(self, 'price_data') and self.price_data is not None:
            quality_report = self._analyze_data_quality()
            print(f"\n📊 数据质量分析:")
            print(quality_report)
            success_count += 1
        
        return success_count >= 2
    
    def _test_api_combination(self):
        """API组合测试 (修复版)"""
        try:
            print("🔗 测试API组合调用...")
            
            # 获取沪深300成分股 (修复版)
            idx_cons = self.DataAPI.IdxConsGet(
                ticker='000300',
                field='consTickerSymbol,consShortName'
            )
            
            if not idx_cons.empty:
                # 选择前3只股票获取详细数据
                raw_stocks = idx_cons.head(3)['consTickerSymbol'].tolist()
                sample_stocks = [stock.replace('.XSHE', '').replace('.XSHG', '') 
                               for stock in raw_stocks]
                
                print(f"📋 测试股票: {sample_stocks}")
                
                # 获取基本信息 (修复版)
                basic_info = self.DataAPI.EquGet(
                    ticker=','.join(sample_stocks),
                    field='ticker,shortName,listDate,industryName1'
                )
                
                # 获取最近价格数据 (修复版)
                end_date = datetime.now().strftime('%Y%m%d')
                start_date = (datetime.now() - timedelta(days=7)).strftime('%Y%m%d')
                
                price_data = self.DataAPI.MktEqudGet(
                    ticker=','.join(sample_stocks),
                    beginDate=start_date,
                    endDate=end_date,
                    field='ticker,tradeDate,closePrice,turnoverVol'
                )
                
                if not basic_info.empty and not price_data.empty:
                    print("✅ API组合调用成功!")
                    print(f"📊 合并数据维度: {len(price_data)} x {len(basic_info.columns) + len(price_data.columns)}")
                    return True
                else:
                    print("❌ 数据获取不完整")
                    return False
            else:
                print("❌ 无法获取指数成分股")
                return False
                
        except Exception as e:
            print(f"❌ API组合测试失败: {e}")
            return False
    
    def _test_api_performance(self):
        """API性能基准测试 (修复版)"""
        try:
            print("⚡ API性能基准测试...")
            
            # 测试不同规模数据获取的性能
            test_cases = [
                {"stocks": 1, "days": 5, "desc": "小规模"},
                {"stocks": 3, "days": 5, "desc": "中规模"},
                {"stocks": 5, "days": 5, "desc": "大规模"},
            ]
            
            performance_results = []
            test_stocks = ['000001', '000002', '600000', '600036', '000858']
            end_date = datetime.now().strftime('%Y%m%d')
            
            for test_case in test_cases:
                stock_count = test_case["stocks"]
                day_count = test_case["days"] 
                desc = test_case["desc"]
                
                selected_stocks = test_stocks[:stock_count]
                start_date = (datetime.now() - timedelta(days=day_count)).strftime('%Y%m%d')
                ticker_str = ','.join(selected_stocks)
                
                print(f"🧪 测试 {desc}: {stock_count}只股票，{day_count}天数据")
                
                start_time = time.time()
                
                try:
                    result = self.DataAPI.MktEqudGet(
                        ticker=ticker_str,
                        beginDate=start_date,
                        endDate=end_date,
                        field='ticker,tradeDate,closePrice,turnoverVol'
                    )
                    
                    end_time = time.time()
                    duration = end_time - start_time
                    
                    if not result.empty:
                        rows = len(result)
                        performance_results.append({
                            "case": desc,
                            "duration": round(duration, 3),
                            "rows": rows,
                            "rows_per_second": round(rows / duration, 1)
                        })
                        
                        print(f"   ✅ 耗时: {duration:.3f}秒, 数据量: {rows}行")
                    else:
                        print(f"   ❌ 未获取到数据")
                        
                except Exception as e:
                    print(f"   ❌ 测试失败: {e}")
                
                # API调用间隔
                time.sleep(0.5)
            
            # 性能总结
            if performance_results:
                print(f"\n📊 性能测试总结:")
                avg_speed = sum(r['rows_per_second'] for r in performance_results) / len(performance_results)
                print(f"📈 平均处理速度: {avg_speed:.1f} 行/秒")
                
                if avg_speed > 50:
                    print("🚀 性能评估: 优秀")
                elif avg_speed > 20:
                    print("✅ 性能评估: 良好") 
                else:
                    print("⚠️  性能评估: 需优化")
                
                return True
            else:
                return False
                
        except Exception as e:
            print(f"❌ 性能测试异常: {e}")
            return False
    
    def _analyze_data_quality(self):
        """数据质量分析"""
        if not hasattr(self, 'price_data') or self.price_data is None:
            return "❌ 无数据可分析"
        
        data = self.price_data
        total_cells = data.size
        missing_cells = data.isnull().sum().sum()
        completeness_rate = (total_cells - missing_cells) / total_cells * 100
        
        report = f"""📊 数据质量分析:
   📏 数据规模: {data.shape}
   📋 完整性: {completeness_rate:.1f}%
   🔍 状态: {'优秀' if completeness_rate >= 95 else '良好' if completeness_rate >= 90 else '需改进'}"""
        
        return report
    
    def step8_data_export_test(self):
        """步骤8: 数据导出功能测试"""
        print("\n💾 步骤8: 数据导出功能测试")
        print("-" * 30)
        
        # 收集所有测试数据
        export_data = {}
        
        if hasattr(self, 'price_data') and self.price_data is not None:
            export_data['price_data'] = {
                'data': self.price_data.to_dict('records'),
                'shape': self.price_data.shape,
                'columns': list(self.price_data.columns)
            }
            
        if hasattr(self, 'stock_list') and self.stock_list:
            export_data['stock_list'] = self.stock_list
            
        export_data['test_results'] = self.results
        export_data['metadata'] = {
            'export_time': datetime.now().isoformat(),
            'framework_version': '1.0.0'
        }
        
        # 导出数据
        if export_data:
            try:
                os.makedirs("./data", exist_ok=True)
                timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
                filename = f"uqer_test_data_{timestamp}.json"
                filepath = os.path.join("./data", filename)
                
                with open(filepath, 'w', encoding='utf-8') as f:
                    json.dump(export_data, f, ensure_ascii=False, indent=2)
                
                print(f"💾 测试数据已导出: {filepath}")
                print("✅ 数据导出测试通过")
                return True
                
            except Exception as e:
                print(f"❌ 数据导出失败: {e}")
                return False
        else:
            print("⚠️  无数据可导出")
            return False
    
    def generate_comprehensive_report(self):
        """生成全面测试报告"""
        print("\n" + "=" * 60)
        print("📋 全面测试报告")
        print("=" * 60)
        
        # 计算通过率
        passed = sum(self.results.values())
        total = len(self.results)
        pass_rate = (passed / total) * 100
        
        print(f"📊 核心功能测试统计:")
        print(f"   通过: {passed}/{total} ({pass_rate:.1f}%)")
        
        print(f"\n📋 详细测试结果:")
        test_names = {
            'connection': '优矿API连接',
            'stock_basic': '股票基本信息',
            'index_components': '指数成分股',
            'price_data': '股票价格数据',
            'index_data': '指数行情数据'
        }
        
        for key, name in test_names.items():
            status = "✅ 成功" if self.results[key] else "❌ 失败"
            print(f"   {name}: {status}")
        
        # 评估和建议
        print(f"\n🎯 总体评估:")
        if pass_rate >= 80:
            print("状态: 🎉 优秀")
            print("评价: 优矿数据获取功能完全正常")
            print("建议: 可以开始开发策略模块和回测引擎")
            
        elif pass_rate >= 60:
            print("状态: ✅ 良好")
            print("评价: 核心功能可用，部分功能需要优化")
            print("建议: 修复剩余问题后继续开发")
            
        else:
            print("状态: ⚠️  需要改进")
            print("评价: 基础数据获取存在问题")
            print("建议: 优先解决API调用和数据获取问题")
        
        # 下一步建议
        print(f"\n💡 下一步开发建议:")
        
        if self.results['connection'] and self.results['price_data']:
            print("✅ 数据获取功能基本正常，建议按以下顺序开发:")
            print("   1. 🧠 完善 data 模块 - 增加更多数据源和特征工程")
            print("   2. 🎯 开发 strategy 模块 - 实现各种交易策略")
            print("   3. ⚡ 开发 backtest 模块 - 策略回测引擎")
            print("   4. 📈 开发 visualization 模块 - 结果可视化")
            print("   5. 🔧 开发 utils 模块 - 工具函数和辅助功能")
            
        else:
            print("🔧 优先修复以下问题:")
            if not self.results['connection']:
                print("   - 检查优矿SDK安装和Token配置")
            if not self.results['price_data']:
                print("   - 调试API参数和数据格式问题")
                print("   - 验证股票代码格式是否正确")
        
        print(f"\n📚 相关资源:")
        print("   📖 优矿API文档: https://uqer.datayes.com/help/api")
        print("   💬 技术支持: 联系优矿技术支持团队")
        print("   🔧 项目仓库: 在GitHub上分享和协作开发")
        
        return pass_rate >= 60
    
    def run_complete_test(self):
        """运行完整测试流程"""
        print("🧪 开始完整测试...")
        
        # 按步骤执行测试
        test_steps = [
            ("步骤1: API连接", self.step1_connect_api),
            ("步骤2: 股票基本信息", self.step2_test_stock_basic),
            ("步骤3: 指数成分股", self.step3_test_index_components),
            ("步骤4: 价格数据", self.step4_test_price_data),
            ("步骤5: 指数数据", self.step5_test_index_data),
            ("步骤6: 数据分析", self.step6_data_analysis),
            ("步骤7: 高级功能", self.step7_advanced_features_test),
            ("步骤8: 数据导出", self.step8_data_export_test)
        ]
        
        for step_name, step_func in test_steps:
            try:
                success = step_func()
                if not success and step_name in ["步骤1: API连接", "步骤2: 股票基本信息"]:
                    print(f"⚠️  {step_name}失败，可能影响后续测试")
            except Exception as e:
                print(f"❌ {step_name}执行异常: {e}")
        
        # 生成最终报告
        success = self.generate_comprehensive_report()
        
        print(f"\n{'='*60}")
        print("🎊 测试完成!")
        print(f"⏰ 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("="*60)
        
        return success

def main():
    """主程序入口"""
    print("🎯 开始修复版优矿数据测试流程...")
    
    # 创建测试实例
    tester = FixedUQerTest()
    
    # 运行完整测试
    success = tester.run_complete_test()
    
    if success:
        print("\n🎊 恭喜！优矿数据连接测试通过")
        print("💡 建议下一步: 开始开发量化策略模块")
        return 0
    else:
        print("\n⚠️  测试未完全通过，请检查相关配置")
        print("💡 建议: 参考测试报告修复问题后重新测试")
        return 1

if __name__ == "__main__":
    exit_code = main()
    sys.exit(exit_code)

✅ 项目路径: /Users/jackstudio/QuantTrade
🚀 修复后的优矿数据测试 - 完全可运行版本
📅 测试时间: 2025-08-26 16:26:44
🎯 开始修复版优矿数据测试流程...
🧪 开始完整测试...

🔌 步骤1: 连接优矿API
------------------------------
db861372k7@16720 账号登录成功
✅ 优矿客户端连接成功

📋 步骤2: 获取股票基本信息
------------------------------
🧪 调用 DataAPI.EquGet...
✅ 股票基本信息获取成功!
   📊 股票代码: 000001
   📝 股票简称: N/A
   📅 上市日期: 1991-04-03

📈 步骤3: 获取沪深300成分股
------------------------------
🧪 调用 DataAPI.IdxConsGet...
✅ 指数成分股获取成功: 300 只
📋 前10只成分股:
    1. 000001 - 平安银行
    2. 000002 - 万科A
    3. 000063 - 中兴通讯
    4. 000157 - 中联重科
    5. 000425 - 徐工机械
    6. 000568 - 泸州老窖
    7. 000625 - 长安汽车
    8. 000651 - 格力电器
    9. 000858 - 五粮液
   10. 600000 - 浦发银行

💰 步骤4: 获取股票价格数据
------------------------------
🧪 调用 DataAPI.MktEqudGet...
📊 测试股票: ['000001', '000002', '000063']
📅 日期范围: 20250812 -> 20250826
✅ 价格数据获取成功!
   📊 数据维度: (33, 8)
   🏢 股票数量: 3
   📅 交易日数: 11

📋 最新价格数据:
   000002 | 2025-08-26 | 收盘: 6.99元
   000001 | 2025-08-26 | 收盘: 12.36元
   000063 | 2025-08-26 | 收盘: 45.02元

📊 步骤5: 获取指数行情数据
-------

SystemExit: 0