Skip to content

Commit

Permalink
Merge pull request #101 from astronomy-commons/issue/85/black
Browse files Browse the repository at this point in the history
Mechanical autoformatting.
  • Loading branch information
delucchi-cmu committed Jul 21, 2023
2 parents 370098a + 8e3be51 commit 2ccf71e
Show file tree
Hide file tree
Showing 32 changed files with 334 additions and 550 deletions.
4 changes: 1 addition & 3 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
exclude_patterns = ["_build", "**.ipynb_checkpoints"]

master_doc = "index" # This assumes that sphinx-build is called from the root directory
html_show_sourcelink = (
False # Remove 'view source code' from top of page (for html, not python)
)
html_show_sourcelink = False # Remove 'view source code' from top of page (for html, not python)
add_module_names = False # Remove namespaces from class/method signatures

autoapi_type = "python"
Expand Down
408 changes: 204 additions & 204 deletions docs/notebooks/estimate_pixel_threshold.ipynb

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,12 @@ markers = [
omit = [
"src/hipscat_import/_version.py", # auto-generated
"src/hipscat_import/pipeline.py", # too annoying to test
]
]

[tool.black]
line-length = 110
target-version = ["py38"]

[tool.isort]
profile = "black"
line_length = 110
2 changes: 1 addition & 1 deletion src/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ indent-after-paren=4
indent-string=' '

# Maximum number of characters on a single line.
max-line-length=100
max-line-length=110

# Maximum number of lines in a module.
max-module-lines=500
Expand Down
4 changes: 1 addition & 3 deletions src/hipscat_import/association/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

from dataclasses import dataclass

from hipscat.catalog.association_catalog.association_catalog import (
AssociationCatalogInfo,
)
from hipscat.catalog.association_catalog.association_catalog import AssociationCatalogInfo

from hipscat_import.runtime_arguments import RuntimeArguments

Expand Down
40 changes: 9 additions & 31 deletions src/hipscat_import/association/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ def map_association(args):
}
if not single_primary_column:
rename_columns[args.primary_id_column] = "primary_id"
primary_index = (
primary_index.reset_index()
.rename(columns=rename_columns)
.set_index("primary_join")
)
primary_index = primary_index.reset_index().rename(columns=rename_columns).set_index("primary_join")

## Read and massage join input data
single_join_column = args.join_id_column == args.join_foreign_key
Expand Down Expand Up @@ -82,16 +78,10 @@ def map_association(args):
}
if not single_join_column:
rename_columns[args.join_id_column] = "join_id"
join_index = (
join_index.reset_index()
.rename(columns=rename_columns)
.set_index("join_to_primary")
)
join_index = join_index.reset_index().rename(columns=rename_columns).set_index("join_to_primary")

## Join the two data sets on the shared join predicate.
join_data = primary_index.merge(
join_index, how="inner", left_index=True, right_index=True
)
join_data = primary_index.merge(join_index, how="inner", left_index=True, right_index=True)

## Write out a summary of each partition join
groups = (
Expand All @@ -102,12 +92,8 @@ def map_association(args):
.count()
.compute()
)
intermediate_partitions_file = file_io.append_paths_to_pointer(
args.tmp_path, "partitions.csv"
)
file_io.write_dataframe_to_csv(
dataframe=groups, file_pointer=intermediate_partitions_file
)
intermediate_partitions_file = file_io.append_paths_to_pointer(args.tmp_path, "partitions.csv")
file_io.write_dataframe_to_csv(dataframe=groups, file_pointer=intermediate_partitions_file)

## Drop join predicate columns
join_data = join_data[
Expand Down Expand Up @@ -137,23 +123,17 @@ def map_association(args):

def reduce_association(input_path, output_path):
"""Collate sharded parquet files into a single parquet file per partition"""
intermediate_partitions_file = file_io.append_paths_to_pointer(
input_path, "partitions.csv"
)
intermediate_partitions_file = file_io.append_paths_to_pointer(input_path, "partitions.csv")
data_frame = file_io.load_csv_to_pandas(intermediate_partitions_file)

## Clean up the dataframe and write out as our new partition join info file.
data_frame = data_frame[data_frame["primary_hipscat_index"] != 0]
data_frame["num_rows"] = data_frame["primary_hipscat_index"]
data_frame = data_frame[
["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix", "num_rows"]
]
data_frame = data_frame[["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix", "num_rows"]]
data_frame = data_frame.sort_values(["Norder", "Npix", "join_Norder", "join_Npix"])
file_io.write_dataframe_to_csv(
dataframe=data_frame,
file_pointer=file_io.append_paths_to_pointer(
output_path, "partition_join_info.csv"
),
file_pointer=file_io.append_paths_to_pointer(output_path, "partition_join_info.csv"),
index=False,
)

Expand Down Expand Up @@ -195,8 +175,6 @@ def reduce_association(input_path, output_path):
f" Expected {partition['num_rows']}, wrote {rows_written}",
)

