# 01 数据管理模块 (Data Management)

## 模块目标
- 从AkShare获取14个金属期货品种的0指数连续合约历史数据（指数加权）
- 建立本地数据仓库（Parquet格式）
- 提供统一的数据访问接口
- 支持增量更新

## 14个期货品种
- **贵金属**: AG0(银), AU0(金)
- **有色金属**: AL0(铝), CU0(铜), NI0(镍), PB0(铅), SN0(锡), ZN0(锌)
- **黑色系**: HC0(热卷), I0(铁矿), RB0(螺纹), SF0(硅铁), SM0(锰硅), SS0(不锈钢)

## 1. 环境设置和依赖安装

In [11]:
# 安装必要的包
!pip install akshare pandas numpy pyarrow matplotlib seaborn scipy statsmodels -q

import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
from pathlib import Path
import json
from datetime import datetime, timedelta
import time
from typing import List, Dict, Optional, Tuple
import logging
import akshare as ak

# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 设置pandas显示选项
pd.set_option('display.max_columns', 20)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', 200)

print(f"AkShare版本: {ak.__version__}")

AkShare版本: 1.17.35


## 2. 配置管理

In [12]:
# 配置参数
class Config:
    """数据管理配置"""
    
    # 14个期货品种代码
    SYMBOLS = [
        "AG0",  # 白银
        "AL0",  # 铝
        "AU0",  # 黄金
        "CU0",  # 铜
        "HC0",  # 热卷
        "I0",   # 铁矿石
        "NI0",  # 镍
        "PB0",  # 铅
        "RB0",  # 螺纹钢
        "SF0",  # 硅铁
        "SM0",  # 锰硅
        "SN0",  # 锡
        "SS0",  # 不锈钢
        "ZN0",  # 锌
    ]
    
    # 品种名称映射
    SYMBOL_NAMES = {
        "AG0": "白银",
        "AL0": "铝",
        "AU0": "黄金",
        "CU0": "铜",
        "HC0": "热卷",
        "I0": "铁矿石",
        "NI0": "镍",
        "PB0": "铅",
        "RB0": "螺纹钢",
        "SF0": "硅铁",
        "SM0": "锰硅",
        "SN0": "锡",
        "SS0": "不锈钢",
        "ZN0": "锌",
    }
    
    # 数据路径
    DATA_DIR = Path("../data")
    FUTURES_DIR = DATA_DIR / "futures"
    METADATA_FILE = DATA_DIR / "metadata.json"
    UPDATE_LOG_FILE = DATA_DIR / "update_log.csv"
    
    # 数据字段
    REQUIRED_COLUMNS = ['date', 'open', 'high', 'low', 'close', 'volume', 'open_interest']
    
    # API设置
    RETRY_TIMES = 3
    RETRY_DELAY = 5  # seconds

# 创建数据目录
Config.FUTURES_DIR.mkdir(parents=True, exist_ok=True)
print(f"数据目录: {Config.FUTURES_DIR.absolute()}")

数据目录: F:\metal verify\notebooks\..\data\futures


## 3. Story DM.1: 初始数据获取

### REQ-DM.1.1: 批量获取14个品种的0指数连续合约日线数据
### REQ-DM.1.2: 数据字段包含必需字段
### REQ-DM.1.3: 使用Parquet格式存储
### REQ-DM.1.4: 记录元信息到metadata.json

In [13]:
# Cell name: fetch_history
# Implements: REQ-DM.1.1, REQ-DM.1.2

def fetch_single_symbol(symbol: str, retries: int = 3) -> pd.DataFrame:
    """
    获取单个品种的期货连续合约历史数据
    使用 ak.futures_zh_daily_sina API
    
    Args:
        symbol: 品种代码，如'RB0'
        retries: 重试次数
        
    Returns:
        DataFrame with columns: date, open, high, low, close, volume, open_interest
    """
    for attempt in range(retries):
        try:
            logger.info(f"正在获取 {symbol} ({Config.SYMBOL_NAMES.get(symbol, symbol)}) 的数据...")
            
            # 使用正确的API: futures_zh_daily_sina
            df = ak.futures_zh_daily_sina(symbol=symbol)
            
            if df is None or df.empty:
                raise ValueError(f"{symbol} 返回空数据")
            
            # 重命名列：hold -> open_interest
            df = df.rename(columns={'hold': 'open_interest'})
            
            # 选择需要的列
            required_cols = ['date', 'open', 'high', 'low', 'close', 'volume', 'open_interest']
            df = df[required_cols]
            
            # 数据类型转换
            df['date'] = pd.to_datetime(df['date'])
            for col in ['open', 'high', 'low', 'close', 'volume', 'open_interest']:
                df[col] = pd.to_numeric(df[col], errors='coerce')
            
            # 删除包含NaN的行
            df = df.dropna(subset=['open', 'high', 'low', 'close'])
            
            # 按日期排序
            df = df.sort_values('date').reset_index(drop=True)
            
            logger.info(f"成功获取 {symbol} 数据: {len(df)} 条记录, "
                       f"日期范围: {df['date'].min().date()} 至 {df['date'].max().date()}")
            
            return df
            
        except Exception as e:
            logger.error(f"获取 {symbol} 数据失败 (尝试 {attempt+1}/{retries}): {str(e)}")
            if attempt < retries - 1:
                time.sleep(Config.RETRY_DELAY)
            else:
                raise RuntimeError(f"无法获取 {symbol} 数据: {str(e)}")


