# Imports

In [1]:
import os
import os.path as op
import datetime
from glob import glob
import re
import shutil
import sys

import numpy as np
import pandas as pd

from general.basic import helper_funcs as hf

In [2]:
# Set globals
overwrite = True
process_unused_mris = False

# Find this file
try:
    print(f"- Running {__file__}")
except NameError:
    __file__ = "/mnt/coredata/processing/leads/code/working/scratchpad.ipynb"
    print(f"- Could not find this script file, setting __file__ = {__file__}")

# Set global paths and check that they exist
ROOT = "/mnt/coredata"
PATHS = {}
if not __file__.startswith(ROOT):
    raise ValueError(
        f"Could not parse the project path because {__file__} is not in {ROOT}"
    )
PATHS["proj"] = "/".join(__file__.split("/")[:5])
PATHS["code"] = op.join(PATHS["proj"], "code")
PATHS["config"] = op.join(PATHS["code"], "config")
PATHS["metadata"] = op.join(PATHS["proj"], "metadata")
PATHS["scans_to_process"] = op.join(PATHS["metadata"], "scans_to_process")
PATHS["ssheets"] = op.join(PATHS["metadata"], "ssheets")
PATHS["data"] = op.join(PATHS["proj"], "data")
PATHS["newdata"] = op.join(PATHS["data"], "newdata")
PATHS["raw"] = op.join(PATHS["data"], "raw")
PATHS["processed"] = op.join(PATHS["data"], "processed")
for k, v in PATHS.items():
    if not op.isdir(v):
        raise ValueError(f"Expected {v}, but this directory does not exist")

PATHS

- Could not find this script file, setting __file__ = /mnt/coredata/processing/leads/code/working/scratchpad.ipynb


{'proj': '/mnt/coredata/processing/leads',
 'code': '/mnt/coredata/processing/leads/code',
 'config': '/mnt/coredata/processing/leads/code/config',
 'metadata': '/mnt/coredata/processing/leads/metadata',
 'scans_to_process': '/mnt/coredata/processing/leads/metadata/scans_to_process',
 'ssheets': '/mnt/coredata/processing/leads/metadata/ssheets',
 'data': '/mnt/coredata/processing/leads/data',
 'newdata': '/mnt/coredata/processing/leads/data/newdata',
 'raw': '/mnt/coredata/processing/leads/data/raw',
 'processed': '/mnt/coredata/processing/leads/data/processed'}

# Initial defs

In [8]:
def scrape_raw(raw_dir, verbose=True):
    """Scrape raw directory for all nifti files and parse them.

    Returns a pandas DataFrame with columns:
    - subj: subject ID
    - scan_date: scan date (YYYY-MM-DD)
    - scan_type: scan type (MRI modality or PET tracer)
    - raw_petf: full path to the nifti file in raw
    """
    raw_scans = glob(op.join(raw_dir, "**", "*.nii"), recursive=True)

    output = []
    for scanf in raw_scans:
        subj = _get_subj(scanf, raw_dir)
        scan_date = _get_scan_date(scanf)
        scan_type = _get_scan_type(scanf)
        output.append([subj, scan_date, scan_type, scanf])

    cols = ["subj", "scan_date", "scan_type", "raw_petf"]
    output = pd.DataFrame(output, columns=cols)

    # Add FDG to scan type for LONI files that don't save "FDG" in filename
    output.loc[output["scan_type"].isnull(), "scan_type"] = output.loc[
        output["scan_type"].isnull(), "raw_petf"
    ].apply(lambda x: "FDG" if (op.join(raw_dir, "fdg") in x) else np.nan)

    if verbose:
        if len(output) == 0:
            print("No scans found in raw")
        else:
            print(
                f"Found {len(output)} scans from {output['subj'].nunique()} subjects in {raw_dir}"
            )
    return output


def _get_subj(filepath, raw_dir):
    """Return the subject ID from filepath to the recon'd nifti.

    Parameters
    ----------
    filepath : str
        The filepath to the reconstructed nifti.

    Returns
    -------
    subj : str
        The subject ID parsed from the input file basename.
    """
    try:
        subj = filepath.replace(raw_dir + "/", "").split("/")[1]
        if len(subj) > 0:
            return subj
        else:
            return np.nan
    except IndexError:
        return np.nan


def _get_scan_date(filepath):
    """Return the scan date from filepath to the recon'd nifti.

    Iterates over filepath directories from right to left until it finds
    a filename or directory whose first 10 characters matches the date
    format YYYY-MM-DD.

    Returns np.nan if no scan date is found, otherwise a string like
    'YYYY-MM-DD'.
    """
    for d in filepath.split(op.sep)[::-1]:
        try:
            acqdate = check_dt_fmt(d[:10], raise_error=True)
            return acqdate
        except ValueError:
            pass
    return np.nan


def check_dt_fmt(datestr, raise_error=False):
    """Return datestr if formatted like YYYY-MM-DD.

    If raise_error is True, raise a ValueError if datestr is not
    formatted like YYYY-MM-DD. Otherwise return np.nan.
    """
    try:
        datestr_to_datetime(datestr)
        return datestr
    except ValueError:
        if raise_error:
            raise ValueError(f"{datestr} is not formatted like YYYY-MM-DD")
        else:
            return np.nan


def datestr_to_datetime(datestr):
    """Convert a date string to a datetime object."""
    return datetime.datetime.strptime(datestr, "%Y-%m-%d")


def _get_scan_type(filepath, scan_types=None):
    """Parse the filepath and return the scan type."""
    if scan_types is None:
        scan_types = load_scan_typesf(scan_typesf)
    basename = op.basename(filepath).lower()
    for k, v in scan_types.items():
        if k in basename:
            return v
    return np.nan


def date_diff(date1, date2, abs=False):
    """Return date2 - date1 in days."""
    try:
        diff = (date2 - date1).days
        if abs:
            return np.abs(diff)
        else:
            return diff
    except TypeError:
        return np.nan


def find_closest_mri(
    subj, scan_date, freesurfer_dir, limit_days=365, strict_limit=False
):
    """Return closest MRI date, days from PET, and Freesurfer path.

    Parameters
    ----------
    subj : str
        The subject ID.
    scan_date : str
        Scan date (YYYY-MM-DD) to match the closest MRI scan to.
    freesurfer_dir : str
        Path to the top-level freesurfer directory containing individual
        processed MRI directories like <subj>_<scan_date>.
    limit_days : int
        A warning is raised if no MRI scan is found within limit days
        and np.nan is returned.
    strict_limit : bool
        If True, np.nan is returned if no MRI scan is found within limit days.
    """
    proc_mris = glob(op.join(freesurfer_dir, f"{subj}_*"))
    if len(proc_mris) == 0:
        print(
            f"WARNING: {subj} scan on {scan_date} has no processed MRI scans in {freesurfer_dir}"
        )
        return np.nan, np.nan, np.nan

    proc_mri_dates = [check_dt_fmt(op.basename(p).split("_")[1]) for p in proc_mris]

    days_to_scan = []
    for d in proc_mri_dates:
        days_to_scan.append(
            date_diff(datestr_to_datetime(d), datestr_to_datetime(scan_date), abs=True)
        )
    closest_mri = proc_mris[np.argmin(days_to_scan)]
    closest_mri_date = proc_mri_dates[np.argmin(days_to_scan)]
    min_days = min(days_to_scan)

    if min_days > limit_days:
        print(
            f"WARNING: {subj} scan on {scan_date} has no matching MRI within {limit_days} days"
        )
        if strict_limit:
            return np.nan, np.nan, np.nan

    return closest_mri_date, min_days, closest_mri


def get_upstream_path(start_path, target_dir_name="code"):
    """Traverse upwards from start_path to find a directory with the name target_dir_name."""
    current_path = start_path
    while op.dirname(current_path) != current_path:  # Check if we've reached the root
        current_path = op.dirname(current_path)
        if op.basename(current_path) == target_dir_name:
            return current_path
    return None

# Move newdata to raw

In [13]:
# Ensure the base directory paths are absolute and normalized
newdata_dir = op.abspath(PATHS["newdata"])
raw_dir = op.abspath(PATHS["raw"])
do_cleanup = True
overwrite = False
verbose = True


def do_cleanup():
    print("Function removes everything in newdata here")


# Find all niftis in newdata
check_exts = (".nii", ".nii.gz", ".dcm")
glob_files = []
for ext in check_exts:
    glob_files.extend(glob(op.join(newdata_dir, f"**/*{ext}"), recursive=True))

# Print the welcome message
if verbose:
    print("- Moving MRI and PET scan data from newdata to raw")
    print(f" (newdata = {newdata_dir})")
    print(f" (raw     = {raw_dir})")

# If no niftis are found, print a message and return
if len(glob_files) == 0:
    if verbose:
        print(f"  * No niftis found in {newdata_dir}")
    if cleanup:
        do_cleanup()
    if verbose:
        print("")
    raise AssertionError("Function returns here")

# Find all unique nifti-containing directories in newdata
source_dirs = set([op.dirname(f) for f in glob_files])
if verbose:
    print(f"  * Found {len(source_dirs)} directories with niftis")

- Moving MRI and PET scan data from newdata to raw
 (newdata = /mnt/coredata/processing/leads/data/newdata)
 (raw     = /mnt/coredata/processing/leads/data/raw)
  * Found 1241 directories with niftis


In [28]:
skip_dirs = ["LEADS", "ADNI", "IDEAS", "SCAN", "PAD"]
ii = 0
for source_dir in source_dirs:
    # Create a matching file hierarchy in raw as in newdata
    scan_path = op.relpath(source_dir, newdata_dir)
    scan_path_parts = scan_path.split("/")
    if scan_path_parts[0].upper() in skip_dirs:
        scan_path = op.join(*scan_path_parts[1:])
    target_dir = op.join(raw_dir, scan_path)
    if ii > 9:
        break
    else:
        print(f"source: {source_dir}")
        print(f"target: {target_dir}")
        print("")
        ii += 1

# Load raw_scans

In [4]:
# Load the most recent raw_scans df
raw_scans = pd.read_csv(
    glob_sort_mtime(op.join(PATHS["metadata"], "log", f"raw_pet_scans_*.csv"))[0]
)
raw_scans = raw_scans.dropna().reset_index(drop=True)

print(f"raw_scans: {raw_scans.shape}")

raw_scans: (2067, 12)


# Match raw PET to closest MRI

In [8]:
PATHS = {}
SCAN_TYPES = None
TIMESTAMP = None


def set_globals():
    """Set module-level global variables"""
    global PATHS, SCAN_TYPES, TIMESTAMP

    # The project directory is assumed to be the 4th directory up from /
    # (e.g., /mnt/coredata/processing/leads)
    PATHS["proj"] = "/mnt/coredata/processing/leads"

    # Set global paths and check that they exist
    PATHS["code"] = op.join(PATHS["proj"], "code")
    PATHS["config"] = op.join(PATHS["code"], "config")
    PATHS["metadata"] = op.join(PATHS["proj"], "metadata")
    PATHS["scans_to_process"] = op.join(PATHS["metadata"], "scans_to_process")
    PATHS["ssheets"] = op.join(PATHS["metadata"], "ssheets")
    PATHS["data"] = op.join(PATHS["proj"], "data")
    PATHS["newdata"] = op.join(PATHS["data"], "newdata")
    PATHS["raw"] = op.join(PATHS["data"], "raw")
    PATHS["processed"] = op.join(PATHS["data"], "processed")
    for k in PATHS:
        if not op.isdir(PATHS[k]):
            raise ValueError(f"Expected {PATHS[k]}, but this directory does not exist")

    # Define the SCAN_TYPES dict
    SCAN_TYPES = load_scan_typesf()

    # Set the timestamp
    TIMESTAMP = now()


