In [1]:
# 单元格 1: 导入必要的库
import dxpy
import numpy as np
import pandas as pd
import re
import subprocess
import glob
import os
import shlex # 用于安全地分割命令行参数

print("库导入完成。")


库导入完成。


In [2]:
# 单元格 2: 定义辅助函数

def convert_phenotype_name(ukb_name):
    """
    将 UK Biobank 表型名称转换为 R 兼容的格式 'x[varid]_[instance]_[array]'。
    示例: 'p12345_i0_a0' 变为 'x12345_0_0'
             'eid' 变为 'userId'
    """
    if ukb_name == 'eid':
        return 'userId'

    # 尝试匹配 p<varid>_i<instance>_a<array> 格式
    match_standard = re.match(r'p(\d+)(?:_i(\d+))?(?:_a(\d+))?$', ukb_name)
    if match_standard:
        varid = match_standard.group(1)
        instance = match_standard.group(2) if match_standard.group(2) is not None else '0'
        array = match_standard.group(3) if match_standard.group(3) is not None else '0'
        return f'x{varid}_{instance}_{array}'
    else:
        # 对于不完全匹配上述格式的名称 (例如，只有 varid)
        match_simple_p = re.match(r'p(\d+)$', ukb_name)
        if match_simple_p:
            varid = match_simple_p.group(1)
            return f'x{varid}_0_0' # 默认为 instance 0, array 0

        # 对于其他不以 'p' 开头或不符合上述规则的名称，进行通用 R 兼容性处理
        r_compatible_name = re.sub(r'[^a-zA-Z0-9_.]', '_', ukb_name) # 允许点号
        if not r_compatible_name: # 如果替换后为空
            return f"unknown_field_{ukb_name}"
        if r_compatible_name[0].isdigit():
            r_compatible_name = 'x' + r_compatible_name
        # 确保名称在 R 中有效（例如，不能以下划线或点开头，尽管 R 本身有时允许）
        if r_compatible_name.startswith('_') or r_compatible_name.startswith('.'):
            r_compatible_name = 'x' + r_compatible_name
        return r_compatible_name

In [3]:
# 单元格 3: 自动发现数据集和项目 ID
dispensed_dataset_id = None
project_id = None
dataset_full_id = None

try:
    dispensed_dataset_obj = dxpy.find_one_data_object(
        typename="Dataset", name="app*.dataset", folder="/", name_mode="glob", return_handler=False
    )
    if not dispensed_dataset_obj:
        raise Exception("错误：未找到分发的数据集。请确保根目录中存在名为 'app*.dataset' 的数据集。")
    dispensed_dataset_id = dispensed_dataset_obj["id"]
    print(f"找到分发的数据集 ID: {dispensed_dataset_id}")

    project_obj = dxpy.find_one_project(return_handler=False)
    if not project_obj:
        raise Exception("错误：未能找到当前项目。")
    project_id = project_obj["id"]
    print(f"找到项目 ID: {project_id}")

    dataset_full_id = f"{project_id}:{dispensed_dataset_id}"
    print(f"将使用数据集: {dataset_full_id}")

except Exception as e:
    print(f"初始化错误: {e}")
    # 如果在notebook中，你可能希望在这里停止执行或进行手动干预
    # raise e # 或者重新抛出异常以停止notebook



找到分发的数据集 ID: record-GvQp8V8JJ68073BBp2PjfgKg
找到项目 ID: project-GvQgb10JjKY3XGbpZbGVg0v3
将使用数据集: project-GvQgb10JjKY3XGbpZbGVg0v3:record-GvQp8V8JJ68073BBp2PjfgKg


In [4]:
# 单元格 4: 提取或加载数据字典
current_path = os.getcwd()
data_dict_filename = ""
data_dict_df = None

# 检查数据字典文件是否已存在
existing_dict_files = glob.glob(os.path.join(current_path, "*.data_dictionary.csv"))
if existing_dict_files:
    data_dict_filename = existing_dict_files[0]
    print(f"找到已存在的数据字典: {data_dict_filename}")
