Skip to content

Commit

Permalink
Map metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
pietsjoh committed Mar 7, 2023
1 parent 6742be8 commit b1eb88d
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 10 deletions.
220 changes: 210 additions & 10 deletions rsciio/hamamatsu/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# and https://ami.scripps.edu/software/mrctools/mrc_specification.php

import logging
import importlib.util
from pathlib import Path
from copy import deepcopy
from enum import IntEnum
Expand All @@ -32,6 +33,37 @@
_logger = logging.getLogger(__name__)
_logger.setLevel(10)


def _str2numeric(input, type):
"""Handle None-values when converting strings to float."""
if input is None:
return None
if type == "float":
return float(input)
elif type == "int":
return int(input)
else:
return None


def _str2bool(input):
if input == "-1":
return True
elif input == "0":
return False
else:
return None


def _remove_none_from_dict(dict_in):
"""Recursive removal of None-values from a dictionary."""
for key, value in list(dict_in.items()):
if isinstance(value, dict):
_remove_none_from_dict(value)
elif value is None:
del dict_in[key]


## < specifies little endian
TypeNames = {
"int8": "<i1", # byte int
Expand All @@ -46,7 +78,11 @@
"double": "<f8", # double (64)
}


## contradicting info in documentation
## 0: not used vs 8bit
## 1: 8bit vs compressed(not used)
## 3: 32bit, but claimed to be unused, however testfiles use 32bit
## rgb not used
class FileType(IntEnum):
bit8_itex = 0
compressed = 1
Expand All @@ -57,6 +93,7 @@ class FileType(IntEnum):
bit96_rgb = 13


## exists in testfiles
class ApplicationType(IntEnum):
application_hipic = 1
application_ta = 2
Expand Down Expand Up @@ -86,7 +123,7 @@ class CameraType(IntEnum):
C8800 = 27


## not in testfile
## not in testfiles
class CameraSubType(IntEnum):
C4880_00 = 1
C4880_60 = 2
Expand Down Expand Up @@ -128,13 +165,15 @@ class CameraSubType(IntEnum):
C8800_21 = 39


## exists in testfiles
class AcqMode(IntEnum):
live = 1
acquire = 2
photon_counting = 3
analog_integration = 4


## exists in file, doubled with FileType?
class DatType(IntEnum):
dat8 = 1
dat10 = 2
Expand All @@ -146,6 +185,7 @@ class DatType(IntEnum):
dat32 = 8


## GrabberType, exists in some files
class FrameGrabber(IntEnum):
grbNone = 0
grbAFG = 1
Expand All @@ -155,12 +195,14 @@ class FrameGrabber(IntEnum):
grbDCam = 5


## Grabber SubType, exists in testfiles, but has value 0 -> not in here
class AcquisitionMode(IntEnum):
amdig = 1
amvs = 2
cam_link = 3


## exists in testfiles
class LUTSize(IntEnum):
lut_size_8 = 1
lut_size_10 = 2
Expand All @@ -171,6 +213,7 @@ class LUTSize(IntEnum):
lut_size_16x = 9


## exists in testfiles
class LUTColor(IntEnum):
lut_color_bw = 1
lut_color_rainbow = 2
Expand All @@ -184,6 +227,7 @@ class LUTType(IntEnum):
lut_type_sigmoid = 3


## exists in testfiles
class Scaling_Type(IntEnum):
scaling_linear = 1
scaling_table = 2
Expand All @@ -195,13 +239,13 @@ def __init__(self, file, filesize, filename):
self._filesize = filesize
self._original_filename = filename

self.metadata = {}
self.original_metadata = {}

self.data, comment = self.parse_file()
self._map_comment(comment)
self.axes = self._read_calibration()
self._reshape_data()
self.metadata = self.map_metadata()

def __read_numeric(self, type, size=1, ret_array=False, convert=True):
if type not in TypeNames.keys():
Expand All @@ -216,8 +260,6 @@ def __read_numeric(self, type, size=1, ret_array=False, convert=True):
if type in ["uint8", "uint16", "uint32", "uint64"] and convert:
dt = "<i8"
data = data.astype(np.dtype(dt))
elif type == "char" and convert:
data = list(map(chr, data))
if size == 1 and not ret_array:
return data[0]
else:
Expand All @@ -233,22 +275,30 @@ def parse_file(self):
com_len = int(self.__read_numeric("int16"))
## IMPORTANT to convert int16 to int
## as int16 leads to problems when defining sizes of numpy arrays
## here data is read incorrectly
## -> data is read incorrectly
self.w_px = int(self.__read_numeric("int16"))
self.h_lines = int(self.__read_numeric("int16"))
header["offset_x"] = int(self.__read_numeric("int16"))
header["offset_y"] = int(self.__read_numeric("int16"))
file_type = self.__read_numeric("int16")
header["file_type"] = FileType(file_type).name
file_type = FileType(int(self.__read_numeric("int16"))).name
header["file_type"] = file_type
header["num_img"] = int.from_bytes(self._file_obj.read(3), "little")
header["num_channels"] = int(self.__read_numeric("int16"))
header["channel_number"] = int(self.__read_numeric("int16"))
header["timestamp"] = self.__read_numeric("double")
header["marker"] = self.__read_utf8(3)
header["additional_info"] = self.__read_utf8(29)
comment = self.__read_utf8(com_len)
## TODO: check dtype for different filetypes
data = self.__read_numeric("uint32", size=self.w_px * self.h_lines)
## TODO: implement rgb filetypes
if file_type == "bit8_itex":
dtype = "uint8"
elif file_type == "bit16":
dtype = "uint16"
elif file_type == "bit32":
dtype = "uint32"
else:
raise NotImplementedError(f"reading dtype: {dtype} not implemented")
data = self.__read_numeric(dtype, size=self.w_px * self.h_lines)
header["image_width_px"] = self.w_px
header["image_height_lines"] = self.h_lines
header["com_len"] = com_len
Expand Down Expand Up @@ -295,6 +345,7 @@ def _read_calibration(self):
x_cal_address, y_cal_address = self._get_scaling_entry(
scaling_md, "ScalingFile"
)
## TODO: unit us == µs or ns?
x_unit, y_unit = self._get_scaling_entry(scaling_md, "Unit")
x_type, y_type = map(int, self._get_scaling_entry(scaling_md, "Type"))
x_scale, y_scale = map(float, self._get_scaling_entry(scaling_md, "Scale"))
Expand Down Expand Up @@ -376,6 +427,155 @@ def _map_comment(self, comment):
result[k] = self._extract_entries_from_comment(v)
self.original_metadata.update({"Comment": result})