def load_scan_typesf(scan_typesf=None):
    """Load scan types CSV and return a {name_in: name_out} dict."""
    if scan_typesf is None:
        scan_typesf = op.join(PATHS["config"], "scan_types_and_tracers.csv")
    scan_types = pd.read_csv(scan_typesf)
    scan_types["name_in"] = scan_types["name_in"].str.lower()
    scan_types = scan_types.drop_duplicates("name_in").dropna()
    scan_types = scan_types.set_index("name_in")["name_out"].to_dict()
    return scan_types


def now():
    """Return the current date and time down to seconds."""
    return datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")


set_globals()


def main(overwrite, process_unused_mris):
    """Save CSV files for MRI and PET scans in the raw directory.

    Indicate which scans need to be processed.
    """
    # Print the welcome message
    print("- Selecting scans to process")

    # Get a list of all directories containing .nii files
    print(f"  * Searching {PATHS['raw']} for all *.nii files")
    raw_niis = fast_recursive_glob_nii(PATHS["raw"])

    # Find the subject ID, scan type, acquisition date, and LONI image ID
    # for each nifti file in raw
    raw_niis = pd.DataFrame(raw_niis, columns=["raw_niif"])
    raw_niis.insert(0, "subj", raw_niis["raw_niif"].apply(get_subj))
    raw_niis.insert(1, "scan_type", raw_niis["raw_niif"].apply(get_scan_type))
    raw_niis.insert(2, "scan_date", raw_niis["raw_niif"].apply(get_scan_date))
    raw_niis.insert(3, "image_id", raw_niis["raw_niif"].apply(get_image_id))

    # Convert the date column to datetime
    raw_niis["scan_date"] = pd.to_datetime(raw_niis["scan_date"])

    # Separate MRI and PET scans into separate dataframes
    raw_mris = raw_niis.query("(scan_type=='MRI-T1')").reset_index(drop=True)
    raw_pets = raw_niis.query("(scan_type!='MRI-T1')").reset_index(drop=True)

    # Rename columns
    raw_mris = raw_mris.rename(
        columns={
            "scan_date": "mri_date",
            "image_id": "mri_image_id",
            "raw_niif": "mri_raw_niif",
        }
    )
    raw_pets = raw_pets.rename(
        columns={
            "scan_type": "tracer",
            "scan_date": "pet_date",
            "image_id": "pet_image_id",
            "raw_niif": "pet_raw_niif",
        }
    )

    # Get rid of scan_type column in the MRI dataframe (all scans are T1)
    raw_mris = raw_mris.drop(columns=["scan_type"])

    # Sort MRIs by subject and scan date
    raw_mris = raw_mris.sort_values(["subj", "mri_date"]).reset_index(drop=True)

    # Sort PET scans by subject, tracer, and scan date
    raw_pets = raw_pets.sort_values(["subj", "tracer", "pet_date"]).reset_index(
        drop=True
    )

    # Get PET resolution from filename
    raw_pets.insert(
        raw_pets.columns.tolist().index("pet_date") + 1,
        "pet_res",
        raw_pets["pet_raw_niif"].apply(get_pet_resolution),
    )

    # Report how many MRI and PET scans are in the raw directory
    print(
        "  * Found {:,} niftis in {:,} subdirectories".format(
            len(raw_niis), len([op.dirname(f) for f in raw_niis["raw_niif"]])
        )
    )
    print(f"    - {len(raw_mris):,} T1 MRIs")
    print(f"    - {len(raw_pets):,} PET scans")
    for scan_type in raw_pets["tracer"].unique():
        print(
            "      * {} {}".format(
                len(raw_pets.loc[raw_pets["tracer"] == scan_type]), scan_type
            )
        )

    # Calculate the number of MRIs for each subject and the time between
    # them
    raw_mris = add_mri_date_columns(raw_mris)

    # Calculate the number of PET scans for each subject and tracer, and
    # the time between them
    raw_pets = add_pet_date_columns(raw_pets)

    # Match each PET scan to its closest MRI
    raw_pets = find_closest_mri_to_pet(raw_pets, raw_mris)
    print("  * Matching PET scans to their closest T1 MRIs")

    # Figure out which MRIs are actually used for PET processing
    raw_mris["mri_used_for_pet_proc"] = raw_mris["mri_image_id"].apply(
        lambda x: 1 if np.isin(x, raw_pets["mri_image_id"]) else 0
    )
    idx = raw_mris.loc[raw_mris["mri_used_for_pet_proc"] == 0].index.tolist()
    if process_unused_mris:
        msg = "      to any PET scan but will be processed in any case"
    else:
        msg = (
            "      to any PET scan and will not be added to the list of MRIs to process"
        )
    print(
        "  * Auditing MRI scans",
        "    - {:,}/{:,} MRIs in {} are not the closest MRI".format(
            len(idx),
            len(raw_mris),
            PATHS["raw"],
        ),
        msg,
        sep="\n",
    )

    # Flag PET scans with issues that preclude processing
    raw_pets = audit_pet(raw_pets)
    print("  * Auditing PET scans")
    print(
        "    - Flagged {:,} scans with issues to be resolved before processing".format(
            len(raw_pets.loc[raw_pets["flag"] == 1])
        )
    )

    # Add path to the processed MRI directory
    ii = raw_mris.columns.tolist().index("mri_raw_niif")
    raw_mris["mri_proc_dir"] = raw_mris.apply(
        lambda x: get_mri_proc_dir(x["subj"], x["mri_date"]), axis=1
    )

    # Add path to the processed PET directory
    ii = raw_pets.columns.tolist().index("pet_raw_niif")
    raw_pets.insert(
        ii + 1,
        "pet_proc_dir",
        raw_pets.apply(
            lambda x: get_pet_proc_dir(x["subj"], x["tracer"], x["pet_date"]), axis=1
        ),
    )

    # Determine which MRIs have been processed
    raw_mris["freesurfer_run"] = raw_mris["mri_proc_dir"].apply(check_if_freesurfer_run)
    print(
        "  * {:,}/{:,} MRIs have been processed through Freesurfer".format(
            len(raw_mris.loc[raw_mris["freesurfer_run"] == 1]),
            len(raw_mris),
        )
    )

    raw_mris["mri_processed"] = raw_mris["mri_proc_dir"].apply(check_if_mri_processed)
    print(
        "  * {:,}/{:,} MRIs have been fully processed".format(
            len(raw_mris.loc[raw_mris["mri_processed"] == 1]),
            len(raw_mris),
        )
    )

    # Determine which PET scans have been processed
    raw_pets.insert(
        ii + 1, "pet_processed", raw_pets["pet_proc_dir"].apply(check_if_pet_processed)
    )
    print(
        "  * {:,}/{:,} PET scans have already been processed".format(
            len(raw_pets.loc[raw_pets["pet_processed"] == 1]),
            len(raw_pets),
        )
    )

    # Determine which MRI scans need to be processed
    raw_mris["need_to_process"] = raw_mris.apply(
        lambda x: get_mri_to_process(
            x["mri_used_for_pet_proc"],
            x["mri_processed"],
            overwrite,
            process_unused_mris,
        ),
        axis=1,
    )
    print(
        "  * {:,}/{:,} MRIs are scheduled for processing".format(
            len(raw_mris.loc[raw_mris["need_to_process"] == 1]),
            len(raw_mris),
        )
    )

    # Determine which PET scans need to be processed
    raw_pets["need_to_process"] = raw_pets.apply(
        lambda x: get_pet_to_process(x["flag"], x["pet_processed"], overwrite), axis=1
    )
    print(
        "  * {:,}/{:,} PET scans are scheduled for processing".format(
            len(raw_pets.loc[raw_pets["need_to_process"] == 1]),
            len(raw_pets),
        )
    )

    # Convert date columns to string
    fmt = "%Y-%m-%d"
    raw_mris["mri_date"] = raw_mris["mri_date"].dt.strftime(fmt)
    for col in ["pet_date", "mri_date"]:
        raw_pets[col] = raw_pets[col].dt.strftime(fmt)

    # Save the raw MRI scans dataframe to a CSV file
    save_mri_scan_index(raw_mris)

    # Save the raw PET scans dataframe to a CSV file
    save_pet_scan_index(raw_pets)


def fast_recursive_glob_nii(path):
    """Return a list of all files in path that end in .nii"""

    def _path_recurse(path):
        """Recursively traverse a directory and its subdirectories"""
        with os.scandir(path) as entries:
            for entry in entries:
                if entry.is_dir():
                    _path_recurse(entry.path)
                elif entry.is_file() and entry.name.endswith(".nii"):
                    nii_files.append(entry.path)

    nii_files = []
    _path_recurse(path)
    return nii_files


def glob_sort_mtime(pattern):
    """Return files matching pattern in most recent modified order.

    Returns
    -------
    files : list of str
        List of files matching pattern, sorted by most recent modified
        (files[0] is the most recently modified).
    """
    files = sorted(glob(pattern), key=op.getmtime, reverse=True)
    return files


def get_subj(filepath, raw_dir=None):
    """Return the subject ID from filepath to the recon'd nifti.

    Parameters
    ----------
    filepath : str
        The filepath to the reconstructed nifti.

    Returns
    -------
    subj : str
        The subject ID parsed from the input file basename.
    """
    if raw_dir is None:
        raw_dir = PATHS["raw"]
    try:
        subj = filepath.replace(raw_dir + "/", "").split("/")[0]
        if len(subj) > 0:
            return subj
        else:
            return np.nan
    except IndexError:
        return np.nan


def get_scan_type(filepath):
    """Search filepath and return the scan type"""

    def find_matches(text, scan_types, scan_type_keys):
        """Helper function to find matches"""
        matches = [key for key in scan_type_keys if key in text]
        unique_values = list(set([scan_types[key] for key in matches]))
        return unique_values

    # Load the SCAN_TYPES dict and get lowercase keys
    scan_type_keys = list(SCAN_TYPES)

    # Convert filepath to lowercase and get the basename
    filepath = filepath.lower()
    basename = op.basename(filepath)

    # First attempt to match in the basename
    match_values = find_matches(basename, SCAN_TYPES, scan_type_keys)
    if len(match_values) == 1:
        return match_values[0]
    elif len(match_values) > 1:
        return "FILENAME MATCHED MULTIPLE PET TRACERS OR MRI MODALITIES"

    # Check for specific substring in basename
    if "coreg,_avg,_std_img_and_vox_siz,_uniform" in basename:
        return "FDG"

    # Attempt to match in the directory path after "raw"
    dirname = op.dirname(filepath)
    raw_base = "/{}/".format(op.basename(PATHS["raw"]))
    raw_idx = dirname.find(raw_base)
    if raw_idx == -1:
        return "FAILED TO IDENTIFY PET TRACER OR MRI MODALITY FROM FILENAME"

    # Only consider part of the path after "raw"
    search_path = dirname[raw_idx + len(raw_base) :]
    match_values = find_matches(search_path, SCAN_TYPES, scan_type_keys)
    if len(match_values) == 1:
        return match_values[0]
    elif len(match_values) > 1:
        return "FILENAME MATCHED MULTIPLE PET TRACERS OR MRI MODALITIES"
    else:
        if "coreg,_avg,_std_img_and_vox_siz,_uniform" in search_path:
            return "FDG"
        else:
            return "FAILED TO IDENTIFY PET TRACER OR MRI MODALITY FROM FILENAME"


