# Overview

Interactive development for the data loading function/class.

In [1]:
import panel as pn
pn.extension()

In [2]:
import re
import logging
import param
import dxchange
import multiprocessing
import numpy as np
from pathlib import Path
from functools import partial
from typing import Optional, Tuple, List
from dxchange.reader import read_tiff
from multiprocessing.managers import SharedMemoryManager
from tqdm.contrib.concurrent import process_map

olefile module not found


In [3]:
# setup module logger
logger = param.get_logger()

In [4]:
handler = logging.StreamHandler()
formatter = logging.Formatter(
        '%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
# replace default hanler
logger.handlers = [handler]
logger.name = "imars3d.backend.io.data.load_data"

In [5]:
logger.setLevel(logging.DEBUG)

In [6]:
class load_data(param.ParameterizedFunction):
    """
    Load data with given input
    
    Parameters
    ---------
    ct_files: str
        explicit list of radiographs
    ob_files: str
        explicit list of open beams
    dc_files: Optional[str]
        explicit list of dark current
    ct_dir: str
        directory contains radiographs
    ob_dir: str
        directory contains open beams
    dc_dir: Optional[str]
        directory contains dark currents
    ct_regex: Optional[str]
        regular expression for down selecting radiographs
    ob_regex: Optional[str]
        regular expression for down selecting open beams
    dc_regex: Optional[str]
        regular expression for down selecting dark current
    max_workers: Optional[int]
        maximum number of processes allowed during loading, default to use as many as possible.

    Returns
    -------
        radiograph stacks, obs, dcs and omegas as numpy.ndarray

    Notes
    -----
        There are two main signatures to load the data:
        1. load_data(ct_files=ctfs, ob_files=obfs, dc_files=dcfs)
        2. load_data(ct_dir=ctdir, ob_dir=obdir, dc_dir=dcdir)

        The two signatures are mutually exclusive, and dc_files and dc_dir are optional
        in both cases as some experiments do not have dark current measurements.

        The regex selectors are applicable in both signature, which help to downselect
        files if needed. Default is set to "*", which selects everything.
        Also, if ob_regex and dc_regex are set to "None" in the second signature call, the
        data loader will attempt to read the metadata embeded in the ct file to find obs
        and dcs with similar metadata.

        Currently, we are using a forgiving reader to load the image where a corrupted file
        will not block reading other data.
    """
    #
    ct_files = param.List(doc="list of all ct files to load")
    ob_files = param.List(doc="list of all ob files to load")
    dc_files = param.List(doc="list of all dc files to load")
    #
    ct_dir = param.Foldername(doc="radiograph directory")
    ob_dir = param.Foldername(doc="open beam directory")
    dc_dir = param.Foldername(doc="dark current directory")
    # NOTE: we need to provide a default value here as param.String default to "", which will
    #       not trigger dict.get(key, value) to get the value as "" is not None.
    ct_regex = param.String(default="\b*", doc="regex for selecting ct files from ct_dir")
    ob_regex = param.String(default="\b*", doc="regex for selecting ob files from ob_dir")
    dc_regex = param.String(default="\b*", doc="regex for selecting dc files from dc_dir")
    # NOTE: 0 means use as many as possible
    max_workers = param.Integer(default=0, bounds=(0, None), doc="Maximum number of processes allowed during loading")

    def __call__(self, **params):
        """
        This makes the class behaves like a function.
        """
        # type*bounds check via Parameter
        _ = self.instance(**params)
        # sanitize arguments
        params = param.ParamOverrides(self, params)
        # type validation is done, now replacing max_worker with an actual interger
        self.max_workers = multiprocessing.cpu_count() - 2 if params.max_workers == 0 else params.max_workers
        logger.debug(f"max_worker={self.max_workers}")

        # multiple dispatch
        # NOTE:
        #    use set to simplify call signature checking
        sigs = set([k.split("_")[-1] for k in params.keys() if "regex" not in k])
        if sigs == {"files", "dir"}:
            logger.error("Files and dir cannot be used at the same time")
            raise ValueError("Mix usage of allowed signature.")
        elif sigs == {"files"}:
            logger.debug("Load by file list")
            ct, ob, dc = self._load_by_file_list(
                    ct_files=params.get("ct_files"),
                    ob_files=params.get("ob_files"),
                    dc_files=params.get("dc_files", []),  # it is okay to skip dc
                    ct_regex=params.get("ct_regex", "\b*"),  # incase None got leaked here
                    ob_regex=params.get("ob_regex", "\b*"),
                    dc_regex=params.get("dc_regex", "\b*"),
                )
        elif sigs == {"dir"}:
            logger.debug("Load by directory")
            ct, ob, dc = self._load_by_dir(
                    ct_dir=params.get("ct_dir"),
                    ob_dir=params.get("ob_dir"),
                    dc_dir=params.get("dc_dir", []),  # it is okay to skip dc
                    ct_regex=params.get("ct_regex", "\b*"),  # incase None got leaked here
                    ob_regex=params.get("ob_regex", "\b*"),
                    dc_regex=params.get("dc_regex", "\b*"),
                )
        else:
            logger.warning("Found unknown input arguments, ignoring.")

        # extracting omegas from
        # 1. filename
        # 2. metadata (only possible for Tiff)
        rot_angles = self._extract_rotation_angles(ct_files)

        # return everything
        return ct, ob, dc, rot_angles

    # use _func to avoid sphinx pulling it into docs
    def _forgiving_reader(self, filename, reader=None):
        """
        Skip corrupted file, but inform the user about the issue.
        """
        try:
            return reader(filename)
        except:
            logger.error(f"Cannot read {filename}, skipping.")
            return None

    # use _func to avoid sphinx pulling it into docs
    def _load_images(self, filelist, desc):
        """
        Load data via dxchange.
        """
        # figure out the file type and select corresponding reader from dxchange
        file_ext = Path(filelist[0]).suffix.lower()
        if file_ext in (".tif", ".tiff"):
            reader = dxchange.read_tiff
        elif file_ext == ".fits":
            reader = dxchange.read_fits
        else:
            logger.error(f"Unsupported file type: {file_ext}")
            raise ValueError("Unsupported file type.")
        # read the data into numpy array via map_process
        rst = process_map(
            partial(self._forgiving_reader, reader=reader),
            filelist,
            max_workers=self.max_workers,
            desc=desc,
        )
        # return the results
        return np.array([me for me in rst if me is not None])

    # use _func to avoid sphinx pulling it into docs
    def _load_by_file_list(
        self,
        ct_files: [str],
        ob_files: [str],
        dc_files: Optional[List[str]],
        ct_regex: Optional[str],
        ob_regex: Optional[str],
        dc_regex: Optional[str],
    ) -> Tuple[np.ndarray]:
        """
        Use provided list of files to load images into memory.
        """
        # empty list is not allowed
        if ct_files == []:
            logger.error("ct_files is [].")
            raise ValueError("ct_files cannot be empty list.")
        if ob_files == []:
            logger.error("ob_files is [].")
            raise ValueError("ob_files cannot be emoty list.")
        if dc_files == []:
            logger.warning("dc_files is [].")

        # explicit list is the most straight forward solution
        # -- radiograph
        re_ct = re.compile(ct_regex)
        ct = self._load_images(
            filelist=[ctf for ctf in ct_files if re_ct.match(ctf)],
            desc="ct",
        )
        # -- open beam
        re_ob = re.compile(ob_regex)
        ob = self._load_images(
            filelist=[obf for obf in ob_files if re_ob.match(obf)],
            desc="ob",
        )
        # -- dark current
        if dc_files == []:
            dc = None
        else:
            re_dc = re.compile(dc_regex)
            dc = self._load_images(
                filelist=[dcf for dcf in dc_files if re_dc.match(dcf)],
                desc="dc",
            )
        #
        return ct, ob, dc

    # use _func to avoid sphinx pulling it into docs
    def _load_by_dir(
        self,
        ct_dir: str,
        ob_dir: str,
        dc_dir: Optional[str],
        ct_regex: Optional[str],
        ob_regex: Optional[str],
        dc_regex: Optional[str],
    ) -> Tuple[np.ndarray]:
        """
        Use provided directory to load images into memory.
        """
        # happy path, all things provided and no need to check metadata
        # hard path, have to check the metadata to build the list of ob and dc
        pass

    # use _func to avoid sphinx pulling it into docs
    def _extract_rotation_angles(self, ct_files):
        """
        Return the rotation angles as a numpy array.
        """
        # from filename
        # from metadata
        raise NotImplementedError


In [None]:
import glob

ct_list = glob.glob("/home/8cz/tmp/HFIR/CG1D/IPTS-25777/raw/ct_scans/iron_man/*.tiff")
ob_list = glob.glob("/home/8cz/tmp/HFIR/CG1D/IPTS-25777/raw/ob/*.tiff")
dc_list = glob.glob("/home/8cz/tmp/HFIR/CG1D/IPTS-25777/raw/df/*.tiff")

In [None]:
# try to access folder that does not exist

ct, ob, dc = load_data(
    ct_files=ct_list,
    ob_files=ob_list,
    dc_files=dc_list,
)

In [None]:
ct.shape

In [None]:
ob.shape

In [None]:
dc.shape

In [None]:
ct, ob, dc = load_data(
    ct_files=ct_list,
    ob_files=ob_list,
)

In [None]:
ct.shape, ob.shape, dc

## Unit test

In [7]:
import pytest

In [8]:
def test_load_data_signature_error():
    # incorrect input type
    with pytest.raises(ValueError):
        load_data(ct_files=1, ob_files=[], dc_files=[])
    # mix usage of signature
    with pytest.raises(ValueError):
        load_data(ct_files=[], ob_files=[], dc_files=[], ct_dir="/tmp", ob_dir="/tmp")

test_load_data_signature_error()

2022-09-21 12:22:01,609 imars3d.backend.io.data.load_data DEBUG    max_worker=14
2022-09-21 12:22:01,610 imars3d.backend.io.data.load_data ERROR    Files and dir cannot be used at the same time


In [10]:
@pytest.fixture
def create_data_filelist():
    pass

def test_load_data_by_filelist(create_data_filelist):
    # error1: ct empty
    with pytest.raises(ValueError):
        load_data(ct_files=[], ob_files=[])
    # error2: ob empty
    with pytest.raises(ValueError):
        load_data(ct_files=["dummy"], ob_files=[])
    # error3: unsupproted file type
    with pytest.raises(ValueError):
        load_data(ct_files=["a.unsupport_ext"], ob_files=["a.unsupport_ext"])
    # case1: load all three
    # case2: load only ct and ob
    # case3: loading tiff
    # case4: loading fits
    pass

test_load_data_by_filelist(1)

2022-09-21 12:25:35,864 imars3d.backend.io.data.load_data DEBUG    max_worker=14
2022-09-21 12:25:35,864 imars3d.backend.io.data.load_data DEBUG    Load by file list
2022-09-21 12:25:35,865 imars3d.backend.io.data.load_data ERROR    ct_files is [].
2022-09-21 12:25:35,865 imars3d.backend.io.data.load_data DEBUG    max_worker=14
2022-09-21 12:25:35,866 imars3d.backend.io.data.load_data DEBUG    Load by file list
2022-09-21 12:25:35,866 imars3d.backend.io.data.load_data ERROR    ob_files is [].
2022-09-21 12:25:35,866 imars3d.backend.io.data.load_data DEBUG    max_worker=14
2022-09-21 12:25:35,867 imars3d.backend.io.data.load_data DEBUG    Load by file list
2022-09-21 12:25:35,867 imars3d.backend.io.data.load_data ERROR    Unsupported file type: .unsupport_ext


In [None]:
def test_load_data_by_dir():
    pass

test_load_data_by_dir()

In [None]:
re_test = re.compile("\b*")

test_list = ["data1", "d2", "what"]

In [None]:
list(map(re_test.match, test_list))

In [None]:
[me for me in test_list if re_test.match(me)]