def fetch_multiple_symbols(symbols: List[str]) -> Dict[str, pd.DataFrame]:
    """
    批量获取多个品种的数据
    
    Args:
        symbols: 品种代码列表
        
    Returns:
        字典 {symbol: DataFrame}
    """
    data_dict = {}
    failed_symbols = []
    
    for symbol in symbols:
        try:
            df = fetch_single_symbol(symbol)
            data_dict[symbol] = df
            time.sleep(1)  # 避免请求过快
        except Exception as e:
            logger.error(f"获取 {symbol} 失败: {str(e)}")
            failed_symbols.append(symbol)
    
    if failed_symbols:
        logger.warning(f"失败的品种: {failed_symbols}")
    
    logger.info(f"批量获取完成: 成功 {len(data_dict)}/{len(symbols)} 个品种")
    
    if not data_dict:
        raise RuntimeError("没有成功获取任何数据")
    
    return data_dict


# 测试：获取核心品种数据
test_symbols = ['RB0', 'HC0', 'I0']  # 先测试3个核心品种
print(f"\n开始获取测试品种: {test_symbols}")
try:
    test_data = fetch_multiple_symbols(test_symbols)
except Exception as e:
    print(f"获取失败: {e}")
    test_data = {}

2025-08-18 18:57:42,285 - INFO - 正在获取 RB0 (螺纹钢) 的数据...



开始获取测试品种: ['RB0', 'HC0', 'I0']


2025-08-18 18:57:42,720 - INFO - 成功获取 RB0 数据: 3982 条记录, 日期范围: 2009-03-27 至 2025-08-18
2025-08-18 18:57:43,723 - INFO - 正在获取 HC0 (热卷) 的数据...
2025-08-18 18:57:44,147 - INFO - 成功获取 HC0 数据: 2780 条记录, 日期范围: 2014-03-21 至 2025-08-18
2025-08-18 18:57:45,158 - INFO - 正在获取 I0 (铁矿石) 的数据...
2025-08-18 18:57:45,926 - INFO - 成功获取 I0 数据: 2879 条记录, 日期范围: 2013-10-18 至 2025-08-18
2025-08-18 18:57:46,931 - INFO - 批量获取完成: 成功 3/3 个品种


In [14]:
# 显示数据样本
if test_data:
    first_symbol = list(test_data.keys())[0]
    print(f"\n{first_symbol} 数据样本:")
    print(test_data[first_symbol].head())
    print(f"\n数据信息:")
    print(test_data[first_symbol].info())
    print(f"\n数据统计:")
    print(test_data[first_symbol][['open', 'high', 'low', 'close']].describe())


RB0 数据样本:
        date    open    high     low   close  volume  open_interest
0 2009-03-27  3550.0  3663.0  3513.0  3561.0  354590          45548
1 2009-03-30  3550.0  3580.0  3528.0  3544.0  145168          48380
2 2009-03-31  3538.0  3566.0  3531.0  3549.0   70592          44714
3 2009-04-01  3560.0  3561.0  3543.0  3547.0   28100          42076
4 2009-04-02  3545.0  3548.0  3456.0  3473.0  235446          68888

数据信息:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3982 entries, 0 to 3981
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   date           3982 non-null   datetime64[ns]
 1   open           3982 non-null   float64       
 2   high           3982 non-null   float64       
 3   low            3982 non-null   float64       
 4   close          3982 non-null   float64       
 5   volume         3982 non-null   int64         
 6   open_interest  3982 non-null   int64         
dtypes: d

In [15]:
# Cell name: save_to_parquet
# Implements: REQ-DM.1.3