def get_scan_date(filepath):
    """Search filepath and return the scan date"""

    def test_datestr(str_to_search, date_start="2000-01-01", date_stop=None):
        """Return date if start of string is in YYYYMMDD format

        The date must be >= date_start and <= today's date.
        """
        # Remove hypens and underscores
        str_to_search = str_to_search.replace("-", "").replace("_", "")

        # Check if the first 8 characters are numeric
        if str_to_search is None or not re.match(r"^\d{8}", str_to_search):
            return False

        # Extract the date part of the string
        datestr = "{}-{}-{}".format(
            str_to_search[:4], str_to_search[4:6], str_to_search[6:8]
        )

        # Check if the date is valid and within the specified range
        date_fmt = "%Y-%m-%d"
        try:
            date_test = datetime.datetime.strptime(datestr, date_fmt).date()
            date_start = datetime.datetime.strptime(date_start, date_fmt).date()
            if date_stop is None:
                date_stop = datetime.date.today()
            else:
                date_stop = datetime.datetime.strptime(date_stop, date_fmt).date()
            if date_start <= date_test <= date_stop:
                return datestr
            else:
                return False
        except ValueError:
            return False

    basename = op.basename(filepath)
    parts = basename.split("_")
    for part in parts:
        datestr = test_datestr(part)
        if datestr:
            return datestr

    # If no date was found in the basename, try the dirname but limit
    # search to everything forward from the raw/ directory
    dirname = op.dirname(filepath)
    raw_base = "/{}/".format(op.basename(PATHS["raw"]))
    raw_idx = dirname.find(raw_base)
    if raw_idx == -1:
        return np.nan
    search_path = dirname[raw_idx + len(raw_base) :]
    parts = search_path.split("/")
    for part in parts:
        datestr = test_datestr(part)
        if datestr:
            return datestr

    return np.nan


def datetime_to_datestr(dt):
    """Convert a datetime object to a date string."""
    try:
        return dt.strftime("%Y-%m-%d")
    except ValueError:
        return np.nan


def get_image_id(filepath):
    """Search basename and return the LONI Image ID"""

    def test_image_id(str_to_search, pattern=r"^I[0-9]+$"):
        """Return str_to_search if it starts with 'I' and continues with numbers"""
        if str_to_search is None:
            return False
        if bool(re.match(pattern, str_to_search)):
            return str_to_search
        else:
            return False

    basename = op.basename(filepath)
    parts = basename.split("_")
    for part in parts:
        image_id = test_image_id(part)
        if image_id:
            return image_id
    return np.nan


def get_pet_resolution(filepath):
    """Search basename and return the PET resolution"""

    def test_resolution(str_to_search, pattern=r"uniform_([0-9])mm_res"):
        """Return scan resolution if the expected pattern is found"""
        if str_to_search is None:
            return False
        match = re.search(pattern, str_to_search)
        if match:
            return int(match.group(1))
        return False

    res = test_resolution(op.basename(filepath).lower())
    if res:
        return res
    else:
        return np.nan


def add_mri_date_columns(mri_scans):
    """Add info on time between PET and MRI and adjacent PET scans"""
    # Copy the input dataframe
    mri_scans_cp = mri_scans.copy()

    # Add columns for PET scan number and total number of scans per tracer
    ii = mri_scans_cp.columns.tolist().index("mri_date")
    grp = mri_scans_cp.groupby("subj")
    mri_scans_cp.insert(ii + 1, "mri_scan_number", grp.cumcount() + 1)
    mri_scans_cp.insert(
        ii + 2, "n_mri_scans", grp["mri_scan_number"].transform("count")
    )

    # Add columns for days from each PET scan to baseline and days between
    # consecutive PET scans per tracer
    baseline_mri_dates = grp["mri_date"].min()
    mri_scans_cp.insert(
        ii + 3,
        "days_from_baseline_mri",
        mri_scans_cp.apply(
            lambda x: (x["mri_date"] - baseline_mri_dates[x["subj"]]).days,
            axis=1,
        ),
    )
    mri_scans_cp.insert(
        ii + 4, "days_from_last_mri", grp["mri_date"].diff().dt.days.fillna(0)
    )

    return mri_scans_cp


def add_pet_date_columns(pet_scans):
    """Add info on time between PET and MRI and adjacent PET scans"""
    # Copy the input dataframe
    pet_scans_cp = pet_scans.copy()

    # Add columns for PET scan number and total number of scans per tracer
    ii = pet_scans_cp.columns.tolist().index("pet_date")
    grp = pet_scans_cp.groupby(["subj", "tracer"])
    pet_scans_cp = pet_scans_cp.sort_values(["subj", "tracer", "pet_date"])
    pet_scans_cp.insert(ii + 1, "pet_scan_number", grp.cumcount() + 1)
    pet_scans_cp.insert(
        ii + 2, "n_pet_scans", grp["pet_scan_number"].transform("count")
    )

    # Add columns for days from each PET scan to baseline and days between
    # consecutive PET scans per tracer
    baseline_pet_dates = grp["pet_date"].min()
    pet_scans_cp.insert(
        ii + 3,
        "days_from_baseline_pet",
        pet_scans_cp.apply(
            lambda x: (
                x["pet_date"] - baseline_pet_dates[(x["subj"], x["tracer"])]
            ).days,
            axis=1,
        ),
    )
    pet_scans_cp.insert(
        ii + 4, "days_from_last_pet", grp["pet_date"].diff().dt.days.fillna(0)
    )

    return pet_scans_cp


def find_closest_mri_to_pet(pet_scans, mri_scans):
    """Return a merged dataframe with the closest MRI to each PET scan"""
    # Check that the necessary columns are present
    assert np.all(
        np.isin(["subj", "tracer", "pet_date", "pet_image_id"], pet_scans.columns)
    )
    assert np.all(
        np.isin(
            ["subj", "mri_date", "mri_scan_number", "n_mri_scans"], mri_scans.columns
        )
    )

    # Copy the input dataframes
    pet_scans_cp = pet_scans.copy()
    mri_scans_cp = mri_scans.copy()

    # Merge the dataframes
    merged = pet_scans_cp.merge(mri_scans_cp, on="subj", how="left")

    # Compute the date difference between each PET and MRI scan
    ii = merged.columns.tolist().index("pet_date")
    merged.insert(
        ii + 1,
        "pet_to_mri_days",
        (merged["pet_date"] - merged["mri_date"]).abs().dt.days,
    )

    # Sort by |pet_date - mri_date| and drop duplicates
    merged_min = merged.sort_values("pet_to_mri_days").drop_duplicates("pet_image_id")

    # Resort the dataframe and reset index before returning
    merged_min = merged_min.sort_values(["subj", "tracer", "pet_date"]).reset_index(
        drop=True
    )

    return merged_min


def audit_pet(pet_scans):
    """Audit each PET scan and flag scans with potential issues"""
    pet_scans_cp = pet_scans.copy()
    pet_scans_cp["flag"] = 0
    pet_scans_cp["notes"] = ""

    # Missing tracer
    idx = pet_scans_cp.loc[
        pet_scans_cp["tracer"].str.startswith("FAILED TO IDENTIFY")
    ].index.tolist()
    pet_scans_cp.loc[idx, "flag"] = 1
    pet_scans_cp.loc[
        idx, "notes"
    ] += "Unknown tracer, check raw PET filename against scan_types_and_tracers.csv\n"

    # Multiple tracers
    idx = pet_scans_cp.loc[
        pet_scans_cp["tracer"].str.startswith("FILENAME MATCHED MULTIPLE")
    ].index.tolist()
    pet_scans_cp.loc[idx, "flag"] = 1
    pet_scans_cp.loc[
        idx, "notes"
    ] += (
        "Matched >1 tracer, check raw PET filename against scan_types_and_tracers.csv\n"
    )

    # Missing PET date
    idx = pet_scans_cp.loc[pd.isna(pet_scans_cp["pet_date"])].index.tolist()
    pet_scans_cp.loc[idx, "flag"] = 1
    pet_scans_cp.loc[idx, "notes"] += "Missing PET date\n"

    # PET resolution unclear or not at 6mm
    idx = pet_scans_cp.loc[pet_scans_cp["pet_res"] != 6].index.tolist()
    pet_scans_cp.loc[idx, "flag"] = 1
    pet_scans_cp.loc[idx, "notes"] += "PET resolution is unclear or not at 6mm\n"

    # Missing MRI
    idx = pet_scans_cp.loc[pd.isna(pet_scans_cp).any(axis=1)].index.tolist()
    pet_scans_cp.loc[idx, "flag"] = 1
    pet_scans_cp.loc[idx, "notes"] += f"No available MRI in {PATHS['raw']}\n"

    # PET and MRI scan dates > 365 days apart
    idx = pet_scans_cp.query("(pet_to_mri_days>365)").index.tolist()
    pet_scans_cp.loc[idx, "flag"] = 1
    pet_scans_cp.loc[idx, "notes"] += "Closest MRI is more than 1 year from PET date\n"

    # Same MRI used to process multiple PET scans for the same tracer
    idx = pet_scans_cp.loc[
        pet_scans_cp.groupby(["subj", "tracer"])["mri_image_id"].transform(
            lambda x: (
                True if (np.max(np.unique(x, return_counts=True)[1]) > 1) else False
            )
        )
    ].index.tolist()
    pet_scans_cp.loc[idx, "flag"] = 1
    pet_scans_cp.loc[
        idx, "notes"
    ] += "Same MRI would be used to process multiple timepoints for the same PET tracer\n"

    return pet_scans_cp


def get_mri_proc_dir(subj, mri_date):
    """Return the processed MRI directory for each MRI scan"""
    return op.join(
        PATHS["processed"], subj, "MRI-T1_{}".format(datetime_to_datestr(mri_date))
    )


def get_pet_proc_dir(subj, tracer, pet_date):
    """Return the processed PET directory for each PET scan"""
    return op.join(
        PATHS["processed"],
        subj,
        "{}_{}".format(tracer, datetime_to_datestr(pet_date)),
    )


def check_if_freesurfer_run(mri_proc_dir):
    """Return True if the MRI scan has been processed by Freesurfer"""
    freesurfer_dir = op.join(mri_proc_dir, "freesurfer")
    if not op.isdir(freesurfer_dir):
        return 0
    # Search for the Freesurfer output directory
    nuf = op.join(freesurfer_dir, "mri", "nu.mgz")
    aparcf = op.join(freesurfer_dir, "mri", "aparc+aseg.mgz")
    bstemf = op.join(freesurfer_dir, "mri", "brainstemSsLabels.v12.FSvoxelSpace.mgz")
    if op.isfile(nuf) and op.isfile(aparcf) and op.isfile(bstemf):
        return 1
    else:
        return 0


