Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor based on hipscat v0.1.2 #117

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies = [
"dask[distributed]",
"deprecated",
"healpy",
"hipscat >= 0.1.1",
"hipscat >= 0.1.2",
"ipykernel", # Support for Jupyter notebooks
"pandas",
"pyarrow",
Expand Down
11 changes: 6 additions & 5 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import abc

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from astropy.table import Table
from hipscat.io import file_io
from hipscat.io import FilePointer, file_io

# pylint: disable=too-few-public-methods,too-many-arguments

Expand Down Expand Up @@ -141,7 +140,9 @@ def read(self, input_file):
self.regular_file_exists(input_file)

if self.schema_file:
schema_parquet = pd.read_parquet(self.schema_file, dtype_backend="numpy_nullable")
schema_parquet = file_io.load_parquet_to_pandas(
FilePointer(self.schema_file), dtype_backend="numpy_nullable"
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
)

use_column_names = None
if self.column_names:
Expand All @@ -155,8 +156,8 @@ def read(self, input_file):
elif self.schema_file:
use_type_map = schema_parquet.dtypes.to_dict()

with pd.read_csv(
input_file,
with file_io.load_csv_to_pandas(
FilePointer(input_file),
chunksize=self.chunksize,
sep=self.separator,
header=self.header,
Expand Down
3 changes: 1 addition & 2 deletions src/hipscat_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import healpy as hp
import pandas as pd
import pyarrow.dataset as ds
from hipscat import pixel_math
from hipscat.io import file_io, paths
Expand All @@ -17,7 +16,7 @@ def map_pixel_shards(
dec_column,
):
"""Creates margin cache shards from a source partition file."""
data = pd.read_parquet(partition_file)
data = file_io.load_parquet_to_pandas(partition_file)

data["margin_pixel"] = hp.ang2pix(
2**margin_order,
Expand Down
10 changes: 6 additions & 4 deletions src/hipscat_import/soap/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ def _count_joins_for_object(source_data, object_catalog_dir, object_id_column, o
pixel_order=object_pixel.order,
pixel_number=object_pixel.pixel,
)
object_data = pd.read_parquet(path=object_path, columns=[object_id_column]).set_index(object_id_column)
object_data = file_io.load_parquet_to_pandas(object_path, columns=[object_id_column]).set_index(
object_id_column
)

joined_data = source_data.merge(object_data, how="inner", left_index=True, right_index=True)

Expand Down Expand Up @@ -64,9 +66,9 @@ def count_joins(
pixel_order=source_pixel.order,
pixel_number=source_pixel.pixel,
)
source_data = pd.read_parquet(path=source_path, columns=[soap_args.source_object_id_column]).set_index(
soap_args.source_object_id_column
)
source_data = file_io.load_parquet_to_pandas(
source_path, columns=[soap_args.source_object_id_column]
).set_index(soap_args.source_object_id_column)

remaining_sources = len(source_data)
results = []
Expand Down
19 changes: 10 additions & 9 deletions tests/hipscat_import/soap/test_soap_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,17 @@ def test_object_to_source_map(small_sky_object_catalog, small_sky_source_catalog

def test_mismatch_order_map(catalog_info_data, source_catalog_info):
"""Create some catalogs that will exercise edge case behavior of map-generation."""
catalog_pixels = pd.DataFrame(
data=[[1, 0, 16], [2, 0, 68], [2, 0, 69], [2, 0, 70], [2, 0, 71]],
columns=["Norder", "Dir", "Npix"],
)
object_catalog = Catalog(CatalogInfo(**catalog_info_data), catalog_pixels)
catalog_pixels = pd.DataFrame(
data=[[1, 0, 16]],
columns=["Norder", "Dir", "Npix"],
object_catalog = Catalog(
CatalogInfo(**catalog_info_data),
[
HealpixPixel(1, 16),
HealpixPixel(2, 68),
HealpixPixel(2, 69),
HealpixPixel(2, 70),
HealpixPixel(2, 71),
],
)
source_catalog = Catalog(CatalogInfo(**source_catalog_info), catalog_pixels)
source_catalog = Catalog(CatalogInfo(**source_catalog_info), [HealpixPixel(1, 16)])

expected = {
HealpixPixel(1, 16): [
Expand Down