else:
    if dataset_full_id:
        print("本地未找到数据字典，尝试从数据集中提取...")
        cmd_extract_dict = ["dx", "extract_dataset", dataset_full_id, "-ddd", "--delimiter", ","]
        try:
            print(f"执行命令: {shlex.join(cmd_extract_dict)}")
            subprocess.check_call(cmd_extract_dict)
            downloaded_dicts = glob.glob(os.path.join(current_path, "*.data_dictionary.csv"))
            if downloaded_dicts:
                data_dict_filename = downloaded_dicts[0]
                print(f"成功提取并找到数据字典: {data_dict_filename}")
            else:
                raise Exception("错误：提取后未能找到数据字典文件。")
        except subprocess.CalledProcessError as e:
            print(f"提取数据集字典时出错: {e}")
            downloaded_dicts = glob.glob(os.path.join(current_path, "*.data_dictionary.csv"))
            if downloaded_dicts: # 尝试查找是否已部分创建或错误是由于重复运行造成的
                data_dict_filename = downloaded_dicts[0]
                print(f"尽管子流程出错，但找到了数据字典 (可能已存在或部分创建): {data_dict_filename}")
            else:
                print("由于提取数据字典失败，中止操作。")
        except Exception as e:
            print(f"提取字典时发生意外错误: {e}")
    else:
        print("错误：数据集 ID 未定义，无法提取数据字典。")


# 读取数据字典
if data_dict_filename:
    try:
        data_dict_df = pd.read_csv(data_dict_filename, low_memory=False)
        print("数据字典加载成功。")
        # display(data_dict_df.head()) # 在RAP环境中，display可能不可用，用print替代
        print("数据字典 (前5行):")
        print(data_dict_df.head())
    except Exception as e:
        print(f"读取数据字典 CSV '{data_dict_filename}' 时出错: {e}")
        data_dict_df = None #确保后续检查能识别失败
else:
    print("未能获取数据字典文件名，无法加载。")

本地未找到数据字典，尝试从数据集中提取...
执行命令: dx extract_dataset project-GvQgb10JjKY3XGbpZbGVg0v3:record-GvQp8V8JJ68073BBp2PjfgKg -ddd --delimiter ,
成功提取并找到数据字典: /opt/notebooks/app83497_20241030011331.dataset.data_dictionary.csv
数据字典加载成功。
数据字典 (前5行):
        entity   name     type primary_key_type coding_name  concept  \
0  participant    eid   string           global         NaN      NaN   
1  participant  p3_i0  integer              NaN         NaN      NaN   
2  participant  p3_i1  integer              NaN         NaN      NaN   
3  participant  p3_i2  integer              NaN         NaN      NaN   
4  participant  p3_i3  integer              NaN         NaN      NaN   

   description                                        folder_path  \
0          NaN                            Participant Information   
1          NaN  Assessment centre > Procedural metrics > Proce...   
2          NaN  Assessment centre > Procedural metrics > Proce...   
3          NaN  Assessment centre > Procedural metrics > 

In [5]:
# 单元格 5: 定义 fields_for_id 函数 (来自 06 脚本)
# 这个函数需要 data_dict_df 已经加载完毕

def fields_for_id(field_ids_list, data_dictionary_df):
    """
    根据提供的 field_id 列表，从数据字典中构建用于 dx extract_dataset 的字段字符串。
    确保 eid 始终被包含，并且字段属于 'participant' 实体。
    """
    if data_dictionary_df is None:
        print("错误: fields_for_id 需要一个有效的数据字典 DataFrame。")
        return None

    # 始终包含 participant.eid
    final_field_names_to_extract = ["participant.eid"]
    
    # 从数据字典中筛选出 'participant' 实体的字段
    participant_data_dict = data_dictionary_df[data_dictionary_df['entity'] == 'participant']

    for _id in field_ids_list:
        # 构建正则表达式以匹配字段ID，包括可能的实例和数组后缀
        # 例如, field_id '3' 应该匹配 'p3', 'p3_i0', 'p3_i0_a0' 等
        regex_pattern = r"^p{}(?:_i\d+)?(?:_a\d+)?$".format(re.escape(str(_id)))
        
        select_field_names = list(
            participant_data_dict[
                participant_data_dict.name.str.match(regex_pattern, na=False)
            ].name.values
        )
        
        for fname in select_field_names:
            qualified_name = f"participant.{fname}"
            if qualified_name not in final_field_names_to_extract: # 确保唯一性
                 final_field_names_to_extract.append(qualified_name)
    
    if len(final_field_names_to_extract) == 1 and "participant.eid" in final_field_names_to_extract:
        print(f"警告: 除了 'eid' 外，没有找到与提供的 ID 列表匹配的 'participant' 实体字段: {field_ids_list}")
        return "participant.eid" # 只返回eid，避免提取空数据导致错误

    return ",".join(final_field_names_to_extract)