def check_if_mri_processed(mri_proc_dir):
    """Return True if the PET scan has been processed"""
    if not op.isdir(mri_proc_dir):
        return 0
    # Search for the warped and affine transformed nu.nii,
    # respectively, as these are among the last files created in the MRI
    # processing pipeline. Also make sure we find at least one mask.
    mfiles = glob(op.join(mri_proc_dir, "*mask-*.nii"))
    wfiles = glob(op.join(mri_proc_dir, "w*_nu.nii"))
    afiles = glob(op.join(mri_proc_dir, "a*_nu.nii"))
    if mfiles and wfiles and afiles:
        return 1
    else:
        return 0


def check_if_pet_processed(pet_proc_dir):
    """Return True if the PET scan has been processed"""
    if not op.isdir(pet_proc_dir):
        return 0
    # Search for the warped and affine transformed PET SUVRs,
    # respectively, as these are among the last files created in the PET
    # processing pipeline
    wfiles = glob(op.join(pet_proc_dir, "w*suvr*.nii"))
    afiles = glob(op.join(pet_proc_dir, "a*suvr*.nii"))
    if wfiles and afiles:
        return 1
    else:
        return 0


def get_mri_to_process(used_for_pet, processed, overwrite, process_unused_mris):
    """Return 1 if the MRI scan should be processed, otherwise 0"""
    if process_unused_mris or used_for_pet:
        if overwrite or not processed:
            return 1
    return 0


def get_pet_to_process(flagged, processed, overwrite):
    """Return 1 if the PET scan should be processed, otherwise 0"""
    if not flagged:
        if overwrite or not processed:
            return 1
    return 0


def save_mri_scan_index(mri_scans):
    """Save the MRI scan index to a CSV file"""
    # Move any existing files to archive
    archive_dir = op.join(PATHS["scans_to_process"], "archive")
    files = glob(op.join(PATHS["scans_to_process"], "Raw_MRI_Scan_Index_*.csv"))
    if files:
        if not op.isdir(archive_dir):
            os.makedirs(archive_dir)
        for f in files:
            os.rename(f, op.join(archive_dir, op.basename(f)))

    # Save the mri_scans dataframe
    outf = op.join(PATHS["scans_to_process"], f"Raw_MRI_Scan_Index_{TIMESTAMP}.csv")
    mri_scans.to_csv(outf, index=False)
    print(f"  * Saved raw MRI scan index to {outf}")


def save_pet_scan_index(pet_scans):
    """Save the PET scan index to a CSV file"""
    # Move any existing files to archive
    archive_dir = op.join(PATHS["scans_to_process"], "archive")
    files = glob(op.join(PATHS["scans_to_process"], "Raw_PET_Scan_Index_*.csv"))
    if files:
        if not op.isdir(archive_dir):
            os.makedirs(archive_dir)
        for f in files:
            os.rename(f, op.join(archive_dir, op.basename(f)))

    # Save the pet_scans dataframe
    outf = op.join(PATHS["scans_to_process"], f"Raw_PET_Scan_Index_{TIMESTAMP}.csv")
    pet_scans.to_csv(outf, index=False)
    print(f"  * Saved raw PET scan index to {outf}")

In [9]:
print(f"  * Searching {PATHS['raw']} for all *.nii files")
raw_niis = fast_recursive_glob_nii(PATHS["raw"])

# Find the subject ID, scan type, acquisition date, and LONI image ID
# for each nifti file in raw
raw_niis = pd.DataFrame(raw_niis, columns=["raw_niif"])
raw_niis.insert(0, "subj", raw_niis["raw_niif"].apply(get_subj))
raw_niis.insert(1, "scan_type", raw_niis["raw_niif"].apply(get_scan_type))
raw_niis.insert(2, "scan_date", raw_niis["raw_niif"].apply(get_scan_date))
raw_niis.insert(3, "image_id", raw_niis["raw_niif"].apply(get_image_id))

# Convert the date column to datetime
raw_niis["scan_date"] = pd.to_datetime(raw_niis["scan_date"])

# Separate MRI and PET scans into separate dataframes
raw_mris = raw_niis.query("(scan_type=='MRI-T1')").reset_index(drop=True)
raw_pets = raw_niis.query("(scan_type!='MRI-T1')").reset_index(drop=True)

# Rename columns
raw_mris = raw_mris.rename(
    columns={
        "scan_date": "mri_date",
        "image_id": "mri_image_id",
        "raw_niif": "mri_raw_niif",
    }
)
raw_pets = raw_pets.rename(
    columns={
        "scan_type": "tracer",
        "scan_date": "pet_date",
        "image_id": "pet_image_id",
        "raw_niif": "pet_raw_niif",
    }
)

# Get rid of scan_type column in the MRI dataframe (all scans are T1)
raw_mris = raw_mris.drop(columns=["scan_type"])

# Sort MRIs by subject and scan date
raw_mris = raw_mris.sort_values(["subj", "mri_date"]).reset_index(drop=True)

# Sort PET scans by subject, tracer, and scan date
raw_pets = raw_pets.sort_values(["subj", "tracer", "pet_date"]).reset_index(drop=True)

# Get PET resolution from filename
raw_pets.insert(
    raw_pets.columns.tolist().index("pet_date") + 1,
    "pet_res",
    raw_pets["pet_raw_niif"].apply(get_pet_resolution),
)

# Report how many MRI and PET scans are in the raw directory
print(
    "  * Found {:,} niftis in {:,} subdirectories".format(
        len(raw_niis), len([op.dirname(f) for f in raw_niis["raw_niif"]])
    )
)
print(f"    - {len(raw_mris):,} T1 MRIs")
print(f"    - {len(raw_pets):,} PET scans")
for scan_type in raw_pets["tracer"].unique():
    print(
        "      * {} {}".format(
            len(raw_pets.loc[raw_pets["tracer"] == scan_type]), scan_type
        )
    )

  * Searching /mnt/coredata/processing/leads/data/raw for all *.nii files
  * Found 3,309 niftis in 3,309 subdirectories
    - 1,240 T1 MRIs
    - 2,069 PET scans
      * 973 FBB
      * 159 FDG
      * 937 FTP


In [14]:
# pet_scans_cp.insert(ii + 1, "pet_scan_number", grp.cumcount() + 1)
pet_scans_cp.insert(ii + 2, "n_pet_scans", grp["pet_scan_number"].transform("count"))
pet_scans_cp.head(20)

Unnamed: 0,subj,tracer,pet_date,pet_scan_number,n_pet_scans,pet_res,pet_image_id,pet_raw_niif
0,LDS0070120,FBB,2019-06-19,1,1,6,I1779724,/mnt/coredata/processing/leads/data/raw/LDS007...
1,LDS0070120,FDG,2021-07-14,1,1,6,I1779574,/mnt/coredata/processing/leads/data/raw/LDS007...
2,LDS0070120,FTP,2019-06-20,1,1,6,I1778313,/mnt/coredata/processing/leads/data/raw/LDS007...
3,LDS0070166,FBB,2019-08-22,1,4,6,I1779005,/mnt/coredata/processing/leads/data/raw/LDS007...
4,LDS0070166,FBB,2020-09-16,2,4,6,I1778781,/mnt/coredata/processing/leads/data/raw/LDS007...
5,LDS0070166,FBB,2021-10-28,3,4,6,I1777871,/mnt/coredata/processing/leads/data/raw/LDS007...
6,LDS0070166,FBB,2022-11-08,4,4,6,I1779415,/mnt/coredata/processing/leads/data/raw/LDS007...
7,LDS0070166,FTP,2019-08-21,1,4,6,I1777826,/mnt/coredata/processing/leads/data/raw/LDS007...
8,LDS0070166,FTP,2020-09-11,2,4,6,I1777397,/mnt/coredata/processing/leads/data/raw/LDS007...
9,LDS0070166,FTP,2021-10-29,3,4,6,I1779041,/mnt/coredata/processing/leads/data/raw/LDS007...


In [21]:
# Copy the input dataframe
pet_scans_cp = raw_pets.copy()

# Add columns for PET scan number and total number of scans per tracer
ii = pet_scans_cp.columns.tolist().index("pet_date")
grp = pet_scans_cp.groupby(["subj", "tracer"])
# pet_scans_cp = pet_scans_cp.sort_values(["subj", "tracer", "pet_date"])
pet_scans_cp.insert(ii + 1, "pet_scan_number", grp.cumcount() + 1)
pet_scans_cp.insert(ii + 2, "n_pet_scans", grp["pet_scan_number"].transform("count"))

# Add columns for days from each PET scan to baseline and days between
# consecutive PET scans per tracer
baseline_pet_dates = grp["pet_date"].min()
pet_scans_cp.insert(
    ii + 3,
    "days_from_baseline_pet",
    pet_scans_cp.apply(
        lambda x: (x["pet_date"] - baseline_pet_dates[(x["subj"], x["tracer"])]).days,
        axis=1,
    ),
)
pet_scans_cp.insert(
    ii + 4, "days_from_last_pet", grp["pet_date"].diff().dt.days.fillna(0)
)

In [25]:
pet_scans_cp.loc[
    :,
    [
        "pet_date",
        "pet_scan_number",
        "n_pet_scans",
        "days_from_baseline_pet",
        "days_from_last_pet",
    ],
].max()

pet_date                  2024-04-02 00:00:00
pet_scan_number                             4
n_pet_scans                                 4
days_from_baseline_pet                   1686
days_from_last_pet                     1490.0
dtype: object

In [112]:
# Get a list of all directories containing .nii files
print(f"- Searching {PATHS['raw']} for all *.nii files")
raw_niis = fast_recursive_glob_nii(PATHS["raw"])

# Load the scan types CSV file
# __file__ = "/mnt/coredata/processing/leads/code/working/scratchpad.ipynb"
scan_types = load_scan_typesf()
scan_type_keys = [x.lower() for x in list(scan_types)]

# Find the subject ID, scan type, acquisition date, and LONI image ID
# for each nifti file in raw
raw_niis = pd.DataFrame(raw_niis, columns=["raw_niif"])
raw_niis.insert(0, "subj", raw_niis["raw_niif"].apply(get_subj))
raw_niis.insert(1, "scan_type", raw_niis["raw_niif"].apply(get_scan_type))
raw_niis.insert(2, "scan_date", raw_niis["raw_niif"].apply(get_scan_date))
raw_niis.insert(3, "image_id", raw_niis["raw_niif"].apply(get_image_id))

# Separate MRI and PET scans into separate dataframes
raw_mris = raw_niis.query("(scan_type=='MRI-T1')").reset_index(drop=True)
raw_pets = raw_niis.query("(scan_type!='MRI-T1')").reset_index(drop=True)

# Rename columns
raw_mris = raw_mris.rename(
    columns={
        "scan_date": "mri_date",
        "image_id": "mri_image_id",
        "raw_niif": "mri_raw_niif",
    }
)
raw_pets = raw_pets.rename(
    columns={
        "scan_type": "tracer",
        "scan_date": "pet_date",
        "image_id": "pet_image_id",
        "raw_niif": "pet_raw_niif",
    }
)

# Get rid of scan_type column in the MRI dataframe (all scans are T1)
raw_mris = raw_mris.drop(columns=["scan_type"])

# Sort MRIs by subject and scan date
raw_mris = raw_mris.sort_values(["subj", "mri_date"]).reset_index(drop=True)

# Get PET resolution from filename
raw_pets.insert(
    raw_pets.columns.tolist().index("pet_date") + 1,
    "pet_res",
    raw_pets["pet_raw_niif"].apply(get_pet_resolution),
)

