Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polars Lazyframe Support #775

Merged
merged 30 commits into from
Mar 28, 2024
Merged

Polars Lazyframe Support #775

merged 30 commits into from
Mar 28, 2024

Conversation

buggtb
Copy link
Contributor

@buggtb buggtb commented Mar 22, 2024

This PR is to aid for support of Polars LazyFrames in Hamilton.

Changes

I've currently stubbed out the CSV Reader / Writer to work both on Eager and Lazy mode in Polars.

How I tested this

There is an accompanying test that mimics the Eager test but using Lazyframes instead

Notes

I've also had to update the get_dataframe_metadata in utils.py to allow it to work with Lazyframes that don't have a row count. I abstracted all the lookups so that if others passed/failed in the future for support of other read/writers they would return what they can.

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

@buggtb buggtb marked this pull request as draft March 22, 2024 11:04
Copy link
Collaborator

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @buggtb for driving this!

I think we can simplify a few things actually.

  1. polars_extensions.py: All existing polars DataSavers can have the following applicable_types():
    @classmethod
    def applicable_types(cls) -> Collection[Type]:
        return [DATAFRAME_TYPE, pl.LazyFrame]

Then in the save_data() method, add union type annotation with pl.LazyFrame, and then in the body:

if isinstance(data, pl.LazyFrame):
    data = data.collect()
# continue with rest of function
  1. h_polars.py should be modified to:
    (a) include a PolarsLazyDataFrameResult -- this won't call collect, it will just return a LazyFrame. (though not sure if it makes sense)
    (b) the existing PolarsDataFrameResult should have isinstance checks for lazyframe, and if found, should collect and move on.
    This means we can delete h_polars_lazyframe.py.

  2. polars_lazyframe_extensions.py: I would use spark_extensions.py as the template. No need to copy all of what polars_extesions.py has. Otherwise it should house the scan_* and sink_* DataLoaders and DataSavers, e.g. scan_csv. Let me know if you need more guidance here.

  3. Remove polars_shared.py.
    I think this is a little over engineering. I don't see the value in the abstraction right now -- I think a little duplication is more straightforward at this time.

  4. io/utils.py. I think the changes here are okay. @elijahbenizzy and I can think about whether there's a nicer way to do this.

@buggtb
Copy link
Contributor Author

buggtb commented Mar 25, 2024

Thanks for the review @skrawcz

A couple of follow up questions:

https://github.com/DAGWorks-Inc/hamilton/pull/775/files#diff-fbd89fcde3b2c949a6f81da8ecfd52a163490f1645f1eca8f9451047cf7785faR179

the save function is fair enough, the load is more problematic in terms of design, if I want to load into a lazy frame instead of a dataframe, there's nothing in there currently to allow that. I could extend that function to default to dataframe and let a user request a lazyframe I guess. I'm unsure how you'd want that pattern to look perhaps something like this, or something else:

    def load_data(self, type_: Type, frametype: Union[DATAFRAME_TYPE, pl.LazyFrame] = pl.DataFrame) -> Tuple[DATAFRAME_TYPE, Dict[str, Any]]:
        if isinstance(type_, pl.LazyFrame):
            df = pl.scan_csv(self.file, **self._get_loading_kwargs())
        else:
            df = pl.read_csv(self.file, **self._get_loading_kwargs())

        metadata = utils.get_file_and_dataframe_metadata(self.file, df)
        return df, metadata

also there are some minor differences in kwargs in the lazyframe when compared to the dataframe and so I'd have to add some conditional logic to the get_kwargs() functions to return the valid sets there.

Not entirely sure how the spark extensions example applies here as I suspect we'll need loaders and savers for each type, unless we can overload an existing class but that doesn't appear to be what the spark_extensions example does, but I'm probably missing something.

@skrawcz
Copy link
Collaborator

skrawcz commented Mar 25, 2024

Thanks for the review @skrawcz

A couple of follow up questions:

https://github.com/DAGWorks-Inc/hamilton/pull/775/files#diff-fbd89fcde3b2c949a6f81da8ecfd52a163490f1645f1eca8f9451047cf7785faR179

the save function is fair enough, the load is more problematic in terms of design, if I want to load into a lazy frame instead of a dataframe, there's nothing in there currently to allow that. I could extend that function to default to dataframe and let a user request a lazyframe I guess. I'm unsure how you'd want that pattern to look perhaps something like this, or something else:

    def load_data(self, type_: Type, frametype: Union[DATAFRAME_TYPE, pl.LazyFrame] = pl.DataFrame) -> Tuple[DATAFRAME_TYPE, Dict[str, Any]]:
        if isinstance(type_, pl.LazyFrame):
            df = pl.scan_csv(self.file, **self._get_loading_kwargs())
        else:
            df = pl.read_csv(self.file, **self._get_loading_kwargs())

        metadata = utils.get_file_and_dataframe_metadata(self.file, df)
        return df, metadata

also there are some minor differences in kwargs in the lazyframe when compared to the dataframe and so I'd have to add some conditional logic to the get_kwargs() functions to return the valid sets there.

Yep, so I'm thinking don't couple them at all precisely for the reasons you mentioned. So polars_lazyframe_extensions.py would contain the "load" ones specific to creating lazyframes that pertain to scan_csv, scan_parquet etc. Does that clarify it more?

Not entirely sure how the spark extensions example applies here as I suspect we'll need loaders and savers for each type, unless we can overload an existing class but that doesn't appear to be what the spark_extensions example does, but I'm probably missing something.

I just mean to use it in terms of structure. Since pyspark doesn't make use of the column stuff. Lazyframes wouldn't make use of it either.

@buggtb
Copy link
Contributor Author

buggtb commented Mar 26, 2024

