Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vaex structured dataset and native types implementation #1230

Merged
merged 9 commits into from
Oct 28, 2022

Conversation

ryankarlos
Copy link
Contributor

@ryankarlos ryankarlos commented Oct 9, 2022

Vaex has great performance on a single machine, which is usually needed for most datasets. This PR adds support for Vaex as a pandas alternative for StructuredDataset object type.
We extend StructuredDatasetDecoder and StructuredDatasetEncoder for vaex as in https://docs.flyte.org/projects/cookbook/en/latest/auto/core/type_system/structured_dataset.html

This PR implements automatic serialization and deserialization between consecutive tasks using parquet but could be extended to Arrow and HDF5 or the other binary formats supported by vaex https://vaex.readthedocs.io/en/latest/guides/io.html

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

Added support for Vaex Dataframe as a type
Vaex Structured Dataset Encode and Decoder for serialisation and deserialisation

Tracking Issue

Fixes flyteorg/flyte#701

Follow-up issue

NA
OR
https://github.com/flyteorg/flyte/issues/

@welcome
Copy link

welcome bot commented Oct 9, 2022

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

@samhita-alla
Copy link
Contributor

@ryankarlos, thanks for creating the PR! We'll review it shortly. :)

@ryankarlos
Copy link
Contributor Author

@samhita-alla thanks - im new to flyte so quite possible i must have missed out a few things.

Copy link
Member

@pingsutw pingsutw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryankarlos Thanks for your contribution.
Could you create a new flytekit-plugin for the Vaex dataframe? Here is an example.

@ryankarlos
Copy link
Contributor Author

ryankarlos commented Oct 15, 2022

@pingsutw Thanks, i have now added a plugin for vaex.

However, when i am trying to this works by running a simple workflow locally , i get an error - and not sure how to fix it

StructuredDatasetTransformerEngine.register(VaexDataFrameToParquetEncodingHandlers())
StructuredDatasetTransformerEngine.register(ParquetToVaxDataFrameDecodingHandler())
StructuredDatasetTransformerEngine.register_renderer(vaex.DataFrame, VaexDataFrameRenderer())

subset_schema = Annotated[StructuredDataset, kwtypes(col2=str), PARQUET]

@task
def generate() -> subset_schema:
    pd_df = pd.DataFrame({"col1": [1, 3, 2], "col2": list("abc")})
    vaex_df = vaex.from_pandas(pd_df)
    return StructuredDataset(dataframe=vaex_df)

@task
def consume(df: subset_schema) -> subset_schema:
    df = df.open(vaex.DataFrame).all()
    assert df["col2"][0] == "a"
    assert df["col2"][1] == "b"
    assert df["col2"][2] == "c"
    return StructuredDataset(dataframe=df)

@workflow
def wf():
    consume(df=generate())

if __name__ == "__main__":
    wf()

I have already registered and encoding and decoding handlers so not sure why it is complaning

TypeError: Failed to convert return value for var o0 for function generate with error 
<class 'ValueError'>: Failed to find a handler for <class 'vaex.dataframe.DataFrameLocal'>, 
protocol file, fmt parquet

@ryankarlos ryankarlos force-pushed the flyte-vaex-plugin branch 4 times, most recently from f2c3003 to a2cfbed Compare October 16, 2022 02:52
@codecov
Copy link

codecov bot commented Oct 16, 2022

Codecov Report

Merging #1230 (9d174f4) into master (63ad4fc) will increase coverage by 0.07%.
The diff coverage is n/a.

@@            Coverage Diff             @@
##           master    #1230      +/-   ##
==========================================
+ Coverage   68.57%   68.65%   +0.07%     
==========================================
  Files         288      288              
  Lines       26224    26351     +127     
  Branches     2929     2489     -440     
==========================================
+ Hits        17984    18092     +108     
- Misses       7762     7779      +17     
- Partials      478      480       +2     
Impacted Files Coverage Δ
flytekit/deck/deck.py 34.04% <0.00%> (-4.26%) ⬇️
flytekit/clis/sdk_in_container/register.py 79.68% <0.00%> (-3.08%) ⬇️
flytekit/types/structured/structured_dataset.py 60.74% <0.00%> (-2.58%) ⬇️
flytekit/types/directory/types.py 54.16% <0.00%> (-0.84%) ⬇️
...ctured_dataset/test_structured_dataset_workflow.py 99.24% <0.00%> (-0.76%) ⬇️
flytekit/core/type_engine.py 58.89% <0.00%> (-0.50%) ⬇️
flytekit/core/local_cache.py 46.66% <0.00%> (-0.40%) ⬇️
flytekit/clis/sdk_in_container/helpers.py 92.59% <0.00%> (-0.27%) ⬇️
flytekit/clis/sdk_in_container/run.py 84.15% <0.00%> (-0.04%) ⬇️
plugins/setup.py 0.00% <0.00%> (ø)
... and 22 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@samhita-alla
Copy link
Contributor

@ryankarlos, it seems like Vaex dataframe type is vaex.dataframe.DataFrameLocal rather than vaex.dataframe. Your transformer handles the latter use case not the former. Can you re-verify what the type of Vaex dataframe is?

@ryankarlos
Copy link
Contributor Author

@ryankarlos, it seems like Vaex dataframe type is vaex.dataframe.DataFrameLocal rather than vaex.dataframe. Your transformer handles the latter use case not the former. Can you re-verify what the type of Vaex dataframe is?

Ah yes, thanks - ive fixed it now.

@pingsutw
Copy link
Member

Thank you @ryankarlos. LGTM

Copy link
Member

@pingsutw pingsutw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The test failed because the plugin name is inconsistent


PLUGIN_NAME = "vaex"