# Print some basic stats on many scans we've found in the raw directory
print(
    "  * Found {:,} .nii files in {:,} subdirectories".format(
        len(raw_niis), len([op.dirname(f) for f in raw_niis["raw_niif"]])
    )
)
print(f"    - {len(raw_mris):,} T1 MRIs")
print(f"    - {len(raw_pets):,} PET scans")
for scan_type in raw_pets["tracer"].unique():
    print(
        "      * {} {}".format(
            len(raw_pets.loc[raw_pets["tracer"] == scan_type]), scan_type
        )
    )

# Match each PET scan to its closest MRI
raw_pets = find_closest_mri_to_pet(raw_pets, raw_mris)
print("  * Matching PET scans to their closest T1 MRIs")

# ----------------------------------------------------------------------
# Add MRI date columns
raw_mris = add_mri_date_columns(raw_mris)

# Check if each MRI is used for PET processing
raw_mris["mri_used_for_pet_proc"] = raw_mris["mri_image_id"].apply(
    lambda x: 1 if np.isin(x, raw_pets["mri_image_id"]) else 0
)
idx = raw_mris.loc[raw_mris["mri_used_for_pet_proc"] == 0].index.tolist()
if process_unused_mris:
    msg = "      to any PET scan but will be processed in any case"
else:
    msg = "      to any PET scan and will not be added to the list of MRIs to process"
print(
    "  * Auditing MRI scans",
    "    - {:,}/{:,} MRIs in {} are not the closest MRI".format(
        len(idx),
        len(raw_mris),
        PATHS["raw"],
    ),
    msg,
    sep="\n",
)

# Add path to the processed MRI directory
ii = raw_mris.columns.tolist().index("mri_raw_niif")
raw_mris["mri_proc_dir"] = raw_mris.apply(
    lambda x: get_mri_proc_dir(x["subj"], x["mri_date"]), axis=1
)

# Determine which MRIs have been processed
raw_mris["freesurfer_run"] = raw_mris["mri_proc_dir"].apply(check_if_freesurfer_run)
print(
    "  * {:,}/{:,} MRIs have been processed through Freesurfer".format(
        len(raw_mris.loc[raw_mris["freesurfer_run"] == 1]),
        len(raw_mris),
    )
)

raw_mris["mri_processed"] = raw_mris["mri_proc_dir"].apply(check_if_mri_processed)
print(
    "  * {:,}/{:,} MRIs have been fully processed".format(
        len(raw_mris.loc[raw_mris["mri_processed"] == 1]),
        len(raw_mris),
    )
)

# Determine which MRI scans need to be processed
raw_mris["need_to_process"] = raw_mris.apply(
    lambda x: get_mri_to_process(
        x["mri_used_for_pet_proc"], x["mri_processed"], overwrite, process_unused_mris
    ),
    axis=1,
)
print(
    "  * {:,}/{:,} MRIs are scheduled for processing".format(
        len(raw_mris.loc[raw_mris["need_to_process"] == 1]),
        len(raw_mris),
    )
)

# Save the raw MRI scans dataframe to a CSV file
outf = op.join(PATHS["scans_to_process"], f"Raw_MRI_Scan_Index_{now()}.csv")
raw_pets.to_csv(outf, index=False)
print(f"  * Saved raw MRI scan index to {outf}")

# ----------------------------------------------------------------------
# Calculate time between serial PET scans
raw_pets = add_pet_date_columns(raw_pets)
print("  * Calculating days between scans")

# Flag PET scans with issues that preclude processing
raw_pets = audit_pet(raw_pets)
print("  * Auditing PET scans")
print(
    "    - Flagged {:,} scans with issues that may need to be resolved before processing".format(
        len(raw_pets.loc[raw_pets["flag"] == 1])
    )
)

# Add path to the processed PET directory
ii = raw_pets.columns.tolist().index("pet_raw_niif")
raw_pets.insert(
    ii + 1,
    "pet_proc_dir",
    raw_pets.apply(
        lambda x: get_pet_proc_dir(x["subj"], x["tracer"], x["pet_date"]), axis=1
    ),
)

# Determine which PET scans have been processed
raw_pets.insert(
    ii + 1, "pet_processed", raw_pets["pet_proc_dir"].apply(check_if_pet_processed)
)
print(
    "  * {:,}/{:,} PET scans have already been processed".format(
        len(raw_pets.loc[raw_pets["pet_processed"] == 1]),
        len(raw_pets),
    )
)

# Determine which PET scans need to be processed
raw_pets["need_to_process"] = raw_pets.apply(
    lambda x: get_pet_to_process(x["flag"], x["pet_processed"], overwrite), axis=1
)
print(
    "  * {:,}/{:,} PET scans are scheduled for processing".format(
        len(raw_pets.loc[raw_pets["need_to_process"] == 1]),
        len(raw_pets),
    )
)

# Convert PET date columns to string
fmt = "%Y-%m-%d"
for col in ["pet_date", "mri_date"]:
    raw_pets[col] = raw_pets[col].dt.strftime(fmt)

# Save the raw PET scans dataframe to a CSV file
outf = op.join(PATHS["scans_to_process"], f"Raw_PET_Scan_Index_{now()}.csv")
raw_pets.to_csv(outf, index=False)
print(f"  * Saved raw PET scan index to {outf}")

- Searching /mnt/coredata/processing/leads/data/raw for all *.nii files
  * Found 3,306 .nii files in 3,306 subdirectories
    - 1,240 T1 MRIs
    - 2,066 PET scans
      * 973 FBB
      * 937 FTP
      * 156 FDG
  * Matching PET scans to their closest T1 MRIs
  * Auditing MRI scans
    - 145/1,240 MRIs in /mnt/coredata/processing/leads/data/raw are not the closest MRI
      to any PET scan and will not be added to the list of MRIs to process
  * 1,088/1,240 MRIs have been processed through Freesurfer
  * 0/1,240 MRIs have been fully processed
  * 1,095/1,240 MRIs are scheduled for processing
  * Saved raw MRI scan index to /mnt/coredata/processing/leads/metadata/scans_to_process/Raw_MRI_Scan_Index_2024-05-14-17-09-16.csv
  * Calculating days between scans
  * Auditing PET scans
    - Flagged 28 scans with issues that may need to be resolved before processing
  * 0/2,066 PET scans have already been processed
  * 2,038/2,066 PET scans are scheduled for processing


NameError: name 'merged_min' is not defined

In [None]:
def check_if_pet_processed(subj, tracer, pet_date):
    """Return 1 if a PET scan has been fully processed and 0 otherwise"""
    # Check if the processed directory exists
    processed_dir = op.join(PATHS["processed"], subj, tracer, pet_date)
    if op.isdir(processed_dir):
        return 1
    else:
        return 0

In [43]:
# Add MRI quality info to raw_mris
files = glob_sort_mtime(op.join(PATHS["ssheets"], "Mayo_ADIRL_MRI_Quality*.csv"))
if files:
    print("  * Loading Mayo MRI QC spreadsheet")
    mri_qc_mayof = files[0]
    mri_qc_mayo = pd.read_csv(mri_qc_mayof)

files = glob_sort_mtime(op.join(PATHS["ssheets"], "ext_mgh_mri_3t*.csv"))
if files:
    print("  * Loading MGH MRI QC spreadsheet")
    mri_qc_mghf = files[0]
    mri_qc_mgh = pd.read_csv(mri_qc_mghf)

  * Loading Mayo MRI QC spreadsheet
  * Loading MGH MRI QC spreadsheet


In [48]:
rename_cols = {
    "subject_id": "subj",
    "series_date": "mri_date",
    "study_overallpass": "mayo_pass",
    "series_quality": "mayo_quality",
    "series_comments": "mayo_comments",
}
mri_qc_mayo.rename(
    columns={
        "subject_id": "subj",
        "series_date": "mri_date",
    },
    inplace=True,
)

Unnamed: 0,subject_id,series_date,studyinstanceuid,study_overallpass,study_comments,study_protocol_status,study_protocol_comment,study_rescan_requested,seriesinstanceuid,series_description,...,study_medical_abnormalities,serial,medical_exclusion,release_from_quarantine,pay_site,qualification,field_strength,t1_accelerated,update_stamp,mri_image_id
0,LDS0370001,2018-05-09 13:07:28.0,2.16.124.113543.6006.99.8966031846000857470,1,,1,QC'd,FALSE,2.16.124.113543.6006.99.03789794833543640529,Axial 3D PASL (Eyes Open),...,,,,1.0,,0,3.0,,2021-02-09 18:05:02.0,I995988
1,LDS0370001,2018-05-09 13:07:28.0,2.16.124.113543.6006.99.8966031846000857470,1,,1,QC'd,FALSE,2.16.124.113543.6006.99.03997603284249532785,Field Mapping,...,,,,1.0,,0,3.0,,2021-02-09 18:05:02.0,I995992
2,LDS0370001,2018-05-09 13:07:28.0,2.16.124.113543.6006.99.8966031846000857470,1,,1,QC'd,FALSE,2.16.124.113543.6006.99.08658171394967486900,Accelerated Sagittal MPRAGE,...,,,,1.0,,0,3.0,,2021-02-09 18:05:02.0,I995986
3,LDS0370001,2018-05-09 13:07:28.0,2.16.124.113543.6006.99.8966031846000857470,1,,1,QC'd,FALSE,2.16.124.113543.6006.99.09071441183906983916,Field Mapping,...,,,,1.0,,0,3.0,,2021-02-09 18:05:02.0,I996000
4,LDS0370001,2018-05-09 13:07:28.0,2.16.124.113543.6006.99.8966031846000857470,1,,1,QC'd,FALSE,2.16.124.113543.6006.99.1453356740595980212,HighResHippocampus,...,,,,1.0,,0,3.0,,2021-02-09 18:05:02.0,I995994
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
16317,LDS0350597,2024-04-22 13:17:49.0,2.16.124.113543.6006.99.6815290193205908847,-1,,3,Not all series passed.,FALSE,2.16.124.113543.6006.99.6813834101512967121,Axial DTI,...,,,,,,0,3.0,,2024-05-10 18:45:32.0,I1827079
16318,LDS0350597,2024-04-22 13:17:49.0,2.16.124.113543.6006.99.6815290193205908847,-1,,3,Not all series passed.,FALSE,2.16.124.113543.6006.99.6813911971215949911,Axial 3TE T2 STAR,...,,,,,,0,3.0,,2024-05-10 18:45:32.0,I1827052
16319,LDS0350597,2024-04-22 13:17:49.0,2.16.124.113543.6006.99.6815290193205908847,-1,,3,Not all series passed.,FALSE,2.16.124.113543.6006.99.6814080161784333564,3 Plane Localizer,...,,,,,,0,3.0,,2024-05-10 18:45:32.0,I1827078
16320,LDS0350597,2024-04-22 13:17:49.0,2.16.124.113543.6006.99.6815290193205908847,-1,,3,Not all series passed.,FALSE,2.16.124.113543.6006.99.6814261004355890729,Field Mapping,...,,,,,,0,3.0,,2024-05-10 18:45:32.0,I1827063


In [47]:
mri_qc_mayo["mri_image_id"] = mri_qc_mayo["loni_image"].apply(lambda x: f"I{x}")
mri_qc_mgh["mri_image_id"] = mri_qc_mgh["loni_image"].apply(lambda x: f"I{x}")

