In [2]:
from typing import  Iterator,  MutableSequence

def tokenize(stem: str) -> tuple[MutableSequence[str | None], MutableSequence[str]]:
    tokens = stem.split("_")
    keys: MutableSequence[str | None] = list()
    values: MutableSequence[str] = list()
    for token in tokens:
        if "-" in token:  # A bids tag
            key: str | None = token.split("-")[0]
            if key is None:
                continue
            keys.append(key)
            values.append(token[len(key) + 1 :])

        else:  # A suffix
            keys.append(None)
            values.append(token)
    return keys, values

def parse(phenotype: str) -> Iterator[tuple[str, str]]:
    keys, values = tokenize(phenotype)

    for key, value in zip(keys, values, strict=False):
        if key is None:
            continue
        yield (key, value)

In [3]:
from collections import defaultdict
import zipfile
from pathlib import Path
from typing import Iterator

base_path = Path("/scratch/imaging/consistency-check/data")


def find_seed_corr(path: zipfile.Path) -> Iterator[zipfile.Path]:
    stack = [path]
    while stack:
        for path in stack.pop().iterdir():
            if path.is_dir():
                stack.append(path)
                continue
            if "SeedCorr" in path.name:
                yield path


image_paths_by_sub_and_feature = defaultdict(lambda: defaultdict(list))
for zip_file_path in base_path.glob("*.zip"):
    try:
        zip_path = zipfile.Path(zip_file_path)
    except zipfile.BadZipFile:
        continue
    for image_path in find_seed_corr(zip_path):
        tags = dict(parse(image_path.stem))
        sub = tags["sub"]
        feature = tags["feature"]
        image_paths_by_sub_and_feature[sub][feature].append(image_path)

In [23]:
from itertools import chain


subjects = image_paths_by_sub_and_feature.keys()
features = set(chain.from_iterable(x.keys() for x in image_paths_by_sub_and_feature.values()))

In [5]:
sub = "9040"
feature = "FalseComb2SeedCorr"

len(image_paths)

100

In [30]:
import nibabel as nib
import numpy as np
import gzip
from tqdm.auto import tqdm
for sub in subjects:
    feature_arrays = list()
    for feature in features:
        fdata = list()
        image_paths = image_paths_by_sub_and_feature[sub][feature]
        for image_path in image_paths:
            with image_path.open("rb") as compressed_file_handle:
                with gzip.open(compressed_file_handle) as file_handle:
                    try:
                        image = nib.nifti1.Nifti1Image.from_stream(file_handle)
                        fdata.append(image.get_fdata()[..., np.newaxis])
                    except Exception:
                        pass
        feature_array = np.concatenate(fdata, axis=3)[..., np.newaxis]
        feature_arrays.append(feature_array)
    subject_array = np.concatenate(feature_arrays, axis=4)
    np.savez(base_path / f"sub-{sub}_seed.npz")

(9700, 115, 97)