From 5c5d0c5e80234951087020084fbe3cce12cd6d56 Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 19 Sep 2025 17:02:35 +0100 Subject: [PATCH 01/10] CU-869ahw0mw: Add argument to control data flow when saving results. When is provided, the user (probably) expects the data to be saved on disk upon method call. But the current implementation forced the user to iterate over the results to force the annotation to actually happen. So this change allows the method to materialise the list internally to force the annotation to happen and results to be saved on disk. Additionally, it adds 2 other options: 1. The lazy iteration (what happens when no is provided) where the iteration of data is left to the user 2. The combined / saved and return option where the results are materialised, but also yielded. Notably, this will take up a lot of memory if/when used with large data sets --- medcat-v2/medcat/cat.py | 44 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/medcat-v2/medcat/cat.py b/medcat-v2/medcat/cat.py index 43c31f975..19dadd4a7 100644 --- a/medcat-v2/medcat/cat.py +++ b/medcat-v2/medcat/cat.py @@ -6,6 +6,7 @@ from concurrent.futures import ProcessPoolExecutor, as_completed, Future import itertools from contextlib import contextmanager +from collections import deque import shutil import zipfile @@ -327,6 +328,9 @@ def get_entities_multi_texts( batch_size_chars: int = 1_000_000, save_dir_path: Optional[str] = None, batches_per_save: int = 20, + entity_consume_mode_on_save: Union[ + Literal["save"], Literal["save_and_return"], + Literal["lazy"]] = "save", ) -> Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]: """Get entities from multiple texts (potentially in parallel). @@ -363,6 +367,24 @@ def get_entities_multi_texts( batches_per_save (int): The number of patches to save (if `save_dir_path` is specified) at once. Defaults to 20. + entity_consume_mode_on_save (Union[ + Literal["save"], Literal["save_and_return"], + Literal["lazy"]]): + Controls how results are handled when `save_dir_path` is + provided: + - "save": + Iterate through results internally, writing them to disk. + Nothing is yielded/returned. This avoids storing all + results in memory and is suitable for large data sets. + - "save_and_return": + As above, but also return a fully materialised list of all + results. **Warning**: this may require large amounts of + memory and is not safe for large amounts of data. + - "lazy": + Do not consume results internally. Results are both + yielded and written to disk as the caller iterates over + them. This preserves lazy evaluation but requires the + caller to drive the iteration. Yields: Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]: @@ -376,6 +398,28 @@ def get_entities_multi_texts( saver = BatchAnnotationSaver(save_dir_path, batches_per_save) else: saver = None + out_iter = self._get_entities_multi_texts( + n_process=n_process, batch_iter=batch_iter, saver=saver) + if saver: + if entity_consume_mode_on_save == "save": + # this materialises the iterator and forces the + # output to be saved on disk, nothing is yielded + deque(out_iter, maxlen=0) + elif entity_consume_mode_on_save == "lazy": + # do the lazy iteration - force the user to drive iteration + yield from out_iter + else: + # force materialising of output to save on disk + out_list = list(out_iter) + # but yield from the list as well + yield from out_list + + def _get_entities_multi_texts( + self, + n_process: int, + batch_iter: Iterator[list[tuple[str, str, bool]]], + saver: Optional[BatchAnnotationSaver], + ) -> Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]: if n_process == 1: # just do in series for batch in batch_iter: From 9a709804291102ba55c42f41273883088c09d31d Mon Sep 17 00:00:00 2001 From: mart-r Date: Fri, 19 Sep 2025 17:04:47 +0100 Subject: [PATCH 02/10] CU-869ahw0mw: Make tests run without materialising the output for multiprocessing --- medcat-v2/tests/test_cat.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/medcat-v2/tests/test_cat.py b/medcat-v2/tests/test_cat.py index cb37b648a..16a6162ae 100644 --- a/medcat-v2/tests/test_cat.py +++ b/medcat-v2/tests/test_cat.py @@ -534,13 +534,13 @@ def _do_mp_run_with_save( for name in self.cdb.name2info for negname in self.cdb.name2info if name != negname ] - out_data = list(self.cat.get_entities_multi_texts( + out_data = self.cat.get_entities_multi_texts( in_data, save_dir_path=save_to, batch_size_chars=chars_per_batch, batches_per_save=batches_per_save, n_process=n_process, - )) + ) out_dict_all = { key: cdata for key, cdata in out_data } From a701bd0d13539471b3ce110c2aba1d120b6aac27 Mon Sep 17 00:00:00 2001 From: mart-r Date: Mon, 22 Sep 2025 09:03:38 +0100 Subject: [PATCH 03/10] CU-869ahw0mw: Move DeID tests to non-deprecated method --- medcat-v2/tests/utils/ner/test_deid.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/medcat-v2/tests/utils/ner/test_deid.py b/medcat-v2/tests/utils/ner/test_deid.py index c776f229c..2e88fb6d8 100644 --- a/medcat-v2/tests/utils/ner/test_deid.py +++ b/medcat-v2/tests/utils/ner/test_deid.py @@ -213,13 +213,13 @@ def test_model_works_deid_text_redact(self): self.assert_deid_redact(anon_text) def test_model_works_deid_multi_text_single_threaded(self): - processed = self.deid_model.deid_multi_text([input_text, input_text], n_process=1) + processed = self.deid_model.deid_multi_texts([input_text, input_text], n_process=1) self.assertEqual(len(processed), 2) for anon_text in processed: self.assert_deid_annotations(anon_text) def test_model_works_deid_multi_text_single_threaded_redact(self): - processed = self.deid_model.deid_multi_text([input_text, input_text], + processed = self.deid_model.deid_multi_texts([input_text, input_text], n_process=1, redact=True) self.assertEqual(len(processed), 2) for anon_text in processed: @@ -229,7 +229,7 @@ def test_model_works_deid_multi_text_single_threaded_redact(self): @unittest.skip("Deid Multiprocess is broken. Exits the process, no errors shown") def test_model_can_multiprocess_no_redact(self): - processed = self.deid_model.deid_multi_text( + processed = self.deid_model.deid_multi_texts( [input_text, input_text], n_process=2) self.assertEqual(len(processed), 2) for tid, new_text in enumerate(processed): @@ -245,7 +245,7 @@ def test_model_can_multiprocess_redact(self): """ try: print("Calling test_model_can_multiprocess_redact") - processed = self.deid_model.deid_multi_text( + processed = self.deid_model.deid_multi_texts( [input_text, input_text], n_process=2, redact=True ) print("Finished processing") From 60465267d18c3b197b34eccc1486ccef14061bcc Mon Sep 17 00:00:00 2001 From: mart-r Date: Mon, 22 Sep 2025 09:04:31 +0100 Subject: [PATCH 04/10] CU-869ahw0mw: Some whitespace fixes --- medcat-v2/tests/utils/ner/test_deid.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/medcat-v2/tests/utils/ner/test_deid.py b/medcat-v2/tests/utils/ner/test_deid.py index 2e88fb6d8..f11cc2dec 100644 --- a/medcat-v2/tests/utils/ner/test_deid.py +++ b/medcat-v2/tests/utils/ner/test_deid.py @@ -213,14 +213,15 @@ def test_model_works_deid_text_redact(self): self.assert_deid_redact(anon_text) def test_model_works_deid_multi_text_single_threaded(self): - processed = self.deid_model.deid_multi_texts([input_text, input_text], n_process=1) + processed = self.deid_model.deid_multi_texts([input_text, input_text], + n_process=1) self.assertEqual(len(processed), 2) for anon_text in processed: self.assert_deid_annotations(anon_text) def test_model_works_deid_multi_text_single_threaded_redact(self): processed = self.deid_model.deid_multi_texts([input_text, input_text], - n_process=1, redact=True) + n_process=1, redact=True) self.assertEqual(len(processed), 2) for anon_text in processed: self.assert_deid_redact(anon_text) From 317fada3cd640623c317523ebf1d141355dcc903 Mon Sep 17 00:00:00 2001 From: mart-r Date: Mon, 22 Sep 2025 09:31:38 +0100 Subject: [PATCH 05/10] CU-869ahw0mw: Fix issue withe multiprocessing. The previous implementation would always consider the method a generator. And as such, the work would never be done at call time, regardless of whether or not the was provided. This commit fixes that by making the wrapper method a regular method that (sometimes) returns the iterator and other times just a (potentially empty) list. --- medcat-v2/medcat/cat.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/medcat-v2/medcat/cat.py b/medcat-v2/medcat/cat.py index 19dadd4a7..0e3cb46d6 100644 --- a/medcat-v2/medcat/cat.py +++ b/medcat-v2/medcat/cat.py @@ -331,7 +331,9 @@ def get_entities_multi_texts( entity_consume_mode_on_save: Union[ Literal["save"], Literal["save_and_return"], Literal["lazy"]] = "save", - ) -> Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]: + ) -> Union[ + Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]], + list[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]]: """Get entities from multiple texts (potentially in parallel). If `n_process` > 1, `n_process - 1` new processes will be created @@ -387,8 +389,10 @@ def get_entities_multi_texts( caller to drive the iteration. Yields: - Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]: - The results in the format of (text_index, entities). + Union[ + Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]], + list[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]]: + The results in the format of (text_index, entities). """ text_iter = cast( Union[Iterator[str], Iterator[tuple[str, str]]], iter(texts)) @@ -405,14 +409,17 @@ def get_entities_multi_texts( # this materialises the iterator and forces the # output to be saved on disk, nothing is yielded deque(out_iter, maxlen=0) + return [] elif entity_consume_mode_on_save == "lazy": # do the lazy iteration - force the user to drive iteration - yield from out_iter + return out_iter else: - # force materialising of output to save on disk + # force materialising of output to save on dis out_list = list(out_iter) # but yield from the list as well - yield from out_list + return out_list + else: + return out_iter def _get_entities_multi_texts( self, From 1986016e59e6599fec7bf0e6c21f13e8a9decf92 Mon Sep 17 00:00:00 2001 From: mart-r Date: Mon, 22 Sep 2025 09:32:47 +0100 Subject: [PATCH 06/10] CU-869ahw0mw: Add further tests to new functionality --- medcat-v2/tests/test_cat.py | 41 +++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/medcat-v2/tests/test_cat.py b/medcat-v2/tests/test_cat.py index 16a6162ae..bac35051e 100644 --- a/medcat-v2/tests/test_cat.py +++ b/medcat-v2/tests/test_cat.py @@ -658,6 +658,47 @@ def test_mp_saves_correct_data_with_3_proc(self): self.assert_correct_loaded_output( in_data, out_dict_all, all_loaded_output) + def test_get_entities_multi_texts_with_save_dir_lazy(self): + texts = ["text1", "text2"] + with tempfile.TemporaryDirectory() as tmp_dir: + out = self.cat.get_entities_multi_texts( + texts, + save_dir_path=tmp_dir, + entity_consume_mode_on_save='lazy') + # nothing before manual iter + self.assertFalse(os.listdir(tmp_dir)) + out_list = list(out) + # something was saved + self.assertTrue(os.listdir(tmp_dir)) + # and something was yielded + self.assertEqual(len(out_list), len(texts)) + + def test_get_entities_multi_texts_with_save_dir_save(self): + texts = ["text1", "text2"] + with tempfile.TemporaryDirectory() as tmp_dir: + out = self.cat.get_entities_multi_texts( + texts, + save_dir_path=tmp_dir, + entity_consume_mode_on_save='save') + # stuff was already saved + self.assertTrue(os.listdir(tmp_dir)) + out_list = list(out) + # nothing was yielded + self.assertFalse(out_list) + + def test_get_entities_multi_texts_with_save_dir_save_and_return(self): + texts = ["text1", "text2"] + with tempfile.TemporaryDirectory() as tmp_dir: + out = self.cat.get_entities_multi_texts( + texts, + save_dir_path=tmp_dir, + entity_consume_mode_on_save='save_and_return') + # stuff was already saved + self.assertTrue(os.listdir(tmp_dir)) + out_list = list(out) + # and something was yielded + self.assertEqual(len(out_list), len(texts)) + class CATWithDocAddonTests(CATIncludingTests): EXAMPLE_TEXT = "Example text to tokenize" From 051cff2a05f50006a94981fca562b663400eaa76 Mon Sep 17 00:00:00 2001 From: mart-r Date: Mon, 22 Sep 2025 09:57:22 +0100 Subject: [PATCH 07/10] CU-869ahw0mw: Fix behaviour (so it remains the same) in old test --- medcat-v2/tests/test_cat.py | 1 + 1 file changed, 1 insertion(+) diff --git a/medcat-v2/tests/test_cat.py b/medcat-v2/tests/test_cat.py index bac35051e..ebd3c2140 100644 --- a/medcat-v2/tests/test_cat.py +++ b/medcat-v2/tests/test_cat.py @@ -540,6 +540,7 @@ def _do_mp_run_with_save( batch_size_chars=chars_per_batch, batches_per_save=batches_per_save, n_process=n_process, + entity_consume_mode_on_save='lazy' ) out_dict_all = { key: cdata for key, cdata in out_data From eebba3545ed8a5f381fc2fef541ac91611655994 Mon Sep 17 00:00:00 2001 From: mart-r Date: Mon, 22 Sep 2025 10:53:51 +0100 Subject: [PATCH 08/10] CU-869ahw0mw: Fix test regarding generator issue --- medcat-v2/tests/test_cat.py | 1 + 1 file changed, 1 insertion(+) diff --git a/medcat-v2/tests/test_cat.py b/medcat-v2/tests/test_cat.py index ebd3c2140..909cbe06d 100644 --- a/medcat-v2/tests/test_cat.py +++ b/medcat-v2/tests/test_cat.py @@ -542,6 +542,7 @@ def _do_mp_run_with_save( n_process=n_process, entity_consume_mode_on_save='lazy' ) + out_data = list(out_data) out_dict_all = { key: cdata for key, cdata in out_data } From a4bebe1bbb78ee5de363eafbad348cf57f7f776a Mon Sep 17 00:00:00 2001 From: mart-r Date: Tue, 23 Sep 2025 13:45:17 +0100 Subject: [PATCH 09/10] CU-869ahw0mw: Move saving (and not returning data) to a separate method --- medcat-v2/medcat/cat.py | 100 ++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 45 deletions(-) diff --git a/medcat-v2/medcat/cat.py b/medcat-v2/medcat/cat.py index 0e3cb46d6..d89eafcac 100644 --- a/medcat-v2/medcat/cat.py +++ b/medcat-v2/medcat/cat.py @@ -319,6 +319,57 @@ def _mp_one_batch_per_process( # Yield all results from this batch yield from cur_results + def save_entities_multi_texts( + self, + texts: Union[Iterable[str], Iterable[tuple[str, str]]], + save_dir_path: str, + only_cui: bool = False, + n_process: int = 1, + batch_size: int = -1, + batch_size_chars: int = 1_000_000, + batches_per_save: int = 20, + ) -> None: + """Saves the resulting entities on disk and allows multiprocessing. + + This uses `get_entities_multi_texts` under the hood. But it is designed + to save the data on disk as it comes through. + + Args: + texts (Union[Iterable[str], Iterable[tuple[str, str]]]): + The input text. Either an iterable of raw text or one + with in the format of `(text_index, text)`. + save_dir_path (str): + The path where the results are saved. The directory will have + a `annotated_ids.pickle` file containing the + `tuple[list[str], int]` with a list of indices already saved + and the number of parts already saved. In addition there will + be (usually multuple) files in the `part_.pickle` format + with the partial outputs. + only_cui (bool): + Whether to only return CUIs rather than other information + like start/end and annotated value. Defaults to False. + n_process (int): + Number of processes to use. Defaults to 1. + The number of texts to batch at a time. A batch of the + specified size will be given to each worker process. + Defaults to -1 and in this case the character count will + be used instead. + batch_size_chars (int): + The maximum number of characters to process in a batch. + Each process will be given batch of texts with a total + number of characters not exceeding this value. Defaults + to 1,000,000 characters. Set to -1 to disable. + """ + if save_dir_path is None: + raise ValueError("Need to specify a save path (`save_dir_path`), " + f"got {save_dir_path}") + out_iter = self.get_entities_multi_texts( + texts, only_cui=only_cui, n_process=n_process, + batch_size=batch_size, batch_size_chars=batch_size_chars, + save_dir_path=save_dir_path, batches_per_save=batches_per_save) + # NOTE: not keeping anything since it'll be saved on disk + deque(out_iter, maxlen=0) + def get_entities_multi_texts( self, texts: Union[Iterable[str], Iterable[tuple[str, str]]], @@ -328,12 +379,7 @@ def get_entities_multi_texts( batch_size_chars: int = 1_000_000, save_dir_path: Optional[str] = None, batches_per_save: int = 20, - entity_consume_mode_on_save: Union[ - Literal["save"], Literal["save_and_return"], - Literal["lazy"]] = "save", - ) -> Union[ - Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]], - list[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]]: + ) -> Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]: """Get entities from multiple texts (potentially in parallel). If `n_process` > 1, `n_process - 1` new processes will be created @@ -369,30 +415,10 @@ def get_entities_multi_texts( batches_per_save (int): The number of patches to save (if `save_dir_path` is specified) at once. Defaults to 20. - entity_consume_mode_on_save (Union[ - Literal["save"], Literal["save_and_return"], - Literal["lazy"]]): - Controls how results are handled when `save_dir_path` is - provided: - - "save": - Iterate through results internally, writing them to disk. - Nothing is yielded/returned. This avoids storing all - results in memory and is suitable for large data sets. - - "save_and_return": - As above, but also return a fully materialised list of all - results. **Warning**: this may require large amounts of - memory and is not safe for large amounts of data. - - "lazy": - Do not consume results internally. Results are both - yielded and written to disk as the caller iterates over - them. This preserves lazy evaluation but requires the - caller to drive the iteration. Yields: - Union[ - Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]], - list[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]]: - The results in the format of (text_index, entities). + Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]: + The results in the format of (text_index, entities). """ text_iter = cast( Union[Iterator[str], Iterator[tuple[str, str]]], iter(texts)) @@ -402,24 +428,8 @@ def get_entities_multi_texts( saver = BatchAnnotationSaver(save_dir_path, batches_per_save) else: saver = None - out_iter = self._get_entities_multi_texts( + yield from self._get_entities_multi_texts( n_process=n_process, batch_iter=batch_iter, saver=saver) - if saver: - if entity_consume_mode_on_save == "save": - # this materialises the iterator and forces the - # output to be saved on disk, nothing is yielded - deque(out_iter, maxlen=0) - return [] - elif entity_consume_mode_on_save == "lazy": - # do the lazy iteration - force the user to drive iteration - return out_iter - else: - # force materialising of output to save on dis - out_list = list(out_iter) - # but yield from the list as well - return out_list - else: - return out_iter def _get_entities_multi_texts( self, From d90b77411aa8194421f8fcaf6193c69244df8a8b Mon Sep 17 00:00:00 2001 From: mart-r Date: Tue, 23 Sep 2025 13:45:42 +0100 Subject: [PATCH 10/10] CU-869ahw0mw: Update tests accordingly as per last change --- medcat-v2/tests/test_cat.py | 27 ++++----------------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/medcat-v2/tests/test_cat.py b/medcat-v2/tests/test_cat.py index 909cbe06d..7a44e42b9 100644 --- a/medcat-v2/tests/test_cat.py +++ b/medcat-v2/tests/test_cat.py @@ -540,7 +540,6 @@ def _do_mp_run_with_save( batch_size_chars=chars_per_batch, batches_per_save=batches_per_save, n_process=n_process, - entity_consume_mode_on_save='lazy' ) out_data = list(out_data) out_dict_all = { @@ -665,8 +664,7 @@ def test_get_entities_multi_texts_with_save_dir_lazy(self): with tempfile.TemporaryDirectory() as tmp_dir: out = self.cat.get_entities_multi_texts( texts, - save_dir_path=tmp_dir, - entity_consume_mode_on_save='lazy') + save_dir_path=tmp_dir) # nothing before manual iter self.assertFalse(os.listdir(tmp_dir)) out_list = list(out) @@ -675,31 +673,14 @@ def test_get_entities_multi_texts_with_save_dir_lazy(self): # and something was yielded self.assertEqual(len(out_list), len(texts)) - def test_get_entities_multi_texts_with_save_dir_save(self): + def test_save_entities_multi_texts(self): texts = ["text1", "text2"] with tempfile.TemporaryDirectory() as tmp_dir: - out = self.cat.get_entities_multi_texts( - texts, - save_dir_path=tmp_dir, - entity_consume_mode_on_save='save') - # stuff was already saved - self.assertTrue(os.listdir(tmp_dir)) - out_list = list(out) - # nothing was yielded - self.assertFalse(out_list) - - def test_get_entities_multi_texts_with_save_dir_save_and_return(self): - texts = ["text1", "text2"] - with tempfile.TemporaryDirectory() as tmp_dir: - out = self.cat.get_entities_multi_texts( + self.cat.save_entities_multi_texts( texts, - save_dir_path=tmp_dir, - entity_consume_mode_on_save='save_and_return') + save_dir_path=tmp_dir) # stuff was already saved self.assertTrue(os.listdir(tmp_dir)) - out_list = list(out) - # and something was yielded - self.assertEqual(len(out_list), len(texts)) class CATWithDocAddonTests(CATIncludingTests):