Unnamed: 0,subject_code,download_date,run_number,loni_image,loni_series,loni_study,coil,Description,Acq Date,OVERALLQC,...,lh_rostralmiddlefrontal_thicknessstd_aparc,lh_superiorfrontal_thicknessstd_aparc,lh_superiorparietal_thicknessstd_aparc,lh_superiortemporal_thicknessstd_aparc,lh_supramarginal_thicknessstd_aparc,lh_frontalpole_thicknessstd_aparc,lh_temporalpole_thicknessstd_aparc,lh_transversetemporal_thicknessstd_aparc,lh_insula_thicknessstd_aparc,update_stamp
0,LDS0370001,12/19/2018,2,995986,684237.0,122553.0,64ch,Accelerated Sagittal MPRAGE,5/9/2018,Partial,...,0.500,0.533,0.443,0.592,0.514,0.538,0.701,0.360,0.823,2023-07-02 21:24:01.0
1,LDS0370002,3/20/2023,2,1006346,,,64ch,Accelerated Sagittal MPRAGE,6/6/2018,Partial,...,0.405,0.411,0.335,0.517,0.408,0.333,0.565,0.318,0.728,2023-07-02 21:24:01.0
2,LDS0370006,12/17/2019,5,1026569,708999.0,125027.0,64ch,Accelerated Sagittal MPRAGE,7/26/2018,Partial,...,0.467,0.519,0.433,0.579,0.461,0.448,0.797,0.362,0.793,2023-07-02 21:24:01.0
3,LDS0370007,11/9/2021,2,1029698,711584.0,125258.0,64ch,Accelerated Sagittal MPRAGE,8/1/2018,Pass,...,0.491,0.487,0.433,0.573,0.434,0.454,0.744,0.435,0.713,2023-07-02 21:24:01.0
4,LDS0370005,12/19/2018,2,1030751,712380.0,125364.0,64ch,Accelerated Sagittal MPRAGE,8/2/2018,Partial,...,0.472,0.469,0.419,0.578,0.472,0.391,0.809,0.338,0.723,2023-07-02 21:24:01.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
865,LDS3600602,9/29/2021,2,1663804,1193184.0,222555.0,64ch,Accelerated Sagittal MPRAGE,2/8/2023,Pass,...,0.528,0.563,0.506,0.578,0.465,0.551,0.604,0.285,0.889,2023-07-02 21:24:01.0
866,LDS9410184,4/1/2023,2,1681240,,,64ch,Accelerated Sagittal MPRAGE,3/22/2023,Partial,...,0.401,0.464,0.481,0.545,0.425,0.391,0.710,0.432,0.670,2023-07-02 21:24:01.0
867,LDS9410568,4/1/2023,2,1646937,,,64ch,Accelerated Sagittal MPRAGE,12/2/2022,Pass,...,0.459,0.490,0.423,0.526,0.469,0.501,0.760,0.440,0.794,2023-07-02 21:24:01.0
868,LDS9410572,4/1/2023,2,1647499,,,64ch,Accelerated Sagittal MPRAGE,12/5/2022,Partial,...,0.484,0.518,0.423,0.607,0.421,0.594,0.571,0.388,0.689,2023-07-02 21:24:01.0


In [22]:
raw_mris.query("(mri_image_id=='I1467558')")

Unnamed: 0,subj,mri_date,mri_image_id,mri_raw_niif,mri_used_for_pet_proc
1,LDS0070120,2021-07-13,I1467558,/mnt/coredata/processing/leads/data/raw/LDS007...,True


In [32]:
raw_pets["pet_raw_niif"].apply(get_pet_resolution).value_counts(dropna=False)

pet_raw_niif
6    2066
Name: count, dtype: int64

In [36]:
raw_pets["pet_res"]

array([6])

In [68]:
raw_pets.groupby(["subj", "tracer"])["mri_image_id"].apply(
    lambda x: np.max(np.unique(x, return_counts=True)[1])
).reset_index().query("(mri_image_id!=1)")

Unnamed: 0,subj,tracer,mri_image_id
237,LDS0220071,FBB,2
238,LDS0220071,FTP,2
331,LDS0350342,FBB,2
374,LDS0360283,FTP,2
488,LDS0370038,FBB,2
750,LDS0730083,FTP,2


In [None]:
# # Merge raw_pets with raw_mris
# dxf = op.join(PATHS["metadata"], "ssheets", "LEADS_Internal_PET-Screening.xlsx")
# if op.isfile(dxf):
#     dx = pd.read_excel(dxf)
#     dx_map = {"ID": "subj", "Cohort": "dx"}
#     dx = dx.rename(columns=dx_map)[["subj", "dx"]]
#     raw_scans = dx.merge(raw_scans, on="subj", how="right")

# # Add controls.
# subj_regf = op.join(
#     PATHS["metadata"], "ssheets", "Participant Registration_vertical.csv"
# )
# if op.isfile(subj_regf):
#     subj_reg = pd.read_csv(subj_regf)
#     subj_reg_map = {"subject.label": "subj", "dd_revision_field.translated_value": "dx"}
#     subj_reg = subj_reg.rename(columns=subj_reg_map)[["subj", "dx"]]
# cn_subjs = subj_reg.query("(dx=='Cognitively Normal Participant')")["subj"].tolist()
# raw_scans.loc[pd.isna(raw_scans["dx"]), "dx"] = raw_scans.loc[
#     pd.isna(raw_scans["dx"]), "subj"
# ].apply(lambda x: "CN" if np.isin(x, cn_subjs) else np.nan)

# Old

In [173]:
# Scrape the raw directory for all PET niftis
raw_scans = scrape_raw(PATHS["raw"])

# Get paths for the raw nifti files copied into the processed directory
# raw_cp_str = op.join(
#     PATHS["processed"],
#     "{subj}",
#     "{scan_type}_{scan_date}",
#     "{subj}_{scan_type}_{scan_date}.nii",
# )

# Search processed Freesurfer dirs for closest MRI to each PET scan
raw_scans["mri_date"], raw_scans["days_to_mri"], raw_scans["fs_dir"] = zip(
    *raw_scans.apply(
        lambda x: find_closest_mri(x["subj"], x["scan_date"], PATHS["freesurfer"]),
        axis=1,
    )
)

print(f"raw_scans: {raw_scans.shape}")

Found 2069 scans from 619 subjects in /mnt/coredata/processing/leads/data/raw
raw_scans: (2069, 7)


In [22]:
fname = op.join(
    "/mnt/coredata/Projects/LEADS/data_f7p1/processed/LDS0370001/Timepoint2/MRI_T1_2020-10-14",
    "LDS0370001_MRI_T1_2020-10-14_nu.nii",
)
op.islink(fname)
freesurfer_scan_dir = op.dirname(op.dirname(op.abspath(os.readlink(fname))))

'/mnt/coredata/Projects/LEADS/data_f7p1/freesurfer_processing/LDS0370001_MRI_T1_2020-10-14'

In [174]:
# # Add processed MRI directories to raw_scans
# proc_dir_old = "/mnt/coredata/Projects/LEADS/data_f7p1/processed"
# mri_dirs = []
# proc_mri_dirs_old = []
# freesurfer_dirs_old = []
# for idx, scan in raw_scans.iterrows():
#     if scan["fs_dir"] is np.nan:
#         mri_dirs.append(np.nan)
#         continue

#     subj = scan["subj"]
#     mri_date = scan["fs_dir"].split("_")[1]
#     proc_mri_dir_new = op.join(PATHS["processed"], subj, f"MRI-T1_{mri_date}")

#     # Find the old processed MRI and FreeSurfer directories
#     max_tp = 6
#     for tp in range(1, max_tp + 1):
#         proc_mri_dir_old = op.join(
#             proc_dir_old, subj, f"Timepoint{tp}", f"MRI_T1_{mri_date}"
#         )
#         if op.isdir(proc_mri_dir_old):
#             os.makedirs(op.dirname(proc_mri_dir_new), exist_ok=True)
#             # if not op.exists(proc_mri_dir_new):
#             #     os.symlink(proc_mri_dir_old, proc_mri_dir_new)
#             if op.islink(proc_mri_dir_new):
#                 os.unlink(proc_mri_dir_new)
#             os.makedirs(proc_mri_dir_new, exist_ok=True)
#             proc_mri_dirs_old.append(proc_mri_dir_old)

#             # Find the FreeSurfer directory from the nu.nii symlink
#             nu_old = op.join(proc_mri_dir_old, f"{subj}_MRI_T1_{mri_date}_nu.nii")
#             if op.islink(nu_old):
#                 freesurfer_dir_old = op.dirname(
#                     op.dirname(op.abspath(os.readlink(fname)))
#                 )
#                 freesurfer_dir_new = op.join(proc_mri_dir_new, "freesurfer_7p1")
#                 freesurfer_link_new = op.join(proc_mri_dir_new, "freesurfer")
#                 shutil.copytree(freesurfer_scan_dir, proc_mri_dir_new)
#             break
#         if tp == max_tp:
#             proc_mri_dir_old = np.nan

#     # Add the processed MRI directory to raw_scans["mri_dir"]
#     if op.exists(proc_mri_dir_new):
#         mri_dirs.append(proc_mri_dir_new)
#     else:
#         mri_dirs.append(np.nan)

# raw_scans["mri_dir"] = mri_dirs

In [176]:
# Drop raw_scans rows with missing data, then sort rows.
raw_scans = (
    raw_scans.dropna()
    .sort_values(["subj", "scan_type", "scan_date"])
    .reset_index(drop=True)
)

# Convert days_to_mri to int
raw_scans["days_to_mri"] = raw_scans["days_to_mri"].astype(int)

# Add a visit column to raw_scans, with visit 1 being the earliest date
# for a given subject and scan_type, visit 2 being the next earliest
# date, and so on.
raw_scans["visit"] = raw_scans.groupby(["subj", "scan_type"]).cumcount() + 1
cols = raw_scans.columns.tolist()
cols.insert(cols.index("scan_type") + 1, cols.pop(cols.index("visit")))
raw_scans = raw_scans[cols]

print(f"raw_scans: {raw_scans.shape}")

raw_scans: (2067, 9)


In [177]:
# Add a diagnosis column to raw_scans
dxf = op.join(PATHS["metadata"], "ssheets", "LEADS_Internal_PET-Screening.xlsx")
if op.isfile(dxf):
    dx = pd.read_excel(dxf)
    dx_map = {"ID": "subj", "Cohort": "dx"}
    dx = dx.rename(columns=dx_map)[["subj", "dx"]]
    raw_scans = dx.merge(raw_scans, on="subj", how="right")

# Add controls.
subj_regf = op.join(
    PATHS["metadata"], "ssheets", "Participant Registration_vertical.csv"
)
if op.isfile(subj_regf):
    subj_reg = pd.read_csv(subj_regf)
    subj_reg_map = {"subject.label": "subj", "dd_revision_field.translated_value": "dx"}
    subj_reg = subj_reg.rename(columns=subj_reg_map)[["subj", "dx"]]
cn_subjs = subj_reg.query("(dx=='Cognitively Normal Participant')")["subj"].tolist()
raw_scans.loc[pd.isna(raw_scans["dx"]), "dx"] = raw_scans.loc[
    pd.isna(raw_scans["dx"]), "subj"
].apply(lambda x: "CN" if np.isin(x, cn_subjs) else np.nan)

