Allow nested event loops.

In [1]:
import nest_asyncio
nest_asyncio.apply()

Source and raw dataset path setup.

Using the standalone storage convention.

In [2]:
from pathlib import Path

datasets_path = Path.home() / "Datasets" / "nifd-testing" 
sourcedata_path = datasets_path / "sourcedata"
rawdata_path = datasets_path / "rawdata"

print(f"Converting from {sourcedata_path} to {rawdata_path}.")

Converting from /Users/ghislain.vaillant/Datasets/nifd-testing/sourcedata to /Users/ghislain.vaillant/Datasets/nifd-testing/rawdata.


### First task definition

In [3]:
from os import PathLike
from typing import Tuple
from pydra.mark import annotate, task


@task
@annotate({"return": {"clinical_data": PathLike, "imaging_data": PathLike}})
def get_collection_files(sourcedata: PathLike):
    from pathlib import Path

    sourcedata_path = Path(sourcedata)
    
    return (
        next(sourcedata_path.glob("*.csv")),
        next(sourcedata_path.glob("*.zip")),
    )

Testing our first task.

In [4]:
task = get_collection_files(sourcedata=sourcedata_path)
res1 = task()

res1

Result(output=Output(clinical_data=PosixPath('/Users/ghislain.vaillant/Datasets/nifd-testing/sourcedata/NIFD_TESTING_8_09_2021.csv'), imaging_data=PosixPath('/Users/ghislain.vaillant/Datasets/nifd-testing/sourcedata/NIFD_TESTING.zip')), runtime=None, errored=False)

### More task definitions

In [5]:
from os import PathLike
from pandas import DataFrame
from pydra.mark import annotate, task


@task
@annotate(
    {
        "return": {
            "participant_metadata": DataFrame,
            "session_metadata": DataFrame,
            "image_data_id": DataFrame,
        }
    }
)
def extract_metadata(clinical_data: PathLike):
    from pandas import read_csv

    # Read and sanitise clinical data.
    dataframe = (
        read_csv(
            clinical_data,
            index_col="Image Data ID",
            parse_dates=["Acq Date", "Downloaded"],
        )
        .rename_axis(index=lambda x: x.lower().replace(" ", "_"))
        .rename(columns=lambda x: x.lower().replace(" ", "_"))
        .convert_dtypes()
    )

    # Normalise participant and session identifiers.
    dataframe["participant_id"] = "sub-" + dataframe.subject.str.replace("_", "")
    dataframe["session_id"] = "ses-" + dataframe.visit.astype("string")

    # Extract participant-level metadata, i.e. sex and group.
    participant_metadata = (
        dataframe.groupby(by="participant_id", sort="session_id")
        .first()
    )[["sex", "group"]]
    
    # Extract session-level metadata, i.e. age.
    session_metadata = (
        dataframe.groupby(by=["participant_id", "session_id"], sort=True)
        .first()
    )[["age"]]

    # Extract images to convert.
    image_data_id = (
        dataframe[
            (dataframe.modality == "MRI")
            & (dataframe.format == "DCM")
            & (dataframe.description.str.contains("mprage"))
        ]
        .reset_index()
        .groupby(by=["participant_id", "session_id"], sort="acq_date")
        .first()
    )[["image_data_id"]]

    return participant_metadata, session_metadata, image_data_id

Testing

In [6]:
task = extract_metadata(clinical_data=res1.output.clinical_data)
res2 = task()

print(res2.output.participant_metadata)
print(res2.output.session_metadata)
print(res2.output.image_data_id)

               sex    group
participant_id             
sub-1S0006       M  Patient
sub-2S0004       M  Patient
sub-3S0004       F  Patient
                           age
participant_id session_id     
sub-1S0006     ses-1        62
               ses-2        62
               ses-3        62
sub-2S0004     ses-1        66
               ses-2        66
sub-3S0004     ses-1        57
               ses-2        57
               ses-3        58
                          image_data_id
