From b1eb88d2ffa0bd56bbf883350fc11d3357011469 Mon Sep 17 00:00:00 2001 From: pietsjoh Date: Tue, 7 Mar 2023 12:19:12 +0100 Subject: [PATCH] Map metadata --- rsciio/hamamatsu/_api.py | 220 +++++++++++++++++++++++++++++++-- rsciio/tests/test_hamamatsu.py | 39 ++++++ 2 files changed, 249 insertions(+), 10 deletions(-) diff --git a/rsciio/hamamatsu/_api.py b/rsciio/hamamatsu/_api.py index d58f41d81..bde6a93dc 100644 --- a/rsciio/hamamatsu/_api.py +++ b/rsciio/hamamatsu/_api.py @@ -21,6 +21,7 @@ # and https://ami.scripps.edu/software/mrctools/mrc_specification.php import logging +import importlib.util from pathlib import Path from copy import deepcopy from enum import IntEnum @@ -32,6 +33,37 @@ _logger = logging.getLogger(__name__) _logger.setLevel(10) + +def _str2numeric(input, type): + """Handle None-values when converting strings to float.""" + if input is None: + return None + if type == "float": + return float(input) + elif type == "int": + return int(input) + else: + return None + + +def _str2bool(input): + if input == "-1": + return True + elif input == "0": + return False + else: + return None + + +def _remove_none_from_dict(dict_in): + """Recursive removal of None-values from a dictionary.""" + for key, value in list(dict_in.items()): + if isinstance(value, dict): + _remove_none_from_dict(value) + elif value is None: + del dict_in[key] + + ## < specifies little endian TypeNames = { "int8": " not in here class AcquisitionMode(IntEnum): amdig = 1 amvs = 2 cam_link = 3 +## exists in testfiles class LUTSize(IntEnum): lut_size_8 = 1 lut_size_10 = 2 @@ -171,6 +213,7 @@ class LUTSize(IntEnum): lut_size_16x = 9 +## exists in testfiles class LUTColor(IntEnum): lut_color_bw = 1 lut_color_rainbow = 2 @@ -184,6 +227,7 @@ class LUTType(IntEnum): lut_type_sigmoid = 3 +## exists in testfiles class Scaling_Type(IntEnum): scaling_linear = 1 scaling_table = 2 @@ -195,13 +239,13 @@ def __init__(self, file, filesize, filename): self._filesize = filesize self._original_filename = filename - self.metadata = {} self.original_metadata = {} self.data, comment = self.parse_file() self._map_comment(comment) self.axes = self._read_calibration() self._reshape_data() + self.metadata = self.map_metadata() def __read_numeric(self, type, size=1, ret_array=False, convert=True): if type not in TypeNames.keys(): @@ -216,8 +260,6 @@ def __read_numeric(self, type, size=1, ret_array=False, convert=True): if type in ["uint8", "uint16", "uint32", "uint64"] and convert: dt = " data is read incorrectly self.w_px = int(self.__read_numeric("int16")) self.h_lines = int(self.__read_numeric("int16")) header["offset_x"] = int(self.__read_numeric("int16")) header["offset_y"] = int(self.__read_numeric("int16")) - file_type = self.__read_numeric("int16") - header["file_type"] = FileType(file_type).name + file_type = FileType(int(self.__read_numeric("int16"))).name + header["file_type"] = file_type header["num_img"] = int.from_bytes(self._file_obj.read(3), "little") header["num_channels"] = int(self.__read_numeric("int16")) header["channel_number"] = int(self.__read_numeric("int16")) @@ -247,8 +289,16 @@ def parse_file(self): header["marker"] = self.__read_utf8(3) header["additional_info"] = self.__read_utf8(29) comment = self.__read_utf8(com_len) - ## TODO: check dtype for different filetypes - data = self.__read_numeric("uint32", size=self.w_px * self.h_lines) + ## TODO: implement rgb filetypes + if file_type == "bit8_itex": + dtype = "uint8" + elif file_type == "bit16": + dtype = "uint16" + elif file_type == "bit32": + dtype = "uint32" + else: + raise NotImplementedError(f"reading dtype: {dtype} not implemented") + data = self.__read_numeric(dtype, size=self.w_px * self.h_lines) header["image_width_px"] = self.w_px header["image_height_lines"] = self.h_lines header["com_len"] = com_len @@ -295,6 +345,7 @@ def _read_calibration(self): x_cal_address, y_cal_address = self._get_scaling_entry( scaling_md, "ScalingFile" ) + ## TODO: unit us == µs or ns? x_unit, y_unit = self._get_scaling_entry(scaling_md, "Unit") x_type, y_type = map(int, self._get_scaling_entry(scaling_md, "Type")) x_scale, y_scale = map(float, self._get_scaling_entry(scaling_md, "Scale")) @@ -376,6 +427,155 @@ def _map_comment(self, comment): result[k] = self._extract_entries_from_comment(v) self.original_metadata.update({"Comment": result}) + def _map_general_md(self): + general = {} + general["title"] = self._original_filename.split(".")[0] + general["original_filename"] = self._original_filename + try: + date = self.original_metadata["Comment"]["Application"]["Date"] + time = self.original_metadata["Comment"]["Application"]["Time"] + except KeyError: # pragma: no cover + pass # pragma: no cover + else: + date_split = date.split("/") + general["date"] = date_split[-1] + "-" + date_split[1] + "-" + date_split[0] + general["time"] = time.split(".")[0] + return general + + def _map_signal_md(self): + signal = {} + + if importlib.util.find_spec("lumispy") is None: + _logger.warning( + "Cannot find package lumispy, using BaseSignal as signal_type." + ) + signal["signal_type"] = "" + else: + signal["signal_type"] = "Luminescence" # pragma: no cover + + try: + quantity = self.original_metadata["Comment"]["Acquisition"]["ZAxisLabel"] + quantity_unit = self.original_metadata["Comment"]["Acquisition"][ + "ZAxisUnit" + ] + except KeyError: # pragma: no cover + pass # pragma: no cover + else: + if quantity_unit == "Count": + quantity_unit = "Counts" + signal["quantity"] = f"{quantity} ({quantity_unit})" + return signal + + def _map_detector_md(self): + detector = {} + acq_dict = self.original_metadata.get("Comment", {}).get("Acquisition", {}) + + try: + exp_time_str = acq_dict["ExposureTime"] + except KeyError: + pass + else: + exp_time, exp_time_units = exp_time_str.split(" ") + exp_time = float(exp_time) + if exp_time_units == "s": + pass + elif exp_time_units == "ms": + exp_time /= 1000 + else: + _logger.warning(f"integration_time is given in {exp_time_units}.") + detector["integration_time"] = exp_time + + try: + binning_str = acq_dict["pntBinning"] + except KeyError: + pass + else: + detector["binning"] = tuple(map(int, binning_str.split(","))) + + ## TODO: areSource or areGRBScan for roi + try: + roi_str = acq_dict["areSource"] + except KeyError: + pass + else: + detector["sensor_roi"] = tuple(map(int, roi_str.split(","))) + + detector["processing"] = { + "shading_correction": _str2bool(acq_dict.get("ShadingCorr")), + "background_correction": _str2bool(acq_dict.get("BacksubCorr")), + "curvature_correction": _str2bool(acq_dict.get("CurveCorr")), + "defect_correction": _str2bool(acq_dict.get("DefectCorrection")), + } + ## TODO: NrExposure == frames? + detector["frames"] = _str2numeric(acq_dict.get("NrExposure"), "int") + detector["detector_type"] = "StreakCamera" + detector["model"] = ( + self.original_metadata.get("Comment", {}) + .get("Streak camera", {}) + .get("DeviceName") + ) + return detector + + def _map_laser_md(self): + ## TODO: is there any information on this? + pass + + def _map_spectrometer_md(self): + spectrometer = {} + spectro_dict = self.original_metadata.get("Comment", {}).get("Spectrograph", {}) + ## TODO: use Ruling as an alternative? + try: + groove_density_str = spectro_dict["Grating"] + except KeyError: + groove_density = None + else: + groove_density, groove_density_units = groove_density_str.split(" ") + groove_density = _str2numeric(groove_density, "int") + if groove_density_units != "g/mm": + _logger.warning(f"groove_density is given in {groove_density_units}") + spectrometer["Grating"] = { + "blazing_wavelength": _str2numeric(spectro_dict.get("Blaze"), "int"), + "groove_density": groove_density, + } + spectrometer["model"] = spectro_dict.get("DeviceName") + try: + entrance_slit_width = _str2numeric( + spectro_dict["Side Ent. Slitw."], "float" + ) + except KeyError: + pass + else: + spectrometer["entrance_slit_width"] = entrance_slit_width + ## TODO: units?, side entry iris? + ## TODO: wavelength -> laser_wavelength, central_wavelength? + spectrometer["central_wavelength"] = _str2numeric( + spectro_dict.get("Wavelength"), "float" + ) + return spectrometer + + def map_metadata(self): + """Maps original_metadata to metadata.""" + general = self._map_general_md() + signal = self._map_signal_md() + detector = self._map_detector_md() + laser = self._map_laser_md() + spectrometer = self._map_spectrometer_md() + + acquisition_instrument = { + "Detector": detector, + "Laser": laser, + "Spectrometer": spectrometer, + } + + metadata = { + "Acquisition_instrument": acquisition_instrument, + "General": general, + "Signal": signal, + } + _remove_none_from_dict(metadata) + ## TODO: any other important metadata (MCP Gain, Time Range?) + return metadata + def file_reader(filename, lazy=False, **kwds): """Reads Hamamatsu's ``.img`` file. diff --git a/rsciio/tests/test_hamamatsu.py b/rsciio/tests/test_hamamatsu.py index 6a4910514..4c45c902a 100644 --- a/rsciio/tests/test_hamamatsu.py +++ b/rsciio/tests/test_hamamatsu.py @@ -271,6 +271,45 @@ def test_original_metadata_comment(self): assert expected_metadata == original_metadata + def test_metadata(self): + metadata = self.s.metadata + + detector = self.s.metadata.Acquisition_instrument.Detector + spectrometer = self.s.metadata.Acquisition_instrument.Spectrometer + + assert metadata.General.date == "2023-01-20" + assert metadata.General.time == "11:34:55" + assert metadata.General.original_filename == "operate_mode.img" + assert metadata.General.title == metadata.General.original_filename[:-4] + + assert metadata.Signal.quantity == "Intensity (Counts)" + assert metadata.Signal.signal_type == "" + + assert isinstance(detector.binning, tuple) + assert len(detector.binning) == 2 + assert detector.binning[0] == 2 + assert detector.binning[1] == 2 + assert detector.detector_type == "StreakCamera" + assert detector.model == "C5680" + assert detector.frames == 60 + np.testing.assert_allclose(detector.integration_time, 5) + assert isinstance(detector.sensor_roi, tuple) + assert len(detector.sensor_roi) == 4 + assert detector.sensor_roi[0] == 0 + assert detector.sensor_roi[1] == 0 + assert detector.sensor_roi[2] == 672 + assert detector.sensor_roi[3] == 512 + assert detector.processing.background_correction == True + assert detector.processing.curvature_correction == False + assert detector.processing.defect_correction == False + assert detector.processing.shading_correction == False + + np.testing.assert_allclose(spectrometer.entrance_slit_width, 10) + assert spectrometer.model == "Andor SG" + assert spectrometer.Grating.blazing_wavelength == 500 + assert spectrometer.Grating.groove_density == 300 + np.testing.assert_allclose(spectrometer.central_wavelength, 500) + class TestFocus: @classmethod