In [1]:
from __future__ import annotations

from pathlib import Path
from rich import print
import pandas as pd
import re
from dataclasses import dataclass, field
from typing import Any, Dict, Iterator, List, Union, NamedTuple
from collections import namedtuple

from readii.io.utils.pattern_resolver import PatternResolver
from readii.utils import logger

import logging
pyradiomics_logger = logging.getLogger("radiomics")
pyradiomics_logger.setLevel(logging.ERROR)

# Setup and Configuration

In [2]:
#a Save data to local directory
DATA_DIR = Path('data')

# If you choose a different collection in the setup notebook, you will need to change this value
COLLECTION_ID = "nsclc_radiomics"

NIFTI_OUTPUT_DIR = DATA_DIR / "images" / COLLECTION_ID / "niftis"

PYRADIOMICS_CONFIG = Path().cwd().parent / "pyradiomics.yaml"

filename_format =  "SubjectID-{SubjectID}/StudyUID-{StudyInstanceUID}/{Modality}_SeriesUID-{SeriesInstanceUID}/{IMAGE_ID}.nii.gz"

In [3]:
# Define custom exceptions
class DirectoryScannerError(Exception):
	"""Base exception for errors in directory scanning."""
	
	pass

@dataclass
class DirectoryScanner:
	"""Handles scanning directories for files or subdirectories."""

	root_directory: Path
	include_files: bool = True
	include_directories: bool = False
	glob_pattern: str = "*"
	recursive: bool = True

	def scan(self) -> List[Path]:
		"""
		Scan the root directory for files and/or directories.

		Returns
		-------
		List[Path]
			A list of paths matching the criteria.
		"""
		if not self.root_directory.exists():
			msg = f"Root directory {self.root_directory} does not exist."
			raise DirectoryScannerError(msg)
		if not self.root_directory.is_dir():
			msg = f"Root directory {self.root_directory} is not a directory."
			raise DirectoryScannerError(msg)

		if self.recursive:
			paths = self.root_directory.rglob(self.glob_pattern)
		else:
			paths = self.root_directory.glob(self.glob_pattern)

		if self.include_files and self.include_directories:
			return list(paths)
		elif self.include_files:
			return [p for p in paths if p.is_file()]
		elif self.include_directories:
			return [p for p in paths if p.is_dir()]
		return []

FileDict = Dict[str, Union[Path, str]]

# Define custom exceptions
class FileFilterError(Exception):
	"""Base exception for errors in file filtering."""
	
	pass


class FileFilter:
	"""Filters a list of dictionaries based on provided keyword arguments or a list of filters."""

	@staticmethod
	def filter(files: List[FileDict], filters: List[Dict[str, Any]] | None = None, **kwargs: Any) -> List[FileDict]: # noqa
		"""
		Apply filters to a list of dictionaries.

		Parameters
		----------
		files : List[Dict[str, Any]]
			The list of dictionaries to filter.
		filters : List[Dict[str, Any]], optional
			A list of dictionaries specifying filter criteria.
		kwargs : Any
			Keyword arguments to filter the dictionaries by.

		Returns
		-------
		List[Dict[str, Any]]
			A list of dictionaries that match the filter criteria.
		"""
		filtered_files = files

		def matches_criteria(file: Dict[str, Any], criteria: Dict[str, Any]) -> bool:
			for key, value in criteria.items():
				if isinstance(value, list):
					if file.get(key) not in value:
						return False
				elif callable(value):
					if not value(file.get(key)):
						return False
				elif file.get(key) != value:
					return False
			return True

		if filters:
			for filter_criteria in filters:
				filtered_files = [file for file in filtered_files if matches_criteria(file, filter_criteria)]

		if kwargs:
			filtered_files = [file for file in filtered_files if matches_criteria(file, kwargs)]

		return filtered_files

@dataclass
class FilteredFiles:
	"""A container class for filtering collections of files based on specified criteria.

	Attributes
	----------
	files: A list of dictionaries containing file information with Path or string values.
	"""

	files: List[FileDict]
	index: str | None = None
	FileTuple: type[NamedTuple] = None  # Dynamically assigned NamedTuple class

	def __post_init__(self) -> None:
		"""Validate the input and create a reusable NamedTuple class."""
		if not self.files:
			logger.warning("No files found in the collection.")
			self.index = None
			return

		# Validate that the first key in all dictionaries is the same
		all_first_keys = set(list(file.keys())[0] for file in self.files)
		if len(all_first_keys) != 1:
			errmsg = f'All dictionaries must have the same first key. Found: {all_first_keys}'
			raise ValueError(errmsg)
		self.index = list(all_first_keys)[0]

		# Dynamically create the reusable NamedTuple class based on the fields of the first file
		fields = list(self.files[0].keys())
		self.FileTuple = namedtuple("FileTuple", fields)

	def filter(self, filters: List[Dict[str, Any]] | None = None, **kwargs: Any) -> FilteredFiles:  # noqa
		"""Filter the files using specified criteria."""
		filtered_files = FileFilter.filter(self.files, filters=filters, **kwargs)
		return FilteredFiles(files=filtered_files)

	def to_df(self, **kwargs: Any) -> pd.DataFrame:
		"""Convert the list of files to a pandas DataFrame."""
		df = pd.DataFrame(self.files, **kwargs)
		if df.empty:
			raise ValueError("No files found in the collection.")
		if self.index:
			df.set_index(self.index, inplace=True)
		return df

	@property
	def keys(self) -> List[str]:
		"""Return a list of keys present in the dictionaries in `self.files`."""
		return list(self.files[0].keys())

	def __getattr__(self, attr: str) -> List[Any]:
		"""
		Return a list of attribute values for the given key from the dictionaries in `self.files`.

		Parameters
		----------
		attr : str
			The key to retrieve values for.

		Returns
		-------
		List[Any]
			A list of values corresponding to the given key in each dictionary.

		Raises
		------
		AttributeError
			If the key is not present in any of the dictionaries.
		"""
		try:
			return [file[attr] for file in self.files if attr in file]
		except KeyError as ke:
			raise AttributeError(f"Attribute '{attr}' not found in any file dictionaries.") from ke

	def itertuples(self) -> Iterator[NamedTuple]:
		"""
		Iterate over the files as named tuples.

		Returns
		-------
		Iterator[NamedTuple]
			An iterator of named tuples representing the file attributes.
		"""
		if not self.files:
			return iter([])  # Return an empty iterator if no files exist

		return (self.FileTuple(**file) for file in self.files)

	def __iter__(self) -> Iterator[FilteredFiles]:
		"""Group by the index and yield a FilteredFiles instance for each group."""
		if not self.index:
			raise ValueError("No index found in the files.")
		grouped_files = pd.DataFrame(self.files).groupby(self.index)
		for name, group in grouped_files:
			yield FilteredFiles(files=group.to_dict(orient="records"))

	def __len__(self) -> int:
		"""Return the number of files in the collection."""
		return len(self.files)

	def __next__(self) -> FileDict:
		"""Return the next file in the collection."""
		for file in self.files:
			yield file

	def first(self) -> NamedTuple:
		"""Return the first file in the collection as a named tuple."""
		if not self.files:
			raise ValueError("No files found in the collection.")
		return self.FileTuple(**self.files[0])

@dataclass
class BaseReader:
	"""Base class for reading files based on a pattern and extracting metadata.

	Parameters
	----------
	root_directory : str | Path
			Directory to scan for files.
	filename_pattern : str
			Pattern to match filenames.
	show_warnings : bool, optional
			Whether to show warnings when a file is not matched. Default is False.
	**kwargs : Any
			Additional keyword arguments to pass to DirectoryScanner.
	"""

	root_directory: str | Path  # Directory to scan for files
	filename_pattern: str  # Pattern to match filenames
	pattern_resolver: PatternResolver
	directory_scanner: DirectoryScanner
	file_filter: FileFilter
	show_warnings: bool = False

	mapped_files: List[FileDict] = field(default_factory=list)

	def __init__(self, root_directory: str | Path, filename_pattern: str, **kwargs: Any) -> None:  # noqa: ANN401
		self.root_directory = Path(root_directory)
		assert self.root_directory.exists(), f"Root directory {self.root_directory} does not exist."

		self.filename_pattern = filename_pattern
		self.pattern_resolver = PatternResolver(self.filename_pattern)

		self.show_warnings = kwargs.pop("show_warnings", False)

		self.directory_scanner = DirectoryScanner(self.root_directory, **kwargs)
		self.mapped_files = []  # Initialize mapped_files

	def _locate_files(self) -> List[Path]:
		"""Use the directory scanner to locate files in the root directory."""
		return self.directory_scanner.scan()

	def extract_metadata(self, file_path: Path) -> Dict[str, Any]:
		"""Extract metadata from the file path based on the pattern.

		Parameters
		----------
		file_path : Path
				The file path to extract metadata from.

		Returns
		-------
		Dict[str, Any]
				Dictionary containing extracted metadata.

		Raises
		------
		ValueError
				If the filename does not match the pattern.
		"""
		regex_pattern = self.pattern_resolver.formatted_pattern.replace("%(", "(?P<").replace(")s", ">.*?)")
		matcher = re.match(regex_pattern, str(file_path))

		if (matcher):
			return matcher.groupdict()
		msg = f"Filename '{file_path}' does not match the expected pattern: {self.pattern_resolver.formatted_pattern}"
		raise ValueError(msg)

	def files(self, **kwargs) -> FilteredFiles:
		"""Map files in the root directory to their extracted metadata.

		Parameters
		----------
		**kwargs : Any, optional
			Keyword arguments to filter the files by.

		Returns
		-------
		FilteredFiles
				An instance of FilteredFiles containing the mapped files.
		"""
		if self.mapped_files:
			if kwargs:
				return FilteredFiles(files=self.mapped_files).filter(**kwargs)
			else:
				return FilteredFiles(files=self.mapped_files)

		unmatched = []
		for file_path in self._locate_files():
			try:
				metadata = self.extract_metadata(file_path.relative_to(self.root_directory))
				metadata["path"] = file_path
				self.mapped_files.append(metadata)
			except ValueError as ve:
				unmatched.append(file_path)
				if self.show_warnings:
					logger.warning(
						f"Skipping file {file_path}, as it does not match the pattern.", 
						error=ve, 
						valid_keys=self.pattern_resolver.keys
					)
		if unmatched:
			logger.debug(f"Unmatched files: {len(unmatched)}", unmatched=unmatched)
		
		# if any kwargs are provided, filter the files
		if kwargs:
			 return self.filter(**kwargs)		

		return FilteredFiles(files=self.mapped_files)



