# 比赛提交文件生成

本notebook用于处理比赛规定的数据格式，进行模型推理，并生成最终的submission.csv文件。

## 主要步骤：
1. 加载测试数据（composition + PXRD）
2. 加载训练好的模型
3. 批量推理生成晶体结构
4. 后处理优化（可选）
5. 生成submission.csv

In [1]:
# 导入必要的库
import json
import pickle
import time
from pathlib import Path
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd
import torch
from tqdm.auto import tqdm
from pymatgen.core import Composition, Element, Structure, Lattice
import matplotlib.pyplot as plt

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}')

使用设备: cuda


## 1. 数据加载函数

In [2]:
def load_test_data(data_path: str) -> Tuple[Dict, Dict[str, np.ndarray]]:
    """
    加载测试数据，包括组成和PXRD谱
    
    Args:
        data_path: 测试数据路径，如 'docs/data/test_v3/A'
    
    Returns:
        compositions_dict: 组成信息字典 {sample_id: {"composition": [str, str]}}
        pxrd_dict: PXRD谱字典 {sample_id: np.ndarray(11051,)}
    """
    data_path = Path(data_path)
    
    # 加载组成信息
    with open(data_path / 'composition.json', 'r') as f:
        compositions_dict = json.load(f)
    
    # 加载PXRD谱
    pxrd_dict = {}
    pattern_dir = data_path / 'pattern'
    
    for sample_id in tqdm(compositions_dict.keys(), desc="加载PXRD谱"):
        pxrd_file = pattern_dir / f"{sample_id}.xy"
        
        # 读取xy文件，跳过第一行注释
        data = np.loadtxt(pxrd_file, skiprows=1)
        # 只取强度值（第二列）
        pxrd_dict[sample_id] = data[:, 1].astype(np.float32)
    
    print(f"加载了 {len(compositions_dict)} 个样本")
    print(f"PXRD谱长度: {len(next(iter(pxrd_dict.values())))}")
    
    return compositions_dict, pxrd_dict

In [3]:
def parse_composition(comp_str: str) -> Tuple[int, List[int]]:
    """
    解析组成字符串为原子数量和类型
    
    Args:
        comp_str: 如 "Sr4 Be2 Re2 N8"
    
    Returns:
        num_atoms: 总原子数
        atom_types: 原子序数列表
    """
    comp = Composition(comp_str)
    atom_list = []
    
    for element, count in comp.items():
        atomic_num = Element(element).Z
        atom_list.extend([atomic_num] * int(count))
    
    # 确保原子列表长度不超过52（根据CLAUDE.md的说明）
    if len(atom_list) > 52:
        raise ValueError(f"原子数 {len(atom_list)} 超过最大限制52")
    
    # padding到52维
    padded_list = atom_list + [0] * (52 - len(atom_list))
    
    return len(atom_list), padded_list

## 2. 模型加载

In [4]:
def load_model(model_path: str):
    """
    加载训练好的模型
    
    Args:
        model_path: 模型checkpoint路径
    
    Returns:
        model: 加载的模型
    """
    # TODO: 这里需要根据实际的模型架构进行修改
    # 目前只是占位符
    
    print(f"加载模型: {model_path}")
    
    # 示例代码，需要替换为实际的模型加载逻辑
    # from src.trainer import LitModule
    # model = LitModule.load_from_checkpoint(model_path)
    # model.eval()
    # model = model.to(device)
    
    return None  # 返回占位符

## 3. 批量推理