participant_id session_id              
sub-1S0006     ses-1            I216358
               ses-2            I227726
               ses-3            I245467
sub-3S0004     ses-1            I710048
               ses-2            I709763
               ses-3            I709749


Writing task

In [7]:
from os import PathLike
from pandas import DataFrame
from pydra.mark import task


@task
def write_metadata(
    rawdata: PathLike,
    participant_metadata: DataFrame,
    session_metadata: DataFrame,
) -> None:
    from pathlib import Path
    
    # Write participants.tsv
    with (Path(rawdata) / "participants.tsv").open(mode="w") as f:
        participant_metadata.to_csv(f, sep="\t", na_rep="n/a")
    
    # Write sessions.tsv
    for participant_id, session_group in session_metadata.groupby("participant_id"):
        session_group = session_group.droplevel("participant_id")

        participant_dir = Path(rawdata) / str(participant_id)
        participant_dir.mkdir(parents=True, exist_ok=True)

        with (participant_dir / f"{str(participant_id)}_sessions.tsv").open("w") as f:
            session_group.to_csv(f, sep="\t", na_rep="n/a")




In [8]:
from os import PathLike
from pandas import DataFrame
from pydra.mark import annotate, task


@task
@annotate({"return": {"image_data_files": list}})
def extract_imaging_data(imaging_data: PathLike, image_data_id: DataFrame):
    from pathlib import PurePath
    from zipfile import ZipFile

    def parse_image_data_id(name: str):     
        return str(PurePath(name).stem).rsplit("_")[-1]

    return DataFrame.from_records(
        [
            (parse_image_data_id(name), PurePath(name).parent)
            for name in ZipFile(imaging_data).namelist()
        ],
        columns=["image_data_id", "file"]
    )

In [9]:
task = extract_imaging_data(
    imaging_data=res1.output.imaging_data,
    image_data_ids=res2.output.image_data_id,
)
res4 = task()

res4

Result(output=Output(image_data_files=     image_data_id                                               file
0          I216349  NIFD_1_S_0006_MRI_2010_08_17_20210729171246574...
1          I216349  NIFD_1_S_0006_MRI_2010_08_17_20210729171246574...
2          I216349  NIFD_1_S_0006_MRI_2010_08_17_20210729171246574...
3          I216349  NIFD_1_S_0006_MRI_2010_08_17_20210729171246574...
4          I216349  NIFD_1_S_0006_MRI_2010_08_17_20210729171246574...
...            ...                                                ...
5992       I710051  NIFD_3_S_0004_MRI_2012_02_01_20210729215714866...
5993       I710051  NIFD_3_S_0004_MRI_2012_02_01_20210729215714866...
5994       I710051  NIFD_3_S_0004_MRI_2012_02_01_20210729215714866...
5995       I710051  NIFD_3_S_0004_MRI_2012_02_01_20210729215714866...
5996       I710051  NIFD_3_S_0004_MRI_2012_02_01_20210729215714866...

[5997 rows x 2 columns]), runtime=None, errored=False)

### Workflow definition

In [10]:
from pydra import Workflow

workflow = Workflow(
    name="nifd_to_bids",
    input_spec=["sourcedata", "rawdata"],
    sourcedata=sourcedata_path,
    rawdata=rawdata_path,
)

workflow.add(
    get_collection_files(
        name="get_collection_files",
        sourcedata=workflow.lzin.sourcedata,
    )
)

workflow.add(
    extract_metadata(
        name="extract_metadata",
        clinical_data=workflow.get_collection_files.lzout.clinical_data,
    )
)

workflow.add(
    write_metadata(
        name="write_metadata",
        rawdata=workflow.lzin.rawdata,
        participant_metadata=workflow.extract_metadata.lzout.participant_metadata,
        session_metadata=workflow.extract_metadata.lzout.session_metadata,
    )
)

workflow.set_output([])

### Workflow execution

In [11]:
from pydra import Submitter

with Submitter(plugin="cf") as submitter:
    submitter(workflow)