From 63626180b29b57d2e9cfa1c8aca7cdc6cb9636bb Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Wed, 28 Feb 2024 13:28:32 -0500 Subject: [PATCH] If not resuming, remove intermediate dir. --- src/hipscat_import/pipeline_resume_plan.py | 9 +-- .../catalog/test_resume_plan.py | 60 +++++++++++-------- .../hipscat_import/catalog/test_run_import.py | 27 +++++---- .../margin_cache/test_margin_cache.py | 17 +++--- .../soap/test_soap_resume_plan.py | 9 ++- .../test_pipeline_resume_plan.py | 3 +- 6 files changed, 69 insertions(+), 56 deletions(-) diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 2bb94963..f816d7a6 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -3,6 +3,7 @@ from __future__ import annotations import re +import warnings from dataclasses import dataclass from pathlib import Path @@ -35,11 +36,11 @@ def safe_to_resume(self): """ if file_io.directory_has_contents(self.tmp_path): if not self.resume: - raise ValueError( - f"tmp_path ({self.tmp_path}) contains intermediate files." - " choose a different directory or use --resume flag" + self.clean_resume_files() + else: + warnings.warn( + f"tmp_path ({self.tmp_path}) contains intermediate files; resuming prior progress." ) - print(f"tmp_path ({self.tmp_path}) contains intermediate files. resuming prior progress.") else: file_io.make_directory(self.tmp_path, exist_ok=True) diff --git a/tests/hipscat_import/catalog/test_resume_plan.py b/tests/hipscat_import/catalog/test_resume_plan.py index be6781d6..eabaabab 100644 --- a/tests/hipscat_import/catalog/test_resume_plan.py +++ b/tests/hipscat_import/catalog/test_resume_plan.py @@ -37,19 +37,22 @@ def test_done_checks(tmp_path): plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, resume=True) plan.touch_stage_done_file(ResumePlan.REDUCING_STAGE) - with pytest.raises(ValueError, match="before reducing"): - plan.gather_plan() + with pytest.warns(UserWarning, match="resuming prior progress"): + with pytest.raises(ValueError, match="before reducing"): + plan.gather_plan() plan.touch_stage_done_file(ResumePlan.SPLITTING_STAGE) - with pytest.raises(ValueError, match="before reducing"): - plan.gather_plan() + with pytest.warns(UserWarning, match="resuming prior progress"): + with pytest.raises(ValueError, match="before reducing"): + plan.gather_plan() plan.clean_resume_files() plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, resume=True) plan.touch_stage_done_file(ResumePlan.SPLITTING_STAGE) - with pytest.raises(ValueError, match="before splitting"): - plan.gather_plan() + with pytest.warns(UserWarning, match="resuming prior progress"): + with pytest.raises(ValueError, match="before splitting"): + plan.gather_plan() def test_same_input_paths(tmp_path, small_sky_single_file, formats_headers_csv): @@ -63,30 +66,33 @@ def test_same_input_paths(tmp_path, small_sky_single_file, formats_headers_csv): map_files = plan.map_files assert len(map_files) == 2 - with pytest.raises(ValueError, match="Different file set"): - ResumePlan( - tmp_path=tmp_path, - progress_bar=False, - resume=True, - input_paths=[small_sky_single_file], - ) + with pytest.warns(UserWarning, match="resuming prior progress"): + with pytest.raises(ValueError, match="Different file set"): + ResumePlan( + tmp_path=tmp_path, + progress_bar=False, + resume=True, + input_paths=[small_sky_single_file], + ) ## List is the same length, but includes a duplicate - with pytest.raises(ValueError, match="Different file set"): - ResumePlan( + with pytest.warns(UserWarning, match="resuming prior progress"): + with pytest.raises(ValueError, match="Different file set"): + ResumePlan( + tmp_path=tmp_path, + progress_bar=False, + resume=True, + input_paths=[small_sky_single_file, small_sky_single_file], + ) + + ## Includes a duplicate file, but that's ok. + with pytest.warns(UserWarning, match="resuming prior progress"): + plan = ResumePlan( tmp_path=tmp_path, progress_bar=False, resume=True, - input_paths=[small_sky_single_file, small_sky_single_file], + input_paths=[small_sky_single_file, small_sky_single_file, formats_headers_csv], ) - - ## Includes a duplicate file, but that's ok. - plan = ResumePlan( - tmp_path=tmp_path, - progress_bar=False, - resume=True, - input_paths=[small_sky_single_file, small_sky_single_file, formats_headers_csv], - ) map_files = plan.map_files assert len(map_files) == 2 @@ -148,13 +154,15 @@ def test_read_write_splitting_keys(tmp_path, small_sky_single_file, formats_head ResumePlan.touch_key_done_file(tmp_path, ResumePlan.SPLITTING_STAGE, "split_0") - plan.gather_plan() + with pytest.warns(UserWarning, match="resuming prior progress"): + plan.gather_plan() split_keys = plan.split_keys assert len(split_keys) == 1 assert split_keys[0][0] == "split_1" ResumePlan.touch_key_done_file(tmp_path, ResumePlan.SPLITTING_STAGE, "split_1") - plan.gather_plan() + with pytest.warns(UserWarning, match="resuming prior progress"): + plan.gather_plan() split_keys = plan.split_keys assert len(split_keys) == 0 diff --git a/tests/hipscat_import/catalog/test_run_import.py b/tests/hipscat_import/catalog/test_run_import.py index 17d5ad53..712503b1 100644 --- a/tests/hipscat_import/catalog/test_run_import.py +++ b/tests/hipscat_import/catalog/test_run_import.py @@ -67,19 +67,20 @@ def test_resume_dask_runner( os.path.join(tmp_path, "resume_catalog", "Norder=0"), ) - args = ImportArguments( - output_artifact_name="resume_catalog", - input_path=small_sky_parts_dir, - file_reader="csv", - output_path=tmp_path, - dask_tmp=tmp_path, - tmp_dir=tmp_path, - resume_tmp=os.path.join(tmp_path, "tmp"), - overwrite=True, - highest_healpix_order=0, - pixel_threshold=1000, - progress_bar=False, - ) + with pytest.warns(UserWarning, match="resuming prior progress"): + args = ImportArguments( + output_artifact_name="resume_catalog", + input_path=small_sky_parts_dir, + file_reader="csv", + output_path=tmp_path, + dask_tmp=tmp_path, + tmp_dir=tmp_path, + resume_tmp=os.path.join(tmp_path, "tmp"), + overwrite=True, + highest_healpix_order=0, + pixel_threshold=1000, + progress_bar=False, + ) runner.run(args, dask_client) diff --git a/tests/hipscat_import/margin_cache/test_margin_cache.py b/tests/hipscat_import/margin_cache/test_margin_cache.py index 729aece5..2b7a667c 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache.py @@ -77,14 +77,15 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client): @pytest.mark.dask(timeout=150) def test_margin_cache_gen_negative_pixels(small_sky_source_catalog, tmp_path, dask_client): """Test that margin cache generation can generate a file for a negative pixel.""" - args = MarginCacheArguments( - margin_threshold=36000.0, - input_catalog_path=small_sky_source_catalog, - output_path=tmp_path, - output_artifact_name="catalog_cache", - margin_order=4, - progress_bar=False, - ) + with pytest.warns(UserWarning, match="smaller resolution"): + args = MarginCacheArguments( + margin_threshold=36000.0, + input_catalog_path=small_sky_source_catalog, + output_path=tmp_path, + output_artifact_name="catalog_cache", + margin_order=4, + progress_bar=False, + ) assert args.catalog.catalog_info.ra_column == "source_ra" diff --git a/tests/hipscat_import/soap/test_soap_resume_plan.py b/tests/hipscat_import/soap/test_soap_resume_plan.py index 29e56e58..daa7ab55 100644 --- a/tests/hipscat_import/soap/test_soap_resume_plan.py +++ b/tests/hipscat_import/soap/test_soap_resume_plan.py @@ -94,13 +94,15 @@ def test_count_keys(small_sky_soap_args): ## Mark one done and check that there's one less key to count later. Path(small_sky_soap_args.tmp_path, "2_187.csv").touch() - plan.gather_plan(small_sky_soap_args) + with pytest.warns(UserWarning, match="resuming prior progress"): + plan.gather_plan(small_sky_soap_args) assert len(plan.count_keys) == 13 ## Mark them ALL done and check that there are on keys later. plan.touch_stage_done_file(SoapPlan.COUNTING_STAGE) - plan.gather_plan(small_sky_soap_args) + with pytest.warns(UserWarning, match="resuming prior progress"): + plan.gather_plan(small_sky_soap_args) assert len(plan.count_keys) == 0 @@ -115,7 +117,8 @@ def test_cached_map_file(small_sky_soap_args): cache_map_file = os.path.join(small_sky_soap_args.tmp_path, SoapPlan.SOURCE_MAP_FILE) assert os.path.exists(cache_map_file) - plan = SoapPlan(small_sky_soap_args) + with pytest.warns(UserWarning, match="resuming prior progress"): + plan = SoapPlan(small_sky_soap_args) assert len(plan.count_keys) == 14 diff --git a/tests/hipscat_import/test_pipeline_resume_plan.py b/tests/hipscat_import/test_pipeline_resume_plan.py index 50a1bbdc..50e9ef15 100644 --- a/tests/hipscat_import/test_pipeline_resume_plan.py +++ b/tests/hipscat_import/test_pipeline_resume_plan.py @@ -85,8 +85,7 @@ def test_safe_to_resume(tmp_path): ## explicitly setting resume=True done_file = "action_done" plan.touch_stage_done_file(done_file) - with pytest.raises(ValueError, match="contains intermediate"): - plan.safe_to_resume() + plan.safe_to_resume() ## If we mark as a resuming pipeline, we're safe to resume. resuming_plan = PipelineResumePlan(tmp_path=tmp_path, progress_bar=False, resume=True)