def save_to_parquet(df: pd.DataFrame, symbol: str, data_dir: Path = None) -> Path:
    """
    将DataFrame保存为Parquet格式
    
    Args:
        df: 数据DataFrame
        symbol: 品种代码
        data_dir: 保存目录
        
    Returns:
        保存的文件路径
    """
    if data_dir is None:
        data_dir = Config.FUTURES_DIR
    
    file_path = data_dir / f"{symbol}.parquet"
    
    # 保存为Parquet格式
    df.to_parquet(file_path, engine='pyarrow', compression='snappy', index=False)
    logger.info(f"成功保存 {symbol} 数据到 {file_path}")
    
    # 验证保存的文件
    test_df = pd.read_parquet(file_path)
    assert len(test_df) == len(df), "保存的数据行数不匹配"
    assert list(test_df.columns) == list(df.columns), "保存的数据列不匹配"
    
    return file_path


def batch_save_to_parquet(data_dict: Dict[str, pd.DataFrame]) -> List[Path]:
    """
    批量保存数据到Parquet文件
    
    Args:
        data_dict: {symbol: DataFrame} 字典
        
    Returns:
        保存的文件路径列表
    """
    saved_files = []
    
    for symbol, df in data_dict.items():
        try:
            file_path = save_to_parquet(df, symbol)
            saved_files.append(file_path)
        except Exception as e:
            logger.error(f"保存 {symbol} 失败: {str(e)}")
            raise
    
    logger.info(f"批量保存完成: {len(saved_files)}/{len(data_dict)} 个文件")
    return saved_files


# 保存测试数据
if test_data:
    saved_files = batch_save_to_parquet(test_data)
    print(f"\n保存了 {len(saved_files)} 个Parquet文件:")
    for file in saved_files:
        print(f"  - {file.name}")
else:
    print("\n没有数据可保存")
    saved_files = []

2025-08-18 18:58:06,890 - INFO - 成功保存 RB0 数据到 ..\data\futures\RB0.parquet
2025-08-18 18:58:06,924 - INFO - 成功保存 HC0 数据到 ..\data\futures\HC0.parquet
2025-08-18 18:58:06,938 - INFO - 成功保存 I0 数据到 ..\data\futures\I0.parquet
2025-08-18 18:58:06,938 - INFO - 批量保存完成: 3/3 个文件



保存了 3 个Parquet文件:
  - RB0.parquet
  - HC0.parquet
  - I0.parquet


In [16]:
# Cell name: save_metadata
# Implements: REQ-DM.1.4

def save_metadata(data_dict: Dict[str, pd.DataFrame]) -> Path:
    """
    保存数据元信息到metadata.json
    
    Args:
        data_dict: {symbol: DataFrame} 字典
        
    Returns:
        metadata文件路径
    """
    metadata = {
        "update_time": datetime.now().isoformat(),
        "symbols": {}
    }
    
    for symbol, df in data_dict.items():
        metadata["symbols"][symbol] = {
            "name": Config.SYMBOL_NAMES.get(symbol, symbol),
            "fetch_time": datetime.now().isoformat(),
            "start_date": df['date'].min().isoformat(),
            "end_date": df['date'].max().isoformat(),
            "record_count": len(df),
            "columns": list(df.columns),
            "file": f"{symbol}.parquet"
        }
    
    # 保存metadata
    with open(Config.METADATA_FILE, 'w', encoding='utf-8') as f:
        json.dump(metadata, f, ensure_ascii=False, indent=2)
    
    logger.info(f"元信息已保存到 {Config.METADATA_FILE}")
    return Config.METADATA_FILE


def load_metadata() -> Dict:
    """
    加载元信息
    
    Returns:
        元信息字典
    """
    if Config.METADATA_FILE.exists():
        with open(Config.METADATA_FILE, 'r', encoding='utf-8') as f:
            return json.load(f)
    return {}


# 保存元信息
if test_data:
    metadata_file = save_metadata(test_data)
    print(f"\n元信息已保存到: {metadata_file}")
    
    # 显示元信息
    metadata = load_metadata()
    print("\n元信息内容:")
    print(json.dumps(metadata, ensure_ascii=False, indent=2)[:800])
else:
    print("\n没有元信息可保存")

2025-08-18 18:58:38,510 - INFO - 元信息已保存到 ..\data\metadata.json



元信息已保存到: ..\data\metadata.json