def _map_general_md(self):
general = {}
general["title"] = self._original_filename.split(".")[0]
general["original_filename"] = self._original_filename
try:
date = self.original_metadata["Comment"]["Application"]["Date"]
time = self.original_metadata["Comment"]["Application"]["Time"]
except KeyError: # pragma: no cover
pass # pragma: no cover
else:
date_split = date.split("/")
general["date"] = date_split[-1] + "-" + date_split[1] + "-" + date_split[0]
general["time"] = time.split(".")[0]
return general

def _map_signal_md(self):
signal = {}

if importlib.util.find_spec("lumispy") is None:
_logger.warning(
"Cannot find package lumispy, using BaseSignal as signal_type."
)
signal["signal_type"] = ""
else:
signal["signal_type"] = "Luminescence" # pragma: no cover

try:
quantity = self.original_metadata["Comment"]["Acquisition"]["ZAxisLabel"]
quantity_unit = self.original_metadata["Comment"]["Acquisition"][
"ZAxisUnit"
]
except KeyError: # pragma: no cover
pass # pragma: no cover
else:
if quantity_unit == "Count":
quantity_unit = "Counts"
signal["quantity"] = f"{quantity} ({quantity_unit})"
return signal

def _map_detector_md(self):
detector = {}
acq_dict = self.original_metadata.get("Comment", {}).get("Acquisition", {})

try:
exp_time_str = acq_dict["ExposureTime"]
except KeyError:
pass
else:
exp_time, exp_time_units = exp_time_str.split(" ")
exp_time = float(exp_time)
if exp_time_units == "s":
pass
elif exp_time_units == "ms":
exp_time /= 1000
else:
_logger.warning(f"integration_time is given in {exp_time_units}.")
detector["integration_time"] = exp_time

try:
binning_str = acq_dict["pntBinning"]
except KeyError:
pass
else:
detector["binning"] = tuple(map(int, binning_str.split(",")))

## TODO: areSource or areGRBScan for roi
try:
roi_str = acq_dict["areSource"]
except KeyError:
pass
else:
detector["sensor_roi"] = tuple(map(int, roi_str.split(",")))

detector["processing"] = {
"shading_correction": _str2bool(acq_dict.get("ShadingCorr")),
"background_correction": _str2bool(acq_dict.get("BacksubCorr")),
"curvature_correction": _str2bool(acq_dict.get("CurveCorr")),
"defect_correction": _str2bool(acq_dict.get("DefectCorrection")),
}
## TODO: NrExposure == frames?
detector["frames"] = _str2numeric(acq_dict.get("NrExposure"), "int")
detector["detector_type"] = "StreakCamera"
detector["model"] = (
self.original_metadata.get("Comment", {})
.get("Streak camera", {})
.get("DeviceName")
)
return detector

def _map_laser_md(self):
## TODO: is there any information on this?
pass

def _map_spectrometer_md(self):
spectrometer = {}
spectro_dict = self.original_metadata.get("Comment", {}).get("Spectrograph", {})
## TODO: use Ruling as an alternative?
try:
groove_density_str = spectro_dict["Grating"]
except KeyError:
groove_density = None
else:
groove_density, groove_density_units = groove_density_str.split(" ")
groove_density = _str2numeric(groove_density, "int")
if groove_density_units != "g/mm":
_logger.warning(f"groove_density is given in {groove_density_units}")
spectrometer["Grating"] = {
"blazing_wavelength": _str2numeric(spectro_dict.get("Blaze"), "int"),
"groove_density": groove_density,
}
spectrometer["model"] = spectro_dict.get("DeviceName")
try:
entrance_slit_width = _str2numeric(
spectro_dict["Side Ent. Slitw."], "float"
)
except KeyError:
pass
else:
spectrometer["entrance_slit_width"] = entrance_slit_width
## TODO: units?, side entry iris?
## TODO: wavelength -> laser_wavelength, central_wavelength?
spectrometer["central_wavelength"] = _str2numeric(
spectro_dict.get("Wavelength"), "float"
)
return spectrometer

def map_metadata(self):
"""Maps original_metadata to metadata."""
general = self._map_general_md()
signal = self._map_signal_md()
detector = self._map_detector_md()
laser = self._map_laser_md()
spectrometer = self._map_spectrometer_md()

acquisition_instrument = {
"Detector": detector,
"Laser": laser,
"Spectrometer": spectrometer,
}

metadata = {
"Acquisition_instrument": acquisition_instrument,
"General": general,
"Signal": signal,
}
_remove_none_from_dict(metadata)
## TODO: any other important metadata (MCP Gain, Time Range?)
return metadata


def file_reader(filename, lazy=False, **kwds):
"""Reads Hamamatsu's ``.img`` file.
Expand Down

0 comments on commit b1eb88d

Please sign in to comment.