print("fields_for_id 函数定义完成。")


fields_for_id 函数定义完成。


In [6]:
# 单元格 6: 获取所有表型名称并构建提取列表
all_phenotype_ukb_names_from_dict = [] # UKB原始名称，如 pXXXX_iY_aZ
all_phenotype_field_ids_for_extraction = [] # 仅包含数字ID，如 XXXX
fields_to_extract_str = ""

if data_dict_df is not None:
    # 1. 获取所有可能的表型名 (name 列)
    # 我们只对 'participant' entity 感兴趣
    participant_fields_df = data_dict_df[data_dict_df['entity'] == 'participant']
    
    # 提取所有在 'name' 列中的有效表型字段 (通常以 'p' 开头)
    # 并确保 'eid' 存在
    raw_names = participant_fields_df['name'].dropna().astype(str).unique().tolist()
    
    if 'eid' not in raw_names:
        all_phenotype_ukb_names_from_dict.append('eid') # 确保userId列的基础'eid'存在

    for name in raw_names:
        if name not in all_phenotype_ukb_names_from_dict: # 避免重复
            all_phenotype_ukb_names_from_dict.append(name)
        
        # 从表型名称 (如 'p3_i0', 'p21003_i0_a0', 'p41270') 中提取数字ID部分
        match_p_id = re.match(r'p(\d+)', name)
        if match_p_id:
            field_id_num = match_p_id.group(1)
            if field_id_num not in all_phenotype_field_ids_for_extraction:
                all_phenotype_field_ids_for_extraction.append(field_id_num)
        elif name != 'eid': # 其他非'p'开头的也尝试加入，如果需要的话
            # 根据您的数据字典，这里可能不需要添加非'p'开头的，除非有特殊情况
            pass
            
    print(f"从数据字典的 'participant' 实体中找到 {len(all_phenotype_ukb_names_from_dict)} 个原始表型名称。")
    print(f"提取到 {len(all_phenotype_field_ids_for_extraction)} 个唯一的表型数字ID。")
    # print(f"原始表型名称示例 (前10): {all_phenotype_ukb_names_from_dict[:10]}")
    # print(f"表型数字ID示例 (前10): {all_phenotype_field_ids_for_extraction[:10]}")

    # 2. 使用 fields_for_id 函数来构建提取字符串
    # 注意：如果表型ID列表过长，直接作为命令行参数可能会超限。
    # dx extract_dataset 也接受 --fields-file 参数，这更适合大量字段。
    # 这里我们先用 fields_for_id 准备要传递给 --fields-file 的 *原始* UKB 字段名列表 (不含 participant. 前缀)
    
    # fields_for_id 函数是设计来接收数字ID列表的。
    # 我们需要的是所有 'participant.pXXXX...' 格式的字段
    # 我们已经有了 all_phenotype_ukb_names_from_dict，其中包含了 'eid' 和所有 'pXXXX...'
    
    fields_to_extract_for_file = []
    if 'eid' in all_phenotype_ukb_names_from_dict:
        fields_to_extract_for_file.append('eid') # dx extract_dataset --fields-file 需要的是不带 entity 前缀的名称
    
    for name in all_phenotype_ukb_names_from_dict:
        if name.startswith('p') and name not in fields_to_extract_for_file:
            fields_to_extract_for_file.append(name)
    
    print(f"准备了 {len(fields_to_extract_for_file)} 个字段写入 --fields-file。")

else:
    print("数据字典未成功加载，无法获取表型名称。")


从数据字典的 'participant' 实体中找到 29378 个原始表型名称。
提取到 9460 个唯一的表型数字ID。
准备了 29378 个字段写入 --fields-file。