元信息内容:
{
  "update_time": "2025-08-18T18:58:38.508669",
  "symbols": {
    "RB0": {
      "name": "螺纹钢",
      "fetch_time": "2025-08-18T18:58:38.508669",
      "start_date": "2009-03-27T00:00:00",
      "end_date": "2025-08-18T00:00:00",
      "record_count": 3982,
      "columns": [
        "date",
        "open",
        "high",
        "low",
        "close",
        "volume",
        "open_interest"
      ],
      "file": "RB0.parquet"
    },
    "HC0": {
      "name": "热卷",
      "fetch_time": "2025-08-18T18:58:38.509669",
      "start_date": "2014-03-21T00:00:00",
      "end_date": "2025-08-18T00:00:00",
      "record_count": 2780,
      "columns": [
        "date",
        "open",
        "high",
        "low",
        "close",
        "volume",
        "open_interest"
      ],
      "fil


## 4. Story DM.2: 增量数据更新

### REQ-DM.2.1: 检测本地数据最后日期，仅获取新增数据
### REQ-DM.2.2: 合并新旧数据，按日期去重
### REQ-DM.2.3: 更新失败时回滚
### REQ-DM.2.4: 生成更新日志

In [17]:
# Cell name: detect_last_date
# Implements: REQ-DM.2.1

def detect_last_date(symbol: str) -> Optional[datetime]:
    """
    检测本地数据的最后日期
    
    Args:
        symbol: 品种代码
        
    Returns:
        最后日期，如果文件不存在返回None
    """
    file_path = Config.FUTURES_DIR / f"{symbol}.parquet"
    
    if not file_path.exists():
        logger.info(f"{symbol} 本地数据不存在")
        return None
    
    try:
        df = pd.read_parquet(file_path)
        last_date = df['date'].max()
        logger.info(f"{symbol} 本地数据最后日期: {last_date.date()}")
        return last_date
    except Exception as e:
        logger.error(f"读取 {symbol} 本地数据失败: {str(e)}")
        raise


# 测试检测最后日期
if test_data:
    for symbol in test_symbols:
        last_date = detect_last_date(symbol)
        if last_date:
            print(f"{symbol}: 最后日期 {last_date.date()}")
else:
    print("没有数据可检测")

2025-08-18 18:58:47,959 - INFO - RB0 本地数据最后日期: 2025-08-18
2025-08-18 18:58:47,963 - INFO - HC0 本地数据最后日期: 2025-08-18
2025-08-18 18:58:47,968 - INFO - I0 本地数据最后日期: 2025-08-18


RB0: 最后日期 2025-08-18
HC0: 最后日期 2025-08-18
I0: 最后日期 2025-08-18


In [18]:
# Cell name: update_incremental
# Implements: REQ-DM.2.1, REQ-DM.2.2

def update_incremental(symbol: str) -> Tuple[bool, int]:
    """
    增量更新单个品种的数据
    
    Args:
        symbol: 品种代码
        
    Returns:
        (是否成功, 新增记录数)
    """
    try:
        # 检测本地最后日期
        last_date = detect_last_date(symbol)
        
        # 获取最新数据
        logger.info(f"开始更新 {symbol} 数据...")
        new_df = fetch_single_symbol(symbol)
        
        if last_date is None:
            # 本地无数据，全部保存
            save_to_parquet(new_df, symbol)
            return True, len(new_df)
        
        # 筛选新增数据
        new_records = new_df[new_df['date'] > last_date]
        
        if len(new_records) == 0:
            logger.info(f"{symbol} 无新数据")
            return True, 0
        
        # 加载本地数据
        old_df = pd.read_parquet(Config.FUTURES_DIR / f"{symbol}.parquet")
        
        # 合并数据
        combined_df = pd.concat([old_df, new_records], ignore_index=True)
        
        # 按日期去重（保留最新的）
        combined_df = combined_df.drop_duplicates(subset=['date'], keep='last')
        
        # 按日期排序
        combined_df = combined_df.sort_values('date').reset_index(drop=True)
        
        # 保存更新后的数据
        save_to_parquet(combined_df, symbol)
        
        logger.info(f"{symbol} 更新成功: 新增 {len(new_records)} 条记录")
        return True, len(new_records)
        
    except Exception as e:
        logger.error(f"{symbol} 更新失败: {str(e)}")
        return False, 0


# 测试增量更新
if test_data:
    print("\n测试增量更新:")
    for symbol in test_symbols[:1]:  # 测试一个品种
        success, new_count = update_incremental(symbol)
        print(f"{symbol}: {'成功' if success else '失败'}, 新增 {new_count} 条")
else:
    print("\n跳过增量更新测试（无数据）")

2025-08-18 18:58:57,079 - INFO - RB0 本地数据最后日期: 2025-08-18
2025-08-18 18:58:57,081 - INFO - 开始更新 RB0 数据...
2025-08-18 18:58:57,082 - INFO - 正在获取 RB0 (螺纹钢) 的数据...



测试增量更新:


2025-08-18 18:58:57,619 - INFO - 成功获取 RB0 数据: 3982 条记录, 日期范围: 2009-03-27 至 2025-08-18
2025-08-18 18:58:57,622 - INFO - RB0 无新数据


RB0: 成功, 新增 0 条


In [19]:
# Cell name: atomic_update
# Implements: REQ-DM.2.3

def atomic_update(symbol: str) -> Tuple[bool, int]:
    """
    原子性更新：使用临时文件确保更新失败时原数据不变
    
    Args:
        symbol: 品种代码
        
    Returns:
        (是否成功, 新增记录数)
    """
    file_path = Config.FUTURES_DIR / f"{symbol}.parquet"
    temp_path = Config.FUTURES_DIR / f"{symbol}.parquet.tmp"
    backup_path = Config.FUTURES_DIR / f"{symbol}.parquet.bak"
    
    try:
        # 检测本地最后日期
        last_date = detect_last_date(symbol)
        
        # 获取最新数据
        new_df = fetch_single_symbol(symbol)
        
        if last_date is not None:
            # 加载旧数据
            old_df = pd.read_parquet(file_path)
            
            # 只保留新增的数据
            new_records = new_df[new_df['date'] > last_date]
            
            if len(new_records) == 0:
                logger.info(f"{symbol} 无新数据")
                return True, 0
            
            # 合并数据
            combined_df = pd.concat([old_df, new_records], ignore_index=True)
            combined_df = combined_df.drop_duplicates(subset=['date'], keep='last')
            combined_df = combined_df.sort_values('date').reset_index(drop=True)
            new_count = len(new_records)
        else:
            combined_df = new_df
            new_count = len(new_df)
        
        # 保存到临时文件
        combined_df.to_parquet(temp_path, engine='pyarrow', compression='snappy', index=False)
        
        # 验证临时文件
        test_df = pd.read_parquet(temp_path)
        assert len(test_df) == len(combined_df), "临时文件验证失败"
        
        # 备份原文件（如果存在）
        if file_path.exists():
            file_path.rename(backup_path)
        
        # 将临时文件重命名为正式文件
        temp_path.rename(file_path)
        
        # 删除备份文件
        if backup_path.exists():
            backup_path.unlink()
        
        logger.info(f"{symbol} 原子更新成功: 新增 {new_count} 条记录")
        return True, new_count
        
    except Exception as e:
        logger.error(f"{symbol} 更新失败: {str(e)}")
        
        # 回滚：恢复备份文件
        if backup_path.exists() and not file_path.exists():
            backup_path.rename(file_path)
            logger.info(f"{symbol} 已回滚到原数据")
        
        # 清理临时文件
        if temp_path.exists():
            temp_path.unlink()
        
        return False, 0


# 测试原子性更新
if test_data:
    print("\n测试原子性更新:")
    for symbol in test_symbols[:1]:
        success, new_count = atomic_update(symbol)
        print(f"{symbol}: {'成功' if success else '失败'}, 新增 {new_count} 条")
else:
    print("\n跳过原子性更新测试（无数据）")

2025-08-18 18:59:09,755 - INFO - RB0 本地数据最后日期: 2025-08-18
2025-08-18 18:59:09,759 - INFO - 正在获取 RB0 (螺纹钢) 的数据...



测试原子性更新:


2025-08-18 18:59:10,225 - INFO - 成功获取 RB0 数据: 3982 条记录, 日期范围: 2009-03-27 至 2025-08-18
2025-08-18 18:59:10,230 - INFO - RB0 无新数据


RB0: 成功, 新增 0 条


In [20]:
# Cell name: log_update
# Implements: REQ-DM.2.4

def log_update(symbol: str, new_count: int, status: str, error_msg: str = ""):
    """
    记录更新日志
    
    Args:
        symbol: 品种代码
        new_count: 新增记录数
        status: 状态（success/failed）
        error_msg: 错误信息（如果失败）
    """
    log_entry = {
        'timestamp': datetime.now(),
        'symbol': symbol,
        'new_records': new_count,
        'status': status,
        'error': error_msg
    }
    
    # 读取现有日志（如果存在）
    if Config.UPDATE_LOG_FILE.exists():
        log_df = pd.read_csv(Config.UPDATE_LOG_FILE)
    else:
        log_df = pd.DataFrame()
    
    # 添加新日志
    new_log_df = pd.DataFrame([log_entry])
    log_df = pd.concat([log_df, new_log_df], ignore_index=True)
    
    # 保存日志
    log_df.to_csv(Config.UPDATE_LOG_FILE, index=False)
    logger.info(f"更新日志已记录: {symbol} - {status}")


def batch_update_with_logging(symbols: List[str]) -> Dict[str, Tuple[bool, int]]:
    """
    批量更新并记录日志
    
    Args:
        symbols: 品种列表
        
    Returns:
        更新结果字典
    """
    results = {}
    
    for symbol in symbols:
        try:
            success, new_count = atomic_update(symbol)
            results[symbol] = (success, new_count)
            
            if success:
                log_update(symbol, new_count, 'success')
            else:
                log_update(symbol, 0, 'failed', 'Update failed')
                
        except Exception as e:
            results[symbol] = (False, 0)
            log_update(symbol, 0, 'failed', str(e))
    
    return results


# 测试批量更新
if test_data:
    print("\n测试批量更新并记录日志:")
    update_results = batch_update_with_logging(test_symbols[:2])
    for symbol, (success, new_count) in update_results.items():
        print(f"{symbol}: {'成功' if success else '失败'}, 新增 {new_count} 条")
    
    # 显示更新日志
    if Config.UPDATE_LOG_FILE.exists():
        print("\n更新日志:")
        log_df = pd.read_csv(Config.UPDATE_LOG_FILE)
        print(log_df.tail())
else:
    print("\n跳过批量更新测试（无数据）")

2025-08-18 18:59:17,625 - INFO - RB0 本地数据最后日期: 2025-08-18
2025-08-18 18:59:17,627 - INFO - 正在获取 RB0 (螺纹钢) 的数据...



测试批量更新并记录日志:


2025-08-18 18:59:17,993 - INFO - 成功获取 RB0 数据: 3982 条记录, 日期范围: 2009-03-27 至 2025-08-18
2025-08-18 18:59:17,996 - INFO - RB0 无新数据
2025-08-18 18:59:18,003 - INFO - 更新日志已记录: RB0 - success
2025-08-18 18:59:18,006 - INFO - HC0 本地数据最后日期: 2025-08-18
2025-08-18 18:59:18,006 - INFO - 正在获取 HC0 (热卷) 的数据...
2025-08-18 18:59:18,304 - INFO - 成功获取 HC0 数据: 2780 条记录, 日期范围: 2014-03-21 至 2025-08-18
2025-08-18 18:59:18,309 - INFO - HC0 无新数据
2025-08-18 18:59:18,313 - INFO - 更新日志已记录: HC0 - success


RB0: 成功, 新增 0 条
HC0: 成功, 新增 0 条

更新日志:
                    timestamp symbol  new_records   status  error
0  2025-08-18 18:59:17.997878    RB0            0  success    NaN
1  2025-08-18 18:59:18.310076    HC0            0  success    NaN


## 5. Story DM.3: 统一数据访问

### REQ-DM.3.1: 提供load_data统一接口
### REQ-DM.3.2: 返回按日期索引对齐的宽表
### REQ-DM.3.3: 自动前向填充缺失值
### REQ-DM.3.4: 支持对数价格

In [21]:
# Cell name: load_data
# Implements: REQ-DM.3.1, REQ-DM.3.2

def load_data(
    symbols: List[str],
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    columns: List[str] = ['close'],
    log_price: bool = False,
    fill_method: str = 'ffill'
) -> pd.DataFrame:
    """
    统一的数据加载接口
    
    Args:
        symbols: 品种列表
        start_date: 开始日期 (YYYY-MM-DD)
        end_date: 结束日期 (YYYY-MM-DD)
        columns: 需要的数据列，默认['close']
        log_price: 是否返回对数价格
        fill_method: 缺失值填充方法 ('ffill', 'bfill', None)
        
    Returns:
        按日期索引对齐的宽表DataFrame，列名格式: {symbol}_{column}
    """
    dfs = []
    
    for symbol in symbols:
        file_path = Config.FUTURES_DIR / f"{symbol}.parquet"
        
        if not file_path.exists():
            raise FileNotFoundError(f"{symbol} 数据文件不存在: {file_path}")
        
        # 读取数据
        df = pd.read_parquet(file_path)
        df['date'] = pd.to_datetime(df['date'])
        df = df.set_index('date')
        
        # 筛选日期范围
        if start_date:
            df = df[df.index >= pd.to_datetime(start_date)]
        if end_date:
            df = df[df.index <= pd.to_datetime(end_date)]
        
        # 选择需要的列
        missing_cols = [col for col in columns if col not in df.columns]
        if missing_cols:
            raise ValueError(f"{symbol} 缺少列: {missing_cols}")
        
        df = df[columns]
        
        # 重命名列
        df.columns = [f"{symbol}_{col}" for col in columns]
        
        dfs.append(df)
    
    if not dfs:
        raise ValueError("没有加载任何数据")
    
    # 合并所有DataFrame（外连接，保留所有日期）
    result = pd.concat(dfs, axis=1, join='outer')
    
    # 处理缺失值
    if fill_method == 'ffill':
        result = result.fillna(method='ffill')
    elif fill_method == 'bfill':
        result = result.fillna(method='bfill')
    
    # 应用对数变换
    if log_price:
        price_cols = [col for col in result.columns if any(p in col for p in ['open', 'high', 'low', 'close'])]
        for col in price_cols:
            if (result[col] <= 0).any():
                raise ValueError(f"{col} 包含非正数，无法进行对数变换")
            result[col] = np.log(result[col])
    
    # 按日期排序
    result = result.sort_index()
    
    logger.info(f"加载数据完成: {len(symbols)} 个品种, {len(result)} 条记录")
    return result


# 测试数据加载
if test_data and len(test_data) >= 2:
    print("\n测试统一数据加载接口:")
    try:
        df = load_data(
            symbols=list(test_data.keys())[:2],
            start_date='2023-01-01',
            end_date='2024-01-01',
            columns=['close', 'volume']
        )
        print(f"\n加载的数据形状: {df.shape}")
        print(f"列名: {list(df.columns)}")
        print("\n数据前5行:")
        print(df.head())
    except Exception as e:
        print(f"加载失败: {e}")
else:
    print("\n跳过数据加载测试（无足够数据）")

2025-08-18 18:59:36,500 - INFO - 加载数据完成: 2 个品种, 242 条记录



测试统一数据加载接口:

加载的数据形状: (242, 4)
列名: ['RB0_close', 'RB0_volume', 'HC0_close', 'HC0_volume']

数据前5行:
            RB0_close  RB0_volume  HC0_close  HC0_volume
date                                                    
2023-01-03     4063.0     1245821     4099.0      343669
2023-01-04     4027.0     1383006     4080.0      325413
2023-01-05     4017.0     1522301     4073.0      384978
2023-01-06     4107.0     1730447     4166.0      530367
2023-01-09     4093.0     1607579     4146.0      430386


## 6. 获取所有14个品种数据

In [22]:
# 获取所有14个品种的数据
print("开始获取所有14个品种数据...")
print(f"品种列表: {Config.SYMBOLS}")

try:
    all_data = fetch_multiple_symbols(Config.SYMBOLS)
    print(f"\n成功获取 {len(all_data)} 个品种的数据")
    
    # 保存所有数据
    if all_data:
        saved_files = batch_save_to_parquet(all_data)
        print(f"\n成功保存 {len(saved_files)} 个文件")
        
        # 更新元信息
        save_metadata(all_data)
        print("\n元信息已更新")
except Exception as e:
    print(f"\n获取所有品种失败: {e}")
    all_data = {}

2025-08-18 18:59:56,041 - INFO - 正在获取 AG0 (白银) 的数据...


开始获取所有14个品种数据...
品种列表: ['AG0', 'AL0', 'AU0', 'CU0', 'HC0', 'I0', 'NI0', 'PB0', 'RB0', 'SF0', 'SM0', 'SN0', 'SS0', 'ZN0']


2025-08-18 18:59:56,478 - INFO - 成功获取 AG0 数据: 3232 条记录, 日期范围: 2012-05-10 至 2025-08-18
2025-08-18 18:59:57,488 - INFO - 正在获取 AL0 (铝) 的数据...
2025-08-18 18:59:58,010 - INFO - 成功获取 AL0 数据: 5018 条记录, 日期范围: 2005-01-04 至 2025-08-18
2025-08-18 18:59:59,017 - INFO - 正在获取 AU0 (黄金) 的数据...
2025-08-18 18:59:59,315 - INFO - 成功获取 AU0 数据: 4291 条记录, 日期范围: 2008-01-09 至 2025-08-18
2025-08-18 19:00:00,318 - INFO - 正在获取 CU0 (铜) 的数据...
2025-08-18 19:00:00,641 - INFO - 成功获取 CU0 数据: 5018 条记录, 日期范围: 2005-01-04 至 2025-08-18
2025-08-18 19:00:01,652 - INFO - 正在获取 HC0 (热卷) 的数据...
2025-08-18 19:00:01,958 - INFO - 成功获取 HC0 数据: 2780 条记录, 日期范围: 2014-03-21 至 2025-08-18
2025-08-18 19:00:02,969 - INFO - 正在获取 I0 (铁矿石) 的数据...
2025-08-18 19:00:03,224 - INFO - 成功获取 I0 数据: 2879 条记录, 日期范围: 2013-10-18 至 2025-08-18
2025-08-18 19:00:04,235 - INFO - 正在获取 NI0 (镍) 的数据...
2025-08-18 19:00:04,514 - INFO - 成功获取 NI0 数据: 2530 条记录, 日期范围: 2015-03-27 至 2025-08-18
2025-08-18 19:00:05,517 - INFO - 正在获取 PB0 (铅) 的数据...
2025-08-18 19:00:05,778 -


成功获取 14 个品种的数据

成功保存 14 个文件

元信息已更新


## 7. 数据质量检查和统计

In [23]:
# 数据统计
if all_data:
    print("\n=== 数据统计报告 ===")
    for symbol, df in all_data.items():
        print(f"\n{symbol} ({Config.SYMBOL_NAMES[symbol]}):")
        print(f"  记录数: {len(df)}")
        print(f"  日期范围: {df['date'].min().date()} 至 {df['date'].max().date()}")
        print(f"  最新收盘价: {df['close'].iloc[-1]:.2f}")
        print(f"  价格范围: {df['close'].min():.2f} - {df['close'].max():.2f}")
else:
    print("\n没有数据可统计")


=== 数据统计报告 ===

AG0 (白银):
  记录数: 3232
  日期范围: 2012-05-10 至 2025-08-18
  最新收盘价: 9258.00
  价格范围: 2937.00 - 9492.00

AL0 (铝):
  记录数: 5018
  日期范围: 2005-01-04 至 2025-08-18
  最新收盘价: 20595.00
  价格范围: 9710.00 - 24695.00

AU0 (黄金):
  记录数: 4291
  日期范围: 2008-01-09 至 2025-08-18
  最新收盘价: 777.66
  价格范围: 148.88 - 831.42

CU0 (铜):
  记录数: 5018
  日期范围: 2005-01-04 至 2025-08-18
  最新收盘价: 78950.00
  价格范围: 22380.00 - 87670.00

HC0 (热卷):
  记录数: 2780
  日期范围: 2014-03-21 至 2025-08-18
  最新收盘价: 3419.00
  价格范围: 1691.00 - 6683.00

I0 (铁矿石):
  记录数: 2879
  日期范围: 2013-10-18 至 2025-08-18
  最新收盘价: 772.00
  价格范围: 284.00 - 1337.00

NI0 (镍):
  记录数: 2530
  日期范围: 2015-03-27 至 2025-08-18
  最新收盘价: 120460.00
  价格范围: 64970.00 - 267700.00

PB0 (铅):
  记录数: 3501
  日期范围: 2011-03-24 至 2025-08-18
  最新收盘价: 16775.00
  价格范围: 11770.00 - 21865.00

RB0 (螺纹钢):
  记录数: 3982
  日期范围: 2009-03-27 至 2025-08-18
  最新收盘价: 3155.00
  价格范围: 1626.00 - 6171.00

SF0 (硅铁):
  记录数: 2649
  日期范围: 2014-08-12 至 2025-08-18
  最新收盘价: 5880.00
  价格范围: 3408.00 - 17780.0

## 8. 总结

### 已实现功能
✅ **Story DM.1: 初始数据获取**
- REQ-DM.1.1: 批量获取14个品种的0指数连续合约日线数据
- REQ-DM.1.2: 数据字段包含必需字段
- REQ-DM.1.3: 使用Parquet格式存储
- REQ-DM.1.4: 记录元信息到metadata.json

✅ **Story DM.2: 增量数据更新**
- REQ-DM.2.1: 检测本地数据最后日期，仅获取新增数据
- REQ-DM.2.2: 合并新旧数据，按日期去重
- REQ-DM.2.3: 更新失败时回滚
- REQ-DM.2.4: 生成更新日志

✅ **Story DM.3: 统一数据访问**
- REQ-DM.3.1: 提供load_data统一接口
- REQ-DM.3.2: 返回按日期索引对齐的宽表
- REQ-DM.3.3: 自动前向填充缺失值
- REQ-DM.3.4: 支持对数价格

### 注意事项
- 所有数据均从AkShare实时获取，无任何mock或fallback
- 如果AkShare API失败，程序将直接报错
- 数据更新使用原子操作，确保数据完整性