In [5]:
def prepare_batch(
    sample_ids: List[str],
    compositions_dict: Dict,
    pxrd_dict: Dict[str, np.ndarray],
    target_pxrd_dict: Optional[Dict[str, np.ndarray]] = None
) -> Dict[str, torch.Tensor]:
    """
    准备一个批次的数据
    
    Args:
        sample_ids: 批次中的样本ID列表
        compositions_dict: 组成信息字典
        pxrd_dict: PXRD谱字典
        target_pxrd_dict: 目标PXRD谱字典（用于采样过程）
    
    Returns:
        batch_data: 包含批次数据的字典
    """
    batch_comps = []  # [batch_size, 52]
    batch_pxrd = []   # [batch_size, 11051]
    batch_target_pxrd = []  # [batch_size, 11051]
    batch_num_atoms = []  # [batch_size]
    
    for sample_id in sample_ids:
        # 获取组成（使用第一个，即niggli reduced cell）
        comp_str = compositions_dict[sample_id]["composition"][0]
        num_atoms, atom_types = parse_composition(comp_str)
        
        batch_comps.append(atom_types)
        batch_num_atoms.append(num_atoms)
        batch_pxrd.append(pxrd_dict[sample_id])
        
        # 如果有目标PXRD（用于采样中间过程）
        if target_pxrd_dict:
            batch_target_pxrd.append(target_pxrd_dict[sample_id])
        else:
            # 初始时使用相同的PXRD
            batch_target_pxrd.append(pxrd_dict[sample_id])
    
    # 转换为tensor
    batch_data = {
        'comp': torch.tensor(batch_comps, dtype=torch.long, device=device),  # [batch_size, 52]
        'pxrd': torch.tensor(np.stack(batch_pxrd), dtype=torch.float32, device=device),  # [batch_size, 11051]
        'target_pxrd': torch.tensor(np.stack(batch_target_pxrd), dtype=torch.float32, device=device),  # [batch_size, 11051]
        'num_atoms': torch.tensor(batch_num_atoms, dtype=torch.long, device=device),  # [batch_size]
        'sample_ids': sample_ids
    }
    
    return batch_data

In [None]:
def batch_inference(
    model,
    batch_data: Dict[str, torch.Tensor],
    num_sampling_steps: int = 100
) -> Dict[str, torch.Tensor]:
    """
    批量推理生成晶体结构
    
    Args:
        model: 训练好的模型
        batch_data: 批次数据
        num_sampling_steps: 采样步数
    
    Returns:
        results: 包含生成结果的字典
    """
    # TODO: 这里需要实现实际的推理逻辑
    # 包括：
    # 1. 初始化z（噪声）
    # 2. 逐步采样（meanflow或cfm）
    # 3. 在每步计算PXRD作为条件
    # 4. 返回最终的晶体结构
    
    batch_size = batch_data['comp'].shape[0]
    
    # 示例输出（需要替换为实际推理）
    # 模型输出: [batch_size, 3 + 52, 3]
    # 前3个是晶格参数(3x3矩阵)，后52个是分数坐标
    dummy_output = torch.randn(batch_size, 55, 3, device=device)
    
    # 解析输出
    lattice_matrix = dummy_output[:, :3, :]  # [batch_size, 3, 3]
    frac_coords = dummy_output[:, 3:, :]  # [batch_size, 52, 3]
    
    # 从晶格矩阵计算长度和角度
    # 这里需要实际的转换逻辑
    lengths = torch.rand(batch_size, 3, device=device) * 10 + 5  # 示例
    angles = torch.rand(batch_size, 3, device=device) * 30 + 75  # 示例
    
    results = {
        'lattice_matrix': lattice_matrix,
        'frac_coords': frac_coords,
        'lengths': lengths,
        'angles': angles,
        'num_atoms': batch_data['num_atoms'],
        'comp': batch_data['comp'],
        'sample_ids': batch_data['sample_ids']
    }
    
    return results

## 4. 后处理与优化

