Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use type-specific catalog info #84

Merged
merged 3 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions src/hipscat_import/association/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from dataclasses import dataclass

from hipscat.catalog import CatalogParameters
from hipscat.catalog.association_catalog.association_catalog import (
AssociationCatalogInfo,
)

from hipscat_import.runtime_arguments import RuntimeArguments

Expand Down Expand Up @@ -51,19 +53,20 @@ def _check_arguments(self):
if self.compute_partition_size < 100_000:
raise ValueError("compute_partition_size must be at least 100_000")

def to_catalog_parameters(self) -> CatalogParameters:
"""Convert importing arguments into hipscat catalog parameters.

Returns:
CatalogParameters for catalog being created.
"""
return CatalogParameters(
catalog_name=self.output_catalog_name,
catalog_type="association",
output_path=self.output_path,
)
def to_catalog_info(self, total_rows) -> AssociationCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_type": "association",
"total_rows": total_rows,
"primary_column": self.primary_id_column,
"primary_catalog": str(self.primary_input_catalog_path),
"join_column": self.join_id_column,
"join_catalog": str(self.join_input_catalog_path),
}
return AssociationCatalogInfo(**info)

def additional_runtime_provenance_info(self):
def additional_runtime_provenance_info(self) -> dict:
return {
"primary_input_catalog_path": str(self.primary_input_catalog_path),
"primary_id_column": self.primary_id_column,
Expand Down
17 changes: 12 additions & 5 deletions src/hipscat_import/association/run_association.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from tqdm import tqdm

from hipscat_import.association.arguments import AssociationArguments
from hipscat_import.association.map_reduce import map_association, reduce_association
from hipscat_import.association.map_reduce import (map_association,
reduce_association)


def _validate_args(args):
Expand Down Expand Up @@ -40,11 +41,17 @@ def run(args):
) as step_progress:
# pylint: disable=duplicate-code
# Very similar to /index/run_index.py
catalog_params = args.to_catalog_parameters()
catalog_params.total_rows = int(rows_written)
write_metadata.write_provenance_info(catalog_params, args.provenance_info())
catalog_info = args.to_catalog_info(int(rows_written))
write_metadata.write_provenance_info(
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
tool_args=args.provenance_info(),
)
step_progress.update(1)
write_metadata.write_catalog_info(catalog_params)
catalog_info = args.to_catalog_info(total_rows=int(rows_written))
write_metadata.write_catalog_info(
dataset_info=catalog_info, catalog_base_dir=args.catalog_path
)
step_progress.update(1)
write_metadata.write_parquet_metadata(args.catalog_path)
step_progress.update(1)
Expand Down
33 changes: 17 additions & 16 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable, List

import pandas as pd
from hipscat.catalog import CatalogParameters
from hipscat.catalog.catalog import CatalogInfo
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math import hipscat_id

Expand Down Expand Up @@ -140,21 +140,19 @@ def _check_arguments(self):
if not self.filter_function:
self.filter_function = passthrough_filter_function

def to_catalog_parameters(self) -> CatalogParameters:
"""Convert importing arguments into hipscat catalog parameters.
Returns:
CatalogParameters for catalog being created.
"""
return CatalogParameters(
catalog_name=self.output_catalog_name,
catalog_type=self.catalog_type,
output_path=self.output_path,
epoch=self.epoch,
ra_column=self.ra_column,
dec_column=self.dec_column,
)
def to_catalog_info(self, total_rows) -> CatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_type": self.catalog_type,
"total_rows": total_rows,
"epoch": self.epoch,
"ra_column": self.ra_column,
"dec_column": self.dec_column,
}
return CatalogInfo(**info)

def additional_runtime_provenance_info(self):
def additional_runtime_provenance_info(self) -> dict:
return {
"catalog_name": self.output_catalog_name,
"epoch": self.epoch,
Expand All @@ -171,7 +169,9 @@ def additional_runtime_provenance_info(self):
"pixel_threshold": self.pixel_threshold,
"mapping_healpix_order": self.mapping_healpix_order,
"debug_stats_only": self.debug_stats_only,
"file_reader_info": self.file_reader.provenance_info(),
"file_reader_info": self.file_reader.provenance_info()
if self.file_reader is not None
else {},
}


Expand All @@ -180,6 +180,7 @@ def check_healpix_order_range(
):
"""Helper method to heck if the `order` is within the range determined by the
`lower_bound` and `upper_bound`, inclusive.

Args:
order (int): healpix order to check
field_name (str): field name to use in the error message
Expand Down
28 changes: 17 additions & 11 deletions src/hipscat_import/catalog/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ def _map_pixels(args, client):
total=len(futures),
disable=(not args.progress_bar),
):
if future.status == "error": # pragma: no cover
if future.status == "error": # pragma: no cover
some_error = True
raw_histogram = np.add(raw_histogram, result)
resume.write_mapping_start_key(args.tmp_path, future.key)
resume.write_histogram(args.tmp_path, raw_histogram)
resume.write_mapping_done_key(args.tmp_path, future.key)
if some_error: # pragma: no cover
if some_error: # pragma: no cover
raise RuntimeError("Some mapping stages failed. See logs for details.")
resume.set_mapping_done(args.tmp_path)
return raw_histogram
Expand Down Expand Up @@ -98,10 +98,10 @@ def _split_pixels(args, alignment_future, client):
total=len(futures),
disable=(not args.progress_bar),
):
if future.status == "error": # pragma: no cover
if future.status == "error": # pragma: no cover
some_error = True
resume.write_splitting_done_key(args.tmp_path, future.key)
if some_error: # pragma: no cover
if some_error: # pragma: no cover
raise RuntimeError("Some splitting stages failed. See logs for details.")
resume.set_splitting_done(args.tmp_path)

Expand Down Expand Up @@ -143,10 +143,10 @@ def _reduce_pixels(args, destination_pixel_map, client):
total=len(futures),
disable=(not args.progress_bar),
):
if future.status == "error": # pragma: no cover
if future.status == "error": # pragma: no cover
some_error = True
resume.write_reducing_key(args.tmp_path, future.key)
if some_error: # pragma: no cover
if some_error: # pragma: no cover
raise RuntimeError("Some reducing stages failed. See logs for details.")
resume.set_reducing_done(args.tmp_path)

Expand Down Expand Up @@ -215,20 +215,26 @@ def run_with_client(args, client):
with tqdm(
total=6, desc="Finishing", disable=not args.progress_bar
) as step_progress:
catalog_parameters = args.to_catalog_parameters()
catalog_parameters.total_rows = int(raw_histogram.sum())
io.write_provenance_info(catalog_parameters, args.provenance_info())
catalog_info = args.to_catalog_info(int(raw_histogram.sum()))
io.write_provenance_info(
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
tool_args=args.provenance_info(),
)
step_progress.update(1)

io.write_catalog_info(catalog_parameters)
io.write_catalog_info(
catalog_base_dir=args.catalog_path, dataset_info=catalog_info
)
step_progress.update(1)
if not args.debug_stats_only:
io.write_parquet_metadata(args.catalog_path)
step_progress.update(1)
io.write_fits_map(args.catalog_path, raw_histogram)
step_progress.update(1)
io.write_partition_info(
catalog_parameters, destination_healpix_pixel_map=destination_pixel_map
catalog_base_dir=args.catalog_path,
destination_healpix_pixel_map=destination_pixel_map,
)
step_progress.update(1)
resume.clean_resume_files(args.tmp_path)
Expand Down
27 changes: 14 additions & 13 deletions src/hipscat_import/index/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from dataclasses import dataclass, field
from typing import List, Optional

from hipscat.catalog import Catalog, CatalogParameters
from hipscat.catalog import Catalog
from hipscat.catalog.index.index_catalog_info import IndexCatalogInfo

from hipscat_import.runtime_arguments import RuntimeArguments

Expand Down Expand Up @@ -46,19 +47,19 @@ def _check_arguments(self):
if self.compute_partition_size < 100_000:
raise ValueError("compute_partition_size must be at least 100_000")

def to_catalog_parameters(self) -> CatalogParameters:
"""Convert importing arguments into hipscat catalog parameters.