In [None]:
# 单元格 7: 提取表型数据 (分批处理)
output_phenotype_csv_name = "all_phenotypes_extracted_raw.csv" # 最终合并的原始文件名
phenotype_df = None # 这将是最终合并的 DataFrame
list_of_batch_dfs = []
temp_batch_files_to_delete = []


output_phenotype_csv_name = "all_phenotypes_extracted_raw_from_batch49_onwards.csv" # 最终合并的原始文件名
phenotype_df = None # 这将是最终合并的 DataFrame
list_of_batch_dfs = [] # 只存储从 start_batch_index 开始新提取的批次
temp_batch_files_to_delete = [] # 本次运行中创建的临时文件

# --- 确保以下变量已由之前的单元格正确定义和加载 ---
# data_dict_df: Pandas DataFrame，数据字典
# all_phenotype_field_ids_for_extraction: Python list，包含所有待提取表型的数字ID字符串 (不含'eid')
# dataset_full_id: 字符串, 格式如 "project-XXXX:record-YYYY"
# fields_for_id: 函数，用于根据ID列表生成 --fields 字符串
# shlex: 模块，shlex.join 用于安全地显示命令
# ---------------------------------------------------------

if (data_dict_df is not None and
        'all_phenotype_field_ids_for_extraction' in globals() and
        all_phenotype_field_ids_for_extraction and # 确保列表不是None也不是空的
        dataset_full_id):
    
    field_ids_to_process = all_phenotype_field_ids_for_extraction.copy() 
    
    batch_size = 1 # 您之前设定的每个表型ID一批
    num_total_batches = (len(field_ids_to_process) + batch_size - 1) // batch_size # 计算总批次数
    
    print(f"总共可处理的表型ID数量: {len(field_ids_to_process)}.")
    print(f"总理论批次数: {num_total_batches}，每批最多 {batch_size} 个表型ID。")

    # --- 从第49个索引 (即第50个批次) 开始提取 ---
    start_batch_index = 49 
    # ---------------------------------------------

    if start_batch_index >= num_total_batches:
        print(f"起始批次索引 {start_batch_index} 超出或等于总批次数 {num_total_batches}。无需提取新批次。")
    else:
        print(f"将从批次索引 {start_batch_index} (即第 {start_batch_index + 1} 个批次) 开始执行，直到批次索引 {num_total_batches - 1}。")

        for i in range(start_batch_index, num_total_batches):
            current_loop_batch_number = i + 1 # 用户友好的批次号 (从1开始)
            
            # 从 field_ids_to_process 中获取当前批次的ID
            current_slice_start = i * batch_size
            current_slice_end = current_slice_start + batch_size
            current_batch_field_ids = field_ids_to_process[current_slice_start:current_slice_end]

            if not current_batch_field_ids: # 理论上在 range(start_batch_index, num_total_batches) 内不应发生
                print(f"警告: 批次 {current_loop_batch_number} (索引 {i}) 的表型ID列表为空，跳过。")
                continue

            print(f"\n正在处理批次 {current_loop_batch_number}/{num_total_batches} (索引 {i})，包含 {len(current_batch_field_ids)} 个表型ID(s)...")
            
            batch_fields_str = fields_for_id(current_batch_field_ids, data_dict_df)
            
            if not batch_fields_str or (batch_fields_str == "participant.eid" and current_batch_field_ids):
                print(f"警告: 批次 {current_loop_batch_number} 未能从ID列表 {current_batch_field_ids} 中找到任何有效字段（除了eid），跳过此提取批次。")
                continue
            
            batch_output_filename = f"temp_pheno_batch_{i}.csv" # 文件名使用当前循环索引 i
            temp_batch_files_to_delete.append(batch_output_filename)

            # 如果希望每次都强制重新提取，可以取消下面这块检查已存在文件的逻辑
            # if os.path.exists(batch_output_filename) and os.path.getsize(batch_output_filename) > 0:
            #     print(f"批次文件 {batch_output_filename} 已存在。为确保最新数据，将重新提取。")
            #     try: os.remove(batch_output_filename)
            #     except OSError as e_del_old: print(f"删除旧批次文件 {batch_output_filename} 失败: {e_del_old}")
            # 或者，如果想跳过已存在的：
            # if os.path.exists(batch_output_filename) and os.path.getsize(batch_output_filename) > 0:
            #     print(f"批次文件 {batch_output_filename} 已存在且非空，尝试加载。")
            #     try:
            #         batch_df = pd.read_csv(batch_output_filename, low_memory=False)
            #         if "participant.eid" not in batch_df.columns:
            #             if "eid" in batch_df.columns: batch_df.rename(columns={"eid": "participant.eid"}, inplace=True)
            #             else: raise ValueError("eid列缺失")
            #         list_of_batch_dfs.append(batch_df)
            #         print(f"已从现有文件加载批次 {current_loop_batch_number} 数据。Shape: {batch_df.shape}")
            #         continue # 跳过提取
            #     except Exception as e_load: print(f"加载文件 {batch_output_filename} 失败 ({e_load})，将尝试重新提取。")
            
            # 强制重新提取（如果需要，或者上面跳过逻辑未执行）
            cmd_extract_batch = [
                "dx", "extract_dataset", dataset_full_id,
                "--fields", batch_fields_str,
                "--delimiter", ",",
                "--output", batch_output_filename,
            ]

            print(f"执行命令 (批次 {current_loop_batch_number}): {shlex.join(cmd_extract_batch)}")
            process = subprocess.run(cmd_extract_batch, capture_output=True, text=True, check=False)

            if process.returncode == 0:
                if process.stdout: print(f"dx STDOUT (批次 {current_loop_batch_number}):\n{process.stdout}")
                if process.stderr: print(f"dx STDERR (批次 {current_loop_batch_number}, 可能包含成功摘要):\n{process.stderr}")

                if os.path.exists(batch_output_filename) and os.path.getsize(batch_output_filename) > 0:
                    try:
                        batch_df = pd.read_csv(batch_output_filename, low_memory=False)
                        if "participant.eid" not in batch_df.columns:
                            potential_eid_col = "eid" if "eid" in batch_df.columns else None
                            if potential_eid_col:
                                batch_df.rename(columns={potential_eid_col: "participant.eid"}, inplace=True)
                            else:
                                print(f"错误: 批次 {current_loop_batch_number} 输出文件 '{batch_output_filename}' 中未找到 'participant.eid' 或 'eid' 列。跳过。")
                                continue
                        list_of_batch_dfs.append(batch_df)
                        print(f"批次 {current_loop_batch_number} 数据加载成功。DataFrame shape: {batch_df.shape}")
                    except Exception as e_read:
                        print(f"读取批次 {current_loop_batch_number} 的 CSV '{batch_output_filename}' 时出错: {e_read}")
                else:
                    print(f"错误: 批次 {current_loop_batch_number} 提取命令成功，但输出文件 '{batch_output_filename}' 未创建或为空。")
            else:
                print(f"执行批次 {current_loop_batch_number} 的 'dx extract_dataset' 时出错。返回码: {process.returncode}")
                print(f"STDERR (批次 {current_loop_batch_number}):\n{process.stderr if process.stderr else '无 STDERR 输出。'}")
                print(f"STDOUT (批次 {current_loop_batch_number}):\n{process.stdout if process.stdout else '无 STDOUT 输出。'}")
                print(f"批次 {current_loop_batch_number} 失败，建议检查错误并决定是否中止整个过程。当前设置为继续处理后续批次。")
                # 若要在一个批次失败时停止，取消下面一行的注释：
                # break 

        # --- 合并从 start_batch_index 开始提取的所有批次的 DataFrame ---
        if list_of_batch_dfs: # 确保至少有一个批次成功加载
            print(f"\n开始合并 {len(list_of_batch_dfs)} 个新提取的批次 (从索引 {start_batch_index} 开始)...")
            merge_key = "participant.eid" 
            
            phenotype_df = list_of_batch_dfs[0] # 第一个成功加载的批次作为基础
            if merge_key not in phenotype_df.columns:
                print(f"错误：第一个用于合并的DataFrame中缺少合并键 '{merge_key}'。其列为: {phenotype_df.columns.tolist()}")
                phenotype_df = None # 阻止后续操作
            else:
                print(f"基础 DataFrame (来自批次索引 {temp_batch_files_to_delete[0].split('_')[-1].split('.')[0]}) shape: {phenotype_df.shape}")

                for k in range(1, len(list_of_batch_dfs)):
                    df_to_merge = list_of_batch_dfs[k]
                    df_to_merge_filename = temp_batch_files_to_delete[k] # 获取对应文件名用于日志
                    print(f"准备合并来自文件 '{df_to_merge_filename}' 的DataFrame (shape: {df_to_merge.shape})")
                    
                    if merge_key not in df_to_merge.columns:
                        print(f"错误: 合并键 '{merge_key}' 在文件 '{df_to_merge_filename}' 的DataFrame中未找到。跳过合并此批次。")
                        continue
                    
                    # 只合并新的列 (除了合并键)
                    new_cols_to_add = [col for col in df_to_merge.columns if col not in phenotype_df.columns or col == merge_key]
                    if not new_cols_to_add or (len(new_cols_to_add) == 1 and new_cols_to_add[0] == merge_key):
                        print(f"文件 '{df_to_merge_filename}' 没有新的列（除了合并键）可以添加到主DataFrame，跳过合并。")
                        continue
                    df_to_merge_selected = df_to_merge[new_cols_to_add]

                    try:
                        phenotype_df = pd.merge(phenotype_df, df_to_merge_selected, on=merge_key, how='outer')
                        print(f"合并后 DataFrame shape: {phenotype_df.shape}")
                    except Exception as e_merge:
                        print(f"合并文件 '{df_to_merge_filename}' 的DataFrame时出错: {e_merge}")
            
            if phenotype_df is not None:
                 print("所有指定批次合并完成。")
                 print(f"最终合并的原始 DataFrame (从批次索引 {start_batch_index} 开始) shape: {phenotype_df.shape}")
                 # 可以选择在这里保存这个中间的合并结果，或者等待单元格8和9的处理
                 # phenotype_df.to_csv(f"temp_merged_from_batch_{start_batch_index}.csv", index=False)
            else:
                print("错误：合并后 phenotype_df 为 None 或未成功初始化。")
        else:
            print("没有从指定起始批次成功提取或加载的数据可供合并。")

        # 清理本次运行中创建的临时批处理文件
        print("\n清理本次运行中创建的临时批处理文件...")
        for temp_file in temp_batch_files_to_delete:
            if os.path.exists(temp_file):
                try:
                    os.remove(temp_file)
                    print(f"已删除临时文件: {temp_file}")
                except OSError as e_rem_temp:
                    print(f"删除临时文件 '{temp_file}' 时出错: {e_rem_temp}")