table.to_pandas().set_index("primary_hipscat_index").sort_index().to_parquet(
output_file
)
table.to_pandas().set_index("primary_hipscat_index").sort_index().to_parquet(output_file)

return data_frame["num_rows"].sum()
12 changes: 3 additions & 9 deletions src/hipscat_import/association/run_association.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,12 @@ def run(args):
step_progress.update(1)

rows_written = 0
with tqdm(
total=1, desc="Reducing ", disable=not args.progress_bar
) as step_progress:
with tqdm(total=1, desc="Reducing ", disable=not args.progress_bar) as step_progress:
rows_written = reduce_association(args.tmp_path, args.catalog_path)
step_progress.update(1)

# All done - write out the metadata
with tqdm(
total=4, desc="Finishing", disable=not args.progress_bar
) as step_progress:
with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress:
# pylint: disable=duplicate-code
# Very similar to /index/run_index.py
catalog_info = args.to_catalog_info(int(rows_written))
Expand All @@ -44,9 +40,7 @@ def run(args):
)
step_progress.update(1)
catalog_info = args.to_catalog_info(total_rows=int(rows_written))
write_metadata.write_catalog_info(
dataset_info=catalog_info, catalog_base_dir=args.catalog_path
)
write_metadata.write_catalog_info(dataset_info=catalog_info, catalog_base_dir=args.catalog_path)
step_progress.update(1)
write_metadata.write_parquet_metadata(args.catalog_path)
step_progress.update(1)
Expand Down
32 changes: 8 additions & 24 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,18 @@ def _check_arguments(self):
raise ValueError("input_format is required")

if self.constant_healpix_order >= 0:
check_healpix_order_range(
self.constant_healpix_order, "constant_healpix_order"
)
check_healpix_order_range(self.constant_healpix_order, "constant_healpix_order")
self.mapping_healpix_order = self.constant_healpix_order
else:
check_healpix_order_range(
self.highest_healpix_order, "highest_healpix_order"
)
check_healpix_order_range(self.highest_healpix_order, "highest_healpix_order")
if not 100 <= self.pixel_threshold <= 1_000_000_000:
raise ValueError(
"pixel_threshold should be between 100 and 1,000,000,000"
)
raise ValueError("pixel_threshold should be between 100 and 1,000,000,000")
self.mapping_healpix_order = self.highest_healpix_order

if self.catalog_type not in ("source", "object"):
raise ValueError("catalog_type should be one of `source` or `object`")

if (not self.input_path and not self.input_file_list) or (
self.input_path and self.input_file_list
):
if (not self.input_path and not self.input_file_list) or (self.input_path and self.input_file_list):
raise ValueError("exactly one of input_path or input_file_list is required")
if not self.file_reader:
self.file_reader = get_file_reader(self.input_format)
Expand All @@ -113,9 +105,7 @@ def _check_arguments(self):
if self.input_path:
if not file_io.does_file_or_directory_exist(self.input_path):
raise FileNotFoundError("input_path not found on local storage")
self.input_paths = file_io.find_files_matching_path(
self.input_path, f"*{self.input_format}"
)
self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}")
elif self.input_file_list:
self.input_paths = self.input_file_list
if len(self.input_paths) == 0:
Expand Down Expand Up @@ -156,9 +146,7 @@ def additional_runtime_provenance_info(self) -> dict:
"pixel_threshold": self.pixel_threshold,
"mapping_healpix_order": self.mapping_healpix_order,
"debug_stats_only": self.debug_stats_only,
"file_reader_info": self.file_reader.provenance_info()
if self.file_reader is not None
else {},
"file_reader_info": self.file_reader.provenance_info() if self.file_reader is not None else {},
}


