Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Read deltatable #8123

Closed
wants to merge 14 commits into from
Closed

[WIP] Read deltatable #8123

wants to merge 14 commits into from

Conversation

rajagurunath
Copy link
Contributor

Support for different filesystem backend like s3, GCP, Azure, etc in the library we discussed earlier delta-rs, - is WIP. So thought of writing the delta reading algorithm from scratch, Currently, this PR has no dependency (other than pyarrow), referred to another delta reading library DeltaLakeReader for developing utilities related to schema evolution

This PR is still in work in Progress, Need Some help/suggestions on the following points.

  1. Initially started with the plan for supporting both Pyarrow and fast parquet engines similar to read_parquet, tried a good amount of iteration but not able to figure out how to develop some features like schema Evolution in fast parquet (in pyarrow achieved this using pyarrow.dataset, which assigns nan to the columns that are not available in the parquet file, since it has access to pyarrow.Schema thereby knowing the column datatype as well)- implemented here.

    So shall I remove the engine parameter altogether and support only pyarrow engine or is there any other way/workaround in fast parquet which supports schema Evolution (i.e; Supporting list of parquet files with different schema, into the same Dataframe)?

  2. Tried Achieving parallel reading of parquet files in different ways and finalized with one of the dask ways using delayed, is any other better/optimized/recommended solution available to do the same?

  3. Currently Test datasets are stored in one of my repositories, can we move to any other stable place? like publicly available storage space? or we can try to create this delta table using Pyspark dynamically inside the fixture, but we need to add Pyspark dependency :)

  4. Currently Added some test cases, But need some help/suggestions to improve/remove redundant test cases, etc. Requesting @MrPowers help here.

  5. I have tested with the S3 private bucket, do we need to add test cases to read and write from AWS also?

@GPUtester
Copy link
Collaborator

Can one of the admins verify this patch?

@jsignell
Copy link
Member

jsignell commented Sep 7, 2021

add to allowlist

@mrocklin
Copy link
Member

mrocklin commented Sep 7, 2021

cc @dask/io and @fjetter

@martindurant
Copy link
Member

is there any other way/workaround in fast parquet which supports schema Evolution

This doesn't currently exist, and there are no plans as such. I don't suppose it would take too much effort...

@martindurant
Copy link
Member

can try to create this delta table using Pyspark dynamically inside the fixture, but we need to add Pyspark dependency

No, don't do that! :). Fastparquet has spark as a test dep, and it causes pretty weird behaviour at times, not to mention slowing everything down a lot.

@martindurant
Copy link
Member

do we need to add test cases to read and write from AWS also?

