From 92c7e3652828e0e7143c14d7932df6e5c7b13d0d Mon Sep 17 00:00:00 2001 From: Max West Date: Mon, 23 Oct 2023 13:37:43 -0700 Subject: [PATCH 01/11] checkpoint --- .../cross_match/macauff_arguments.py | 151 ++++++++++++++++++ .../cross_match/macauff_map_reduce.py | 75 +++++++++ src/hipscat_import/pipeline.py | 4 + 3 files changed, 230 insertions(+) create mode 100644 src/hipscat_import/cross_match/macauff_arguments.py create mode 100644 src/hipscat_import/cross_match/macauff_map_reduce.py diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py new file mode 100644 index 00000000..1d2a5d3a --- /dev/null +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -0,0 +1,151 @@ +from dataclasses import dataclass, field +from os import path + +from hipscat.catalog.association_catalog.association_catalog import AssociationCatalogInfo +from hipscat.catalog.catalog_type import CatalogType +from hipscat.io.validation import is_valid_catalog +from hipscat.io import FilePointer, file_io + +from hipscat_import.runtime_arguments import RuntimeArguments +from hipscat_import.catalog.arguments import check_healpix_order_range +from typing import List + +from hipscat_import.catalog.resume_plan import ResumePlan + +@dataclass +class MacauffArguments(RuntimeArguments): + """Data class for holding cross-match association arguments""" + ## Input - Cross-match data + input_path: FilePointer | None = None + """path to search for the input data""" + input_format: str = "" + """specifier of the input data format. this will be used to find an appropriate + InputReader type, and may be used to find input files, via a match like + ``/*`` """ + input_file_list: List[FilePointer] = field(default_factory=list) + """can be used instead of `input_format` to import only specified files""" + input_paths: List[FilePointer] = field(default_factory=list) + """resolved list of all files that will be used in the importer""" + add_hipscat_index: bool = True + """add the hipscat spatial index field alongside the data""" + constant_healpix_order: int = -1 + """healpix order to use when mapping. if this is + a positive number, this will be the order of all final pixels and we + will not combine pixels according to the threshold""" + highest_healpix_order: int = 10 + """healpix order to use when mapping. this will + not necessarily be the order used in the final catalog, as we may combine + pixels that don't meed the threshold""" + pixel_threshold: int = 1_000_000 + """maximum number of rows for a single resulting pixel. + we may combine hierarchically until we near the ``pixel_threshold``""" + mapping_healpix_order: int = -1 + """healpix order to use when mapping. will be + ``highest_healpix_order`` unless a positive value is provided for + ``constant_healpix_order``""" + + ## Input - Left catalog + left_catalog_dir: str = "" + left_id_column: str = "" + left_ra_column: str = "" + left_dec_column: str = "" + + ## Input - Right catalog + right_catalog_dir: str = "" + right_id_column: str = "" + right_ra_column: str = "" + right_dec_column: str = "" + + ## `macauff` specific attributes + metadata_file_path: str = "" + match_probability_column: str = "match_p" + column_names: List[str] = field(default_factory=list) + + resume: bool = True + resume_plan: ResumePlan | None = None + + def __post_init__(self): + self._check_arguments() + + def _check_arguments(self): + super()._check_arguments() + + if not self.input_format: + raise ValueError("input_format is required") + + if self.constant_healpix_order >= 0: + check_healpix_order_range(self.constant_healpix_order, "constant_healpix_order") + self.mapping_healpix_order = self.constant_healpix_order + else: + check_healpix_order_range(self.highest_healpix_order, "highest_healpix_order") + if not 100 <= self.pixel_threshold <= 1_000_000_000: + raise ValueError("pixel_threshold should be between 100 and 1,000,000,000") + self.mapping_healpix_order = self.highest_healpix_order + + # if not self.left_catalog_dir: + # raise ValueError("left_catalog_dir is required") + if not self.left_id_column: + raise ValueError("left_id_column is required") + # if not is_valid_catalog(self.left_catalog_dir): + # raise ValueError("left_catalog_dir not a valid catalog") + + # if not self.right_catalog_dir: + # raise ValueError("right_catalog_dir is required") + if not self.right_id_column: + raise ValueError("right_object_id_column is required") + # if not is_valid_catalog(self.right_catalog_dir): + # raise ValueError("right_catalog_dir not a valid catalog") + + if not self.metadata_file_path: + raise ValueError("column metadata file required for macauff crossmatch") + if not path.isfile(self.metadata_file_path): + raise ValueError("Macauff column metadata file must point to valid file path.") + + # Basic checks complete - make more checks and create directories where necessary + if self.input_path: + if not file_io.does_file_or_directory_exist(self.input_path): + raise FileNotFoundError("input_path not found on local storage") + self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}") + elif self.input_file_list: + self.input_paths = self.input_file_list + if len(self.input_paths) == 0: + raise FileNotFoundError("No input files found") + + self.column_names = self.get_column_names() + + self.resume_plan = ResumePlan( + resume=self.resume, + progress_bar=True, + input_paths=self.input_paths, + tmp_path=self.tmp_path, + ) + + def get_column_names(self): + """Grab the macauff column names.""" + # TODO: Actually read in the metadata file once we get the example file from Tom. + + return [ + 'Gaia_designation', + 'Gaia_RA', + 'Gaia_Dec', + 'BP', + 'G', + 'RP', + 'CatWISE_Name', + 'CatWISE_RA', + 'CatWISE_Dec', + 'W1', + 'W2', + 'match_p', + 'Separation', + 'eta', + 'xi', + 'Gaia_avg_cont', + 'CatWISE_avg_cont', + 'Gaia_cont_f1', + 'Gaia_cont_f10', + 'CatWISE_cont_f1', + 'CatWISE_cont_f10', + 'CatWISE_fit_sig', + ] + diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py new file mode 100644 index 00000000..c83c7adf --- /dev/null +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -0,0 +1,75 @@ +import hipscat_import.catalog.map_reduce as mr +from hipscat_import.cross_match.macauff_arguments import MacauffArguments +# import hipscat_import.catalog.run_import as ri +from hipscat_import.catalog.file_readers import CsvReader +from hipscat_import.pipeline_resume_plan import PipelineResumePlan +from hipscat import pixel_math +from tqdm import tqdm + +def _map_pixels(args, client): + """Generate a raw histogram of object counts in each healpix pixel""" + + if args.resume_plan.is_mapping_done(): + return + + reader = CsvReader(column_names=args.column_names) + + reader_future = client.scatter(reader) + futures = [] + for key, file_path in args.resume_plan.map_files: + futures.append( + client.submit( + mr.map_to_pixels, + key=key, + input_file=file_path, + resume_path=args.resume_plan.tmp_path, + file_reader=reader_future, + mapping_key=key, + highest_order=args.mapping_healpix_order, + ra_column=args.left_ra_column, + dec_column=args.left_dec_column, + ) + ) + args.resume_plan.wait_for_mapping(futures) + + +def run(args, client): + if not args: + raise ValueError("args is required and should be type MacauffArguments") + if not isinstance(args, MacauffArguments): + raise ValueError("args must be type ImportArguments") + _map_pixels(args, client) + + with tqdm( + total=2, desc=PipelineResumePlan.get_formatted_stage_name("Binning"), disable=not args.progress_bar + ) as step_progress: + raw_histogram = args.resume_plan.read_histogram(args.mapping_healpix_order) + step_progress.update(1) + if args.constant_healpix_order >= 0: + alignment = np.full(len(raw_histogram), None) + for pixel_num, pixel_sum in enumerate(raw_histogram): + alignment[pixel_num] = ( + args.constant_healpix_order, + pixel_num, + pixel_sum, + ) + + destination_pixel_map = pixel_math.generate_constant_pixel_map( + histogram=raw_histogram, + constant_healpix_order=args.constant_healpix_order, + ) + else: + alignment = pixel_math.generate_alignment( + raw_histogram, + highest_order=args.highest_healpix_order, + threshold=args.pixel_threshold, + ) + destination_pixel_map = pixel_math.compute_pixel_map( + raw_histogram, + highest_order=args.highest_healpix_order, + threshold=args.pixel_threshold, + ) + step_progress.update(1) + + + diff --git a/src/hipscat_import/pipeline.py b/src/hipscat_import/pipeline.py index d0482e44..db718da5 100644 --- a/src/hipscat_import/pipeline.py +++ b/src/hipscat_import/pipeline.py @@ -9,12 +9,14 @@ import hipscat_import.margin_cache.margin_cache as margin_runner import hipscat_import.soap.run_soap as soap_runner import hipscat_import.verification.run_verification as verification_runner +import hipscat_import.cross_match.macauff_map_reduce as macauff_runner from hipscat_import.catalog.arguments import ImportArguments from hipscat_import.index.arguments import IndexArguments from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments from hipscat_import.runtime_arguments import RuntimeArguments from hipscat_import.soap.arguments import SoapArguments from hipscat_import.verification.arguments import VerificationArguments +from hipscat_import.cross_match.macauff_arguments import MacauffArguments # pragma: no cover @@ -49,6 +51,8 @@ def pipeline_with_client(args: RuntimeArguments, client: Client): soap_runner.run(args, client) elif isinstance(args, VerificationArguments): verification_runner.run(args) + elif isinstance(args, MacauffArguments): + macauff_runner.run(args, client) else: raise ValueError("unknown args type") except Exception as exception: # pylint: disable=broad-exception-caught From 378c55023567c9be5adfc23c1a7be34f62ee0049 Mon Sep 17 00:00:00 2001 From: Max West Date: Mon, 23 Oct 2023 13:38:29 -0700 Subject: [PATCH 02/11] checkpoint --- src/hipscat_import/cross_match/macauff_arguments.py | 8 ++++---- src/hipscat_import/cross_match/macauff_map_reduce.py | 9 ++++++--- src/hipscat_import/pipeline.py | 4 ++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 1d2a5d3a..39b4bab3 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -1,16 +1,16 @@ from dataclasses import dataclass, field from os import path +from typing import List from hipscat.catalog.association_catalog.association_catalog import AssociationCatalogInfo from hipscat.catalog.catalog_type import CatalogType -from hipscat.io.validation import is_valid_catalog from hipscat.io import FilePointer, file_io +from hipscat.io.validation import is_valid_catalog -from hipscat_import.runtime_arguments import RuntimeArguments from hipscat_import.catalog.arguments import check_healpix_order_range -from typing import List - from hipscat_import.catalog.resume_plan import ResumePlan +from hipscat_import.runtime_arguments import RuntimeArguments + @dataclass class MacauffArguments(RuntimeArguments): diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index c83c7adf..76791dd0 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -1,10 +1,13 @@ +from hipscat import pixel_math +from tqdm import tqdm + import hipscat_import.catalog.map_reduce as mr -from hipscat_import.cross_match.macauff_arguments import MacauffArguments + # import hipscat_import.catalog.run_import as ri from hipscat_import.catalog.file_readers import CsvReader +from hipscat_import.cross_match.macauff_arguments import MacauffArguments from hipscat_import.pipeline_resume_plan import PipelineResumePlan -from hipscat import pixel_math -from tqdm import tqdm + def _map_pixels(args, client): """Generate a raw histogram of object counts in each healpix pixel""" diff --git a/src/hipscat_import/pipeline.py b/src/hipscat_import/pipeline.py index db718da5..99ed497d 100644 --- a/src/hipscat_import/pipeline.py +++ b/src/hipscat_import/pipeline.py @@ -5,18 +5,18 @@ from dask.distributed import Client import hipscat_import.catalog.run_import as catalog_runner +import hipscat_import.cross_match.macauff_map_reduce as macauff_runner import hipscat_import.index.run_index as index_runner import hipscat_import.margin_cache.margin_cache as margin_runner import hipscat_import.soap.run_soap as soap_runner import hipscat_import.verification.run_verification as verification_runner -import hipscat_import.cross_match.macauff_map_reduce as macauff_runner from hipscat_import.catalog.arguments import ImportArguments +from hipscat_import.cross_match.macauff_arguments import MacauffArguments from hipscat_import.index.arguments import IndexArguments from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments from hipscat_import.runtime_arguments import RuntimeArguments from hipscat_import.soap.arguments import SoapArguments from hipscat_import.verification.arguments import VerificationArguments -from hipscat_import.cross_match.macauff_arguments import MacauffArguments # pragma: no cover From a198625681b43b6ce09958516cc81089cddb1304 Mon Sep 17 00:00:00 2001 From: Max West Date: Tue, 24 Oct 2023 10:49:28 -0700 Subject: [PATCH 03/11] testing for MacauffArguments --- .../cross_match/macauff_arguments.py | 40 +-- .../cross_match/macauff_map_reduce.py | 79 +---- tests/hipscat_import/conftest.py | 4 + .../cross_match/test_macauff_arguments.py | 305 ++++++++++++++++++ .../data/test_formats/macauff_metadata.yaml | 3 + 5 files changed, 329 insertions(+), 102 deletions(-) create mode 100644 tests/hipscat_import/cross_match/test_macauff_arguments.py create mode 100644 tests/hipscat_import/data/test_formats/macauff_metadata.yaml diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 39b4bab3..6bef805d 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -2,15 +2,15 @@ from os import path from typing import List -from hipscat.catalog.association_catalog.association_catalog import AssociationCatalogInfo -from hipscat.catalog.catalog_type import CatalogType from hipscat.io import FilePointer, file_io from hipscat.io.validation import is_valid_catalog -from hipscat_import.catalog.arguments import check_healpix_order_range from hipscat_import.catalog.resume_plan import ResumePlan from hipscat_import.runtime_arguments import RuntimeArguments +# pylint: disable=too-many-instance-attributes +# pylint: disable=unsupported-binary-operation + @dataclass class MacauffArguments(RuntimeArguments): @@ -28,10 +28,6 @@ class MacauffArguments(RuntimeArguments): """resolved list of all files that will be used in the importer""" add_hipscat_index: bool = True """add the hipscat spatial index field alongside the data""" - constant_healpix_order: int = -1 - """healpix order to use when mapping. if this is - a positive number, this will be the order of all final pixels and we - will not combine pixels according to the threshold""" highest_healpix_order: int = 10 """healpix order to use when mapping. this will not necessarily be the order used in the final catalog, as we may combine @@ -70,31 +66,24 @@ def __post_init__(self): def _check_arguments(self): super()._check_arguments() + if not self.input_path and not self.input_file_list: + raise ValueError("input files/path not provided") if not self.input_format: raise ValueError("input_format is required") - if self.constant_healpix_order >= 0: - check_healpix_order_range(self.constant_healpix_order, "constant_healpix_order") - self.mapping_healpix_order = self.constant_healpix_order - else: - check_healpix_order_range(self.highest_healpix_order, "highest_healpix_order") - if not 100 <= self.pixel_threshold <= 1_000_000_000: - raise ValueError("pixel_threshold should be between 100 and 1,000,000,000") - self.mapping_healpix_order = self.highest_healpix_order - - # if not self.left_catalog_dir: - # raise ValueError("left_catalog_dir is required") + if not self.left_catalog_dir: + raise ValueError("left_catalog_dir is required") if not self.left_id_column: raise ValueError("left_id_column is required") - # if not is_valid_catalog(self.left_catalog_dir): - # raise ValueError("left_catalog_dir not a valid catalog") + if not is_valid_catalog(self.left_catalog_dir): + raise ValueError("left_catalog_dir not a valid catalog") - # if not self.right_catalog_dir: - # raise ValueError("right_catalog_dir is required") + if not self.right_catalog_dir: + raise ValueError("right_catalog_dir is required") if not self.right_id_column: - raise ValueError("right_object_id_column is required") - # if not is_valid_catalog(self.right_catalog_dir): - # raise ValueError("right_catalog_dir not a valid catalog") + raise ValueError("right_id_column is required") + if not is_valid_catalog(self.right_catalog_dir): + raise ValueError("right_catalog_dir not a valid catalog") if not self.metadata_file_path: raise ValueError("column metadata file required for macauff crossmatch") @@ -148,4 +137,3 @@ def get_column_names(self): 'CatWISE_cont_f10', 'CatWISE_fit_sig', ] - diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index 76791dd0..a5f17dbc 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -1,78 +1,5 @@ -from hipscat import pixel_math -from tqdm import tqdm - -import hipscat_import.catalog.map_reduce as mr - -# import hipscat_import.catalog.run_import as ri -from hipscat_import.catalog.file_readers import CsvReader -from hipscat_import.cross_match.macauff_arguments import MacauffArguments -from hipscat_import.pipeline_resume_plan import PipelineResumePlan - - -def _map_pixels(args, client): - """Generate a raw histogram of object counts in each healpix pixel""" - - if args.resume_plan.is_mapping_done(): - return - - reader = CsvReader(column_names=args.column_names) - - reader_future = client.scatter(reader) - futures = [] - for key, file_path in args.resume_plan.map_files: - futures.append( - client.submit( - mr.map_to_pixels, - key=key, - input_file=file_path, - resume_path=args.resume_plan.tmp_path, - file_reader=reader_future, - mapping_key=key, - highest_order=args.mapping_healpix_order, - ra_column=args.left_ra_column, - dec_column=args.left_dec_column, - ) - ) - args.resume_plan.wait_for_mapping(futures) - +# pylint: disable=missing-function-docstring +# pylint: disable=unused-argument def run(args, client): - if not args: - raise ValueError("args is required and should be type MacauffArguments") - if not isinstance(args, MacauffArguments): - raise ValueError("args must be type ImportArguments") - _map_pixels(args, client) - - with tqdm( - total=2, desc=PipelineResumePlan.get_formatted_stage_name("Binning"), disable=not args.progress_bar - ) as step_progress: - raw_histogram = args.resume_plan.read_histogram(args.mapping_healpix_order) - step_progress.update(1) - if args.constant_healpix_order >= 0: - alignment = np.full(len(raw_histogram), None) - for pixel_num, pixel_sum in enumerate(raw_histogram): - alignment[pixel_num] = ( - args.constant_healpix_order, - pixel_num, - pixel_sum, - ) - - destination_pixel_map = pixel_math.generate_constant_pixel_map( - histogram=raw_histogram, - constant_healpix_order=args.constant_healpix_order, - ) - else: - alignment = pixel_math.generate_alignment( - raw_histogram, - highest_order=args.highest_healpix_order, - threshold=args.pixel_threshold, - ) - destination_pixel_map = pixel_math.compute_pixel_map( - raw_histogram, - highest_order=args.highest_healpix_order, - threshold=args.pixel_threshold, - ) - step_progress.update(1) - - - + pass diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index dea9ea78..d563963c 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -123,6 +123,10 @@ def formats_pandasindex(test_data_dir): def formats_multiindex(test_data_dir): return os.path.join(test_data_dir, "test_formats", "multiindex.parquet") +@pytest.fixture +def formats_yaml(test_data_dir): + return os.path.join(test_data_dir, "test_formats", "macauff_metadata.yaml") + @pytest.fixture def small_sky_parts_dir(test_data_dir): diff --git a/tests/hipscat_import/cross_match/test_macauff_arguments.py b/tests/hipscat_import/cross_match/test_macauff_arguments.py new file mode 100644 index 00000000..b33fb267 --- /dev/null +++ b/tests/hipscat_import/cross_match/test_macauff_arguments.py @@ -0,0 +1,305 @@ +"""Tests of macauff arguments""" + + +from os import path + +import pytest + +from hipscat_import.cross_match.macauff_arguments import MacauffArguments + +# pylint: disable=unused-variable + + +def test_macauff_arguments( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + """Test that we can create a MacauffArguments instance with two valid catalogs.""" + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + + assert len(args.input_paths) > 0 + +def test_macauff_arguments_file_list( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + """Test that we can create a MacauffArguments instance with two valid catalogs.""" + files = [path.join(small_sky_dir, "catalog.csv")] + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_file_list=files, + input_format="csv", + metadata_file_path=formats_yaml, + ) + + assert len(args.input_paths) > 0 + + +def test_macauff_args_no_input_path( + small_sky_object_catalog, + small_sky_source_catalog, + formats_yaml, + tmp_path +): + with pytest.raises(ValueError, match="not provided"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_format="csv", + metadata_file_path=formats_yaml, + ) + +def test_macauff_args_no_input_format( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + with pytest.raises(ValueError, match="input_format"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + metadata_file_path=formats_yaml, + ) + +def test_macauff_args_no_left_catalog( + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + with pytest.raises(ValueError, match="left_catalog_dir"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + +def test_macauff_args_no_left_id( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + with pytest.raises(ValueError, match="left_id_column"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + +def test_macauff_args_invalid_catalog( + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + with pytest.raises(ValueError, match="left_catalog_dir"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_dir, # valid path, but not a catalog + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + + +def test_macauff_args_no_right_catalog( + small_sky_object_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + with pytest.raises(ValueError, match="right_catalog_dir"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + +def test_macauff_args_no_right_catalog_id( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + with pytest.raises(ValueError, match="right_id_column"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + +def test_macauff_args_right_invalid_catalog( + small_sky_object_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + with pytest.raises(ValueError, match="right_catalog_dir"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_dir, # valid directory with files, not a catalog + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + +def test_macauff_args_no_metadata( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + tmp_path +): + with pytest.raises(ValueError, match="column metadata file"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + ) + +# def test_macauff_args_no_left_catalog( +# small_sky_object_catalog, +# small_sky_source_catalog, +# small_sky_dir, +# formats_yaml, +# tmp_path +# ): +# with pytest.raises(ValueError, match="left_catalog_dir"): +# args = MacauffArguments( +# output_path=tmp_path, +# output_catalog_name="object_to_source", +# tmp_dir=tmp_path, +# left_catalog_dir=small_sky_object_catalog, +# left_ra_column="ra", +# left_dec_column="dec", +# left_id_column="id", +# right_catalog_dir=small_sky_source_catalog, +# right_ra_column="source_ra", +# right_dec_column="source_dec", +# right_id_column="source_id", +# input_path=small_sky_dir, +# input_format="csv", +# metadata_file_path=formats_yaml, +# ) diff --git a/tests/hipscat_import/data/test_formats/macauff_metadata.yaml b/tests/hipscat_import/data/test_formats/macauff_metadata.yaml new file mode 100644 index 00000000..e41a8357 --- /dev/null +++ b/tests/hipscat_import/data/test_formats/macauff_metadata.yaml @@ -0,0 +1,3 @@ +#placeholder file while we wait for the full metadata example file. +left_catalog_name: small_sky_object_catalog +right_catalog_name: small_sky_source_catalog \ No newline at end of file From 626c39eda51060c8552155a528ffc1e4c95c49ef Mon Sep 17 00:00:00 2001 From: Max West Date: Wed, 25 Oct 2023 10:28:19 -0400 Subject: [PATCH 04/11] create boilerplate for macauff runner + tests --- .../cross_match/macauff_map_reduce.py | 5 --- .../cross_match/run_macauff_import.py | 12 +++++ src/hipscat_import/pipeline.py | 2 +- .../cross_match/test_macauff_runner.py | 44 +++++++++++++++++++ 4 files changed, 57 insertions(+), 6 deletions(-) delete mode 100644 src/hipscat_import/cross_match/macauff_map_reduce.py create mode 100644 src/hipscat_import/cross_match/run_macauff_import.py create mode 100644 tests/hipscat_import/cross_match/test_macauff_runner.py diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py deleted file mode 100644 index a5f17dbc..00000000 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ /dev/null @@ -1,5 +0,0 @@ -# pylint: disable=missing-function-docstring -# pylint: disable=unused-argument - -def run(args, client): - pass diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py new file mode 100644 index 00000000..f133c8bc --- /dev/null +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -0,0 +1,12 @@ +from hipscat_import.cross_match.macauff_arguments import MacauffArguments + +# pylint: disable=unused-argument + +def run(args, client): + """run macauff cross-match import pipeline""" + if not args: + raise TypeError("args is required and should be type MacauffArguments") + if not isinstance(args, MacauffArguments): + raise TypeError("args must be type MacauffArguments") + + raise NotImplementedError("macauff pipeline not implemented yet.") diff --git a/src/hipscat_import/pipeline.py b/src/hipscat_import/pipeline.py index 99ed497d..ec253a9d 100644 --- a/src/hipscat_import/pipeline.py +++ b/src/hipscat_import/pipeline.py @@ -5,7 +5,7 @@ from dask.distributed import Client import hipscat_import.catalog.run_import as catalog_runner -import hipscat_import.cross_match.macauff_map_reduce as macauff_runner +import hipscat_import.cross_match.run_macauff_import as macauff_runner import hipscat_import.index.run_index as index_runner import hipscat_import.margin_cache.margin_cache as margin_runner import hipscat_import.soap.run_soap as soap_runner diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py new file mode 100644 index 00000000..bc0f9073 --- /dev/null +++ b/tests/hipscat_import/cross_match/test_macauff_runner.py @@ -0,0 +1,44 @@ +import pytest + +import hipscat_import.cross_match.run_macauff_import as runner +from hipscat_import.cross_match.macauff_arguments import MacauffArguments + + +def test_bad_args(dask_client): + """Runner should fail with empty or mis-typed arguments""" + with pytest.raises(TypeError, match="MacauffArguments"): + runner.run(None, dask_client) + + args = {"output_catalog_name": "bad_arg_type"} + with pytest.raises(TypeError, match="MacauffArguments"): + runner.run(args, dask_client) + + +def test_no_implementation( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path, + dask_client, +): + """Test that we can create a MacauffArguments instance with two valid catalogs.""" + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path=formats_yaml, + ) + + with pytest.raises(NotImplementedError, match="not implemented yet."): + runner.run(args, dask_client) From 4cec8a6474be79c5bfbeccb0b2bfa8db2ea2e3fd Mon Sep 17 00:00:00 2001 From: Max West Date: Wed, 25 Oct 2023 10:31:28 -0400 Subject: [PATCH 05/11] remove commented out test --- .../cross_match/test_macauff_arguments.py | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/tests/hipscat_import/cross_match/test_macauff_arguments.py b/tests/hipscat_import/cross_match/test_macauff_arguments.py index b33fb267..dfa19b51 100644 --- a/tests/hipscat_import/cross_match/test_macauff_arguments.py +++ b/tests/hipscat_import/cross_match/test_macauff_arguments.py @@ -278,28 +278,3 @@ def test_macauff_args_no_metadata( input_path=small_sky_dir, input_format="csv", ) - -# def test_macauff_args_no_left_catalog( -# small_sky_object_catalog, -# small_sky_source_catalog, -# small_sky_dir, -# formats_yaml, -# tmp_path -# ): -# with pytest.raises(ValueError, match="left_catalog_dir"): -# args = MacauffArguments( -# output_path=tmp_path, -# output_catalog_name="object_to_source", -# tmp_dir=tmp_path, -# left_catalog_dir=small_sky_object_catalog, -# left_ra_column="ra", -# left_dec_column="dec", -# left_id_column="id", -# right_catalog_dir=small_sky_source_catalog, -# right_ra_column="source_ra", -# right_dec_column="source_dec", -# right_id_column="source_id", -# input_path=small_sky_dir, -# input_format="csv", -# metadata_file_path=formats_yaml, -# ) From 7c86ca1648d64787a294e6bedab1b4086d8178ee Mon Sep 17 00:00:00 2001 From: Max West Date: Wed, 25 Oct 2023 13:30:10 -0400 Subject: [PATCH 06/11] add __future__.annotations --- src/hipscat_import/cross_match/macauff_arguments.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 6bef805d..09b0fceb 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass, field from os import path from typing import List From 84c845b3026cd53252e2d41e41dfff84260237af Mon Sep 17 00:00:00 2001 From: Max West Date: Wed, 25 Oct 2023 13:51:22 -0400 Subject: [PATCH 07/11] linter problems --- tests/hipscat_import/cross_match/test_macauff_arguments.py | 1 + tests/hipscat_import/cross_match/test_macauff_runner.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/tests/hipscat_import/cross_match/test_macauff_arguments.py b/tests/hipscat_import/cross_match/test_macauff_arguments.py index dfa19b51..1d8c0907 100644 --- a/tests/hipscat_import/cross_match/test_macauff_arguments.py +++ b/tests/hipscat_import/cross_match/test_macauff_arguments.py @@ -8,6 +8,7 @@ from hipscat_import.cross_match.macauff_arguments import MacauffArguments # pylint: disable=unused-variable +# pylint: disable=duplicate-code def test_macauff_arguments( diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py index bc0f9073..cac03d29 100644 --- a/tests/hipscat_import/cross_match/test_macauff_runner.py +++ b/tests/hipscat_import/cross_match/test_macauff_runner.py @@ -3,6 +3,8 @@ import hipscat_import.cross_match.run_macauff_import as runner from hipscat_import.cross_match.macauff_arguments import MacauffArguments +# pylint: disable=too-many-instance-attributes +# pylint: disable=duplicate-code def test_bad_args(dask_client): """Runner should fail with empty or mis-typed arguments""" From 87e484e8699e7152186192139693acb33bdd25cb Mon Sep 17 00:00:00 2001 From: Max West Date: Wed, 25 Oct 2023 15:55:05 -0400 Subject: [PATCH 08/11] add more tests for missing coverage --- .../cross_match/test_macauff_arguments.py | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/tests/hipscat_import/cross_match/test_macauff_arguments.py b/tests/hipscat_import/cross_match/test_macauff_arguments.py index 1d8c0907..6dd8ae9a 100644 --- a/tests/hipscat_import/cross_match/test_macauff_arguments.py +++ b/tests/hipscat_import/cross_match/test_macauff_arguments.py @@ -279,3 +279,76 @@ def test_macauff_args_no_metadata( input_path=small_sky_dir, input_format="csv", ) + +def test_macauff_args_invalid_metadata_file( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + tmp_path +): + with pytest.raises(ValueError, match="column metadata file must"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="csv", + metadata_file_path="ceci_n_est_pas_un_fichier.xml", + ) + +def test_macauff_args_invalid_input_directory( + small_sky_object_catalog, + small_sky_source_catalog, + formats_yaml, + tmp_path +): + with pytest.raises(FileNotFoundError, match="input_path not found"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path="ceci_n_est_pas_un_directoire/", + input_format="csv", + metadata_file_path=formats_yaml, + ) + +def test_macauff_args_no_files( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + with pytest.raises(FileNotFoundError, match="No input files found"): + args = MacauffArguments( + output_path=tmp_path, + output_catalog_name="object_to_source", + tmp_dir=tmp_path, + left_catalog_dir=small_sky_object_catalog, + left_ra_column="ra", + left_dec_column="dec", + left_id_column="id", + right_catalog_dir=small_sky_source_catalog, + right_ra_column="source_ra", + right_dec_column="source_dec", + right_id_column="source_id", + input_path=small_sky_dir, + input_format="parquet", # no files of this format will be found + metadata_file_path=formats_yaml, + ) From c6c2b2faf3354945130b7aa48430d4decea3ea84 Mon Sep 17 00:00:00 2001 From: Max West Date: Thu, 26 Oct 2023 10:54:57 -0400 Subject: [PATCH 09/11] refactor MacauffArguments required parameter tests --- .../cross_match/macauff_arguments.py | 12 +- .../cross_match/test_macauff_arguments.py | 235 +++++------------- 2 files changed, 73 insertions(+), 174 deletions(-) diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 09b0fceb..948fedf5 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -69,7 +69,7 @@ def _check_arguments(self): super()._check_arguments() if not self.input_path and not self.input_file_list: - raise ValueError("input files/path not provided") + raise ValueError("input_path nor input_file_list not provided") if not self.input_format: raise ValueError("input_format is required") @@ -77,6 +77,10 @@ def _check_arguments(self): raise ValueError("left_catalog_dir is required") if not self.left_id_column: raise ValueError("left_id_column is required") + if not self.left_ra_column: + raise ValueError("left_ra_column is required") + if not self.left_dec_column: + raise ValueError("left_dec_column is required") if not is_valid_catalog(self.left_catalog_dir): raise ValueError("left_catalog_dir not a valid catalog") @@ -84,11 +88,15 @@ def _check_arguments(self): raise ValueError("right_catalog_dir is required") if not self.right_id_column: raise ValueError("right_id_column is required") + if not self.right_ra_column: + raise ValueError("right_ra_column is required") + if not self.right_dec_column: + raise ValueError("right_dec_column is required") if not is_valid_catalog(self.right_catalog_dir): raise ValueError("right_catalog_dir not a valid catalog") if not self.metadata_file_path: - raise ValueError("column metadata file required for macauff crossmatch") + raise ValueError("metadata_file_path required for macauff crossmatch") if not path.isfile(self.metadata_file_path): raise ValueError("Macauff column metadata file must point to valid file path.") diff --git a/tests/hipscat_import/cross_match/test_macauff_arguments.py b/tests/hipscat_import/cross_match/test_macauff_arguments.py index 6dd8ae9a..c0ab2b49 100644 --- a/tests/hipscat_import/cross_match/test_macauff_arguments.py +++ b/tests/hipscat_import/cross_match/test_macauff_arguments.py @@ -7,7 +7,6 @@ from hipscat_import.cross_match.macauff_arguments import MacauffArguments -# pylint: disable=unused-variable # pylint: disable=duplicate-code @@ -38,6 +37,64 @@ def test_macauff_arguments( assert len(args.input_paths) > 0 +def test_empty_required( + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path +): + """All non-runtime arguments are required.""" + + ## List of required args: + ## - match expression that should be found when missing + ## - default value + required_args = [ + ["output_path", tmp_path], + ["output_catalog_name", "object_to_source"], + ["left_catalog_dir", small_sky_object_catalog], + ["left_ra_column", "ra"], + ["left_dec_column", "dec"], + ["left_id_column", "id"], + ["right_catalog_dir", small_sky_source_catalog], + ["right_ra_column", "source_ra"], + ["right_dec_column", "source_dec"], + ["right_id_column", "source_id"], + ["input_path", small_sky_dir], + ["input_format", "csv"], + ["metadata_file_path", formats_yaml] + ] + + ## For each required argument, check that a ValueError is raised that matches the + ## expected name of the missing param. + for index, args in enumerate(required_args): + test_args = [ + list_args[1] if list_index != index else None + for list_index, list_args in enumerate(required_args) + ] + + print(f"testing required arg #{index}") + + with pytest.raises(ValueError, match=args[0]): + MacauffArguments( + output_path=test_args[0], + output_catalog_name=test_args[1], + tmp_dir=tmp_path, + left_catalog_dir=test_args[2], + left_ra_column=test_args[3], + left_dec_column=test_args[4], + left_id_column=test_args[5], + right_catalog_dir=test_args[6], + right_ra_column=test_args[7], + right_dec_column=test_args[8], + right_id_column=test_args[9], + input_path=test_args[10], + input_format=test_args[11], + metadata_file_path=test_args[12], + overwrite=True, + ) + + def test_macauff_arguments_file_list( small_sky_object_catalog, small_sky_source_catalog, @@ -66,101 +123,6 @@ def test_macauff_arguments_file_list( assert len(args.input_paths) > 0 - -def test_macauff_args_no_input_path( - small_sky_object_catalog, - small_sky_source_catalog, - formats_yaml, - tmp_path -): - with pytest.raises(ValueError, match="not provided"): - args = MacauffArguments( - output_path=tmp_path, - output_catalog_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_format="csv", - metadata_file_path=formats_yaml, - ) - -def test_macauff_args_no_input_format( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path -): - with pytest.raises(ValueError, match="input_format"): - args = MacauffArguments( - output_path=tmp_path, - output_catalog_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - metadata_file_path=formats_yaml, - ) - -def test_macauff_args_no_left_catalog( - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path -): - with pytest.raises(ValueError, match="left_catalog_dir"): - args = MacauffArguments( - output_path=tmp_path, - output_catalog_name="object_to_source", - tmp_dir=tmp_path, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - input_format="csv", - metadata_file_path=formats_yaml, - ) - -def test_macauff_args_no_left_id( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path -): - with pytest.raises(ValueError, match="left_id_column"): - args = MacauffArguments( - output_path=tmp_path, - output_catalog_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - input_format="csv", - metadata_file_path=formats_yaml, - ) - def test_macauff_args_invalid_catalog( small_sky_source_catalog, small_sky_dir, @@ -168,7 +130,7 @@ def test_macauff_args_invalid_catalog( tmp_path ): with pytest.raises(ValueError, match="left_catalog_dir"): - args = MacauffArguments( + MacauffArguments( output_path=tmp_path, output_catalog_name="object_to_source", tmp_dir=tmp_path, @@ -185,54 +147,6 @@ def test_macauff_args_invalid_catalog( metadata_file_path=formats_yaml, ) - -def test_macauff_args_no_right_catalog( - small_sky_object_catalog, - small_sky_dir, - formats_yaml, - tmp_path -): - with pytest.raises(ValueError, match="right_catalog_dir"): - args = MacauffArguments( - output_path=tmp_path, - output_catalog_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - input_format="csv", - metadata_file_path=formats_yaml, - ) - -def test_macauff_args_no_right_catalog_id( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path -): - with pytest.raises(ValueError, match="right_id_column"): - args = MacauffArguments( - output_path=tmp_path, - output_catalog_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - input_path=small_sky_dir, - input_format="csv", - metadata_file_path=formats_yaml, - ) - def test_macauff_args_right_invalid_catalog( small_sky_object_catalog, small_sky_dir, @@ -240,7 +154,7 @@ def test_macauff_args_right_invalid_catalog( tmp_path ): with pytest.raises(ValueError, match="right_catalog_dir"): - args = MacauffArguments( + MacauffArguments( output_path=tmp_path, output_catalog_name="object_to_source", tmp_dir=tmp_path, @@ -257,29 +171,6 @@ def test_macauff_args_right_invalid_catalog( metadata_file_path=formats_yaml, ) -def test_macauff_args_no_metadata( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - tmp_path -): - with pytest.raises(ValueError, match="column metadata file"): - args = MacauffArguments( - output_path=tmp_path, - output_catalog_name="object_to_source", - tmp_dir=tmp_path, - left_catalog_dir=small_sky_object_catalog, - left_ra_column="ra", - left_dec_column="dec", - left_id_column="id", - right_catalog_dir=small_sky_source_catalog, - right_ra_column="source_ra", - right_dec_column="source_dec", - right_id_column="source_id", - input_path=small_sky_dir, - input_format="csv", - ) - def test_macauff_args_invalid_metadata_file( small_sky_object_catalog, small_sky_source_catalog, @@ -287,7 +178,7 @@ def test_macauff_args_invalid_metadata_file( tmp_path ): with pytest.raises(ValueError, match="column metadata file must"): - args = MacauffArguments( + MacauffArguments( output_path=tmp_path, output_catalog_name="object_to_source", tmp_dir=tmp_path, @@ -311,7 +202,7 @@ def test_macauff_args_invalid_input_directory( tmp_path ): with pytest.raises(FileNotFoundError, match="input_path not found"): - args = MacauffArguments( + MacauffArguments( output_path=tmp_path, output_catalog_name="object_to_source", tmp_dir=tmp_path, @@ -336,7 +227,7 @@ def test_macauff_args_no_files( tmp_path ): with pytest.raises(FileNotFoundError, match="No input files found"): - args = MacauffArguments( + MacauffArguments( output_path=tmp_path, output_catalog_name="object_to_source", tmp_dir=tmp_path, From b95de6d8efd1aefe85ef0143bf0b5ab9b00e0a6d Mon Sep 17 00:00:00 2001 From: Max West Date: Thu, 26 Oct 2023 14:40:50 -0400 Subject: [PATCH 10/11] address more comments from pr #152 --- .../cross_match/macauff_arguments.py | 24 +------------------ 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 948fedf5..53e23553 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -7,7 +7,6 @@ from hipscat.io import FilePointer, file_io from hipscat.io.validation import is_valid_catalog -from hipscat_import.catalog.resume_plan import ResumePlan from hipscat_import.runtime_arguments import RuntimeArguments # pylint: disable=too-many-instance-attributes @@ -30,17 +29,6 @@ class MacauffArguments(RuntimeArguments): """resolved list of all files that will be used in the importer""" add_hipscat_index: bool = True """add the hipscat spatial index field alongside the data""" - highest_healpix_order: int = 10 - """healpix order to use when mapping. this will - not necessarily be the order used in the final catalog, as we may combine - pixels that don't meed the threshold""" - pixel_threshold: int = 1_000_000 - """maximum number of rows for a single resulting pixel. - we may combine hierarchically until we near the ``pixel_threshold``""" - mapping_healpix_order: int = -1 - """healpix order to use when mapping. will be - ``highest_healpix_order`` unless a positive value is provided for - ``constant_healpix_order``""" ## Input - Left catalog left_catalog_dir: str = "" @@ -56,12 +44,9 @@ class MacauffArguments(RuntimeArguments): ## `macauff` specific attributes metadata_file_path: str = "" - match_probability_column: str = "match_p" + match_probability_columns: List[str] = field(default_factory=list) column_names: List[str] = field(default_factory=list) - resume: bool = True - resume_plan: ResumePlan | None = None - def __post_init__(self): self._check_arguments() @@ -112,13 +97,6 @@ def _check_arguments(self): self.column_names = self.get_column_names() - self.resume_plan = ResumePlan( - resume=self.resume, - progress_bar=True, - input_paths=self.input_paths, - tmp_path=self.tmp_path, - ) - def get_column_names(self): """Grab the macauff column names.""" # TODO: Actually read in the metadata file once we get the example file from Tom. From 49c931044842673848762ceff1f25c82570580ec Mon Sep 17 00:00:00 2001 From: Max West Date: Mon, 30 Oct 2023 13:38:38 -0700 Subject: [PATCH 11/11] add dask pytest mark + black formatting --- .../cross_match/macauff_arguments.py | 45 +++++++------- .../cross_match/run_macauff_import.py | 1 + tests/hipscat_import/conftest.py | 1 + .../cross_match/test_macauff_arguments.py | 62 ++++++------------- .../cross_match/test_macauff_runner.py | 15 +++-- 5 files changed, 52 insertions(+), 72 deletions(-) diff --git a/src/hipscat_import/cross_match/macauff_arguments.py b/src/hipscat_import/cross_match/macauff_arguments.py index 53e23553..93f3bca6 100644 --- a/src/hipscat_import/cross_match/macauff_arguments.py +++ b/src/hipscat_import/cross_match/macauff_arguments.py @@ -16,6 +16,7 @@ @dataclass class MacauffArguments(RuntimeArguments): """Data class for holding cross-match association arguments""" + ## Input - Cross-match data input_path: FilePointer | None = None """path to search for the input data""" @@ -102,26 +103,26 @@ def get_column_names(self): # TODO: Actually read in the metadata file once we get the example file from Tom. return [ - 'Gaia_designation', - 'Gaia_RA', - 'Gaia_Dec', - 'BP', - 'G', - 'RP', - 'CatWISE_Name', - 'CatWISE_RA', - 'CatWISE_Dec', - 'W1', - 'W2', - 'match_p', - 'Separation', - 'eta', - 'xi', - 'Gaia_avg_cont', - 'CatWISE_avg_cont', - 'Gaia_cont_f1', - 'Gaia_cont_f10', - 'CatWISE_cont_f1', - 'CatWISE_cont_f10', - 'CatWISE_fit_sig', + "Gaia_designation", + "Gaia_RA", + "Gaia_Dec", + "BP", + "G", + "RP", + "CatWISE_Name", + "CatWISE_RA", + "CatWISE_Dec", + "W1", + "W2", + "match_p", + "Separation", + "eta", + "xi", + "Gaia_avg_cont", + "CatWISE_avg_cont", + "Gaia_cont_f1", + "Gaia_cont_f10", + "CatWISE_cont_f1", + "CatWISE_cont_f10", + "CatWISE_fit_sig", ] diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py index f133c8bc..aad49b51 100644 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -2,6 +2,7 @@ # pylint: disable=unused-argument + def run(args, client): """run macauff cross-match import pipeline""" if not args: diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index d563963c..1d6f2d77 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -123,6 +123,7 @@ def formats_pandasindex(test_data_dir): def formats_multiindex(test_data_dir): return os.path.join(test_data_dir, "test_formats", "multiindex.parquet") + @pytest.fixture def formats_yaml(test_data_dir): return os.path.join(test_data_dir, "test_formats", "macauff_metadata.yaml") diff --git a/tests/hipscat_import/cross_match/test_macauff_arguments.py b/tests/hipscat_import/cross_match/test_macauff_arguments.py index c0ab2b49..ab981ae0 100644 --- a/tests/hipscat_import/cross_match/test_macauff_arguments.py +++ b/tests/hipscat_import/cross_match/test_macauff_arguments.py @@ -11,11 +11,7 @@ def test_macauff_arguments( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path ): """Test that we can create a MacauffArguments instance with two valid catalogs.""" args = MacauffArguments( @@ -37,12 +33,9 @@ def test_macauff_arguments( assert len(args.input_paths) > 0 + def test_empty_required( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path ): """All non-runtime arguments are required.""" @@ -62,7 +55,7 @@ def test_empty_required( ["right_id_column", "source_id"], ["input_path", small_sky_dir], ["input_format", "csv"], - ["metadata_file_path", formats_yaml] + ["metadata_file_path", formats_yaml], ] ## For each required argument, check that a ValueError is raised that matches the @@ -96,11 +89,7 @@ def test_empty_required( def test_macauff_arguments_file_list( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path ): """Test that we can create a MacauffArguments instance with two valid catalogs.""" files = [path.join(small_sky_dir, "catalog.csv")] @@ -123,18 +112,14 @@ def test_macauff_arguments_file_list( assert len(args.input_paths) > 0 -def test_macauff_args_invalid_catalog( - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path -): + +def test_macauff_args_invalid_catalog(small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path): with pytest.raises(ValueError, match="left_catalog_dir"): MacauffArguments( output_path=tmp_path, output_catalog_name="object_to_source", tmp_dir=tmp_path, - left_catalog_dir=small_sky_dir, # valid path, but not a catalog + left_catalog_dir=small_sky_dir, # valid path, but not a catalog left_ra_column="ra", left_dec_column="dec", left_id_column="id", @@ -147,12 +132,8 @@ def test_macauff_args_invalid_catalog( metadata_file_path=formats_yaml, ) -def test_macauff_args_right_invalid_catalog( - small_sky_object_catalog, - small_sky_dir, - formats_yaml, - tmp_path -): + +def test_macauff_args_right_invalid_catalog(small_sky_object_catalog, small_sky_dir, formats_yaml, tmp_path): with pytest.raises(ValueError, match="right_catalog_dir"): MacauffArguments( output_path=tmp_path, @@ -162,7 +143,7 @@ def test_macauff_args_right_invalid_catalog( left_ra_column="ra", left_dec_column="dec", left_id_column="id", - right_catalog_dir=small_sky_dir, # valid directory with files, not a catalog + right_catalog_dir=small_sky_dir, # valid directory with files, not a catalog right_ra_column="source_ra", right_dec_column="source_dec", right_id_column="source_id", @@ -171,11 +152,9 @@ def test_macauff_args_right_invalid_catalog( metadata_file_path=formats_yaml, ) + def test_macauff_args_invalid_metadata_file( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - tmp_path + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, tmp_path ): with pytest.raises(ValueError, match="column metadata file must"): MacauffArguments( @@ -195,11 +174,9 @@ def test_macauff_args_invalid_metadata_file( metadata_file_path="ceci_n_est_pas_un_fichier.xml", ) + def test_macauff_args_invalid_input_directory( - small_sky_object_catalog, - small_sky_source_catalog, - formats_yaml, - tmp_path + small_sky_object_catalog, small_sky_source_catalog, formats_yaml, tmp_path ): with pytest.raises(FileNotFoundError, match="input_path not found"): MacauffArguments( @@ -219,12 +196,9 @@ def test_macauff_args_invalid_input_directory( metadata_file_path=formats_yaml, ) + def test_macauff_args_no_files( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path + small_sky_object_catalog, small_sky_source_catalog, small_sky_dir, formats_yaml, tmp_path ): with pytest.raises(FileNotFoundError, match="No input files found"): MacauffArguments( @@ -240,6 +214,6 @@ def test_macauff_args_no_files( right_dec_column="source_dec", right_id_column="source_id", input_path=small_sky_dir, - input_format="parquet", # no files of this format will be found + input_format="parquet", # no files of this format will be found metadata_file_path=formats_yaml, ) diff --git a/tests/hipscat_import/cross_match/test_macauff_runner.py b/tests/hipscat_import/cross_match/test_macauff_runner.py index cac03d29..1431b60d 100644 --- a/tests/hipscat_import/cross_match/test_macauff_runner.py +++ b/tests/hipscat_import/cross_match/test_macauff_runner.py @@ -6,6 +6,8 @@ # pylint: disable=too-many-instance-attributes # pylint: disable=duplicate-code + +@pytest.mark.dask def test_bad_args(dask_client): """Runner should fail with empty or mis-typed arguments""" with pytest.raises(TypeError, match="MacauffArguments"): @@ -16,13 +18,14 @@ def test_bad_args(dask_client): runner.run(args, dask_client) +@pytest.mark.dask def test_no_implementation( - small_sky_object_catalog, - small_sky_source_catalog, - small_sky_dir, - formats_yaml, - tmp_path, - dask_client, + small_sky_object_catalog, + small_sky_source_catalog, + small_sky_dir, + formats_yaml, + tmp_path, + dask_client, ): """Test that we can create a MacauffArguments instance with two valid catalogs.""" args = MacauffArguments(