diff --git a/inselect/tests/test_document.py b/inselect/tests/test_document.py index 66e57d3..94ccc38 100644 --- a/inselect/tests/test_document.py +++ b/inselect/tests/test_document.py @@ -1,7 +1,7 @@ import json -import unittest import shutil import tempfile +import unittest from itertools import izip from pathlib import Path @@ -14,6 +14,7 @@ from inselect.lib.document import validate_normalised from inselect.lib.inselect_error import InselectError from inselect.lib.rect import Rect +from inselect.lib.utils import make_readonly TESTDATA = Path(__file__).parent / 'test_data' @@ -101,13 +102,31 @@ def test_save_crops(self): self.assertTrue(np.all(i.array==InselectImage(p).array)) # Subsection of image + # Make sure that existing file is overwritten p = Path(temp) / 'partial.png' + p.open('w') # File just needs to exist i.save_crops([Rect(0.1, 0.2, 0.4, 0.3)], [p]) expected = i.array[87:218, 45:228] self.assertTrue(np.all(expected==InselectImage(p).array)) finally: shutil.rmtree(temp) + def test_save_crops_read_only(self): + # Try to save to existing read-only file + temp = tempfile.mkdtemp() + try: + i = InselectImage(TESTDATA / 'test_segment.png') + + p = Path(temp) / 'readonly.png' + p.open('w') # File just needs to exist + make_readonly(p) + + # Entire image + with self.assertRaises(InselectError): + i.save_crops([Rect(0, 0, 1, 1)], [p]) + finally: + shutil.rmtree(temp) + class TestDocument(unittest.TestCase): def test_load(self): @@ -142,21 +161,21 @@ def test_load_images(self): temp = tempfile.mkdtemp() try: doc_temp = Path(temp) / 'test_segment.inselect' - open(str(doc_temp), 'w').write(source.open().read()) + doc_temp.open('w').write(source.open().read()) # Document load with no scanned image file self.assertRaises(InselectError, InselectDocument.load, doc_temp) # Document load with scanned image file present scanned_temp = Path(temp) / 'test_segment.png' - open(str(scanned_temp), 'w') # File only needs to exist + scanned_temp.open('w') # File only needs to exist actual = InselectDocument.load(doc_temp) self.assertEqual(InselectDocument.load(source).items, actual.items) self.assertFalse(actual.thumbnail) # Document load with scanned and thumbnail files present thumbnail_temp = Path(temp) / 'test_segment_thumbnail.jpg' - open(str(thumbnail_temp), 'w') # File only needs to exist + thumbnail_temp.open('w') # File only needs to exist actual = InselectDocument.load(doc_temp) self.assertEqual(InselectDocument.load(source).items, actual.items) self.assertTrue(actual.thumbnail) @@ -168,10 +187,10 @@ def test_save(self): temp = tempfile.mkdtemp() try: doc_temp = Path(temp) / 'test_segment.inselect' - open(str(doc_temp), 'w').write(source.open().read()) + doc_temp.open('w').write(source.open().read()) scanned_temp = Path(temp) / 'test_segment.png' - open(str(scanned_temp), 'w') # File only needs to exist + scanned_temp.open('w') # File only needs to exist items = [ {'rect': Rect(0.1, 0.2, 0.5, 0.5) }, ] @@ -241,7 +260,7 @@ def test_new_from_scan(self): try: temp = Path(temp) img = temp / 'test.jpg' - open(str(img), 'w') # File only needs to exist + img.open('w') # File only needs to exist doc = InselectDocument.new_from_scan(img) self.assertTrue(doc.document_path.is_file()) @@ -263,10 +282,10 @@ def test_ensure_thumbnail(self): temp = tempfile.mkdtemp() try: doc_temp = Path(temp) / 'test_segment.inselect' - open(str(doc_temp), 'w').write(source_doc.open().read()) + doc_temp.open('w').write(source_doc.open().read()) scan_tmp = Path(temp) / 'test_segment.png' - open(str(scan_tmp), 'wb').write(source_img.open('rb').read()) + scan_tmp.open('wb').write(source_img.open('rb').read()) # Document load with no scanned image file doc = InselectDocument.load(doc_temp) @@ -276,23 +295,25 @@ def test_ensure_thumbnail(self): finally: shutil.rmtree(str(temp)) - def test_ensure_thumbnail_bad_width(self): + def test_ensure_thumbnail_failures(self): source_doc = TESTDATA / 'test_segment.inselect' source_img = TESTDATA / 'test_segment.png' temp = tempfile.mkdtemp() try: doc_temp = Path(temp) / 'test_segment.inselect' - open(str(doc_temp), 'w').write(source_doc.open().read()) + doc_temp.open('w').write(source_doc.open().read()) scan_tmp = Path(temp) / 'test_segment.png' - open(str(scan_tmp), 'wb').write(source_img.open('rb').read()) + scan_tmp.open('wb').write(source_img.open('rb').read()) doc = InselectDocument.load(doc_temp) self.assertRaises(InselectError, doc.ensure_thumbnail, 50) self.assertRaises(InselectError, doc.ensure_thumbnail, 20000) + + # TODO LH Assert that failure to create thumbnail raises finally: - shutil.rmtree(str(temp)) + shutil.rmtree(str(temp)) if __name__=='__main__': diff --git a/inselect/tests/test_workflow.py b/inselect/tests/test_workflow.py index 43c803c..d07b5b8 100644 --- a/inselect/tests/test_workflow.py +++ b/inselect/tests/test_workflow.py @@ -11,9 +11,9 @@ from inselect.lib.document import InselectDocument from inselect.lib.inselect_error import InselectError -from inselect.workflow.ingest import ingest -from inselect.workflow.segment import segment_pending -from inselect.workflow.post_process import post_process +from inselect.workflow.ingest import ingest, ingest_image +from inselect.workflow.segment import segment +from inselect.workflow.read_barcodes import read_barcodes TESTDATA = Path(__file__).parent / 'test_data' @@ -30,25 +30,57 @@ def tearDown(self): finally: shutil.rmtree(self.docs) +class TestIngest(TestWorkflow): + def test_ingest_fail(self): + # Inbox does not exist + self.assertRaises(InselectError, ingest, Path('I am not a directory'), + Path(self.docs)) + + def test_ingest_create_docs(self): + # Document dir should be created + docs = Path(self.docs) / 'I do not yet exist' + self.assertFalse(docs.is_dir()) + + img = cv2.imread(str(TESTDATA / 'test_segment.png')) + + inbox_img = Path(self.inbox) / 'x.tiff' + + cv2.imwrite(str(inbox_img), img) + ingest(self.inbox, docs) + + self.assertTrue(docs.is_dir()) + def test_ingest(self): # Ingest from tiff + inbox_img = Path(self.inbox) / 'x.tiff' + docs_img = Path(self.docs) / 'x.tiff' + img = cv2.imread(str(TESTDATA / 'test_segment.png')) - cv2.imwrite(str(Path(self.inbox) / 'x.tiff'), img) + cv2.imwrite(str(inbox_img), img) ingest(self.inbox, self.docs) # Document, scan and thumbnail should all exists - doc = InselectDocument.load(Path(self.docs) / 'x.inselect') + self.assertTrue((Path(self.docs) / 'x.inselect').is_file()) + self.assertTrue(docs_img.is_file()) + self.assertTrue((Path(self.docs) / 'x_thumbnail.jpg').is_file()) + + # Scan should have been removed from inbox + self.assertFalse(inbox_img.is_file()) # Scan is as expected? + doc = InselectDocument.load(Path(self.docs) / 'x.inselect') self.assertTrue(np.all(img==doc.scanned.array)) - self.assertTrue(doc.thumbnail.array.shape[1], 4096) # TODO LH Assert images are read-only - # TODO LH Assert import of inbox/x.tiff should fail because - # it exists in docs + # Call ingest_image() because ingest() swallows errors + cv2.imwrite(str(inbox_img), img) + self.assertRaises(InselectError, ingest_image, inbox_img, Path(self.docs)) + + +class TestSegment(TestWorkflow): def test_segment(self): # Ingest from tiff img = cv2.imread(str(TESTDATA / 'test_segment.png')) @@ -59,12 +91,15 @@ def test_segment(self): doc = InselectDocument.load(Path(self.docs) / 'x.inselect') self.assertEqual(0, len(doc.items)) - segment_pending(self.docs) + segment(self.docs) doc = InselectDocument.load(Path(self.docs) / 'x.inselect') self.assertEqual(5, len(doc.items)) - # TODO LH test post_process + # TODO LH assert that segment again does not touch this document + + +# TODO LH test read_barcodes if __name__=='__main__': diff --git a/inselect/workflow/ingest.py b/inselect/workflow/ingest.py index 7ed49ee..d0d5af8 100644 --- a/inselect/workflow/ingest.py +++ b/inselect/workflow/ingest.py @@ -7,22 +7,26 @@ from pathlib import Path +# Import numpy here to prevent PyInstaller build from breaking +# TODO LH find a better solution +import numpy import inselect import inselect.lib.utils -from inselect.lib import config from inselect.lib.document import InselectDocument, InselectImage from inselect.lib.inselect_error import InselectError from inselect.lib.utils import debug_print, make_readonly -def ingest_image(source, dest): +def ingest_image(source, dest_dir): + dest = dest_dir / source.name if source!=dest and dest.is_file(): raise InselectError('Destination image [{0}] exists'.format(dest)) else: debug_print('Ingesting [{0}] to [{1}]'.format(source, dest)) - source.rename(dest) + if source!=dest: + source.rename(dest) # Raises if the document already exists doc = InselectDocument.new_from_scan(dest) @@ -50,22 +54,23 @@ def ingest(inbox, docs): for source in inbox.glob('*tiff'): try: - dest = docs / source.name - ingest_image(source, dest) + ingest_image(source, docs) except Exception: - print('Error ingesting [{0}]'.format(source)) + print('Error reading barcodes in [{0}] [{1}]'.format(p, source)) traceback.print_exc() def main(): parser = argparse.ArgumentParser(description='Ingests images into inselect') + parser.add_argument("inbox", help='Source directory containing scanned images') + parser.add_argument("docs", help='Destination directory') parser.add_argument('--verbose', action='store_true') - parser.add_argument('-v', '--version', action='version', + parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + inselect.__version__) args = parser.parse_args() inselect.lib.utils.DEBUG_PRINT = args.verbose - ingest(config.inbox, config.inselect) + ingest(args.inbox, args.docs) if __name__=='__main__': main() diff --git a/inselect/workflow/read_barcodes.py b/inselect/workflow/read_barcodes.py index 62be00e..219bbf2 100644 --- a/inselect/workflow/read_barcodes.py +++ b/inselect/workflow/read_barcodes.py @@ -6,36 +6,61 @@ from itertools import izip from pathlib import Path +# Import numpy here to prevent PyInstaller build from breaking +# TODO LH find a better solution +import numpy import inselect.lib.utils -from inselect.lib import config from inselect.lib.utils import debug_print from inselect.lib.document import InselectDocument - - -from gouda.bin.decode_barcode import decode_barcodes -from gouda.decode import ZbarDecoder, SoftekDecoder - - -def post_process(dir): - dir = Path(dir) - - # TODO LH Workers from metadata config? - workers = [ZbarDecoder(), - SoftekDecoder(), - ] - +from inselect.lib.inselect_error import InselectError + +from gouda.strategies import roi, resize +from gouda.engines import (AccusoftEngine, DataSymbolEngine, + InliteEngine, LibDMTXEngine, StecosEngine, + SoftekEngine, ZbarEngine, ZxingEngine) + + +def create_datamatrix_engine(): + # Preferred Data Matrix decoders + if InliteEngine.available(): + return InliteEngine(datamatrix=True) + elif AccusoftEngine.available(): + return AccusoftEngine(datamatrix=True) + elif SoftekEngine.available(): + return SoftekEngine(datamatrix=True) + elif LibDMTXEngine.available(): + return LibDMTXEngine() + else: + raise InselectError('No engine for Data Matrix') + +def read_barcodes(dir): + # TODO LH Engines from metadata config + engine = create_datamatrix_engine() for p in dir.glob('*' + InselectDocument.EXTENSION): - # TODO LH Do not do this for documents that have been post-processed - metadata_from_barcodes(InselectDocument.load(p), workers) + # TODO LH Do not overwrite existing specimen numbers or whatever field + # it is that barcodes are written to + print(p) + try: + read_barcodes_in_document(InselectDocument.load(p), engines) + except Exception: + print('Error reading barcodes in [{0}]'.format(p)) + traceback.print_exc() + +def decode_barcodes(crop, engines): + for strategy in (resize, roi): + barcodes = strategy(img, engines) + if barcodes: + return barcodes + return [] -def metadata_from_barcodes(doc, workers): +def read_barcodes_in_document(doc, engines): items = doc.items for item, crop in izip(items, doc.crops): - barcodes = decode_barcodes(crop, workers) + barcodes = decode_barcodes(crop, engines) if barcodes: - barcodes = u' '.join(barcodes) + barcodes = u' '.join([b.data for b in barcodes]) debug_print('Found barcodes [{0}]'.format(barcodes)) # TODO LH This mapping from metadata config? item['fields']['Specimen Number'] = barcodes @@ -44,15 +69,18 @@ def metadata_from_barcodes(doc, workers): doc.save() def main(): - parser = argparse.ArgumentParser(description='Post-processes pending documents') + parser = argparse.ArgumentParser(description='Read barcodes in cropped specimens') + parser.add_argument("dir", help='Directory containing inselect documents') parser.add_argument('--verbose', action='store_true') - parser.add_argument('-v', '--version', action='version', + parser.add_argument('--debug-barcodes', action='store_true') + parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + inselect.__version__) args = parser.parse_args() inselect.lib.utils.DEBUG_PRINT = args.verbose + gouda.util.DEBUG_PRINT = args.debug_barcodes - post_process(config.inselect) + read_barcodes(Path(args.dir)) if __name__=='__main__': main() diff --git a/inselect/workflow/segment.py b/inselect/workflow/segment.py index 155cd10..09f2f3d 100644 --- a/inselect/workflow/segment.py +++ b/inselect/workflow/segment.py @@ -3,20 +3,21 @@ import argparse import traceback - from pathlib import Path +# Import numpy here to prevent PyInstaller build from breaking +# TODO LH find a better solution +import numpy import inselect.lib.utils -from inselect.lib import config from inselect.lib.document import InselectDocument from inselect.lib.segment import segment_edges from inselect.lib.utils import debug_print from inselect.lib.rect import Rect -def segment_pending(dir): +def segment(dir): dir = Path(dir) for p in dir.glob('*' + InselectDocument.EXTENSION): doc = InselectDocument.load(p) @@ -48,15 +49,16 @@ def segment_pending(dir): print('Skipping [{0}] as it already contains items'.format(p)) def main(): - parser = argparse.ArgumentParser(description='Segments pending documents') + parser = argparse.ArgumentParser(description='Segments inselect documents') + parser.add_argument("dir", help='Directory containing inselect documents') parser.add_argument('--verbose', action='store_true') - parser.add_argument('-v', '--version', action='version', + parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + inselect.__version__) args = parser.parse_args() inselect.lib.utils.DEBUG_PRINT = args.verbose - segment_pending(config.inselect) + segment(args.dir) if __name__=='__main__': main()