else:
    if data_dict_df is None: print("数据字典 (data_dict_df) 未加载。")
    if 'all_phenotype_field_ids_for_extraction' not in globals() or not all_phenotype_field_ids_for_extraction:
        print("表型字段ID列表 (all_phenotype_field_ids_for_extraction) 未定义或为空。")
    if not dataset_full_id: print("数据集ID (dataset_full_id) 未定义。")
    print("因此跳过表型数据提取步骤。")

# phenotype_df 现在将包含从批次索引 start_batch_index 开始提取并合并的数据。
# 后续的单元格 8 (重命名和排序列) 将基于这个 phenotype_df 进行操作。
# 如果 phenotype_df 为 None (因为没有批次成功)，单元格8和9应该能正确处理这种情况。

总共可处理的表型ID数量: 9460.
总理论批次数: 9460，每批最多 1 个表型ID。
将从批次索引 49 (即第 50 个批次) 开始执行，直到批次索引 9459。

正在处理批次 50/9460 (索引 49)，包含 1 个表型ID(s)...
执行命令 (批次 50): dx extract_dataset project-GvQgb10JjKY3XGbpZbGVg0v3:record-GvQp8V8JJ68073BBp2PjfgKg --fields participant.eid,participant.p136_i0,participant.p136_i1,participant.p136_i2,participant.p136_i3 --delimiter , --output temp_pheno_batch_49.csv
dx STDERR (批次 50, 可能包含成功摘要):
INFO:dxpy:[Sat May 17 06:56:15 2025] POST http://10.0.3.1:8124/VIZ/SRV/data/3.0/record-GvQp8V8JJ68073BBp2PjfgKg/raw: Recovered after 1 retries