microlib_name = f"plugins-{PLUGIN_NAME}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
microlib_name = f"plugins-{PLUGIN_NAME}"
microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

Copy link
Contributor Author

@ryankarlos ryankarlos Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated now

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
from flytekit import kwtypes, task, workflow
from flytekit.types.structured.structured_dataset import PARQUET, StructuredDataset

full_schema = Annotated[StructuredDataset, kwtypes(x=int, y=str), PARQUET]
Copy link
Contributor Author

@ryankarlos ryankarlos Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an observation:
i initially assumed col types metadata was skipped then it would still be ok as still have two arguments to Annotated

full_schema = Annotated[StructuredDataset, PARQUET]

but if i do full_schema = Annotated[StructuredDataset, PARQUET], i get the following error when running the test

Screenshot 2022-10-19 at 23 56 33

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @pingsutw

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I fetched your commit, and reran the test (has updated to Annotated[StructuredDataset, PARQUET]) but didn't get the error. Let's wait to see if ci can pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I fetched your commit, and reran the test (has updated to Annotated[StructuredDataset, PARQUET]) but didn't get the error. Let's wait to see if ci can pass.

thanks, test that has failed in ci is some other one (unrelated to this PR)

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
pingsutw
pingsutw previously approved these changes Oct 24, 2022
Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
@ryankarlos
Copy link
Contributor Author

@samhita-alla pushed requested changes now.

@samhita-alla
Copy link
Contributor

@pingsutw, +1 again, please?

path = ctx.file_access.get_random_remote_directory()
local_dir = ctx.file_access.get_random_local_directory()
local_path = os.path.join(local_dir, f"{0:05}")
df.export_parquet(local_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for reviewing it late! As per their docs, HDF5 is the most suitable when the data is huge: https://vaex.readthedocs.io/en/docs/example_io.html#id1. We can go with Parquet, just want to give a heads-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, we can register another handler (VaexDataFrameToHDF5EncodingHandler), so people can use Annotated to update the default format. we can add it in the separate PR

def t1() -> Annotated[StructuredDataset, HDF5]

Copy link
Contributor Author

@ryankarlos ryankarlos Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrow may also be useful to add support for https://vaex.readthedocs.io/en/latest/faq.html, what are your thoughts ? Im happy to implement and register extra handlers in separate PR if thats ok ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's awesome, works for me! Please feel free to create issues accordingly and let me know! :)

) -> vaex.dataframe.DataFrameLocal:
local_dir = ctx.file_access.get_random_local_directory()
ctx.file_access.get_data(flyte_value.uri, local_dir, is_multipart=True)
path = f"{local_dir}/00000"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it's okay to consider the first partition if the dataframe is huge. How about we consider all files that are present under the parquet directory using vaex.open or vaex.open_many? I think you can use the glob pattern.

Copy link
Contributor Author

@ryankarlos ryankarlos Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samhita-alla @pingsutw Thanks for spotting this . Actually, looking at this more closely - i think i may need to use df.export_many https://vaex.io/docs/guides/io.html#Export-to-multiple-files-in-parallel if we want to serialise chunks to multiple parts in parallel.

From the docs https://vaex.io/docs/guides/io.html#Export-to-multiple-files-in-parallel :

With the export_many method one can export a DataFrame to muliple files of the same type in parallel. This is likely to be more performant when exporting very large DataFrames to the cloud compared to writing a single large Arrow of Parquet file, where each chunk is written in succession.

What i implemented writes chunks serially to single parquet it seems according to the docs (default chunk size 1048576) . Quoting from this section https://vaex.io/docs/guides/io.html#Exporting-binary-file-formats

When exporting to Apache Arrow and Apache Parquet file format, the data is written in chunks thus enabling to export of data that does not fit in RAM all at once. A custom chunk size can be specified via the chunk_size argument, the default value of which is 1048576. For example:

Do we want to support one or both options and do we want to give the user option to override chunk size ?
Accordingly, we can consider using vaex.open or vaex.open_many for single or multiple parts as you suggested, for decoding step. Also, you think maybe worth adding an extra workflow test for dataframe with > chunk size limit to ascertain this behaviour for either or both options implemented ?

Copy link
Contributor Author

@ryankarlos ryankarlos Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, ive been trying to mimic polars implementation - any idea why 00000 suffix in path and how is this being split to multiple parts (as polars implementation seems to write to single parquet https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_parquet.html) ?

ctx.file_access.get_data(flyte_value.uri, local_dir, is_multipart=True)
path = f"{local_dir}/00000"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryankarlos, thanks for doing the research! I think 00000 isn't a partition after all. It's right in your code where you're creating a local path using local_path = os.path.join(local_dir, f"{0:05}") syntax. So it's just a file. 😅

Would you mind creating an issue to support writing large dataframes using export_many? This isn't required now but you or someone else can implement it later.

@samhita-alla samhita-alla merged commit b1ff43e into flyteorg:master Oct 28, 2022
@welcome
Copy link

welcome bot commented Oct 28, 2022

Congrats on merging your first pull request! 🎉

VPraharsha03 pushed a commit to VPraharsha03/flytekit that referenced this pull request Oct 29, 2022
* vaex structured dataset and native types implementation

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>

* Create new plugin for vaex

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>

* fix vaex type to DataFrameLocal and add reqs

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>

* fix tests

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>

* add flytekit-vaex to python build

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>

* correct microlib name

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>

* fix test

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>

* run pip-compile again

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>

* small fixes

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>

Signed-off-by: Ryan Nazareth <ryankarlos@gmail.com>
Signed-off-by: Vivek Praharsha <vpraharsha@outlook.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Flytekit Schema type extension] Vaex Dataframe plugin
3 participants