Okay so I removed Polars shared, I then removed most from polars_lazyframe_exensions but kept the loader. Can you check to make sure it's as you imaged, before I go fill out the other save/loaders etc. I see how the save_to and load_from annotations work it makes sense now, I wasn't sure how you could blend loaders and savers across classes, all good.

In h_polars.py I took the same logic I had in the lazyframe version and I do run collect and return a dataframe. When I was running it, to me at least it made sense to collect at the end and return a dataframe as the pipeline is done, but now I think about it there are times, perhaps if you're running chained pipelines, subdags or whatever that you may not want to, I don't know, the jury is out there!

Copy link
Collaborator

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep looking good. Added code for what I would change.

hamilton/plugins/polars_extensions.py Outdated Show resolved Hide resolved
hamilton/plugins/polars_lazyframe_extensions.py Outdated Show resolved Hide resolved
hamilton/plugins/polars_lazyframe_extensions.py Outdated Show resolved Hide resolved
@skrawcz
Copy link
Collaborator

skrawcz commented Mar 26, 2024

In h_polars.py I took the same logic I had in the lazyframe version and I do run collect and return a dataframe. When I was running it, to me at least it made sense to collect at the end and return a dataframe as the pipeline is done, but now I think about it there are times, perhaps if you're running chained pipelines, subdags or whatever that you may not want to, I don't know, the jury is out there!

Yep! So that's a vote for a second one specific to LazyFrames IMO.

The applicable types for PolarsCSVWriter, PolarsParquetWriter, and PolarsFeatherWriter have been extended to include pl.LazyFrame in addition to the existing pl.DataFrame. This change allows these writer classes to handle both eager and lazy data frames from the polars library.
Copy link
Collaborator

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great. A few minor things.

We should also have a lazyframe test for each of the savers. If code duplication is the simplest here, I think that's fine; not sure there's a DRY way that would be worth the time/effort since this is a one off.

hamilton/plugins/h_polars_lazyframe.py Outdated Show resolved Hide resolved
hamilton/plugins/polars_extensions.py Outdated Show resolved Hide resolved
hamilton/plugins/polars_extensions.py Outdated Show resolved Hide resolved
hamilton/plugins/polars_extensions.py Show resolved Hide resolved
The PolarsLazyFrameResult class now uses the PolarsLazyFrameResult instead of the PolarsDataFrameResult. The logging statement in register_types() has been removed. DataSaver classes have been updated to handle both DATAFRAME_TYPE and pl.LazyFrame types, with a check added to collect data if it's a LazyFrame before saving. Tests have been updated and expanded to cover these changes, including checks for applicable types and correct handling of LazyFrames.
The applicable_types method in the PolarsSpreadsheetWriter class and corresponding test assertions have been updated to include pl.LazyFrame, along with the existing DATAFRAME_TYPE. This change extends the functionality of our Polars extensions to handle LazyFrames as well as DataFrames.
@buggtb buggtb marked this pull request as ready for review March 27, 2024 18:12
@buggtb buggtb changed the title DRAFT: Polars Lazyframe Support Polars Lazyframe Support Mar 27, 2024
Copy link
Collaborator

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @buggtb for the work! Looking good I think we just need an example to help people show how to use the new functionality you just added!

So two things:

  1. Can you add an example exercising the scan variants? E.g. add something to examples/polars/lazyframe? I think we should add this to complete this PR.
  2. There are two minor things (see suggested commits), I can do the commit before merging, or you can. Let me know.

Otherwise question, should I create an issue for the sink_* variants for lazyframe writing to csv, etc. to track that? No need to add it in this PR, but I can see that be a natural progression if we have scan_* support.

Thanks!

tests/plugins/test_polars_lazyframe_extensions.py Outdated Show resolved Hide resolved
tests/plugins/test_polars_lazyframe_extensions.py Outdated Show resolved Hide resolved
This update introduces a new example demonstrating the use of Polars LazyFrame. The changes include:
- Creation of two new Python scripts: one defining functions for loading data and calculating spend per signup, and another script to execute these functions.
- Addition of a README file explaining how to run the example, visualize execution, and detailing some caveats with Polars.
- Inclusion of a requirements.txt file specifying necessary dependencies.
- Addition of sample CSV data for testing purposes.
The test methods for PolarsScanParquetReader and PolarsScanFeatherReader have been updated. Instead of using pl.DataFrame to load data, they now use pl.LazyFrame. This change aligns with the applicable types for these readers.
@buggtb
Copy link
Contributor Author

buggtb commented Mar 28, 2024

Added a test which loads a CSV using the annotation, does a simple sum and dumps out the result as a LazyFrame, added a comment in my_script.py explaining how you could use either LazyFrame or DataFrame as a resultset depending on what you needed.

Added #792 and #791 on the issues lists for some missing sources and sinks

@skrawcz
Copy link
Collaborator

skrawcz commented Mar 28, 2024

Looks great let's 🚢 it.

But, argh, I see there's a rebase that's needed. Mind doing that? Then we'll be able to squash merge.

@buggtb
Copy link
Contributor Author

buggtb commented Mar 28, 2024

Pulled the latest main branch into my fork, should be good, I think.

@skrawcz
Copy link
Collaborator

skrawcz commented Mar 28, 2024

hmm -- 🤔
Screen Shot 2024-03-28 at 10 02 42 AM

@buggtb
Copy link
Contributor Author

buggtb commented Mar 28, 2024

Screenshot 2024-03-28 at 17 05 19

I'm not sure why I don't see the same

@skrawcz skrawcz merged commit 39ce9e0 into DAGWorks-Inc:main Mar 28, 2024
23 checks passed
@skrawcz
Copy link
Collaborator

skrawcz commented Mar 28, 2024

Done. Thanks @buggtb 🍾 . This will go out next week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants