Skip to content

Commit

Permalink
refactor complex function
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasGensollen committed Nov 9, 2023
1 parent 662f778 commit a230608
Showing 1 changed file with 130 additions and 92 deletions.
222 changes: 130 additions & 92 deletions clinica/iotools/utils/data_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,38 @@

from os import PathLike
from pathlib import Path
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Tuple

import click
import pandas as pd
from nibabel.nifti1 import Nifti1Header
from numpy import ndarray


def _validate_output_tsv_path(out_path: Path) -> Path:
def _validate_output_tsv_path(out_path: PathLike) -> Path:
"""Validate that provided file path is a TSV file.
If folders do not exist, this function will create them.
If provided path is a directory, this function will return
a file named 'merge.tsv' within this directory.
"""
out_path = out_path.resolve()
import warnings

from clinica.utils.stream import cprint

out_path = Path(out_path).resolve()
if out_path.is_dir():
out_path = out_path / "merge.tsv"
elif "." not in out_path.name:
out_path = out_path.with_suffix(".tsv")
elif out_path.suffix != ".tsv":
raise TypeError("Output path extension must be tsv.")
if out_path.exists():
msg = (
f"Path to TSV file {out_path} already exists. The file will be overwritten."
)
warnings.warn(msg)
cprint(msg=msg, lvl="warning")
out_dir = out_path.parent
if not out_dir.exists():
out_dir.mkdir(parents=True)
Expand Down Expand Up @@ -64,53 +75,89 @@ def create_merge_file(
ignore_sessions_files : bool, optional
If True the information related to sessions and scans is not read.
"""
import json
from os import path
from pathlib import Path

import numpy as np
import pandas as pd

from clinica.utils.participant import get_subject_session_list
from clinica.utils.stream import cprint

from .pipeline_handling import DatasetError
bids_dir = Path(bids_dir)
caps_dir = _validate_caps_dir(caps_dir)
out_path = _validate_output_tsv_path(out_tsv)
sub_ses_df, participants_df = _get_participants_and_subjects_sessions_df(
bids_dir, tsv_file, ignore_sessions_files
)
merged_df = _create_bids_merge_file(
bids_dir, sub_ses_df, participants_df, ignore_scan_files, ignore_sessions_files
)
merged_df.to_csv(out_path, sep="\t", index=False)
cprint("End of BIDS information merge.", lvl="debug")
merged_df.reset_index(drop=True, inplace=True)
if caps_dir is not None:
merged_df, merged_summary_df = _create_caps_merge_file(
caps_dir, merged_df, pipelines, **kwargs
)
summary_path = path.splitext(str(out_path))[0] + "_summary.tsv"
merged_summary_df.to_csv(summary_path, sep="\t", index=False)
merged_df.to_csv(out_path, sep="\t", index=False)
cprint("End of CAPS information merge.", lvl="debug")


def _validate_caps_dir(caps_dir: Optional[PathLike] = None) -> Optional[Path]:
if caps_dir is not None:
caps_dir = Path(caps_dir)
if not caps_dir.is_dir():
raise IOError("The path to the CAPS directory is wrong")
return caps_dir

bids_dir = Path(bids_dir)

def _get_participants_and_subjects_sessions_df(
bids_dir: Path,
tsv_file: Optional[PathLike] = None,
ignore_sessions_files: bool = False,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
from clinica.utils.participant import get_subject_session_list
from clinica.utils.stream import cprint

index_cols = ["participant_id", "session_id"]
sessions, subjects = get_subject_session_list(
bids_dir, ss_file=tsv_file, use_session_tsv=(not ignore_sessions_files)
)
if (bids_dir / "participants.tsv").is_file():
participants_df = pd.read_csv(bids_dir / "participants.tsv", sep="\t")
else:
participants_df = pd.DataFrame(list(set(subjects)), columns=["participant_id"])

sub_ses_df = pd.DataFrame(
[[subject, session] for subject, session in zip(subjects, sessions)],
columns=["participant_id", "session_id"],
columns=index_cols,
)

try:
sub_ses_df.set_index(
["participant_id", "session_id"], inplace=True, verify_integrity=True
)
sub_ses_df.set_index(index_cols, inplace=True, verify_integrity=True)
except ValueError:
cprint(
"Found duplicate subject/session pair. Keeping first occurrence.",
lvl="warning",
)
sub_ses_df = sub_ses_df.drop_duplicates(subset=["participant_id", "session_id"])
sub_ses_df.set_index(["participant_id", "session_id"], inplace=True)
sub_ses_df = sub_ses_df.drop_duplicates(subset=index_cols)
sub_ses_df.set_index(index_cols, inplace=True)

out_path = _validate_output_tsv_path(Path(out_tsv))
merged_df = pd.DataFrame(columns=participants_df.columns.values)
return participants_df, sub_ses_df


def _create_bids_merge_file(
bids_dir: Path,
sub_ses_df: pd.DataFrame,
participants_df: pd.DataFrame,
ignore_scan_files: bool = False,
ignore_sessions_files: bool = False,
) -> pd.DataFrame:
import json

import numpy as np

from clinica.utils.stream import cprint

# BIDS part
from .pipeline_handling import DatasetError

merged_df = pd.DataFrame(columns=participants_df.columns.values)
for subject, subject_df in sub_ses_df.groupby(level=0):
sub_path = bids_dir / subject
row_participant_df = participants_df[
Expand Down Expand Up @@ -143,24 +190,22 @@ def create_merge_file(
scan_path = (
bids_dir / subject / session / f"{subject}_{session}_scans.tsv"
)
row_scans_df = pd.DataFrame()
if scan_path.is_file() and not ignore_scan_files:
scans_dict = dict()
scans_df = pd.read_csv(scan_path, sep="\t")
for idx in scans_df.index.values:
filepath = scans_df.loc[idx, "filename"]
if filepath.endswith(".nii.gz"):
filename = path.basename(filepath).split(".")[0]
filename = Path(filepath).name.split(".")[0]
modality = "_".join(filename.split("_")[2::])
for col in scans_df.columns.values:
if col == "filename":
pass
else:
if col != "filename":
value = scans_df.loc[idx, col]
new_col_name = f"{modality}_{col}"
scans_dict.update({new_col_name: value})
json_path = (
bids_dir / subject / session / filepath.split(".")[0]
+ ".json"
bids_dir / subject / session / f"{filename}.json"
)
if json_path.exists():
with open(json_path, "r") as f:
Expand All @@ -172,8 +217,6 @@ def create_merge_file(
str(key): str(value) for key, value in scans_dict.items()
}
row_scans_df = pd.DataFrame(scans_dict, index=[0])
else:
row_scans_df = pd.DataFrame()

row_df = pd.concat(
[row_participant_df, row_session_df, row_scans_df], axis=1
Expand All @@ -187,77 +230,72 @@ def create_merge_file(
col_list.insert(0, col_list.pop(col_list.index("participant_id")))
col_list.insert(1, col_list.pop(col_list.index("session_id")))
merged_df = merged_df[col_list]

tmp = merged_df.select_dtypes(include=[np.number])
# Round numeric values in dataframe to 6 decimal values
merged_df.loc[:, tmp.columns] = np.round(tmp, 6)
merged_df.to_csv(out_path, sep="\t", index=False)
cprint("End of BIDS information merge.", lvl="debug")

merged_df.reset_index(drop=True, inplace=True)
return merged_df

# CAPS
if caps_dir is not None:
# Call the different pipelines
from .pipeline_handling import (
dwi_dti_pipeline,
pet_volume_pipeline,
t1_freesurfer_longitudinal_pipeline,
t1_freesurfer_pipeline,
t1_volume_pipeline,
)

pipeline_options = {
"t1-volume": t1_volume_pipeline,
"pet-volume": pet_volume_pipeline,
"t1-freesurfer": t1_freesurfer_pipeline,
"t1-freesurfer-longitudinal": t1_freesurfer_longitudinal_pipeline,
"dwi-dti": dwi_dti_pipeline,
}
merged_summary_df = pd.DataFrame()
if not pipelines:
for pipeline_name, pipeline_fn in pipeline_options.items():
merged_df, summary_df = pipeline_fn(caps_dir, merged_df, **kwargs)
if summary_df is not None and not summary_df.empty:
merged_summary_df = pd.concat([merged_summary_df, summary_df])

if summary_df is None or summary_df.empty:
cprint(
f"{pipeline_name} outputs were not found in the CAPS folder."
)
else:
for pipeline in pipelines:
merged_df, summary_df = pipeline_options[pipeline](
caps_dir, merged_df, **kwargs
)
merged_summary_df = pd.concat([merged_summary_df, summary_df])
def _create_caps_merge_file(
caps_dir: Path,
merged_df: pd.DataFrame,
pipelines: Optional[List[str]] = None,
**kwargs,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
import numpy as np

n_atlas = len(merged_summary_df)
if n_atlas == 0:
raise FileNotFoundError(
"No outputs were found for any pipeline in the CAPS folder. "
"The output only contains BIDS information."
)
columns = merged_df.columns.values.tolist()
merged_summary_df.reset_index(inplace=True, drop=True)
for idx in merged_summary_df.index:
first_column_name = merged_summary_df.loc[idx, "first_column_name"]
last_column_name = merged_summary_df.loc[idx, "last_column_name"]
merged_summary_df.loc[idx, "first_column_index"] = columns.index(
first_column_name
)
merged_summary_df.loc[idx, "last_column_index"] = columns.index(
last_column_name
from clinica.utils.stream import cprint

from .pipeline_handling import (
dwi_dti_pipeline,
pet_volume_pipeline,
t1_freesurfer_longitudinal_pipeline,
t1_freesurfer_pipeline,
t1_volume_pipeline,
)

pipeline_options = {
"t1-volume": t1_volume_pipeline,
"pet-volume": pet_volume_pipeline,
"t1-freesurfer": t1_freesurfer_pipeline,
"t1-freesurfer-longitudinal": t1_freesurfer_longitudinal_pipeline,
"dwi-dti": dwi_dti_pipeline,
}
merged_summary_df = pd.DataFrame()
if not pipelines:
for pipeline_name, pipeline_fn in pipeline_options.items():
merged_df, summary_df = pipeline_fn(caps_dir, merged_df, **kwargs)
if summary_df is not None and not summary_df.empty:
merged_summary_df = pd.concat([merged_summary_df, summary_df])
if summary_df is None or summary_df.empty:
cprint(f"{pipeline_name} outputs were not found in the CAPS folder.")
else:
for pipeline in pipelines:
merged_df, summary_df = pipeline_options[pipeline](
caps_dir, merged_df, **kwargs
)
merged_summary_df = pd.concat([merged_summary_df, summary_df])

summary_path = path.splitext(str(out_path))[0] + "_summary.tsv"
merged_summary_df.to_csv(summary_path, sep="\t", index=False)
if len(merged_summary_df) == 0:
raise FileNotFoundError(
"No outputs were found for any pipeline in the CAPS folder. "
"The output only contains BIDS information."
)
columns = merged_df.columns.values.tolist()
merged_summary_df.reset_index(inplace=True, drop=True)
for idx in merged_summary_df.index:
first_column_name = merged_summary_df.loc[idx, "first_column_name"]
last_column_name = merged_summary_df.loc[idx, "last_column_name"]
merged_summary_df.loc[idx, "first_column_index"] = columns.index(
first_column_name
)
merged_summary_df.loc[idx, "last_column_index"] = columns.index(
last_column_name
)
tmp = merged_df.select_dtypes(include=[np.number])
merged_df.loc[:, tmp.columns] = np.round(tmp, 12)

tmp = merged_df.select_dtypes(include=[np.number])
# Round numeric values in dataframe to 12 floating point values
merged_df.loc[:, tmp.columns] = np.round(tmp, 12)
merged_df.to_csv(out_path, sep="\t", index=False)
cprint("End of CAPS information merge.", lvl="debug")
return merged_df, merged_summary_df


def find_mods_and_sess(bids_dir):
Expand Down

0 comments on commit a230608

Please sign in to comment.