Skip to content

Commit

Permalink
FEAT: SPE plugin: Support new SDT-control metadata (#989)
Browse files Browse the repository at this point in the history
  • Loading branch information
lschr committed May 25, 2023
1 parent f201d0a commit c93536c
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 69 deletions.
156 changes: 100 additions & 56 deletions imageio/plugins/spe.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,38 +253,43 @@ def __init__(
self.cvt = cvt
self.scale = scale

comments = {
"sdt_major_version": CommentDesc(4, slice(66, 68), int),
"sdt_minor_version": CommentDesc(4, slice(68, 70), int),
"sdt_controller_name": CommentDesc(4, slice(0, 6), str),
"exposure_time": CommentDesc(1, slice(64, 73), float, 10**-6),
"color_code": CommentDesc(4, slice(10, 14), str),
"detection_channels": CommentDesc(4, slice(15, 16), int),
"background_subtraction": CommentDesc(4, 14, lambda x: x == "B"),
"em_active": CommentDesc(4, 32, lambda x: x == "E"),
"em_gain": CommentDesc(4, slice(28, 32), int),
"modulation_active": CommentDesc(4, 33, lambda x: x == "A"),
"pixel_size": CommentDesc(4, slice(25, 28), float, 0.1),
"sequence_type": CommentDesc(
4, slice(6, 10), lambda x: __class__.sequence_types[x]
),
"grid": CommentDesc(4, slice(16, 25), float, 10**-6),
"n_macro": CommentDesc(1, slice(0, 4), int),
"delay_macro": CommentDesc(1, slice(10, 19), float, 10**-3),
"n_mini": CommentDesc(1, slice(4, 7), int),
"delay_mini": CommentDesc(1, slice(19, 28), float, 10**-6),
"n_micro": CommentDesc(1, slice(7, 10), int),
"delay_micro": CommentDesc(1, slice(28, 37), float, 10**-6),
"n_subpics": CommentDesc(1, slice(7, 10), int),
"delay_shutter": CommentDesc(1, slice(73, 79), float, 10**-6),
"delay_prebleach": CommentDesc(1, slice(37, 46), float, 10**-6),
"bleach_time": CommentDesc(1, slice(46, 55), float, 10**-6),
"recovery_time": CommentDesc(1, slice(55, 64), float, 10**-6),
comment_fields = {
(5, 0): {
"sdt_major_version": CommentDesc(4, slice(66, 68), int),
"sdt_minor_version": CommentDesc(4, slice(68, 70), int),
"sdt_controller_name": CommentDesc(4, slice(0, 6), str),
"exposure_time": CommentDesc(1, slice(64, 73), float, 10**-6),
"color_code": CommentDesc(4, slice(10, 14), str),
"detection_channels": CommentDesc(4, slice(15, 16), int),
"background_subtraction": CommentDesc(4, 14, lambda x: x == "B"),
"em_active": CommentDesc(4, 32, lambda x: x == "E"),
"em_gain": CommentDesc(4, slice(28, 32), int),
"modulation_active": CommentDesc(4, 33, lambda x: x == "A"),
"pixel_size": CommentDesc(4, slice(25, 28), float, 0.1),
"sequence_type": CommentDesc(
4, slice(6, 10), lambda x: __class__.sequence_types[x]
),
"grid": CommentDesc(4, slice(16, 25), float, 10**-6),
"n_macro": CommentDesc(1, slice(0, 4), int),
"delay_macro": CommentDesc(1, slice(10, 19), float, 10**-3),
"n_mini": CommentDesc(1, slice(4, 7), int),
"delay_mini": CommentDesc(1, slice(19, 28), float, 10**-6),
"n_micro": CommentDesc(1, slice(7, 10), int),
"delay_micro": CommentDesc(1, slice(28, 37), float, 10**-6),
"n_subpics": CommentDesc(1, slice(7, 10), int),
"delay_shutter": CommentDesc(1, slice(73, 79), float, 10**-6),
"delay_prebleach": CommentDesc(1, slice(37, 46), float, 10**-6),
"bleach_time": CommentDesc(1, slice(46, 55), float, 10**-6),
"recovery_time": CommentDesc(1, slice(55, 64), float, 10**-6),
},
(5, 1): {
"bleach_piezo_active": CommentDesc(4, slice(34, 35), lambda x: x == "z")
},
}

@staticmethod
def parse_comments(comments: Sequence[str]) -> Union[Dict, None]:
"""Extract SDT-control metadata from comments
def get_comment_version(comments: Sequence[str]) -> Tuple[int, int]:
"""Get the version of SDT-control metadata encoded in the comments
Parameters
----------
Expand All @@ -293,26 +298,62 @@ def parse_comments(comments: Sequence[str]) -> Union[Dict, None]:
Returns
-------
If SDT-control comments were detected, return a dict of metadata, else
`None`.
Major and minor version. ``-1, -1`` if detection failed.
"""
sdt_md = {}
if comments[4][70:] != "COMVER0500":
logger.debug("SDT-control comments not found.")
return None
if comments[4][70:76] != "COMVER":
return -1, -1
try:
return int(comments[4][76:78]), int(comments[4][78:80])
except ValueError:
return -1, -1

@staticmethod
def parse_comments(
comments: Sequence[str], version: Tuple[int, int]
) -> Dict[str, Any]:
"""Extract SDT-control metadata from comments
Parameters
----------
comments
List of SPE file comments, typically ``metadata["comments"]``.
version
Major and minor version of SDT-control metadata format
Returns
-------
Dict of metadata
"""
sdt_md = {}
for name, spec in SDTControlSpec.comments.items():
for minor in range(version[1] + 1):
# Metadata with same major version is backwards compatible.
# Fields are specified incrementally in `comment_fields`.
# E.g. if the file has version 5.01, `comment_fields[5, 0]` and
# `comment_fields[5, 1]` need to be decoded.
try:
v = spec.cvt(comments[spec.n][spec.slice])
if spec.scale is not None:
v *= spec.scale
sdt_md[name] = v
except Exception as e:
logger.debug(
f"Failed to decode SDT-control metadata field `{name}`: {e}"
)
sdt_md[name] = None
cmt = __class__.comment_fields[version[0], minor]
except KeyError:
continue
for name, spec in cmt.items():
try:
v = spec.cvt(comments[spec.n][spec.slice])
if spec.scale is not None:
v *= spec.scale
sdt_md[name] = v
except Exception as e:
warnings.warn(
f"Failed to decode SDT-control metadata field `{name}`: {e}"
)
sdt_md[name] = None
if version not in __class__.comment_fields:
supported_ver = ", ".join(
map(lambda x: f"{x[0]}.{x[1]:02}", __class__.comment_fields)
)
warnings.warn(
f"Unsupported SDT-control metadata version {version[0]}.{version[1]:02}. "
f"Only versions {supported_ver} are supported. "
"Some or all SDT-control metadata may be missing."
)
comment = comments[0] + comments[2]
sdt_md["comment"] = comment.strip()
return sdt_md
Expand Down Expand Up @@ -359,10 +400,13 @@ def extract_metadata(meta: Mapping, char_encoding: str = "latin1"):
char_encoding
Character encoding used to decode strings in the metadata.
"""
sdt_meta = __class__.parse_comments(meta["comments"])
if not sdt_meta:
comver = __class__.get_comment_version(meta["comments"])
if any(c < 0 for c in comver):
# This file most likely was not created by SDT-control
logger.debug("SDT-control comments not found.")
return
# This file has SDT-control metadata

sdt_meta = __class__.parse_comments(meta["comments"], comver)
meta.pop("comments")
meta.update(sdt_meta)

Expand All @@ -378,7 +422,7 @@ def extract_metadata(meta: Mapping, char_encoding: str = "latin1"):
meta["modulation_script"] = sp4.decode(char_encoding)
meta.pop("spare_4")
except UnicodeDecodeError:
logger.warning(
warnings.warn(
"Failed to decode SDT-control laser "
"modulation script. Bad char_encoding?"
)
Expand Down Expand Up @@ -460,12 +504,9 @@ def __init__(
line = data_end - Spec.data_start
line //= self._shape[0] * self._shape[1] * self._dtype.itemsize
if line != self._len:
logger.warning(
"The file header of %s claims there are %s frames, "
"but there are actually %s frames.",
self.request.filename,
self._len,
line,
warnings.warn(
f"The file header of {self.request.filename} claims there are "
f"{self._len} frames, but there are actually {line} frames."
)
self._len = min(line, self._len)
self._file.seek(Spec.data_start)
Expand Down Expand Up @@ -718,6 +759,9 @@ def metadata(
modulation_script : str
(only for files created by SDT-control)
Laser modulation script. Replaces the "spare_4" key.
bleach_piezo_active : bool
(only for files created by SDT-control)
Whether piezo for bleaching was enabled
"""

if self._file_header_ver < 3:
Expand Down Expand Up @@ -869,7 +913,7 @@ def _parse_header(
try:
v = decode(v)
except Exception:
logger.warning(
warnings.warn(
f'Failed to decode "{name}" metadata '
"string. Check `char_encoding` parameter."
)
Expand Down
80 changes: 67 additions & 13 deletions tests/test_spe.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ def test_metadata(test_images, tmp_path):
assert md.get("sdt_minor_version") == 18
assert isinstance(md.get("modulation_script"), str)
assert md.get("sequence_type") == "standard"
assert "bleach_piezo_active" not in md

patched = tmp_path / fname.name
shutil.copy(fname, patched)
with patched.open("r+b") as f:
# Set SDT-control comment version to 0400. In this case, comments
# should not be parsed.
f.seek(597)
f.write(b"4")
# Scramble "COMVER" in the comments. In this case, SDT-control metadata
# should not be extracted.
f.seek(595)
f.write(b"x")
# Add something to `geometric`
f.seek(600)
f.write(struct.pack("<H", 7))
Expand All @@ -107,7 +108,7 @@ def test_metadata(test_images, tmp_path):
f.write(struct.pack("<H", 2))

patched_cmt = cmt.copy()
patched_cmt[4] = patched_cmt[4].replace("COMVER0500", "COMVER0400")
patched_cmt[4] = patched_cmt[4].replace("COMVER0500", "COMVEx0500")

patched_meta = iio.immeta(patched, sdt_control=True)
# Parsing any SDT-control metadata should fail
Expand All @@ -132,7 +133,8 @@ def test_metadata(test_images, tmp_path):
f.seek(690)
f.write(b"\xff")

patched_meta = iio.immeta(patched, sdt_control=True, char_encoding="ascii")
with pytest.warns(UserWarning):
patched_meta = iio.immeta(patched, sdt_control=True, char_encoding="ascii")
# Decoding `sequence_type` should fail
assert patched_meta.get("sequence_type", "") is None
# Decoding `date` and `time_local` into `datetime` should fail
Expand All @@ -150,6 +152,48 @@ def test_metadata(test_images, tmp_path):
).metadata()
assert patched_meta2.get("sequence_type", "") is None

shutil.copy(fname, patched)
with patched.open("r+b") as f:
# Scramble version numbers following "COMVER" in the comments. In this
# case, SDT-control metadata should not be extracted.
f.seek(597)
f.write(b"x")

patched_cmt = cmt.copy()
patched_cmt[4] = patched_cmt[4].replace("COMVER0500", "COMVER0x00")

patched_meta = iio.immeta(patched, sdt_control=True)
# Parsing any SDT-control metadata should fail
assert patched_meta.get("comments") == patched_cmt
assert "delay_shutter" not in patched_meta

shutil.copy(fname, patched)
with patched.open("r+b") as f:
# Set SDT-control metadata version to 6.00. No metadata should be
# extracted and there should be a warning.
f.seek(597)
f.write(b"6")

with pytest.warns(UserWarning):
patched_meta = iio.immeta(patched, sdt_control=True)
# Parsing any SDT-control metadata should fail, only comment should be there
assert patched_meta.get("comment") == "OD 1.0 in r, g"
assert "delay_shutter" not in patched_meta

shutil.copy(fname, patched)
with patched.open("r+b") as f:
# Set SDT-control metadata version to 5.99. Metadata should still be
# extracted, but there should be a warning
f.seek(599)
f.write(b"99")

with pytest.warns(UserWarning):
patched_meta = iio.immeta(patched, sdt_control=True)
assert patched_meta.get("delay_shutter") == pytest.approx(0.001)
assert patched_meta.get("delay_macro") == pytest.approx(0.048)
assert patched_meta.get("exposure_time") == pytest.approx(0.002)
assert patched_meta.get("comment") == "OD 1.0 in r, g"

shutil.copy(fname, patched)
with patched.open("r+b") as f:
# Create a fake SPE v3 file
Expand All @@ -165,6 +209,15 @@ def test_metadata(test_images, tmp_path):
assert patched_meta == {"__xml": b"Here could be your XML."}


def test_sdtcontrol_v0501(test_images):
fname = test_images / "test_v0501_000_.SPE"
md = iio.immeta(fname)
assert md.get("bleach_piezo_active") is False
assert md.get("sdt_major_version") == 3
assert md.get("sdt_minor_version") == 4
assert md.get("datetime") == datetime(2023, 5, 3, 15, 13, 16)


def test_properties(test_images, tmp_path):
fname = test_images / "test_000_.SPE"
props0 = iio.improps(fname, index=0)
Expand Down Expand Up @@ -194,10 +247,11 @@ def test_properties(test_images, tmp_path):
).n_images
== 3
)
with iio.imopen(patched, "r", check_filesize=True) as img:
assert (
img.properties(
index=...,
).n_images
== 2
)
with pytest.warns():
with iio.imopen(patched, "r", check_filesize=True) as img:
assert (
img.properties(
index=...,
).n_images
== 2
)

0 comments on commit c93536c

Please sign in to comment.