In [4]:
file_path = "/home/cuixing/.local/lib/python3.9/site-packages/totalsegmentator/map_to_binary.py"

with open(file_path, "r") as f:
    content = f.read()

    for i, line in enumerate(content.splitlines()):
    print(f"{i+1:3}: {line}")


IndentationError: expected an indented block (2794961134.py, line 7)

In [5]:
class_map = {
    "background": 0,
    "Femur": 1,
    "Fibula": 2,
    "T1_S1": 3,
    "T1_S3": 4,
    "T1_S4": 5,
    "T2_S1": 6,
    "T2_S2": 7,
    "T2_S3": 8,
    "T2_S4": 9,
    "T3_S1": 10,
    "T3_S2": 11,
    "T3_S3": 12,
    "T3_S4": 13,
    "T4_S1": 14,
    "T4_S2": 15,
    "T4_S3": 16,
    "T4_S4": 17,
    "T5_S1": 18,
    "T5_S2": 19,
    "T5_S3": 20,
    "T5_S4": 21,
    "Calcaneus": 22,
    "Cuboid": 23,
    "Cunei_Med": 24,
    "Cunei_Lat": 25,
    "Cunei_Int": 26,
    "Navicular": 27,
    "Talus": 28,
    "Tibia": 29

}


In [None]:
import os
import shutil
import json
import numpy as np
import nibabel as nib
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor


# 配置路径
os.environ['nnUNet_raw'] = '/home/cuixing/Foot&Ankle/Tarsus_split_rawdata'
os.environ['nnUNet_preprocessed'] = '/home/cuixing/Foot&Ankle/Data_Preprocessed'



def combine_labels(ref_img, file_out, class_map, seg_dir):
    ref_img = nib.load(ref_img)
    combined = np.zeros(ref_img.shape, dtype=np.uint8)
    missing = []
    multiple = []

    for label_name, label_id in class_map.items():
        if label_name == "background":
            continue
        matched = list(seg_dir.glob(f"*{label_name}*.nii.gz"))
        if not matched:
            missing.append(label_name)
            continue
        if len(matched) > 1:
            multiple.append(label_name)
        img = nib.load(matched[0])
        combined[img.get_fdata() > 0] = label_id

    nib.save(nib.Nifti1Image(combined, ref_img.affine), file_out)
    return missing


def process_train(subject):
    subject_path = dataset_path / subject
    ct_file = subject_path / f"{subject}_resampled.nii.gz"
    seg_dir = subject_path / f"{subject}_GT_Segmentations"
    if not ct_file.exists():
        return subject, "missing", []

    shutil.copy(ct_file, nnunet_path / "imagesTr" / f"{subject}_0000.nii.gz")
    label_out = nnunet_path / "labelsTr" / f"{subject}.nii.gz"
    missing = combine_labels(ct_file, label_out, class_map, seg_dir)
    return subject, "ok", missing


def process_test(subject):
    subject_path = dataset_path / subject
    ct_file = subject_path / f"{subject}_resampled.nii.gz"
    seg_dir = subject_path / f"{subject}_GT_Segmentations"
    if not ct_file.exists():
        return subject, "missing", []

    shutil.copy(ct_file, nnunet_path / "imagesTs" / f"{subject}_0000.nii.gz")
    label_out = nnunet_path / "labelsTs" / f"{subject}.nii.gz"
    missing = combine_labels(ct_file, label_out, class_map, seg_dir)
    return subject, "ok", missing


def generate_json_from_dir_v2(foldername, subjects_train, subjects_val, labels):
    print("📄 Creating dataset.json...")
    out_base = Path(os.environ['nnUNet_raw']) / foldername
    json_dict = {
        "name": "TotalSegmentator",
        "description": "Segmentation of TotalSegmentator classes",
        "reference": "https://zenodo.org/record/6802614",
        "licence": "Apache 2.0",
        "release": "2.0",
        "channel_names": {"0": "CT"},
        "labels": class_map,
        "numTraining": len(subjects_train + subjects_val),
        "file_ending": ".nii.gz",
        "overwrite_image_reader_writer": "NibabelIOWithReorient",
        "training": [
            {
                "image": f"./imagesTr/{subj}_0000.nii.gz",
                "label": f"./labelsTr/{subj}.nii.gz"
            }
            for subj in subjects_train + subjects_val
        ]
    }
    json.dump(json_dict, open(out_base / "dataset.json", "w"), indent=4)

    print("📄 Creating splits_final.json...")
    output_folder_pkl = Path(os.environ['nnUNet_preprocessed']) / foldername
    output_folder_pkl.mkdir(parents=True, exist_ok=True)
    splits = [{"train": subjects_train, "val": subjects_val}]
    json.dump(splits, open(output_folder_pkl / "splits_final.json", "w"), indent=4)


if __name__ == "__main__":
    dataset_path = Path("/nfs/turbo/coe-mreedsensitive/Processing/Foot_and_Ankle/SK/Tarsus_Separated")
    nnunet_path = Path("/home/cuixing/Foot&Ankle/Tarsus_split_rawdata/Dataset001_TS_app_bones")

    for subdir in ["imagesTr", "labelsTr", "imagesTs", "labelsTs"]:
        (nnunet_path / subdir).mkdir(parents=True, exist_ok=True)

    meta = pd.read_csv(dataset_path / "meta.csv", sep=",")
    subjects_train = list(meta[meta["split"] == "train"]["image_id"].values)
    subjects_val = list(meta[meta["split"] == "val"]["image_id"].values)
    subjects_test = list(meta[meta["split"] == "test"]["image_id"].values)

    print("📦 Processing training+validation set in parallel...")
    missing_counts = {}
    with ProcessPoolExecutor(max_workers=8) as executor:
        for subject, status, missing in tqdm(executor.map(process_train, subjects_train + subjects_val), total=len(subjects_train + subjects_val)):
            if status == "missing":
                print(f"❌ Missing CT for {subject}")
            for m in missing:
                missing_counts[m] = missing_counts.get(m, 0) + 1

    print("📦 Processing test set in parallel...")
    with ProcessPoolExecutor(max_workers=8) as executor:
        for subject, status, missing in tqdm(executor.map(process_test, subjects_test), total=len(subjects_test)):
            if status == "missing":
                print(f"❌ Missing CT for {subject}")
            for m in missing:
                missing_counts[m] = missing_counts.get(m, 0) + 1

    generate_json_from_dir_v2(nnunet_path.name, subjects_train, subjects_val, class_map.keys())

    print("\n📊 缺失结构统计报告:")
    for label, count in sorted(missing_counts.items(), key=lambda x: -x[1]):
        print(f" - {label:<15}: missing in {count} samples")


📦 Processing training+validation set in parallel...


  0%|          | 0/12 [00:00<?, ?it/s]