You can use moto to host files locally in an S3-compatible service; there are already tests in dask that do this (or look at how it's done in s3fs). However, if it works with local files via fsspec, it really should just work for any storage.

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks OK at a first run-through. I have some comments, but not much.

Coding this up using delayed is OK as a first start, but delayed does not benefit from the optimisations possible for dataframes based on high-level-graphs. For example, in the long run we would like to be able to use selections on the index to pick files to load, or that column selection on the output dataframe should result in only those columns being loaded (pushdown).

Actually, is there a concept of an index here?

Probably, given we have an explicit schema, the call to from_delayed should specify a meta=, to prevent having to load the first data file eagerly.

from ..parquet.core import get_engine, read_parquet
from .utils import schema_from_string

__all__ = "read_delta_table"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__all__ = ["read_delta_table"]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it changed

self.version = version
self.pq_files = set()
self.delta_log_path = f"{self.path}/_delta_log"
self.fs, _, _ = get_fs_token_paths(path, storage_options=storage_options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably do want this token, to name your tasks below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it now added name parameter in the delayed fn

if _last_checkpoint file exists, returns checkpoint_id else zero
"""
try:
with self.fs.open(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be faster with json.loads(self.fs.cat(fn))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, Now Changed it into fs.cat

f"File {checkpoint_path} not found"
)

parquet_checkpoint = read_parquet(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a single small file, right? Probably better, then, to load directly with the engine rather than via dask, which adds overhead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now reading with the help of engine, using engine.read_partition, Need to Pass columns of checkpoint.parquet so added those columns in utils.py

checkpoint_path, engine=self.engine, storage_options=self.storage_options
).compute()

# reason for this `if condition` was that FastParquetEngine seems to normalizes the json present in each column
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You. mean it's a structure-type column (not JSON)

Copy link
Contributor Author

@rajagurunath rajagurunath Sep 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's right, fastparquet was flattening the JSON schema, i.e let's say col1 has JSON ({"a":1, "b":2}), Pyarrow is able to read a column of JSON, but Fastparquet reads more number of columns like col1.a, col1.b

So We are getting different data frames and handled with if condition based on the engine class

)
for log_file_name, log_version in zip(log_files, log_versions):
with self.fs.open(log_file_name) as log:
for line in log:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, fs.cat might well be faster than opening the file and relying on readahead buffering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, Thanks for the suggestion once again, changed from fs.open to fs.cat

return (
pa_ds.dataset(
source=f,
schema=schema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is providing a schema here the critical thing, to enable evolution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes, different versions of delta files may have different schemas, and using schema parameter in pyarrow dataset we are controlling it, as well as User can explicitly pass the pyarrow.schema , Anything needs to be changed here?

delta protocol stores the schema string in the json log files which is converted
into pyarrow.Schema and used for schema evolution (refer delta/utils.py).
i.e Based on particular version, some columns can be shown or not shown.
filter: pyarrow.dataset.Expression
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the kind of filter that is used for the normal parquet read functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, Thanks for letting me know about this, now changed the code to accept filters like [("x","==",1)] similar to read_parquet

import json
from typing import Union

import pyarrow as pa
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that everything here is pyarrow specific

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I am also a bit worried here, because currently reading delta files are achieved only through pyarrow dataset, not able to achieve the same using fastparquet. that's why I am wondering whether we can fix the engine as pyarrow only for reading delta files as of now?


# nested field needs special handling
else:
if input_type["type"] == "array":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect pyarrow to handle the full set of nested list/map/struct types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also initially handled only primitive data types, but after referring to the package code DeltaLakeReader, added the nested fields as well. What do you think shall I remove those nested types?

@rajagurunath
Copy link
Contributor Author

Hi @martindurant,

Thanks for Reviewing the code and detailed feedback, really helpful, and learned a lot.

Having following doubts though,

  1. we have specified pyarrow==1.0 in environment-3.7.yaml, is there any reason behind this? due to this above continous_integeration test for environment 3.7 was failing with the error : E pyarrow.lib.ArrowNotImplementedError: Reading lists of structs from Parquet files not yet supported: key_value: list<key_value: struct<key: string not null, value: string> not null> not null ?
  2. Since FastParquet was not supporting schema evolution, reading different versions of delta tables with different columns seems not feasible, So shall I keep only one pyarrow engine as of now?
  3. Can I keep the test-delta-dataset in one of my Github repositories itself?

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your work here @rajagurunath and for reviewing @martindurant!

This PR is proposing we add read_delta_table directly to Dask. I wanted to check in to see if this is intentional, or if this PR is to just work through the read_delta_table implementation before moving read_delta_table to a separate dask-deltatable project?

While delta table support is certainly interesting, it's not something I've seen commonly requested. Given this, the fact that pandas doesn't have a read_delta_table function, and the added maintenance burden associated with adding support for a new data format, I'd prefer to not add read_delta_table directly to Dask right now, but instead include this in a separate dask-deltatable package (which could live in https://github.com/dask-contrib) so we can get a better understanding of how much demand there is for delta table support, what additional feature delta table users will want, and what the corresponding maintenance burden might be. For reference this was discussed previously in #6001 / #8046 (comment)

Also cc @fjetter who may have more thoughts on delta table specifically

@rajagurunath
Copy link
Contributor Author

Hi @jrbourbeau,

Thanks for looking into the PR and the suggestion.

Yes, Initially we have started with the motive of directly merging with the Dask repo. Definitely, I see your concern here, which makes perfect sense to me. 💯

So As Suggested above, I have created a new repo and package dask_deltatable here: https://github.com/rajagurunath/dask_deltatable, which is in the initial stage(alpha), can someone please review and give suggestions so that it can be moved to dask-contrib.

@MrPowers
Copy link
Contributor

@rajagurunath - this looks like great work. Thanks for the contribution.

I just created a dask-interop repo for some integration tests for how Dask works with other systems. Right now the repo only has some simple tests showing how PySpark written Parquet files are readable by Dask, but I plan on adding a lot more.

I'm thinking about adding your lib as a dependency and adding some Delta Lake unit tests. The tests you've written that rely on zip files are great, but I'd like to augment them with tests that actually create Delta Lakes, perform a bunch of operations, and then makes assertions. This'll protect us when Delta Lake changes (cause those zip files are static).

Let me know your thoughts! Thanks again for this great work!

@rajagurunath
Copy link
Contributor Author

First of all, Thanks a lot @MrPowers for having a look into lib dask-deltatable, really glad.

I'm thinking about adding your lib as a dependency and adding some Delta Lake unit tests. The tests you've written that rely on zip files are great, but I'd like to augment them with tests that actually create Delta Lakes, perform a bunch of operations, and then makes assertions. This'll protect us when Delta Lake changes (cause those zip files are static).

Completely makes sense, yes I have prepared those zip datasets using io.delta:delta-core_2.12:0.7.0 jar version and now I think there are some changes added in the Delta protocol which adds more fields to the delta JSON logs.

I faced some issues while setting up the delta lake environment in CI in the past, So thought of adding zipped datasets, now I feel, we can make use of delta-spark library which makes setting the spark with delta lake dependency much easier.

Please let me know if anything needs to be done from my side.

@MrPowers
Copy link
Contributor

Updated status of this work:

@martindurant
Copy link
Member

So we should close this, I think. Ping us on the other repo if you want reviews.

@MrPowers
Copy link
Contributor

@martindurant - Yea, agree we can close this.

@rajagurunath - I tried delta-rs and that'll probably be the easiest way to add Dask support for Delta Lake. Here's how we can get all the files that need to be read into the Dask DataFrame.

from deltalake import DeltaTable

dt = DeltaTable("tmp/some-delta-pyspark")
dt.files() # list of files

This to_pyarrow_dataset function may be useful. There is also a to_pyarrow_table function. Anyone know if there's an easy way to convert a PyArrow Table to a Dask DataFrame?

@jrbourbeau
Copy link
Member

Closing. Thanks for all your work @rajagurunath! Looking forward to watching dask-deltatables progress

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants