Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polaris JSON Read Writer, Unit Test, Materilization. #440

Merged
merged 6 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified examples/polars/materialization/dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 10 additions & 0 deletions examples/polars/materialization/my_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,20 @@
file="./df.parquet",
combine=df_builder,
),
# materialize the dataframe to a feather file
to.feather(
dependencies=output_columns,
id="df_to_feather",
file="./df.feather",
combine=df_builder,
),
# materialize the dataframe to a json file
to.json(
dependencies=output_columns,
id="df_to_json",
file="./df.json",
combine=df_builder,
),
]
# Visualize what is happening
dr.visualize_materialization(
Expand All @@ -68,9 +76,11 @@
additional_vars=[
"df_to_parquet_build_result",
"df_to_feather_build_result",
"df_to_json_build_result",
], # because combine is used, we can get that result here.
inputs=initial_columns,
)
print(materialization_results)
print(additional_outputs["df_to_parquet_build_result"])
print(additional_outputs["df_to_feather_build_result"])
print(additional_outputs["df_to_json_build_result"])
417 changes: 100 additions & 317 deletions examples/polars/materialization/notebook.ipynb

Large diffs are not rendered by default.

71 changes: 69 additions & 2 deletions hamilton/plugins/polars_extensions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import sys
from io import BytesIO, TextIOWrapper
from io import BytesIO, IOBase, TextIOWrapper
from pathlib import Path
from typing import (
Any,
Expand Down Expand Up @@ -31,7 +31,7 @@

# for polars 0.18.0 we need to check what to do.
if has_alias and hasattr(pl.type_aliases, "CsvEncoding"):
from polars.type_aliases import CsvEncoding
from polars.type_aliases import CsvEncoding, SchemaDefinition
else:
CsvEncoding = Type
if has_alias and hasattr(pl.type_aliases, "CsvQuoteStyle"):
Expand Down Expand Up @@ -434,6 +434,71 @@ def name(cls) -> str:
return "feather"


@dataclasses.dataclass
class PolarsJSONReader(DataLoader):
"""
Class specifically to handle loading JSON files with Polars.
Should map to https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.read_json.html
"""

source: Union[str, Path, IOBase, bytes]
schema: SchemaDefinition = None
schema_overrides: SchemaDefinition = None

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]

def _get_loading_kwargs(self):
kwargs = {}
if self.schema is not None:
kwargs["schema"] = self.schema
if self.schema_overrides is not None:
kwargs["schema_overrides"] = self.schema_overrides
return kwargs

def load_data(self, type_: Type) -> Tuple[DATAFRAME_TYPE, Dict[str, Any]]:
df = pl.read_json(self.source, **self._get_loading_kwargs())
metadata = utils.get_file_metadata(self.source)
return df, metadata

@classmethod
def name(cls) -> str:
return "json"


@dataclasses.dataclass
class PolarsJSONWriter(DataSaver):
"""
Class specifically to handle saving JSON files with Polars.
Should map to https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_json.html
"""

file: None = None
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
pretty: bool = False
row_oriented: bool = False

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]

def _get_saving_kwargs(self):
kwargs = {}
if self.pretty is not None:
kwargs["pretty"] = self.pretty
if self.row_oriented is not None:
kwargs["row_oriented"] = self.row_oriented
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
data.write_json(self.file, **self._get_saving_kwargs())
return utils.get_file_metadata(self.file)

@classmethod
def name(cls) -> str:
return "json"


def register_data_loaders():
"""Function to register the data loaders for this extension."""
for loader in [
Expand All @@ -443,6 +508,8 @@ def register_data_loaders():
PolarsParquetWriter,
PolarsFeatherReader,
PolarsFeatherWriter,
PolarsJSONReader,
PolarsJSONWriter,
]:
registry.register_adapter(loader)

Expand Down
20 changes: 20 additions & 0 deletions tests/plugins/test_polars_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
PolarsCSVWriter,
PolarsFeatherReader,
PolarsFeatherWriter,
PolarsJSONReader,
PolarsJSONWriter,
PolarsParquetReader,
PolarsParquetWriter,
)
Expand Down Expand Up @@ -73,3 +75,21 @@ def test_polars_feather(tmp_path: pathlib.Path) -> None:
assert "compression" in write_kwargs
assert file_path.exists()
assert metadata["path"] == file_path


def test_polars_json(df: pl.DataFrame, tmp_path: pathlib.Path) -> None:
file = tmp_path / "test.json"
writer = PolarsJSONWriter(file=file, pretty=True)
kwargs1 = writer._get_saving_kwargs()
writer.save_data(df)

reader = PolarsJSONReader(source=file)
kwargs2 = reader._get_loading_kwargs()
df2, metadata = reader.load_data(pl.DataFrame)

assert PolarsJSONWriter.applicable_types() == [pl.DataFrame]
assert PolarsJSONReader.applicable_types() == [pl.DataFrame]
assert kwargs1["pretty"]
assert df2.shape == (2, 2)
assert "schema" not in kwargs2
assert df.frame_equal(df2)