Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mechanical autoformatting. #101

Merged
merged 2 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
exclude_patterns = ["_build", "**.ipynb_checkpoints"]

master_doc = "index" # This assumes that sphinx-build is called from the root directory
html_show_sourcelink = (
False # Remove 'view source code' from top of page (for html, not python)
)
html_show_sourcelink = False # Remove 'view source code' from top of page (for html, not python)
add_module_names = False # Remove namespaces from class/method signatures

autoapi_type = "python"
Expand Down
408 changes: 204 additions & 204 deletions docs/notebooks/estimate_pixel_threshold.ipynb

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,12 @@ markers = [
omit = [
"src/hipscat_import/_version.py", # auto-generated
"src/hipscat_import/pipeline.py", # too annoying to test
]
]

[tool.black]
line-length = 110
target-version = ["py38"]

[tool.isort]
profile = "black"
line_length = 110
2 changes: 1 addition & 1 deletion src/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ indent-after-paren=4
indent-string=' '

# Maximum number of characters on a single line.
max-line-length=100
max-line-length=110

# Maximum number of lines in a module.
max-module-lines=500
Expand Down
4 changes: 1 addition & 3 deletions src/hipscat_import/association/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

from dataclasses import dataclass

from hipscat.catalog.association_catalog.association_catalog import (
AssociationCatalogInfo,
)
from hipscat.catalog.association_catalog.association_catalog import AssociationCatalogInfo

from hipscat_import.runtime_arguments import RuntimeArguments

Expand Down
40 changes: 9 additions & 31 deletions src/hipscat_import/association/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ def map_association(args):
}
if not single_primary_column:
rename_columns[args.primary_id_column] = "primary_id"
primary_index = (
primary_index.reset_index()
.rename(columns=rename_columns)
.set_index("primary_join")
)
primary_index = primary_index.reset_index().rename(columns=rename_columns).set_index("primary_join")

## Read and massage join input data
single_join_column = args.join_id_column == args.join_foreign_key
Expand Down Expand Up @@ -82,16 +78,10 @@ def map_association(args):
}
if not single_join_column:
rename_columns[args.join_id_column] = "join_id"
join_index = (
join_index.reset_index()
.rename(columns=rename_columns)
.set_index("join_to_primary")
)
join_index = join_index.reset_index().rename(columns=rename_columns).set_index("join_to_primary")

## Join the two data sets on the shared join predicate.
join_data = primary_index.merge(
join_index, how="inner", left_index=True, right_index=True
)
join_data = primary_index.merge(join_index, how="inner", left_index=True, right_index=True)

## Write out a summary of each partition join
groups = (
Expand All @@ -102,12 +92,8 @@ def map_association(args):
.count()
.compute()
)
intermediate_partitions_file = file_io.append_paths_to_pointer(
args.tmp_path, "partitions.csv"
)
file_io.write_dataframe_to_csv(
dataframe=groups, file_pointer=intermediate_partitions_file
)
intermediate_partitions_file = file_io.append_paths_to_pointer(args.tmp_path, "partitions.csv")
file_io.write_dataframe_to_csv(dataframe=groups, file_pointer=intermediate_partitions_file)

## Drop join predicate columns
join_data = join_data[
Expand Down Expand Up @@ -137,23 +123,17 @@ def map_association(args):

def reduce_association(input_path, output_path):
"""Collate sharded parquet files into a single parquet file per partition"""
intermediate_partitions_file = file_io.append_paths_to_pointer(
input_path, "partitions.csv"
)
intermediate_partitions_file = file_io.append_paths_to_pointer(input_path, "partitions.csv")
data_frame = file_io.load_csv_to_pandas(intermediate_partitions_file)

## Clean up the dataframe and write out as our new partition join info file.
data_frame = data_frame[data_frame["primary_hipscat_index"] != 0]
data_frame["num_rows"] = data_frame["primary_hipscat_index"]
data_frame = data_frame[
["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix", "num_rows"]
]
data_frame = data_frame[["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix", "num_rows"]]
data_frame = data_frame.sort_values(["Norder", "Npix", "join_Norder", "join_Npix"])
file_io.write_dataframe_to_csv(
dataframe=data_frame,
file_pointer=file_io.append_paths_to_pointer(
output_path, "partition_join_info.csv"
),
file_pointer=file_io.append_paths_to_pointer(output_path, "partition_join_info.csv"),
index=False,
)

Expand Down Expand Up @@ -195,8 +175,6 @@ def reduce_association(input_path, output_path):
f" Expected {partition['num_rows']}, wrote {rows_written}",
)

