Skip to content

Add bag.read_avro#4000

Merged
martindurant merged 3 commits intodask:masterfrom
martindurant:bag_read_avro
Sep 22, 2018
Merged

Add bag.read_avro#4000
martindurant merged 3 commits intodask:masterfrom
martindurant:bag_read_avro

Conversation

@martindurant
Copy link
Copy Markdown
Member

Involves a little copying of code from uavro, but stands alone.

  • Tests added / passed
  • Passes flake8 dask

Involves a little copying of code from uavro, but stands alone.
@martindurant
Copy link
Copy Markdown
Member Author

I would not advise trying to add a to_avro counterpart to bag, since in general we cannot know whether the items in a bag all share the same schema.

dask/bag/avro.py Outdated
files = open_files(urlpath, **storage_options)
with copy.copy(files[0]) as f:
# we assume the same header for all files
head = read_header(f)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please forgive the naive question, but what exactly does this header represent?

Mainly wondering if it could change if the avro files in a given urlpath had different schemas..

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The header has a name, description, namespace, schema (fields and types) and a block sync marker. You are right, in the case of bags, it may be OK to read files where each has a different schema, and I actually think the comment is wrong and this would work fine as things stand. There is no test for different schemas, though, I hadn't considered it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, actually I've made a mistake - we need the sync marker before up front, else the read_bytes call above goes wrong. This'll take a little bit of work.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you are saying, bit of a circular dependency. In the dask-avro library @rmax used the fastavro reader to grab the sync marker before calling read_bytes if that helps.

Either way, if we can get by with just reading the header from one file that would be great, as I think doing it for each file significantly slowed things down in dask_avro.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see my comment below)
In general, the sync markers are all different. I think it is possibly to have identical headers for every file, but that is not the normal way to do things.

@martindurant
Copy link
Copy Markdown
Member Author

@mrocklin , do you think it's reasonable to touch every file in the client part of the code, since each will have a different sync marker, and there's no way to load in chunks without that. The amount loaded for each file would be small, but this is exactly the case we are trying to avoid in the parquet case. The reads could be parallelised, at least.

@mrocklin
Copy link
Copy Markdown
Member

@mrocklin , do you think it's reasonable to touch every file in the client part of the code

Obviously it would be nice to avoid this. From what you say though it sounds like that's not possible? Seem like an easy decision in that case :)

@rmax
Copy link
Copy Markdown

rmax commented Sep 20, 2018

Already commented but want to add that according to the avro spec (and the reference implementation), every file will have a different sync marker.

@mrocklin
Copy link
Copy Markdown
Member

Out of curiosity, how expensive would it be to also write avro?

@martindurant
Copy link
Copy Markdown
Member Author

how expensive would it be to also write avro?

Using fastavro, one file per partition, and with no checks around consistency of the items, it would be easy. The hardest part would be inferring the schema, I don't think fastavro has a way to do that automatically (in the test it is hard-coded).

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Sep 21, 2018 via email

@martindurant
Copy link
Copy Markdown
Member Author

Is it reasonable to ask users to provide the schema?

I think that @rmax and @ian-whitestone might have a better feeling for that.

@ian-whitestone
Copy link
Copy Markdown

Full disclosure: I haven't done any avro writing before.

I think it's reasonable, and I feel like it enforces good practices. With that said, schema inference could be a nice, separate feature, or an option when calling the write function, for those looking to prototype pipelines quickly. I know spark lets you dump a dataframe to avro without specifying a schema.

@martindurant
Copy link
Copy Markdown
Member Author

@mrocklin , I'd be happy to attempt both provided-schema and inference writing. The latter I would imagine using the first N records, where N is user-specified. Things like "enum" (category) would not be inferable.

Maybe that should be in a new PR, and maybe lower priority that dataframe.read_avro. I have already put a bit of work into uavro to update to the things learned in this PR so far.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Sep 21, 2018 via email

@martindurant
Copy link
Copy Markdown
Member Author

I think this is pretty simple and good like this, and I can come back for the other features in the future.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Sep 22, 2018 via email

@martindurant martindurant merged commit 15eb83f into dask:master Sep 22, 2018
@martindurant martindurant deleted the bag_read_avro branch September 22, 2018 13:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants