diff --git a/.gitignore b/.gitignore
index f96cfe995..d5008ac6a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -299,3 +299,4 @@ test.xml
/docs/source/05_reference/_autosummary
/docs/source/05_reference/_autosummary
codex.md
+AGENTS.MD
diff --git a/src/navigate/model/data_sources/bdv_data_source.py b/src/navigate/model/data_sources/bdv_data_source.py
index 8618678bb..2fceb0a4a 100644
--- a/src/navigate/model/data_sources/bdv_data_source.py
+++ b/src/navigate/model/data_sources/bdv_data_source.py
@@ -70,9 +70,11 @@ def __init__(self, file_name: str = None, mode: str = "w") -> None:
Parameters
----------
file_name : str
- The name of the file to write to.
- mode : str
- The mode to open the file in. Must be "w" for write or "r" for read.
+ Path to the output. For BDV/HDF5 use a ".h5" file; for BDV/N5 use a
+ ".n5" file; for TIFF-based filelist export use a directory containing
+ ".tif" or ".tiff" files.
+ mode : {'w', 'r'}
+ Mode to open the file in. Must be 'w' for write (export) or 'r' for read.
"""
#: np.array: The image.
self.image = None
@@ -86,7 +88,8 @@ def __init__(self, file_name: str = None, mode: str = "w") -> None:
#: str: The file type.
self.__file_type = os.path.splitext(os.path.basename(file_name))[-1][1:].lower()
- if self.__file_type not in ["h5", "n5"]:
+ # Allow HDF5, N5, and TIFF (filelist) outputs for BigDataViewer metadata
+ if self.__file_type not in ["h5", "n5", "tif", "tiff"]:
error_statement = f"Unknown file type {self.__file_type}."
logger.error(error_statement)
raise ValueError(error_statement)
diff --git a/src/navigate/model/data_sources/tiff_data_source.py b/src/navigate/model/data_sources/tiff_data_source.py
index 30b812295..312e9fa88 100644
--- a/src/navigate/model/data_sources/tiff_data_source.py
+++ b/src/navigate/model/data_sources/tiff_data_source.py
@@ -35,6 +35,7 @@
import uuid
from pathlib import Path
import logging
+from typing import Dict, Any
# Third Party Imports
import tifffile
@@ -45,6 +46,7 @@
from .data_source import DataSource, DataReader
from ..metadata_sources.metadata import Metadata
from ..metadata_sources.ome_tiff_metadata import OMETIFFMetadata
+from ..metadata_sources.bdv_metadata import BigDataViewerMetadata
# Logger Setup
@@ -74,6 +76,8 @@ def __init__(
"""
#: np.ndarray: Image data
self.image = None
+
+ #: list: List of positions on a per-slice basis.
self._views = []
super().__init__(file_name=file_name, mode=mode)
@@ -81,19 +85,27 @@ def __init__(
#: str: Directory to save the data to.
self.save_directory = Path(self.file_name).parent
- # Is this an OME-TIFF?
- # TODO: check the header, rather than use the file extension
+ # Check if the file is OME-TIFF and create the appropriate metadata object
if self.file_name.endswith(".ome.tiff") or self.file_name.endswith(".ome.tif"):
+ #: bool: Is this an OME-TIFF file?
self._is_ome = True
+
#: Metadata: Metadata object
self.metadata = OMETIFFMetadata()
else:
+ #: bool: Is this an OME-TIFF file?
self._is_ome = False
+
+ # Metadata: Metadata object
self.metadata = Metadata()
+ #: BigDataViewerMetadata: Metadata for BigDataViewer
+ self.bdv_metadata = BigDataViewerMetadata()
+
+ #: bool: Is this a bigtiff file?
self._is_bigtiff = is_bigtiff
- # For file writing, do we assume all files end with tiff or tif?
+ #: bool: For file writing, do we assume all files end with tiff or tif?
self.__double_f = self.file_name.endswith("tiff")
# Keep track of z, time, channel indices
@@ -170,7 +182,7 @@ def get_data(
channel: int = 0,
z: int = -1,
resolution: int = 1,
- ) -> npt.ArrayLike:
+ ) -> npt.ArrayLike or None:
"""Get data according to timepoint, position, channel and z-axis id
Parameters
@@ -226,13 +238,18 @@ def write(self, data: npt.ArrayLike, **kw) -> None:
data : npt.ArrayLike
Data to write to file.
kw : dict
- Keyword arguments to pass to tifffile.imsave.
+ Keyword arguments to pass to tifffile.imsave. Includes stage coordinates
+ in format {'x': 11259.4, 'y': 11759.4, 'z': 68.0, 'theta': 0.0, 'f': 100.0}
"""
self.mode = "w"
+ # Get the current frame and position
c, z, self._current_time, self._current_position = self._cztp_indices(
self._current_frame, self.metadata.per_stack
- ) # find current channel
+ )
+
+ # If it is the first frame of the stack, create a new image file.
+ ome_xml = None
if z == 0:
if c == 0:
# Make sure we're set up for writing
@@ -241,10 +258,10 @@ def write(self, data: npt.ArrayLike, **kw) -> None:
ome_xml = self.metadata.to_xml(
c=c, t=self._current_time, file_name=self.file_name, uid=self.uid
).encode()
- else:
- ome_xml = None
if len(kw) > 0:
+ # On a per-stack basis, we store the stage coordinates.
+ # Resets for each stack.
self._views.append(kw)
if self.is_ome:
@@ -269,8 +286,7 @@ def write(self, data: npt.ArrayLike, **kw) -> None:
self._current_frame += 1
# Check if this was the last frame to write
- # print("Switch")
- c, z, _, _ = self._cztp_indices(self._current_frame, self.metadata.per_stack)
+ c, z, t, p = self._cztp_indices(self._current_frame, self.metadata.per_stack)
if (z == 0) and (c == 0):
self.close(True)
@@ -369,9 +385,49 @@ def close(self, internal=False) -> None:
)
else:
self.image.close()
+
if not internal:
self._closed = True
+ # Write the metadata to XML
+ if len(self._views) > 0:
+ self.bdv_metadata.write_xml(
+ os.path.join(self.save_directory, "dataset.xml"), self._views
+ )
+
+ def set_metadata_from_configuration_experiment(
+ self, configuration: Dict[str, Any], microscope_name: str = None
+ ) -> None:
+ """Sets the metadata from according to the microscope configuration.
+
+ Child method also provides information to the BigDataViewerMetadata.
+
+ Parameters
+ ----------
+ configuration : Dict[str, Any]
+ Configuration experiment.
+ microscope_name : str
+ The microscope name
+ """
+ self.metadata.active_microscope = microscope_name
+ self.metadata.configuration = configuration
+ self.get_shape_from_metadata()
+
+ self.bdv_metadata.active_microscope = microscope_name
+ self.bdv_metadata.configuration = configuration
+
+ def set_metadata(self, metadata_config: dict) -> None:
+ """Sets the metadata
+
+ Parameters
+ ----------
+ metadata_config : dict
+ shape configuration: "c", "z", "t", "p", "is_dynamic", "per_stack"
+ """
+ self.metadata.set_from_dict(metadata_config)
+ self.bdv_metadata.set_from_dict(metadata_config)
+ self.get_shape_from_metadata()
+
class TiffReader(DataReader):
def __init__(self, tiff_file: tifffile.TiffFile):
diff --git a/src/navigate/model/features/image_writer.py b/src/navigate/model/features/image_writer.py
index a593bd4a4..9b41778d6 100644
--- a/src/navigate/model/features/image_writer.py
+++ b/src/navigate/model/features/image_writer.py
@@ -139,6 +139,7 @@ def __init__(
# camera flip flags
if self.microscope_name is None:
self.microscope_name = self.model.active_microscope_name
+
camera_config = self.model.configuration["configuration"]["microscopes"][
self.microscope_name
]["camera"]
@@ -151,13 +152,14 @@ def __init__(
self.disk_space_check_interval = 60
#: int: Minimum disk space required in bytes.
- self.min_disk_space = 10 * 1024 * 1024 * 1024 # 10 GB
+ self.min_disk_space = 10 * 1024 * 1024 * 1024 # 10 GB
#: float: Time of last disk space check
self.last_disk_space_check = 0
#: bool: Flag to indicate if initialized before
self.initialized = False
+
# initialize saving
self.initialize_saving(sub_dir, image_name)
@@ -177,7 +179,10 @@ def save_image(self, frame_ids):
continue
# Check disk space at regular intervals to prevent running out of space
- if time.time() - self.last_disk_space_check > self.disk_space_check_interval:
+ if (
+ time.time() - self.last_disk_space_check
+ > self.disk_space_check_interval
+ ):
_, _, free = shutil.disk_usage(self.save_directory)
logger.info(f"Free Disk Space: {free / 1024 / 1024 / 1024} GB")
if free < self.min_disk_space:
@@ -349,9 +354,7 @@ def get_saving_file_name(self, sub_dir="", image_name=None):
os.makedirs(self.save_directory)
logger.debug(f"Save Directory Created - {self.save_directory}")
except (PermissionError, OSError, FileNotFoundError):
- logger.debug(
- f"Unable to Create Save Directory - {self.save_directory}"
- )
+ logger.debug(f"Unable to Create Save Directory - {self.save_directory}")
self.model.stop_acquisition = True
self.model.event_queue.put(
"warning",
@@ -378,6 +381,7 @@ def get_saving_file_name(self, sub_dir="", image_name=None):
def initialize_saving(self, sub_dir="", image_name=None):
+ # Check if previously initialized data source exists and close it
if self.data_source is not None:
self.data_source.close()
self.data_source = None
@@ -393,9 +397,7 @@ def initialize_saving(self, sub_dir="", image_name=None):
os.makedirs(self.mip_directory)
logger.debug(f"MIP Directory Created - {self.mip_directory}")
except (PermissionError, OSError, FileNotFoundError):
- logger.debug(
- f"Unable to Create MIP Directory - {self.mip_directory}"
- )
+ logger.debug(f"Unable to Create MIP Directory - {self.mip_directory}")
self.model.stop_acquisition = True
self.model.event_queue.put(
"warning",
diff --git a/src/navigate/model/metadata_sources/bdv_metadata.py b/src/navigate/model/metadata_sources/bdv_metadata.py
index b8cba543d..6af0f3b2d 100644
--- a/src/navigate/model/metadata_sources/bdv_metadata.py
+++ b/src/navigate/model/metadata_sources/bdv_metadata.py
@@ -51,23 +51,18 @@
class BigDataViewerMetadata(XMLMetadata):
"""Metadata for BigDataViewer files.
+ Supports HDF5 (.h5), N5 (.n5), and TIFF filelists (.tif/.tiff) via
+ the SPIM-reconstruction filelist loader (format="spimreconstruction.filelist").
+
Note
----
- XML spec in section 2.3 of https://arxiv.org/abs/1412.0488.
-
+ XML spec in section 2.3 of https://arxiv.org/abs/1412.0488.
"""
def __init__(self) -> None:
- """Initialize the BigDataViewer metadata object.
-
- Parameters
- ----------
- configuration : Optional[Dict[str, Any]]
- Configuration dictionary.
- """
+ """Initialize the BigDataViewer metadata object."""
super().__init__()
- # Affine Transform Parameters
#: bool: Shear the data.
self.shear_data = False
@@ -80,7 +75,6 @@ def __init__(self) -> None:
#: npt.NDArray: Shear transform matrix.
self.shear_transform = np.eye(3, 4)
- # Rotation Transform Parameters
#: bool: Rotate the data.
self.rotate_data = False
@@ -124,31 +118,32 @@ def get_affine_parameters(self, configuration):
def bdv_xml_dict(
self, file_name: Union[str, list, None], views: list, **kw
) -> dict:
- """Create a BigDataViewer XML dictionary from a list of views.
+ """Create a BigDataViewer XML metadata dictionary from a list of views.
Parameters
----------
- file_name : str
- The file name of the file to be written.
- views : list
- A list of dictionaries containing metadata for each view.
+ file_name : str or list of str
+ For HDF5/N5, the path to the dataset file (.h5 or .n5).
+ For TIFF, either a directory path containing .tif/.tiff files,
+ or an explicit list of TIFF file paths.
+ views : list of dict
+ A list of dictionaries containing stage/transform metadata for each view.
**kw
- Additional keyword arguments.
+ Additional keyword arguments (not used directly).
Returns
-------
dict
- A dictionary containing the XML metadata.
-
+ Nested dictionary representing the BigDataViewer XML structure.
"""
+ view_transforms = []
+
# Header
bdv_dict = {
"version": 0.2,
"BasePath": {"type": "relative", "text": "."},
"SequenceDescription": {},
}
-
- # File path
ext = os.path.basename(file_name).split(".")[-1]
if ext == "h5":
"""
@@ -162,25 +157,6 @@ def bdv_xml_dict(
"text": file_name,
}
- # TODO: Consider adding support for tiff/tif files. Needs evaluation.
- # elif ext == "tiff" or ext == "tif":
- # """
- # Need to iterate through the time points, etc.
- #
- # ArrayImgFactory
- # false
- #
- #
- # 1_CH00_000000.tif
- #
- #
- # 1_CH01_000000.tif
- #
- #
- #
- # """
- # pass
-
elif ext == "n5":
"""
@@ -193,6 +169,51 @@ def bdv_xml_dict(
"text": file_name,
}
+ else:
+ # File type is assumed to be TIFF or TIF
+ ext = "tif"
+ """
+
+ false
+
+
+ Position0/CH00_000000.tiff
+
+ ...
+
+ Position11/CH00_000000.tiff
+
+
+
+ """
+ # Iterate through FileMapping and populate the file paths
+ file_mapping = []
+ view_id = 0
+
+ for c in range(self.shape_c):
+ for pos in range(self.positions - 1):
+ file_mapping.append(
+ {
+ "view_setup": str(view_id),
+ "timepoint": "0",
+ "series": "0",
+ "channel": str(c),
+ "file": {
+ "type": "relative",
+ "text": f"Position{pos}/CH{c:02d}_000000.tiff",
+ },
+ }
+ )
+ view_id += 1
+
+ loader = {
+ "format": "spimreconstruction.filemap2",
+ "ZGrouped": {"text": "false"},
+ "files": {"FileMapping": file_mapping},
+ }
+
+ bdv_dict["SequenceDescription"]["ImageLoader"] = loader
+
# Calculate shear and rotation transforms
self.bdv_shear_transform()
self.bdv_rotate_transform()
@@ -200,6 +221,7 @@ def bdv_xml_dict(
# Populate ViewSetups
bdv_dict["SequenceDescription"]["ViewSetups"] = {}
bdv_dict["SequenceDescription"]["ViewSetups"]["ViewSetup"] = []
+
# Attributes are necessary for BigStitcher
bdv_dict["SequenceDescription"]["ViewSetups"]["Attributes"] = [
{
@@ -210,6 +232,7 @@ def bdv_xml_dict(
{"name": "tile", "Tile": []},
{"name": "angle", "Angle": {"id": {"text": 0}, "name": {"text": 0}}},
]
+
# The actual loop that populates ViewSetup
view_id = 0
for c in range(self.shape_c):
@@ -219,13 +242,13 @@ def bdv_xml_dict(
"Channel"
].append(ch)
- for pos in range(self.positions):
+ for pos in range(self.positions - 1):
d = {
"id": {"text": view_id},
"name": {"text": view_id},
"size": {"text": f"{self.shape_x} {self.shape_y} {self.shape_z}"},
"voxelSize": {
- "unit": {"text": "um"},
+ "unit": {"text": "µm"},
"size": {"text": f"{self.dx} {self.dy} {self.dz}"},
},
"attributes": {
@@ -238,9 +261,10 @@ def bdv_xml_dict(
bdv_dict["SequenceDescription"]["ViewSetups"]["ViewSetup"].append(d)
view_id += 1
- # Finish up the Tile Attributes outside of the channels loop so we have
+
+ # Finish up the Tile Attributes outside the channels loop so we have
# one per tile
- for pos in range(self.positions):
+ for pos in range(self.positions - 1):
tile = {"id": {"text": str(pos)}, "name": {"text": str(pos)}}
bdv_dict["SequenceDescription"]["ViewSetups"]["Attributes"][2][
"Tile"
@@ -252,14 +276,18 @@ def bdv_xml_dict(
bdv_dict["SequenceDescription"]["Timepoints"]["last"] = {
"text": self.shape_t - 1
}
+ bdv_dict["SequenceDescription"]["MissingViews "] = {}
+ bdv_dict["SequenceDescription"]["BoundingBoxes "] = {}
# View registrations
bdv_dict["ViewRegistrations"] = {"ViewRegistration": []}
for t in range(self.shape_t):
- for p in range(self.positions):
+ for p in range(self.positions - 1):
for c in range(self.shape_c):
view_id = c * self.positions + p
- mat = np.zeros((3, 4), dtype=float)
+ mat = np.zeros(shape=(3, 4), dtype=float)
+
+ # if ext == "n5" or ext == "h5":
for z in range(self.shape_z):
matrix_id = (
z
@@ -269,7 +297,6 @@ def bdv_xml_dict(
)
# Construct centroid of volume matrix
- # print(matrix_id, views[matrix_id])
try:
mat += (
self.stage_positions_to_affine_matrix(
@@ -282,58 +309,65 @@ def bdv_xml_dict(
# an acquisition.
pass
- view_transforms = [
- {
- "type": "affine",
- "Name": "Translation to Regular Grid",
- "affine": {
- "text": " ".join([f"{x:.6f}" for x in mat.ravel()])
- },
- }
- ]
-
- if self.shear_data:
- view_transforms.append(
+ view_transforms = [
{
"type": "affine",
- "Name": "Shearing Transform",
+ "Name": {"text": "Translation to Regular Grid"},
"affine": {
- "text": " ".join(
- [
- f"{x:.6f}"
- for x in self.shear_transform.ravel()
- ]
- )
+ "text": " ".join([f"{x:.6f}" for x in mat.ravel()])
},
}
- )
+ ]
- if self.rotate_data:
- view_transforms.append(
- {
- "type": "affine",
- "Name": "Rotation Transform",
- "affine": {
- "text": " ".join(
- [
- f"{x:.6f}"
- for x in self.rotate_transform.ravel()
- ]
- )
- },
- }
- )
+ d = self.shear_and_rotate_transform(t, view_id, view_transforms)
- d = dict(timepoint=t, setup=view_id, ViewTransform=view_transforms)
+ if ext == "n5" or ext == "h5":
+ bdv_dict["ViewRegistrations"]["ViewRegistration"].append(d)
- bdv_dict["ViewRegistrations"]["ViewRegistration"].append(d)
+ else:
+ # Only add once per volume.
+ if view_id == 0:
+ bdv_dict["ViewRegistrations"][
+ "ViewRegistration"
+ ].append(d)
- bdv_dict["Misc"] = {
- "Entry": {"Key": "Note", "text": self.misc}
- }
+ # Add housekeeping metadata
+ bdv_dict["ViewInterestPoints "] = {}
+ bdv_dict["PointSpreadFunctions "] = {}
+ bdv_dict["StitchingResults "] = {}
+ bdv_dict["IntensityAdjustments "] = {}
+ bdv_dict["Misc"] = {"Entry": {"Key": "Note", "text": self.misc}}
return bdv_dict
+ def shear_and_rotate_transform(self, t, view_id, view_transforms):
+ if self.shear_data:
+ view_transforms.append(
+ {
+ "type": "affine",
+ "Name": "Shearing Transform",
+ "affine": {
+ "text": " ".join(
+ [f"{x:.6f}" for x in self.shear_transform.ravel()]
+ )
+ },
+ }
+ )
+ if self.rotate_data:
+ view_transforms.append(
+ {
+ "type": "affine",
+ "Name": "Rotation Transform",
+ "affine": {
+ "text": " ".join(
+ [f"{x:.6f}" for x in self.rotate_transform.ravel()]
+ )
+ },
+ }
+ )
+ d = dict(timepoint=t, setup=view_id, ViewTransform=view_transforms)
+ return d
+
def stage_positions_to_affine_matrix(
self, x: float, y: float, z: float, theta: float, f: Optional[float] = None
) -> npt.ArrayLike:
@@ -445,13 +479,18 @@ def parse_xml(self, root: Union[str, ET.Element]) -> tuple:
Parameters
----------
- root : Union[str, ET.Element]
- The root of the XML tree.
+ root : str or xml.etree.ElementTree.Element
+ Path to a BigDataViewer XML file or an ElementTree root element.
Returns
-------
- tuple
- A tuple containing the file path, setups, and transforms.
+ file_path : str or list of str
+ HDF5/N5 dataset path, or list of TIFF file paths if using the
+ SPIM-reconstruction filelist loader.
+ setups : list of str
+ View setup identifiers as strings.
+ transforms : list of array-like
+ List of affine transform matrices corresponding to each view.
"""
# Open the file, if present
@@ -466,16 +505,24 @@ def parse_xml(self, root: Union[str, ET.Element]) -> tuple:
# Check if we are loading a BigDataViewer hdf5
image_loader = root.find("SequenceDescription/ImageLoader")
- if image_loader.attrib["format"] not in ["bdv.hdf5", "bdv.n5"]:
- logger.error(f"Unknown Format: {image_loader.attrib['format']}.")
- raise NotImplementedError(
- f"Unknown format {image_loader.attrib['format']} failed to load."
- )
-
- # Parse the file path
- base_path = root.find("BasePath")
- file = root.find("SequenceDescription/ImageLoader/hdf5")
- file_path = os.path.join(base_path.text, file.text)
+ fmt = image_loader.attrib.get("format", "")
+ if fmt in ("bdv.hdf5", "bdv.n5"):
+ # HDF5 or N5 dataset loader
+ tag = "hdf5" if fmt == "bdv.hdf5" else "n5"
+ base_path = root.find("BasePath").text or "."
+ file = root.find(f"SequenceDescription/ImageLoader/{tag}")
+ file_path = os.path.join(base_path, file.text)
+ elif fmt == "spimreconstruction.filelist":
+ # SPIM-reconstruction TIFF filelist loader
+ base = root.find("BasePath").text or "."
+ files = []
+ for fm in root.findall("SequenceDescription/ImageLoader/files/FileMapping"):
+ fnode = fm.find("file")
+ files.append(os.path.join(base, fnode.text))
+ file_path = files
+ else:
+ logger.error(f"Unknown Format: {fmt}.")
+ raise NotImplementedError(f"Unknown format {fmt} failed to load.")
# Get setups. Each setup represents a visualisation data source in the viewer
# that provides one image volume per timepoint