Returns:
CatalogParameters for catalog being created.
"""
return CatalogParameters(
catalog_name=self.output_catalog_name,
catalog_type="index",
output_path=self.output_path,
)
def to_catalog_info(self, total_rows) -> IndexCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"total_rows": total_rows,
"catalog_type": "index",
"primary_catalog": str(self.input_catalog_path),
"indexing_column": self.indexing_column,
"extra_columns": self.extra_columns,
}
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
return IndexCatalogInfo(**info)

def additional_runtime_provenance_info(self):
def additional_runtime_provenance_info(self) -> dict:
return {
"input_catalog_path": str(self.input_catalog_path),
"indexing_column": self.indexing_column,
Expand Down
13 changes: 9 additions & 4 deletions src/hipscat_import/index/run_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ def run(args):
) as step_progress:
# pylint: disable=duplicate-code
# Very similar to /association/run_association.py
catalog_params = args.to_catalog_parameters()
catalog_params.total_rows = int(rows_written)
write_metadata.write_provenance_info(catalog_params, args.provenance_info())
catalog_info = args.to_catalog_info(int(rows_written))
write_metadata.write_provenance_info(
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
tool_args=args.provenance_info(),
)
step_progress.update(1)
write_metadata.write_catalog_info(catalog_params)
write_metadata.write_catalog_info(
catalog_base_dir=args.catalog_path, dataset_info=catalog_info
)
step_progress.update(1)
file_io.remove_directory(args.tmp_path, ignore_errors=True)
step_progress.update(1)
Expand Down
26 changes: 25 additions & 1 deletion src/hipscat_import/margin_cache/margin_cache_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import healpy as hp
import numpy as np
from hipscat.catalog import Catalog
from hipscat.catalog.margin_cache.margin_cache_catalog_info import (
MarginCacheCatalogInfo,
)
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
from hipscat.io import file_io

from hipscat_import.runtime_arguments import RuntimeArguments
Expand Down Expand Up @@ -53,9 +56,30 @@ def _check_arguments(self):

margin_pixel_nside = hp.order2nside(self.margin_order)

if hp.nside2resol(margin_pixel_nside, arcmin=True) * 60. < self.margin_threshold:
if (
hp.nside2resol(margin_pixel_nside, arcmin=True) * 60.0
< self.margin_threshold
):
# pylint: disable=line-too-long
warnings.warn(
"Warning: margin pixels have a smaller resolution than margin_threshold; this may lead to data loss in the margin cache."
)
# pylint: enable=line-too-long

def to_catalog_info(self, total_rows) -> MarginCacheCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"total_rows": total_rows,
"catalog_type": "margin",
"primary_catalog": self.input_catalog_path,
"margin_threshold": self.margin_threshold,
}
return MarginCacheCatalogInfo(**info)

def additional_runtime_provenance_info(self) -> dict:
return {
"input_catalog_path": str(self.input_catalog_path),
"margin_threshold": self.margin_threshold,
"margin_order": self.margin_order,
}
9 changes: 5 additions & 4 deletions tests/hipscat_import/association/test_association_argument.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ def test_all_required_args(tmp_path, small_sky_object_catalog):
)


def test_to_catalog_parameters(small_sky_object_catalog, tmp_path):
"""Verify creation of catalog parameters for index to be created."""
def test_to_catalog_info(small_sky_object_catalog, tmp_path):
"""Verify creation of catalog parameters for association table to be created."""
args = AssociationArguments(
primary_input_catalog_path=small_sky_object_catalog,
primary_id_column="id",
Expand All @@ -193,8 +193,9 @@ def test_to_catalog_parameters(small_sky_object_catalog, tmp_path):
output_path=tmp_path,
output_catalog_name="small_sky_self_join",
)
catalog_parameters = args.to_catalog_parameters()
assert catalog_parameters.catalog_name == args.output_catalog_name
catalog_info = args.to_catalog_info(total_rows=10)
assert catalog_info.catalog_name == args.output_catalog_name
assert catalog_info.total_rows == 10


def test_provenance_info(small_sky_object_catalog, tmp_path):
Expand Down
Loading