批次 50 数据加载成功。DataFrame shape: (502137, 5)

正在处理批次 51/9460 (索引 50)，包含 1 个表型ID(s)...
执行命令 (批次 51): dx extract_dataset project-GvQgb10JjKY3XGbpZbGVg0v3:record-GvQp8V8JJ68073BBp2PjfgKg --fields participant.eid,participant.p137_i0,participant.p137_i1,participant.p137_i2,participant.p137_i3 --delimiter , --output temp_pheno_batch_50.csv
批次 51 数据加载成功。DataFrame shape: (502137, 5)

正在处理批次 52/9460 (索引 51)，包含 1 个表型ID(s)...
执行命令 (批次 52): dx extract_dataset 

In [None]:
# 单元格 8: 重命名和排序列
processed_phenotype_df = None
final_output_filename = "ukbiobank_phenotypes_r_compatible.csv"

if phenotype_df is not None:
    print("开始处理已加载的表型数据...")
    # dx extract_dataset --fields-file 提取出的列名是不带 'participant.' 前缀的
    # 所以我们直接使用这些列名进行转换和排序
    processed_phenotype_df = phenotype_df.copy() 

    original_columns = processed_phenotype_df.columns.tolist()
    renamed_columns_map = {} 
    final_columns_ordered_r_names = [] # 存储R兼容的列名，用于最终排序

    # 1. 处理 'eid' -> 'userId'
    eid_col_original = None
    if 'eid' in original_columns: # dx extract --fields-file 应该输出不带前缀的 'eid'
        eid_col_original = 'eid'
    
    if eid_col_original:
        renamed_columns_map[eid_col_original] = 'userId'
        final_columns_ordered_r_names.append('userId') # userId 应该是第一列
    else:
        print("错误：在提取的数据中未找到 'eid' 列。无法设置 'userId'。")
        # 脚本可能需要停止或采取其他措施
        
    # 2. 处理其他表型列
    temp_pheno_cols_for_sorting = [] # 存储包含排序信息的字典列表
    for col_name in original_columns:
        if col_name == eid_col_original: # 跳过已处理的 eid
            continue

        # col_name 此时应该是原始的UKB字段名，如 pXXXX_iY_aZ (不带 'participant.' 前缀)
        r_compat_name = convert_phenotype_name(col_name) # 'eid' 已被上面处理
        renamed_columns_map[col_name] = r_compat_name

        # 提取 varid, instance, array 用于排序
        match = re.match(r'x(\d+)(?:_(\d+))?(?:_(\d+))?$', r_compat_name)
        if match:
            varid = int(match.group(1))
            instance_str = match.group(2)
            array_str = match.group(3)
            instance = int(instance_str) if instance_str is not None else 0
            array_val = int(array_str) if array_str is not None else 0
            temp_pheno_cols_for_sorting.append({
                'original': col_name,      # 原始列名 (e.g., 'p21003_0_0')
                'r_name': r_compat_name,   # R兼容列名 (e.g., 'x21003_0_0')
                'varid': varid,
                'instance': instance,
                'array': array_val
            })
        else: # 处理非标准格式的列名 (convert_phenotype_name已处理，这里主要用于排序)
             temp_pheno_cols_for_sorting.append({
                'original': col_name,
                'r_name': r_compat_name,
                'varid': float('inf'), # 使其排在标准格式之后
                'instance': float('inf'),
                'array': float('inf')
            })
            if r_compat_name not in final_columns_ordered_r_names: # 避免userId重复
                 print(f"警告：列 '{col_name}' (转换为 '{r_compat_name}') 未能解析出varid/instance/array，将按名称排序。")


    # 按 varid, then instance, then array 排序表型列
    sorted_pheno_cols_info = sorted(temp_pheno_cols_for_sorting, key=lambda x: (x['varid'], x['instance'], x['array'], x['r_name']))

    for item in sorted_pheno_cols_info:
        if item['r_name'] not in final_columns_ordered_r_names: 
            final_columns_ordered_r_names.append(item['r_name'])
    
    # 应用重命名
    processed_phenotype_df.rename(columns=renamed_columns_map, inplace=True)

    # 按最终顺序排序列
    try:
        # 确保 final_columns_ordered_r_names 中的所有列都存在于 DataFrame 中
        existing_final_columns = [col for col in final_columns_ordered_r_names if col in processed_phenotype_df.columns]
        
        # 如果 'userId' 由于某种原因不在列表的开头 (例如，它是唯一的非数字ID字段且排序逻辑不同)
        # 或者某些列丢失了，这里尝试确保 'userId' 是第一列 (如果它存在)
        if 'userId' in processed_phenotype_df.columns:
            if 'userId' in existing_final_columns:
                existing_final_columns.remove('userId')
            existing_final_columns.insert(0, 'userId')
        else: # 如果 userId 真的丢失了，这是一个严重问题
            print("严重错误: 'userId' 列在重命名后丢失。")
        
        # 去重，以防万一 (userId 可能在 renamed_columns_map 和 final_columns_ordered_r_names 中都出现)
        seen = set()
        unique_existing_final_columns = [x for x in existing_final_columns if not (x in seen or seen.add(x))]

        processed_phenotype_df = processed_phenotype_df[unique_existing_final_columns]
        print("列已成功重命名并排序。")
        print("处理后的表型数据 (前5行):")
        print(processed_phenotype_df.head())

    except KeyError as e:
        print(f"排序列时发生 KeyError: {e}。可能是某些列未能正确重命名或意外丢失。")
        print("重命名后的可用列:", processed_phenotype_df.columns.tolist())
        print("尝试使用可用列继续。")
        available_cols = [col for col in final_columns_ordered_r_names if col in processed_phenotype_df.columns]
        if 'userId' in processed_phenotype_df.columns and 'userId' not in available_cols:
            available_cols.insert(0, 'userId')
        
        seen = set()
        unique_available_cols = [x for x in available_cols if not (x in seen or seen.add(x))]

        if unique_available_cols:
             processed_phenotype_df = processed_phenotype_df[unique_available_cols]
             print("已使用可用列重新组织 DataFrame。")
             print(processed_phenotype_df.head())
        else:
            print("没有可用的列来创建最终的 DataFrame。")
            processed_phenotype_df = None