In [None]:
def postprocess_structure(results: Dict[str, torch.Tensor]) -> List[Structure]:
    """
    后处理生成的结构
    
    Args:
        results: 批量推理结果
    
    Returns:
        structures: pymatgen Structure对象列表
    """
    structures = []
    batch_size = results['lengths'].shape[0]
    
    for i in range(batch_size):
        # 获取当前样本数据
        num_atoms = results['num_atoms'][i].item()
        comp = results['comp'][i].cpu().numpy()
        frac_coords = results['frac_coords'][i][:num_atoms].cpu().numpy()  # 只取有效原子
        lengths = results['lengths'][i].cpu().numpy()
        angles = results['angles'][i].cpu().numpy()
        
        # 获取原子类型（去除padding）
        atom_types = [z for z in comp[:num_atoms] if z > 0]
        species = [Element.from_Z(z) for z in atom_types]
        
        # 创建晶格
        lattice = Lattice.from_parameters(
            lengths[0], lengths[1], lengths[2],
            angles[0], angles[1], angles[2]
        )
        
        # 创建结构
        structure = Structure(lattice, species, frac_coords)
        
        # 可选：能量最小化或其他优化
        # structure = optimize_structure(structure)
        
        structures.append(structure)
    
    return structures

## 5. 生成提交文件

In [None]:
def generate_submission(
    structures: List[Structure],
    sample_ids: List[str],
    output_path: str = "submission.csv"
):
    """
    生成比赛提交文件
    
    Args:
        structures: 生成的晶体结构列表
        sample_ids: 样本ID列表
        output_path: 输出文件路径
    """
    rows = []
    
    for structure, sample_id in zip(structures, sample_ids):
        # 转换为CIF格式
        cif_str = structure.to(fmt="cif")
        rows.append([sample_id, cif_str])
    
    # 创建DataFrame并保存
    df = pd.DataFrame(rows, columns=["ID", "cif"])
    df.to_csv(output_path, index=False)
    
    print(f"提交文件已保存: {output_path}")
    print(f"包含 {len(df)} 个结构")

## 6. 主流程

In [None]:
# 配置参数
TEST_DATA_PATH = "docs/data/test_v3/A"  # 测试数据路径
MODEL_PATH = "outputs/model.ckpt"  # 模型路径（需要修改为实际路径）
BATCH_SIZE = 32  # 批处理大小
NUM_SAMPLING_STEPS = 100  # 采样步数
DRY_RUN = False  # 是否测试模式（只处理前10个样本）

In [None]:
def main():
    """
    主函数：完整的推理流程
    """
    # 1. 加载测试数据
    print("="*50)
    print("步骤1: 加载测试数据")
    compositions_dict, pxrd_dict = load_test_data(TEST_DATA_PATH)
    
    # 获取所有样本ID
    sample_ids = list(compositions_dict.keys())
    if DRY_RUN:
        sample_ids = sample_ids[:10]
        print(f"DRY_RUN模式：只处理前{len(sample_ids)}个样本")
    
    # 2. 加载模型
    print("="*50)
    print("步骤2: 加载模型")
    model = load_model(MODEL_PATH)
    
    # 3. 批量推理
    print("="*50)
    print("步骤3: 批量推理生成晶体结构")
    print(f"批处理大小: {BATCH_SIZE}")
    print(f"采样步数: {NUM_SAMPLING_STEPS}")
    
    all_structures = []
    all_sample_ids = []
    
    start_time = time.time()
    
    # 分批处理
    for i in tqdm(range(0, len(sample_ids), BATCH_SIZE), desc="批量推理"):
        batch_ids = sample_ids[i:i + BATCH_SIZE]
        
        # 准备批次数据
        batch_data = prepare_batch(batch_ids, compositions_dict, pxrd_dict)
        
        # 批量推理
        with torch.no_grad():
            results = batch_inference(model, batch_data, NUM_SAMPLING_STEPS)
        
        # 后处理
        structures = postprocess_structure(results)
        
        all_structures.extend(structures)
        all_sample_ids.extend(batch_ids)
    
    elapsed_time = time.time() - start_time
    
    # 4. 生成提交文件
    print("="*50)
    print("步骤4: 生成提交文件")
    output_path = "submission.csv" if not DRY_RUN else "submission_dryrun.csv"
    generate_submission(all_structures, all_sample_ids, output_path)
    
    # 打印统计信息
    print("="*50)
    print("推理完成！")
    print(f"处理样本数: {len(all_structures)}")
    print(f"总用时: {elapsed_time:.2f} 秒")
    print(f"平均每个样本: {elapsed_time/len(all_structures):.3f} 秒")
    print("="*50)