table.to_pandas().set_index("primary_hipscat_index").sort_index().to_parquet(
output_file
)
table.to_pandas().set_index("primary_hipscat_index").sort_index().to_parquet(output_file)

return data_frame["num_rows"].sum()
12 changes: 3 additions & 9 deletions src/hipscat_import/association/run_association.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,12 @@ def run(args):
step_progress.update(1)

rows_written = 0
with tqdm(
total=1, desc="Reducing ", disable=not args.progress_bar
) as step_progress:
with tqdm(total=1, desc="Reducing ", disable=not args.progress_bar) as step_progress:
rows_written = reduce_association(args.tmp_path, args.catalog_path)
step_progress.update(1)

# All done - write out the metadata
with tqdm(
total=4, desc="Finishing", disable=not args.progress_bar
) as step_progress:
with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress:
# pylint: disable=duplicate-code
# Very similar to /index/run_index.py
catalog_info = args.to_catalog_info(int(rows_written))
Expand All @@ -44,9 +40,7 @@ def run(args):
)
step_progress.update(1)
catalog_info = args.to_catalog_info(total_rows=int(rows_written))
write_metadata.write_catalog_info(
dataset_info=catalog_info, catalog_base_dir=args.catalog_path
)
write_metadata.write_catalog_info(dataset_info=catalog_info, catalog_base_dir=args.catalog_path)
step_progress.update(1)
write_metadata.write_parquet_metadata(args.catalog_path)
step_progress.update(1)
Expand Down
32 changes: 8 additions & 24 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,18 @@ def _check_arguments(self):
raise ValueError("input_format is required")

if self.constant_healpix_order >= 0:
check_healpix_order_range(
self.constant_healpix_order, "constant_healpix_order"
)
check_healpix_order_range(self.constant_healpix_order, "constant_healpix_order")
self.mapping_healpix_order = self.constant_healpix_order
else:
check_healpix_order_range(
self.highest_healpix_order, "highest_healpix_order"
)
check_healpix_order_range(self.highest_healpix_order, "highest_healpix_order")
if not 100 <= self.pixel_threshold <= 1_000_000_000:
raise ValueError(
"pixel_threshold should be between 100 and 1,000,000,000"
)
raise ValueError("pixel_threshold should be between 100 and 1,000,000,000")
self.mapping_healpix_order = self.highest_healpix_order

if self.catalog_type not in ("source", "object"):
raise ValueError("catalog_type should be one of `source` or `object`")

if (not self.input_path and not self.input_file_list) or (
self.input_path and self.input_file_list
):
if (not self.input_path and not self.input_file_list) or (self.input_path and self.input_file_list):
raise ValueError("exactly one of input_path or input_file_list is required")
if not self.file_reader:
self.file_reader = get_file_reader(self.input_format)
Expand All @@ -113,9 +105,7 @@ def _check_arguments(self):
if self.input_path:
if not file_io.does_file_or_directory_exist(self.input_path):
raise FileNotFoundError("input_path not found on local storage")
self.input_paths = file_io.find_files_matching_path(
self.input_path, f"*{self.input_format}"
)
self.input_paths = file_io.find_files_matching_path(self.input_path, f"*{self.input_format}")
elif self.input_file_list:
self.input_paths = self.input_file_list
if len(self.input_paths) == 0:
Expand Down Expand Up @@ -156,9 +146,7 @@ def additional_runtime_provenance_info(self) -> dict:
"pixel_threshold": self.pixel_threshold,
"mapping_healpix_order": self.mapping_healpix_order,
"debug_stats_only": self.debug_stats_only,
"file_reader_info": self.file_reader.provenance_info()
if self.file_reader is not None
else {},
"file_reader_info": self.file_reader.provenance_info() if self.file_reader is not None else {},
}