In [178]:
# Find PET scans that are ready and needing to be processed
proc_pet_dirs = []
need_to_process = []
for idx, scan in raw_scans.iterrows():
    if scan["mri_dir"] is np.nan:
        proc_pet_dirs.append(np.nan)
        need_to_process.append(False)
        continue

    proc_pet_dir = op.join(
        PATHS["processed"], scan["subj"], f"{scan['scan_type']}_{scan['scan_date']}"
    )
    proc_pet_dirs.append(proc_pet_dir)
    need_to_process.append(not op.exists(proc_pet_dir))

raw_scans["proc_pet_dir"] = proc_pet_dirs
raw_scans["need_to_process"] = need_to_process

In [77]:
# Save raw_scans to a CSV file
outf = op.join(PATHS["metadata"], "log", f"raw_pet_scans_{now()}.csv")
raw_scans.to_csv(outf, index=False)
print(f"Saved raw_scans to {outf}")

Saved raw_scans to /mnt/coredata/processing/leads/metadata/log/raw_pet_scans_2024-04-22-19-12-02.csv


# Copy MRI files

In [89]:
from concurrent.futures import ThreadPoolExecutor

In [109]:
def copy_freesurfer(scan, rm_existing=False):
    """Copy FreeSurfer directory to the processed MRI directory."""
    # Add processed MRI directories to raw_scans
    fs_dir_old = "/mnt/coredata/Projects/LEADS/data_f7p1/freesurfer_processing"

    # Create the processed MRI directory, if it doesn't already exist
    if op.islink(scan["mri_dir"]):
        os.unlink(scan["mri_dir"])
    os.makedirs(scan["mri_dir"], exist_ok=True)

    # Copy the FreeSurfer directory to the processed MRI directory
    fs_scan_dir_old = op.join(fs_dir_old, f"{scan['subj']}_MRI_T1_{scan['mri_date']}")
    fs_scan_dir_new = op.join(scan["mri_dir"], f"freesurfer_7p1")
    fs_scan_link_new = fs_scan_dir_new.replace("freesurfer_7p1", "freesurfer")
    if op.isdir(fs_scan_dir_old):
        if op.isdir(fs_scan_dir_new) and rm_existing:
            shutil.rmtree(fs_scan_dir_new)
        if not op.isdir(fs_scan_dir_new):
            # Find files in fs_scan_dir_old that are dirs
            subfiles = [
                f
                for f in os.listdir(fs_scan_dir_old)
                if not op.islink(op.join(fs_scan_dir_old, f))
            ]
            if len(subfiles) > 0:
                os.makedirs(fs_scan_dir_new, exist_ok=True)
                for f in subfiles:
                    shutil.copytree(
                        op.join(fs_scan_dir_old, f),
                        op.join(fs_scan_dir_new, f),
                        symlinks=True,
                    )

            # Add a symlink to the new freesurfer directory
            if op.islink(fs_scan_link_new):
                os.unlink(fs_scan_link_new)
            os.symlink(fs_scan_dir_new, fs_scan_link_new)

In [112]:
from concurrent.futures import ThreadPoolExecutor

max_workers = 16
mris_to_copy = raw_scans.drop_duplicates(["subj", "mri_date"]).to_dict(orient="records")
print(f"Copying FreeSurfer directories for {len(mris_to_copy)} MRIs")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    executor.map(copy_freesurfer, mris_to_copy)

Copying FreeSurfer directories for 1092 MRIs


In [67]:
# Add processed MRI directories to raw_scans
cp_freesurfer = True
fs_dir_old = "/mnt/coredata/Projects/LEADS/data_f7p1/freesurfer_processing"
for idx, scan in raw_scans.drop_duplicates(["subj", "mri_date"]).iterrows():
    if scan["subj"] != "LDS0370008":
        continue

    # Create the processed MRI directory, if it doesn't already exist
    if op.islink(scan["mri_dir"]):
        os.unlink(scan["mri_dir"])
    os.makedirs(scan["mri_dir"], exist_ok=True)

    # Copy the FreeSurfer directory to the processed MRI directory
    fs_scan_dir_old = op.join(fs_dir_old, f"{scan['subj']}_MRI_T1_{scan['mri_date']}")
    fs_scan_dir_new = op.join(scan["mri_dir"], f"freesurfer_7p1")
    fs_scan_link_new = fs_scan_dir_new.replace("freesurfer_7p1", "freesurfer")
    if op.isdir(fs_scan_dir_old):
        if cp_freesurfer and not op.isdir(fs_scan_dir_new):
            # Find files in fs_scan_dir_old that are dirs
            subfiles = [
                f
                for f in os.listdir(fs_scan_dir_old)
                if not op.islink(op.join(fs_scan_dir_old, f))
            ]
            if len(subfiles) > 0:
                os.makedirs(fs_scan_dir_new, exist_ok=True)
                for f in subfiles:
                    shutil.copytree(
                        op.join(fs_scan_dir_old, f),
                        op.join(fs_scan_dir_new, f),
                        symlinks=True,
                    )

            # Add a symlink to the new freesurfer directory
            if op.islink(fs_scan_link_new):
                os.unlink(fs_scan_link_new)
            os.symlink(fs_scan_dir_new, fs_scan_link_new)

# Consolidate dirs in raw

In [67]:
subdirs = ["fbb", "fdg", "ftp", "mri"]
# for d in subdirs:
#     os.makedirs(op.join(PATHS["processed"], d), exist_ok=True)
PATHS["raw"]

'/mnt/coredata/processing/leads/data/raw'

In [24]:
# Move raw scan directories from raw/<scan_type>/<subj> to raw/<subj>
scan_type_dirs = [
    op.join(PATHS["raw"], f)
    for f in os.listdir(PATHS["raw"])
    if op.isdir(op.join(PATHS["raw"], f))
]
for scan_type_dir in scan_type_dirs:
    subj_dirs_old = [
        op.join(scan_type_dir, f)
        for f in os.listdir(scan_type_dir)
        if op.isdir(op.join(scan_type_dir, f))
    ]
    for subj_dir_old in subj_dirs_old:
        subj_dir_new = op.join(PATHS["raw"], op.basename(subj_dir_old))
        subj_scan_dirs_old = [
            op.join(subj_dir_old, f)
            for f in os.listdir(subj_dir_old)
            if op.isdir(op.join(subj_dir_old, f))
        ]
        # Make the new subject directory if it doesn't already exist
        if subj_scan_dirs_old:
            os.makedirs(subj_dir_new, exist_ok=True)
        for subj_scan_dir_old in subj_scan_dirs_old:
            # Move the scan directory to its new location
            shutil.move(subj_scan_dir_old, subj_dir_new)

In [64]:
def move_newdata_to_raw(
    newdata_dir, raw_dir, overwrite=False, cleanup=True, verbose=True
):
    """Move scans from newdata to raw, keeping file hierarchies intact.

    Parameters
    ----------
    newdata_dir : str
        The directory containing the new scan data. Must format like:
            <newdata_dir>/<subj>/<...>/<nifti_or_dicom_files>
    raw_dir : str
        The directory to move the new scan data to. Structure after
        moving will be:
            <raw_dir>/<subj>/<...>/<nifti_or_dicom_files>
    overwrite : bool
        If True, overwrite existing scan directories in raw_dir with
        directories from newdata_dir. If False, skip existing
        directories.
    cleanup : bool
        If True, remove all files and folders from newdata_dir after
        moving everything eligible to be moved to raw_dir.
    verbose : bool
        If True, print messages about what is happening as the function
        runs.
    """

    def do_cleanup():
        """Remove all files and folders from newdata."""
        if verbose:
            print(f"  Cleaning up {newdata_dir}")
        for file in os.listdir(newdata_dir):
            filepath = op.join(newdata_dir, file)
            if op.isdir(filepath):
                shutil.rmtree(filepath)
            else:
                os.remove(filepath)

    # Ensure the base directory paths are absolute and normalized
    newdata_dir = op.normpath(newdata_dir)
    raw_dir = op.normpath(raw_dir)

    # Find all nifti and dicom files in newdata
    check_exts = (".nii", ".nii.gz", ".IMA", ".dcm")
    glob_files = []
    for ext in check_exts:
        glob_files.extend(glob(op.join(newdata_dir, f"**/*{ext}"), recursive=True))

    if verbose:
        title = "Moving newdata to raw"
        print(title, "-" * len(title), sep="\n")
    if len(glob_files) == 0:
        if verbose:
            print(f"  No nifti or dicom files found in {newdata_dir}")
        do_cleanup()
        return

    # Find all unique nifti- or dicom-containing directories in newdata
    source_dirs = set([op.dirname(f) for f in glob_files])
    if verbose:
        print(
            f"  Found {len(source_dirs)} nifti- or dicom-containing directories in {newdata_dir}"
        )

    for source_dir in source_dirs:
        # Create a matching file hierarchy in raw as in newdata
        target_dir = op.join(raw_dir, op.relpath(source_dir, newdata_dir))

        # Check if the target directory exists
        if op.exists(target_dir):
            # If overwrite is True, remove the existing directory
            if overwrite:
                if verbose:
                    print(f"  Overwriting existing raw directory: {target_dir}")
                shutil.rmtree(target_dir)
            else:
                if verbose:
                    print(f"  Skipping existing raw directory: {target_dir}")
                continue

        # Create the necessary directory structure, then copy source to
        # target
        os.makedirs(op.dirname(target_dir), exist_ok=True)
        shutil.move(source_dir, target_dir)
        if verbose:
            print(f"  Moved {source_dir} to {target_dir}")

    # Clean up empty directories in newdata
    if cleanup:
        do_cleanup()

In [66]:
newdata_dir = "/home/mac/dschonhaut/tmp/leads/restructuring/newdata"
raw_dir = "/home/mac/dschonhaut/tmp/leads/restructuring/raw"

move_newdata_to_raw(
    newdata_dir=newdata_dir,
    raw_dir=raw_dir,
    overwrite=False,
    cleanup=True,
    verbose=True,
)

Moving newdata to raw
---------------------
  Found 7 nifti- or dicom-containing directories in /home/mac/dschonhaut/tmp/leads/restructuring/newdata
  Skipping existing raw directory: /home/mac/dschonhaut/tmp/leads/restructuring/raw/b/aa
  Skipping existing raw directory: /home/mac/dschonhaut/tmp/leads/restructuring/raw/a
  Skipping existing raw directory: /home/mac/dschonhaut/tmp/leads/restructuring/raw/c/aa/aaa
  Skipping existing raw directory: /home/mac/dschonhaut/tmp/leads/restructuring/raw/c/aa/aab
  Skipping existing raw directory: /home/mac/dschonhaut/tmp/leads/restructuring/raw/b/ab
  Skipping existing raw directory: /home/mac/dschonhaut/tmp/leads/restructuring/raw/c/ab/aab
  Skipping existing raw directory: /home/mac/dschonhaut/tmp/leads/restructuring/raw/c/ab/aaa
  Cleaning up /home/mac/dschonhaut/tmp/leads/restructuring/newdata


# Setup PET directories

In [None]:
import general.basic.helper_funcs as hf
import general.basic.str_methods as strm

In [24]:
raw_scansf = glob_sort_mtime(op.join(PATHS["metadata"], "log", "raw_pet_scans_*.csv"))[
    0
]
raw_scans = pd.read_csv(raw_scansf)

# Remove rows with missing data
raw_scans = raw_scans.dropna().reset_index(drop=True)
raw_scans = raw_scans.drop(935).reset_index(drop=True)