In [None]:
# 运行主流程
if __name__ == "__main__":
    main()

## 7. 可视化与分析（可选）

In [None]:
def visualize_pxrd(sample_id: str, pxrd_dict: Dict[str, np.ndarray]):
    """
    可视化PXRD谱
    """
    pxrd = pxrd_dict[sample_id]
    theta = np.linspace(5, 115, len(pxrd))  # 2theta范围
    
    plt.figure(figsize=(12, 4))
    plt.plot(theta, pxrd)
    plt.xlabel('2θ (degrees)')
    plt.ylabel('Intensity')
    plt.title(f'PXRD Pattern - {sample_id}')
    plt.grid(True, alpha=0.3)
    plt.show()

In [None]:
def analyze_results(structures: List[Structure]):
    """
    分析生成的晶体结构
    """
    # 统计晶格参数分布
    lengths_a = [s.lattice.a for s in structures]
    lengths_b = [s.lattice.b for s in structures]
    lengths_c = [s.lattice.c for s in structures]
    
    angles_alpha = [s.lattice.alpha for s in structures]
    angles_beta = [s.lattice.beta for s in structures]
    angles_gamma = [s.lattice.gamma for s in structures]
    
    fig, axes = plt.subplots(2, 3, figsize=(15, 8))
    
    # 晶格长度分布
    axes[0, 0].hist(lengths_a, bins=30, alpha=0.7)
    axes[0, 0].set_title('Lattice a')
    axes[0, 0].set_xlabel('Length (Å)')
    
    axes[0, 1].hist(lengths_b, bins=30, alpha=0.7)
    axes[0, 1].set_title('Lattice b')
    axes[0, 1].set_xlabel('Length (Å)')
    
    axes[0, 2].hist(lengths_c, bins=30, alpha=0.7)
    axes[0, 2].set_title('Lattice c')
    axes[0, 2].set_xlabel('Length (Å)')
    
    # 晶格角度分布
    axes[1, 0].hist(angles_alpha, bins=30, alpha=0.7)
    axes[1, 0].set_title('Angle α')
    axes[1, 0].set_xlabel('Angle (degrees)')
    
    axes[1, 1].hist(angles_beta, bins=30, alpha=0.7)
    axes[1, 1].set_title('Angle β')
    axes[1, 1].set_xlabel('Angle (degrees)')
    
    axes[1, 2].hist(angles_gamma, bins=30, alpha=0.7)
    axes[1, 2].set_title('Angle γ')
    axes[1, 2].set_xlabel('Angle (degrees)')
    
    plt.tight_layout()
    plt.show()
    
    # 打印统计信息
    print("晶格参数统计:")
    print(f"a: {np.mean(lengths_a):.2f} ± {np.std(lengths_a):.2f} Å")
    print(f"b: {np.mean(lengths_b):.2f} ± {np.std(lengths_b):.2f} Å")
    print(f"c: {np.mean(lengths_c):.2f} ± {np.std(lengths_c):.2f} Å")
    print(f"α: {np.mean(angles_alpha):.2f} ± {np.std(angles_alpha):.2f}°")
    print(f"β: {np.mean(angles_beta):.2f} ± {np.std(angles_beta):.2f}°")
    print(f"γ: {np.mean(angles_gamma):.2f} ± {np.std(angles_gamma):.2f}°")

## TODO: 与实际模型集成

当前notebook提供了完整的推理框架，但需要与实际训练的模型集成：

1. **模型加载**：需要导入实际的模型类（如从`src.trainer`或`src.networks`）
2. **推理逻辑**：实现`batch_inference`函数中的实际推理流程，包括：
   - meanflow或cfm的采样过程
   - 实时PXRD计算与条件注入
   - 正确的输出解析
3. **PXRD计算**：集成`src.PXRDSimulator`用于采样过程中的PXRD计算
4. **后处理优化**：可选添加能量最小化等优化步骤