-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add phasics zip tif group file format
- Loading branch information
1 parent
c283a42
commit 8bb88e6
Showing
5 changed files
with
151 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import io | ||
import functools | ||
import zipfile | ||
|
||
from .dataset import DataSet | ||
from .single_tif_phasics import SingleTifPhasics | ||
|
||
|
||
class GroupZipTifPhasics(DataSet): | ||
def __init__(self, *args, **kwargs): | ||
super(GroupZipTifPhasics, self).__init__(*args, **kwargs) | ||
self._files = None | ||
self._dataset = None | ||
|
||
def __len__(self): | ||
return len(self.files) | ||
|
||
def _get_dataset(self, idx): | ||
if self._dataset is None: | ||
self._dataset = [None] * len(self) | ||
if self._dataset[idx] is None: | ||
# Use ``zipfile.ZipFile.open`` to return an open file | ||
zf = zipfile.ZipFile(self.path) | ||
pt = zf.open(self.files[idx]) | ||
fd = io.BytesIO(pt.read()) | ||
self._dataset[idx] = SingleTifPhasics(path=fd, | ||
meta_data=self.meta_data) | ||
assert len(self._dataset[idx]) == 1, "unknown phasics tif file" | ||
return self._dataset[idx] | ||
|
||
@staticmethod | ||
@functools.lru_cache(maxsize=32) | ||
def _index_files(path): | ||
"""Search zip file for SID PHA files""" | ||
with zipfile.ZipFile(path) as zf: | ||
names = sorted(zf.namelist()) | ||
names = [nn for nn in names if nn.endswith(".tif")] | ||
names = [nn for nn in names if nn.startswith("SID PHA")] | ||
phasefiles = [] | ||
for name in names: | ||
with zf.open(name) as pt: | ||
fd = io.BytesIO(pt.read()) | ||
if SingleTifPhasics.verify(fd): | ||
phasefiles.append(name) | ||
return phasefiles | ||
|
||
@property | ||
def files(self): | ||
if self._files is None: | ||
self._files = GroupZipTifPhasics._index_files(self.path) | ||
return self._files | ||
|
||
def get_qpimage_raw(self, idx=0): | ||
"""Return QPImage without background correction""" | ||
ds = self._get_dataset(idx) | ||
return ds.get_qpimage_raw(idx=0) | ||
|
||
def get_time(self, idx=0): | ||
ds = self._get_dataset(idx) | ||
return ds.get_time(idx=0) | ||
|
||
@staticmethod | ||
def verify(path): | ||
"""Verify phasics zip tif file format""" | ||
valid = False | ||
try: | ||
zf = zipfile.ZipFile(path) | ||
except: | ||
pass | ||
else: | ||
names = sorted(zf.namelist()) | ||
names = [nn for nn in names if nn.endswith(".tif")] | ||
names = [nn for nn in names if nn.startswith("SID PHA")] | ||
for name in names: | ||
with zf.open(name) as pt: | ||
fd = io.BytesIO(pt.read()) | ||
if SingleTifPhasics.verify(fd): | ||
valid = True | ||
break | ||
zf.close() | ||
return valid |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
""" | ||
The test file "group_phasics.zip" was created from an original | ||
Phasics zip file. The "ConfigStack.txt" was adapted to reflect | ||
only three measured images. All data files that are not relevant | ||
for qpformat are included as empty files to reflect the original | ||
zip file structure. The "SID PHA*.tif" files were created with | ||
the script given in the doc string of "test_single_tif_phasics.py". | ||
""" | ||
from os.path import abspath, dirname, join | ||
import sys | ||
|
||
import numpy as np | ||
|
||
# Add parent directory to beginning of path variable | ||
sys.path.insert(0, dirname(dirname(abspath(__file__)))) | ||
import qpformat # noqa: E402 | ||
|
||
|
||
def test_load_data(): | ||
path = join(dirname(abspath(__file__)), "data/group_phasics.zip") | ||
ds = qpformat.load_data(path) | ||
assert ds.path == path | ||
assert "GroupZipTifPhasics" in ds.__repr__() | ||
|
||
|
||
def test_data_content(): | ||
path = join(dirname(abspath(__file__)), "data/group_phasics.zip") | ||
ds = qpformat.load_data(path) | ||
assert len(ds) == 3 | ||
assert ds.get_time(0) == 1461949418.29027 | ||
assert ds.get_time(1) == 1461949418.62727 | ||
assert ds.get_time(2) == 1461949419.11427 | ||
qpi = ds.get_qpimage(0) | ||
assert qpi.meta["wavelength"] == 550e-9 | ||
assert np.allclose(qpi.amp.max(), 183.96992660669972) | ||
assert np.allclose(qpi.pha.max() - qpi.pha.min(), 0.189023369599572) | ||
|
||
|
||
if __name__ == "__main__": | ||
# Run all tests | ||
loc = locals() | ||
for key in list(loc.keys()): | ||
if key.startswith("test_") and hasattr(loc[key], "__call__"): | ||
loc[key]() |