Expand All @@ -180,10 +168,6 @@ def check_healpix_order_range(
if lower_bound < 0:
raise ValueError("healpix orders must be positive")
if upper_bound > hipscat_id.HIPSCAT_ID_HEALPIX_ORDER:
raise ValueError(
f"healpix order should be <= {hipscat_id.HIPSCAT_ID_HEALPIX_ORDER}"
)
raise ValueError(f"healpix order should be <= {hipscat_id.HIPSCAT_ID_HEALPIX_ORDER}")
if not lower_bound <= order <= upper_bound:
raise ValueError(
f"{field_name} should be between {lower_bound} and {upper_bound}"
)
raise ValueError(f"{field_name} should be between {lower_bound} and {upper_bound}")
12 changes: 3 additions & 9 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ def regular_file_exists(self, input_file):
if not file_io.does_file_or_directory_exist(input_file):
raise FileNotFoundError(f"File not found at path: {input_file}")
if not file_io.is_regular_file(input_file):
raise FileNotFoundError(
f"Directory found at path - requires regular file: {input_file}"
)
raise FileNotFoundError(f"Directory found at path - requires regular file: {input_file}")


class CsvReader(InputReader):
Expand Down Expand Up @@ -143,9 +141,7 @@ def read(self, input_file):
self.regular_file_exists(input_file)

if self.schema_file:
schema_parquet = pd.read_parquet(
self.schema_file, dtype_backend="numpy_nullable"
)
schema_parquet = pd.read_parquet(self.schema_file, dtype_backend="numpy_nullable")

use_column_names = None
if self.column_names:
Expand Down Expand Up @@ -214,9 +210,7 @@ class FitsReader(InputReader):
one of `column_names` or `skip_column_names`
"""

def __init__(
self, chunksize=500_000, column_names=None, skip_column_names=None, **kwargs
):
def __init__(self, chunksize=500_000, column_names=None, skip_column_names=None, **kwargs):
self.chunksize = chunksize
self.column_names = column_names
self.skip_column_names = skip_column_names
Expand Down
28 changes: 7 additions & 21 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ def _has_named_index(dataframe):
if dataframe.index.name is not None:
## Single index with a given name.
return True
if len(dataframe.index.names) == 0 or all(
name is None for name in dataframe.index.names
):
if len(dataframe.index.names) == 0 or all(name is None for name in dataframe.index.names):
return False
return True

Expand Down Expand Up @@ -112,9 +110,7 @@ def map_to_pixels(
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
histo[mapped_pixel] += count_at_pixel.astype(np.int64)
ResumePlan.write_partial_histogram(
tmp_path=cache_path, mapping_key=mapping_key, histogram=histo
)
ResumePlan.write_partial_histogram(tmp_path=cache_path, mapping_key=mapping_key, histogram=histo)


def split_pixels(
Expand Down Expand Up @@ -221,9 +217,7 @@ def reduce_pixel_shards(
ValueError: if the number of rows written doesn't equal provided
`destination_pixel_size`
"""
destination_dir = paths.pixel_directory(
output_path, destination_pixel_order, destination_pixel_number
)
destination_dir = paths.pixel_directory(output_path, destination_pixel_order, destination_pixel_number)
file_io.make_directory(destination_dir, exist_ok=True)

destination_file = paths.pixel_catalog_file(
Expand All @@ -235,9 +229,7 @@ def reduce_pixel_shards(
schema = file_io.read_parquet_metadata(use_schema_file).schema.to_arrow_schema()

tables = []
pixel_dir = _get_pixel_directory(
cache_path, destination_pixel_order, destination_pixel_number
)
pixel_dir = _get_pixel_directory(cache_path, destination_pixel_order, destination_pixel_number)

if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
Expand All @@ -264,17 +256,13 @@ def reduce_pixel_shards(
dataframe[dec_column].values,
)

dataframe["Norder"] = np.full(
rows_written, fill_value=destination_pixel_order, dtype=np.int32
)
dataframe["Norder"] = np.full(rows_written, fill_value=destination_pixel_order, dtype=np.int32)
dataframe["Dir"] = np.full(
rows_written,
fill_value=int(destination_pixel_number / 10_000) * 10_000,
dtype=np.int32,
)
dataframe["Npix"] = np.full(
rows_written, fill_value=destination_pixel_number, dtype=np.int32
)
dataframe["Npix"] = np.full(rows_written, fill_value=destination_pixel_number, dtype=np.int32)

if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
Expand All @@ -286,8 +274,6 @@ def reduce_pixel_shards(
del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = _get_pixel_directory(
cache_path, destination_pixel_order, destination_pixel_number
)
pixel_dir = _get_pixel_directory(cache_path, destination_pixel_order, destination_pixel_number)

file_io.remove_directory(pixel_dir, ignore_errors=True)
Loading