else:
    print("原始表型数据 (phenotype_df) 未成功加载，跳过处理步骤。")


In [None]:


# 单元格 9: 保存最终的 CSV 文件
if processed_phenotype_df is not None:
    try:
        processed_phenotype_df.to_csv(final_output_filename, index=False, na_rep='NA') # R 中常用的缺失值表示
        print(f"最终表型数据已保存到 '{final_output_filename}'。")
        print(f"文件格式: CSV, 逗号分隔, {processed_phenotype_df.shape[0]} 行, {processed_phenotype_df.shape[1]} 列。")
        if not processed_phenotype_df.empty:
            print(f"第一列: '{processed_phenotype_df.columns[0]}'")
            print("最终数据的前5行示例:")
            print(processed_phenotype_df.head())
        else:
            print("警告：最终的DataFrame为空。")

        # 可选：上传到 UKB RAP (根据需要取消注释并调整路径)
        if project_id: # 确保 project_id 已定义
             dx_upload_folder = "/output_data/" # 例如，在项目中的 output_data 文件夹
             print(f"准备上传 '{final_output_filename}' 到 UKB RAP 项目 '{project_id}' 的文件夹 '{dx_upload_folder}'...")
             try:
                 # 确保文件夹存在
                 dxpy.DXProject(project_id).new_folder(dx_upload_folder, parents=True)
                 dxpy.upload_local_file(filename=final_output_filename, project=project_id, folder=dx_upload_folder)
                 print(f"文件成功上传到 '{dx_upload_folder}{final_output_filename}'")
             except Exception as e_upload:
                 print(f"上传文件到 DNAnexus 时出错: {e_upload}")
        else:
            print("项目ID未定义，跳过上传步骤。")


    except Exception as e:
        print(f"保存最终 CSV 时出错: {e}")
else:
    print("没有处理好的表型数据可供保存。")

print("脚本执行完毕。")