# Fix raw PET filepaths
replace_vals = {
    "raw/fbb": "raw",
    "raw/fdg": "raw",
    "raw/ftp": "raw",
}
raw_scans["raw_petf"] = raw_scans["raw_petf"].apply(
    lambda x: strm.str_replace(x, replace_vals)
)

print(f"reading {raw_scansf}")
print(f"raw_scans: {raw_scans.shape}")

reading /mnt/coredata/processing/leads/metadata/log/raw_pet_scans_2024-03-27-22-54-15.csv
raw_scans: (2065, 12)


In [49]:
# Fix raw PET filepaths
replace_vals = {
    "raw/fbb": "raw",
    "raw/fdg": "raw",
    "raw/ftp": "raw",
}
raw_scans["raw_petf"] = raw_scans["raw_petf"].apply(
    lambda x: strm.str_replace(x, replace_vals)
)

In [50]:
# Check that all expected files and directories exist
raw_scans["raw_petf_exists"] = raw_scans["raw_petf"].apply(lambda x: op.isfile(x))
raw_scans["mri_dir_exists"] = raw_scans["mri_dir"].apply(lambda x: op.isdir(x))
raw_scans["proc_pet_dir_exists"] = raw_scans["proc_pet_dir"].apply(
    lambda x: op.isdir(x)
)

In [68]:
raw_scans["scan_tag"] = raw_scans.apply(
    lambda x: "_".join([x["subj"], x["scan_type"], x["scan_date"]]), axis=1
)

In [69]:
raw_scans.head()

Unnamed: 0,subj,dx,scan_date,scan_type,visit,raw_petf,mri_date,days_to_mri,fs_dir,mri_dir,proc_pet_dir,need_to_process,raw_petf_exists,mri_dir_exists,proc_pet_dir_exists,scan_tag
0,LDS0070120,CN,2019-06-19,FBB,1,/mnt/coredata/processing/leads/data/raw/LDS007...,2019-06-20,1,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True,True,True,False,LDS0070120_FBB_2019-06-19
1,LDS0070120,CN,2021-07-14,FDG,1,/mnt/coredata/processing/leads/data/raw/LDS007...,2021-07-13,1,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True,True,True,False,LDS0070120_FDG_2021-07-14
2,LDS0070120,CN,2019-06-20,FTP,1,/mnt/coredata/processing/leads/data/raw/LDS007...,2019-06-20,0,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True,True,True,False,LDS0070120_FTP_2019-06-20
3,LDS0070166,EOAD,2019-08-22,FBB,1,/mnt/coredata/processing/leads/data/raw/LDS007...,2019-08-22,0,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True,True,True,False,LDS0070166_FBB_2019-08-22
4,LDS0070166,EOAD,2020-09-16,FBB,2,/mnt/coredata/processing/leads/data/raw/LDS007...,2020-09-16,0,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True,True,True,False,LDS0070166_FBB_2020-09-16


In [74]:
raw_scans.shape, raw_scans.query("(need_to_process==True)").shape

((2064, 16), (2064, 16))

In [76]:
import os
import os.path as op
import shutil


def setup_pet_proc_dirs(raw_scans=None, overwrite=False, verbose=True):
    """Create processed PET directories and link to associated MRIs.

    For each scan that needs to be processed, there must already be:
    1. A raw PET file in .nii format
    2. A processed MRI directory that will be linked to

    This function then does the following for each scan:
    1. Creates new processed PET directory
    2. Copies raw PET file to processed PET directory and renames it
    3. Creates symbolic link from processed PET directory to the
       associated MRI directory

    Parameters
    ----------
    raw_scans : DataFrame
        A pandas DataFrame with columns 'raw_petf', 'scan_tag',
        'mri_dir', and 'proc_pet_dir', which hold paths to raw PET .nii
        files, scan tags ("<subj>_<tracer>_<scan_date>"), processed MRI
        directories that will be used to process each PET scan, and
        target directories for processed PET data that will be created
        by this function, respectively.
    overwrite : bool, optional
        If True, overwrite existing processed PET directories if they
        exist. If False, skip scans with existing processed PET
        directories.
    verbose : bool, optional
        If True, output status messages during execution.

    Returns
    -------
    None
    """
    # Print the welcome message
    if verbose:
        title = f"\nCreating processed PET directories"
        print(title, "-" * len(title), sep="\n")

    # Load the most recently saved raw_scans spreadsheet if not provided
    if raw_scans is None:
        raw_scansf = glob_sort_mtime(
            op.join(PATHS["metadata"], "log", "raw_pet_scans_*.csv")
        )[0]
        if verbose:
            print(f"  Reading {raw_scansf}")
        raw_scans = pd.read_csv(raw_scansf)

    # Filter scans that need to be processed
    raw_scans = raw_scans.query("(need_to_process==True)").reset_index(drop=True)
    if verbose:
        print(f"  {raw_scans.shape[0]} scans to process")

    # Loop over each scan and do directory setup
    for idx, scan in raw_scans.iterrows():
        # Make sure the raw PET file exists
        if not op.isfile(scan["raw_petf"]):
            if verbose:
                print(
                    f"  Skipping {scan['scan_tag']} due to missing raw PET file: {scan['raw_petf']}"
                )
            continue
        elif not scan["raw_petf"].endswith(".nii"):
            if verbose:
                print(
                    f"  Skipping {scan['scan_tag']} as raw PET file does not end in .nii: {scan['raw_petf']}"
                )
            continue
        # Make sure the MRI directory exists
        if not op.isdir(scan["mri_dir"]):
            if verbose:
                print(
                    f"  Skipping {scan['scan_tag']} due to missing MRI directory: {scan['mri_dir']}"
                )
            continue
        # Remove existing processed PET directories if overwrite is True
        if op.isdir(scan["proc_pet_dir"]):
            if overwrite:
                if verbose:
                    print(
                        f"  Removing existing directory and its contents: {scan['proc_pet_dir']}"
                    )
                shutil.rmtree(scan["proc_pet_dir"])
            else:
                if verbose:
                    print(
                        f"  Skipping {scan['scan_tag']} due to existing processed PET directory: {scan['proc_pet_dir']}"
                    )
                continue

        # Create the processed PET directory
        os.makedirs(scan["proc_pet_dir"])

        # Copy the raw PET file to the processed PET directory
        infile = scan["raw_petf"]
        outfile = op.join(scan["proc_pet_dir"], f"{scan['scan_tag']}.nii")
        shutil.copy(infile, outfile)

        # Create a symlink to the processed MRI directory
        link_src = scan["mri_dir"]
        link_dst = op.join(scan["proc_pet_dir"], "mri")
        os.symlink(link_src, link_dst)

    if verbose:
        print("")

In [51]:
raw_scans.groupby(["scan_type", "visit"]).agg(
    {
        "raw_petf_exists": hf.count_pct,
        "mri_dir_exists": hf.count_pct,
        "proc_pet_dir_exists": hf.count_pct,
    }
)

Unnamed: 0_level_0,Unnamed: 1_level_0,raw_petf_exists,mri_dir_exists,proc_pet_dir_exists
scan_type,visit,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
FBB,1,614/614 (100.0%),614/614 (100.0%),0/614 (0.0%)
FBB,2,238/238 (100.0%),238/238 (100.0%),0/238 (0.0%)
FBB,3,94/94 (100.0%),94/94 (100.0%),0/94 (0.0%)
FBB,4,26/26 (100.0%),26/26 (100.0%),0/26 (0.0%)
FDG,1,156/156 (100.0%),156/156 (100.0%),0/156 (0.0%)
FTP,1,604/604 (100.0%),604/604 (100.0%),0/604 (0.0%)
FTP,2,219/219 (100.0%),219/219 (100.0%),0/219 (0.0%)
FTP,3,84/84 (100.0%),84/84 (100.0%),0/84 (0.0%)
FTP,4,29/29 (100.0%),29/29 (100.0%),0/29 (0.0%)


In [52]:
raw_scans.groupby(["need_to_process", "scan_type", "visit"]).size()

need_to_process  scan_type  visit
True             FBB        1        614
                            2        238
                            3         94
                            4         26
                 FDG        1        156
                 FTP        1        604
                            2        219
                            3         84
                            4         29
dtype: int64

In [56]:
raw_scans.iloc[0]["proc_pet_dir"]

'/mnt/coredata/processing/leads/data/processed/LDS0070120/FBB_2019-06-19'

# Do other stuff

In [33]:
file_renaming_map = {
    "FBB": {
        op.join(pet_dir_old, "compwm_ref_mask.nii"): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
    },
    "FTP": {
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
    },
    "MRI-T1": {},
}

# copy mask files from PET to MRI proc dirs
for idx, scan in (
    raw_scans.query("(scan_type==['FBB','FTP'])").sort_values("scan_date").iterrows()
):
    try:
        globstr = f"/mnt/coredata/Projects/LEADS/data_f7p1/processed/{scan['subj']}/Timepoint*/{scan['scan_type']}_{scan['scan_date']}"
        pet_dir_old = glob(globstr)[0]
    except IndexError:
        print(
            f"WARNING: {scan['subj']} {scan['scan_type']} scan on {scan['scan_date']} is missing a processed PET dir in {globstr}"
        )
        continue
    try:
        globstr = f"/mnt/coredata/Projects/LEADS/data_f7p1/processed/{scan['subj']}/Timepoint*/MRI_T1_{scan['mri_date']}"
        mri_dir_old = glob(globstr)[0]
    except IndexError:
        print(
            f"WARNING: {scan['subj']} MRI scan on {scan['scan_date']} is missing a processed MRI dir in {globstr}"
        )
        continue

    # copy mask files from PET to MRI proc dirs
    mri_dir = scan["mri_dir"]



In [13]:
raw_scans.query("(subj=='LDS0370008')")

Unnamed: 0,subj,dx,scan_date,scan_type,visit,raw_petf,mri_date,days_to_mri,fs_dir,mri_dir,proc_pet_dir,need_to_process
680,LDS0370008,EOAD,2018-08-15,FBB,1,/mnt/coredata/processing/leads/data/raw/fbb/LD...,2018-08-15,0,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
681,LDS0370008,EOAD,2019-09-19,FBB,2,/mnt/coredata/processing/leads/data/raw/fbb/LD...,2019-09-19,0,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
682,LDS0370008,EOAD,2020-10-28,FBB,3,/mnt/coredata/processing/leads/data/raw/fbb/LD...,2020-10-29,1,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
683,LDS0370008,EOAD,2021-11-16,FBB,4,/mnt/coredata/processing/leads/data/raw/fbb/LD...,2021-11-03,13,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
684,LDS0370008,EOAD,2018-08-27,FTP,1,/mnt/coredata/processing/leads/data/raw/ftp/LD...,2018-08-15,12,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
685,LDS0370008,EOAD,2019-10-03,FTP,2,/mnt/coredata/processing/leads/data/raw/ftp/LD...,2019-09-19,14,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
686,LDS0370008,EOAD,2020-10-29,FTP,3,/mnt/coredata/processing/leads/data/raw/ftp/LD...,2020-10-29,0,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
687,LDS0370008,EOAD,2021-12-09,FTP,4,/mnt/coredata/processing/leads/data/raw/ftp/LD...,2021-11-03,36,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True


In [157]:
raw_scans.drop_duplicates("subj")["dx"].value_counts(dropna=False)

EOAD       396
EOnonAD    122
CN          99
NaN          1
Name: dx, dtype: int64

raw_scans: (2067, 12)


0