Expand All @@ -180,10 +168,6 @@ def check_healpix_order_range(
if lower_bound < 0:
raise ValueError("healpix orders must be positive")
if upper_bound > hipscat_id.HIPSCAT_ID_HEALPIX_ORDER:
raise ValueError(
f"healpix order should be <= {hipscat_id.HIPSCAT_ID_HEALPIX_ORDER}"
)
raise ValueError(f"healpix order should be <= {hipscat_id.HIPSCAT_ID_HEALPIX_ORDER}")
if not lower_bound <= order <= upper_bound:
raise ValueError(
f"{field_name} should be between {lower_bound} and {upper_bound}"
)
raise ValueError(f"{field_name} should be between {lower_bound} and {upper_bound}")
12 changes: 3 additions & 9 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ def regular_file_exists(self, input_file):
if not file_io.does_file_or_directory_exist(input_file):
raise FileNotFoundError(f"File not found at path: {input_file}")
if not file_io.is_regular_file(input_file):
raise FileNotFoundError(
f"Directory found at path - requires regular file: {input_file}"
)
raise FileNotFoundError(f"Directory found at path - requires regular file: {input_file}")


class CsvReader(InputReader):
Expand Down Expand Up @@ -143,9 +141,7 @@ def read(self, input_file):
self.regular_file_exists(input_file)

if self.schema_file:
schema_parquet = pd.read_parquet(
self.schema_file, dtype_backend="numpy_nullable"
)
schema_parquet = pd.read_parquet(self.schema_file, dtype_backend="numpy_nullable")

use_column_names = None
if self.column_names:
Expand Down Expand Up @@ -214,9 +210,7 @@ class FitsReader(InputReader):
one of `column_names` or `skip_column_names`
"""

def __init__(
self, chunksize=500_000, column_names=None, skip_column_names=None, **kwargs
):
def __init__(self, chunksize=500_000, column_names=None, skip_column_names=None, **kwargs):
self.chunksize = chunksize
self.column_names = column_names
self.skip_column_names = skip_column_names
Expand Down
28 changes: 7 additions & 21 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ def _has_named_index(dataframe):
if dataframe.index.name is not None:
## Single index with a given name.
return True
if len(dataframe.index.names) == 0 or all(
name is None for name in dataframe.index.names
):
if len(dataframe.index.names) == 0 or all(name is None for name in dataframe.index.names):
return False
return True

Expand Down Expand Up @@ -112,9 +110,7 @@ def map_to_pixels(
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
histo[mapped_pixel] += count_at_pixel.astype(np.int64)
ResumePlan.write_partial_histogram(
tmp_path=cache_path, mapping_key=mapping_key, histogram=histo
)
ResumePlan.write_partial_histogram(tmp_path=cache_path, mapping_key=mapping_key, histogram=histo)


def split_pixels(
Expand Down Expand Up @@ -221,9 +217,7 @@ def reduce_pixel_shards(
ValueError: if the number of rows written doesn't equal provided
`destination_pixel_size`
"""
destination_dir = paths.pixel_directory(
output_path, destination_pixel_order, destination_pixel_number
)
destination_dir = paths.pixel_directory(output_path, destination_pixel_order, destination_pixel_number)
file_io.make_directory(destination_dir, exist_ok=True)

destination_file = paths.pixel_catalog_file(
Expand All @@ -235,9 +229,7 @@ def reduce_pixel_shards(
schema = file_io.read_parquet_metadata(use_schema_file).schema.to_arrow_schema()

tables = []
pixel_dir = _get_pixel_directory(
cache_path, destination_pixel_order, destination_pixel_number
)
pixel_dir = _get_pixel_directory(cache_path, destination_pixel_order, destination_pixel_number)

if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
Expand All @@ -264,17 +256,13 @@ def reduce_pixel_shards(
dataframe[dec_column].values,
)

dataframe["Norder"] = np.full(
rows_written, fill_value=destination_pixel_order, dtype=np.int32
)
dataframe["Norder"] = np.full(rows_written, fill_value=destination_pixel_order, dtype=np.int32)
dataframe["Dir"] = np.full(
rows_written,
fill_value=int(destination_pixel_number / 10_000) * 10_000,
dtype=np.int32,
)
dataframe["Npix"] = np.full(
rows_written, fill_value=destination_pixel_number, dtype=np.int32
)
dataframe["Npix"] = np.full(rows_written, fill_value=destination_pixel_number, dtype=np.int32)

if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
Expand All @@ -286,8 +274,6 @@ def reduce_pixel_shards(
del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = _get_pixel_directory(
cache_path, destination_pixel_order, destination_pixel_number
)
pixel_dir = _get_pixel_directory(cache_path, destination_pixel_order, destination_pixel_number)

file_io.remove_directory(pixel_dir, ignore_errors=True)
Loading

0 comments on commit 2ccf71e

Please sign in to comment.