diff --git a/clinica/iotools/utils/data_handling.py b/clinica/iotools/utils/data_handling.py index 4a714484c4..94ef87d8c6 100644 --- a/clinica/iotools/utils/data_handling.py +++ b/clinica/iotools/utils/data_handling.py @@ -2,27 +2,38 @@ from os import PathLike from pathlib import Path -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Tuple import click +import pandas as pd from nibabel.nifti1 import Nifti1Header from numpy import ndarray -def _validate_output_tsv_path(out_path: Path) -> Path: +def _validate_output_tsv_path(out_path: PathLike) -> Path: """Validate that provided file path is a TSV file. If folders do not exist, this function will create them. If provided path is a directory, this function will return a file named 'merge.tsv' within this directory. """ - out_path = out_path.resolve() + import warnings + + from clinica.utils.stream import cprint + + out_path = Path(out_path).resolve() if out_path.is_dir(): out_path = out_path / "merge.tsv" elif "." not in out_path.name: out_path = out_path.with_suffix(".tsv") elif out_path.suffix != ".tsv": raise TypeError("Output path extension must be tsv.") + if out_path.exists(): + msg = ( + f"Path to TSV file {out_path} already exists. The file will be overwritten." + ) + warnings.warn(msg) + cprint(msg=msg, lvl="warning") out_dir = out_path.parent if not out_dir.exists(): out_dir.mkdir(parents=True) @@ -64,24 +75,49 @@ def create_merge_file( ignore_sessions_files : bool, optional If True the information related to sessions and scans is not read. """ - import json from os import path - from pathlib import Path - import numpy as np - import pandas as pd - - from clinica.utils.participant import get_subject_session_list from clinica.utils.stream import cprint - from .pipeline_handling import DatasetError + bids_dir = Path(bids_dir) + caps_dir = _validate_caps_dir(caps_dir) + out_path = _validate_output_tsv_path(out_tsv) + sub_ses_df, participants_df = _get_participants_and_subjects_sessions_df( + bids_dir, tsv_file, ignore_sessions_files + ) + merged_df = _create_bids_merge_file( + bids_dir, sub_ses_df, participants_df, ignore_scan_files, ignore_sessions_files + ) + merged_df.to_csv(out_path, sep="\t", index=False) + cprint("End of BIDS information merge.", lvl="debug") + merged_df.reset_index(drop=True, inplace=True) + if caps_dir is not None: + merged_df, merged_summary_df = _create_caps_merge_file( + caps_dir, merged_df, pipelines, **kwargs + ) + summary_path = path.splitext(str(out_path))[0] + "_summary.tsv" + merged_summary_df.to_csv(summary_path, sep="\t", index=False) + merged_df.to_csv(out_path, sep="\t", index=False) + cprint("End of CAPS information merge.", lvl="debug") + +def _validate_caps_dir(caps_dir: Optional[PathLike] = None) -> Optional[Path]: if caps_dir is not None: caps_dir = Path(caps_dir) if not caps_dir.is_dir(): raise IOError("The path to the CAPS directory is wrong") + return caps_dir - bids_dir = Path(bids_dir) + +def _get_participants_and_subjects_sessions_df( + bids_dir: Path, + tsv_file: Optional[PathLike] = None, + ignore_sessions_files: bool = False, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + from clinica.utils.participant import get_subject_session_list + from clinica.utils.stream import cprint + + index_cols = ["participant_id", "session_id"] sessions, subjects = get_subject_session_list( bids_dir, ss_file=tsv_file, use_session_tsv=(not ignore_sessions_files) ) @@ -89,28 +125,39 @@ def create_merge_file( participants_df = pd.read_csv(bids_dir / "participants.tsv", sep="\t") else: participants_df = pd.DataFrame(list(set(subjects)), columns=["participant_id"]) - sub_ses_df = pd.DataFrame( [[subject, session] for subject, session in zip(subjects, sessions)], - columns=["participant_id", "session_id"], + columns=index_cols, ) - try: - sub_ses_df.set_index( - ["participant_id", "session_id"], inplace=True, verify_integrity=True - ) + sub_ses_df.set_index(index_cols, inplace=True, verify_integrity=True) except ValueError: cprint( "Found duplicate subject/session pair. Keeping first occurrence.", lvl="warning", ) - sub_ses_df = sub_ses_df.drop_duplicates(subset=["participant_id", "session_id"]) - sub_ses_df.set_index(["participant_id", "session_id"], inplace=True) + sub_ses_df = sub_ses_df.drop_duplicates(subset=index_cols) + sub_ses_df.set_index(index_cols, inplace=True) - out_path = _validate_output_tsv_path(Path(out_tsv)) - merged_df = pd.DataFrame(columns=participants_df.columns.values) + return participants_df, sub_ses_df + + +def _create_bids_merge_file( + bids_dir: Path, + sub_ses_df: pd.DataFrame, + participants_df: pd.DataFrame, + ignore_scan_files: bool = False, + ignore_sessions_files: bool = False, +) -> pd.DataFrame: + import json + + import numpy as np + + from clinica.utils.stream import cprint - # BIDS part + from .pipeline_handling import DatasetError + + merged_df = pd.DataFrame(columns=participants_df.columns.values) for subject, subject_df in sub_ses_df.groupby(level=0): sub_path = bids_dir / subject row_participant_df = participants_df[ @@ -143,24 +190,22 @@ def create_merge_file( scan_path = ( bids_dir / subject / session / f"{subject}_{session}_scans.tsv" ) + row_scans_df = pd.DataFrame() if scan_path.is_file() and not ignore_scan_files: scans_dict = dict() scans_df = pd.read_csv(scan_path, sep="\t") for idx in scans_df.index.values: filepath = scans_df.loc[idx, "filename"] if filepath.endswith(".nii.gz"): - filename = path.basename(filepath).split(".")[0] + filename = Path(filepath).name.split(".")[0] modality = "_".join(filename.split("_")[2::]) for col in scans_df.columns.values: - if col == "filename": - pass - else: + if col != "filename": value = scans_df.loc[idx, col] new_col_name = f"{modality}_{col}" scans_dict.update({new_col_name: value}) json_path = ( - bids_dir / subject / session / filepath.split(".")[0] - + ".json" + bids_dir / subject / session / f"{filename}.json" ) if json_path.exists(): with open(json_path, "r") as f: @@ -172,8 +217,6 @@ def create_merge_file( str(key): str(value) for key, value in scans_dict.items() } row_scans_df = pd.DataFrame(scans_dict, index=[0]) - else: - row_scans_df = pd.DataFrame() row_df = pd.concat( [row_participant_df, row_session_df, row_scans_df], axis=1 @@ -187,77 +230,72 @@ def create_merge_file( col_list.insert(0, col_list.pop(col_list.index("participant_id"))) col_list.insert(1, col_list.pop(col_list.index("session_id"))) merged_df = merged_df[col_list] - tmp = merged_df.select_dtypes(include=[np.number]) - # Round numeric values in dataframe to 6 decimal values merged_df.loc[:, tmp.columns] = np.round(tmp, 6) - merged_df.to_csv(out_path, sep="\t", index=False) - cprint("End of BIDS information merge.", lvl="debug") - merged_df.reset_index(drop=True, inplace=True) + return merged_df - # CAPS - if caps_dir is not None: - # Call the different pipelines - from .pipeline_handling import ( - dwi_dti_pipeline, - pet_volume_pipeline, - t1_freesurfer_longitudinal_pipeline, - t1_freesurfer_pipeline, - t1_volume_pipeline, - ) - pipeline_options = { - "t1-volume": t1_volume_pipeline, - "pet-volume": pet_volume_pipeline, - "t1-freesurfer": t1_freesurfer_pipeline, - "t1-freesurfer-longitudinal": t1_freesurfer_longitudinal_pipeline, - "dwi-dti": dwi_dti_pipeline, - } - merged_summary_df = pd.DataFrame() - if not pipelines: - for pipeline_name, pipeline_fn in pipeline_options.items(): - merged_df, summary_df = pipeline_fn(caps_dir, merged_df, **kwargs) - if summary_df is not None and not summary_df.empty: - merged_summary_df = pd.concat([merged_summary_df, summary_df]) - - if summary_df is None or summary_df.empty: - cprint( - f"{pipeline_name} outputs were not found in the CAPS folder." - ) - else: - for pipeline in pipelines: - merged_df, summary_df = pipeline_options[pipeline]( - caps_dir, merged_df, **kwargs - ) - merged_summary_df = pd.concat([merged_summary_df, summary_df]) +def _create_caps_merge_file( + caps_dir: Path, + merged_df: pd.DataFrame, + pipelines: Optional[List[str]] = None, + **kwargs, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + import numpy as np - n_atlas = len(merged_summary_df) - if n_atlas == 0: - raise FileNotFoundError( - "No outputs were found for any pipeline in the CAPS folder. " - "The output only contains BIDS information." - ) - columns = merged_df.columns.values.tolist() - merged_summary_df.reset_index(inplace=True, drop=True) - for idx in merged_summary_df.index: - first_column_name = merged_summary_df.loc[idx, "first_column_name"] - last_column_name = merged_summary_df.loc[idx, "last_column_name"] - merged_summary_df.loc[idx, "first_column_index"] = columns.index( - first_column_name - ) - merged_summary_df.loc[idx, "last_column_index"] = columns.index( - last_column_name + from clinica.utils.stream import cprint + + from .pipeline_handling import ( + dwi_dti_pipeline, + pet_volume_pipeline, + t1_freesurfer_longitudinal_pipeline, + t1_freesurfer_pipeline, + t1_volume_pipeline, + ) + + pipeline_options = { + "t1-volume": t1_volume_pipeline, + "pet-volume": pet_volume_pipeline, + "t1-freesurfer": t1_freesurfer_pipeline, + "t1-freesurfer-longitudinal": t1_freesurfer_longitudinal_pipeline, + "dwi-dti": dwi_dti_pipeline, + } + merged_summary_df = pd.DataFrame() + if not pipelines: + for pipeline_name, pipeline_fn in pipeline_options.items(): + merged_df, summary_df = pipeline_fn(caps_dir, merged_df, **kwargs) + if summary_df is not None and not summary_df.empty: + merged_summary_df = pd.concat([merged_summary_df, summary_df]) + if summary_df is None or summary_df.empty: + cprint(f"{pipeline_name} outputs were not found in the CAPS folder.") + else: + for pipeline in pipelines: + merged_df, summary_df = pipeline_options[pipeline]( + caps_dir, merged_df, **kwargs ) + merged_summary_df = pd.concat([merged_summary_df, summary_df]) - summary_path = path.splitext(str(out_path))[0] + "_summary.tsv" - merged_summary_df.to_csv(summary_path, sep="\t", index=False) + if len(merged_summary_df) == 0: + raise FileNotFoundError( + "No outputs were found for any pipeline in the CAPS folder. " + "The output only contains BIDS information." + ) + columns = merged_df.columns.values.tolist() + merged_summary_df.reset_index(inplace=True, drop=True) + for idx in merged_summary_df.index: + first_column_name = merged_summary_df.loc[idx, "first_column_name"] + last_column_name = merged_summary_df.loc[idx, "last_column_name"] + merged_summary_df.loc[idx, "first_column_index"] = columns.index( + first_column_name + ) + merged_summary_df.loc[idx, "last_column_index"] = columns.index( + last_column_name + ) + tmp = merged_df.select_dtypes(include=[np.number]) + merged_df.loc[:, tmp.columns] = np.round(tmp, 12) - tmp = merged_df.select_dtypes(include=[np.number]) - # Round numeric values in dataframe to 12 floating point values - merged_df.loc[:, tmp.columns] = np.round(tmp, 12) - merged_df.to_csv(out_path, sep="\t", index=False) - cprint("End of CAPS information merge.", lvl="debug") + return merged_df, merged_summary_df def find_mods_and_sess(bids_dir):