In [4]:
! tree -F $NIFTI_OUTPUT_DIR.parent -I "*.dcm"

[01;34mdata/images/nsclc_radiomics[0m/
├── [01;34mdicoms[0m/
│   ├── [01;34mPatient-LUNG1-005[0m/
│   │   └── [01;34mStudyUID-93819[0m/
│   │       ├── [01;34mCT_SeriesUID-68747[0m/
│   │       └── [01;34mRTSTRUCT_SeriesUID-99068[0m/
│   ├── [01;34mPatient-LUNG1-027[0m/
│   │   └── [01;34mStudyUID-35913[0m/
│   │       ├── [01;34mCT_SeriesUID-45865[0m/
│   │       └── [01;34mRTSTRUCT_SeriesUID-63878[0m/
│   ├── [01;34mPatient-LUNG1-101[0m/
│   │   └── [01;34mStudyUID-27911[0m/
│   │       ├── [01;34mCT_SeriesUID-55665[0m/
│   │       └── [01;34mRTSTRUCT_SeriesUID-25865[0m/
│   ├── [01;34mPatient-LUNG1-108[0m/
│   │   └── [01;34mStudyUID-62453[0m/
│   │       ├── [01;34mCT_SeriesUID-81484[0m/
│   │       └── [01;34mRTSTRUCT_SeriesUID-99496[0m/
│   ├── [01;34mPatient-LUNG1-162[0m/
│   │   └── [01;34mStudyUID-21249[0m/
│   │       ├── [01;34mCT_SeriesUID-72433[0m/
│   │       └── [01;34mRTSTRUCT_SeriesUID-38612[0m/
│   ├── [01;34mPatient-LUNG1-

In [5]:
neg_nifti_reader = BaseReader(
  root_directory=NIFTI_OUTPUT_DIR,
  filename_pattern=filename_format
)
files = neg_nifti_reader.files()


In [6]:
for f in files.itertuples():
  print(f)
  break

In [7]:
files.to_df()

Unnamed: 0_level_0,StudyInstanceUID,Modality,SeriesInstanceUID,IMAGE_ID,path
SubjectID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
7_LUNG1-342,33638,CT,01555,randomized_sampled_non_roi,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,01555,shuffled_full,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,01555,randomized_sampled_roi,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,01555,original,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,01555,randomized_sampled_full,data/images/nsclc_radiomics/niftis/SubjectID-7...
...,...,...,...,...,...
1_LUNG1-229,22809,CT,29880,original,data/images/nsclc_radiomics/niftis/SubjectID-1...
1_LUNG1-229,22809,CT,29880,randomized_sampled_full,data/images/nsclc_radiomics/niftis/SubjectID-1...
1_LUNG1-229,22809,CT,29880,shuffled_non_roi,data/images/nsclc_radiomics/niftis/SubjectID-1...
1_LUNG1-229,22809,CT,29880,shuffled_roi,data/images/nsclc_radiomics/niftis/SubjectID-1...


In [8]:
files.to_df().loc['7_LUNG1-342']

Unnamed: 0_level_0,StudyInstanceUID,Modality,SeriesInstanceUID,IMAGE_ID,path
SubjectID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
7_LUNG1-342,33638,CT,1555,randomized_sampled_non_roi,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,1555,shuffled_full,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,1555,randomized_sampled_roi,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,1555,original,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,1555,randomized_sampled_full,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,1555,shuffled_non_roi,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,CT,1555,shuffled_roi,data/images/nsclc_radiomics/niftis/SubjectID-7...
7_LUNG1-342,33638,RTSTRUCT,9318,GTV,data/images/nsclc_radiomics/niftis/SubjectID-7...


In [9]:
for subject in files:
  mask = subject.filter(Modality="RTSTRUCT").first()
  print(mask)
  print(mask.path)

  for ct in subject.filter(Modality="CT").itertuples():
    print(ct)
    print(ct.path)
    break
  break