Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2879] Support writing data to BigQuery via avro #9665

Merged
merged 1 commit into from Nov 8, 2019

Conversation

@steveniemitz
Copy link
Contributor

steveniemitz commented Sep 25, 2019

This change enhances BigQueryIO.Write to support writing avro files rather than json when using FILE_LOADS (STREAMING_INSERTS is unchanged).

Benchmarks

Preliminary results look good. The more CPU constrained a job is, the faster avro becomes.

My test dataset is a typical workload of ours, around 2 billion records (~130 GB serialized) representing the result of a combine. My tests read these records from GCS and wrote them to BigQuery. The jobs were run in dataflow with 150 x n1-standard-2 workers.

format time to start load job bytes written BQ slot time (ms)
avro 6 m 30 s 126 GB 35,189,679
json 8 m 5 s 712 GB 96,006,088

@lukecwik @chamikaramj looks like you guys are the OWNERS of the gcp IO module.


  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@steveniemitz steveniemitz force-pushed the tc-dc:bq-write-avro branch from 08f8b2e to 633c395 Sep 25, 2019
@pabloem pabloem requested review from pabloem and chamikaramj Oct 2, 2019
@steveniemitz steveniemitz force-pushed the tc-dc:bq-write-avro branch from d4daa95 to 1449844 Oct 4, 2019
Copy link
Member

pabloem left a comment

Thanks Steve! I've.. mostly gone through the changes to internal functionality, and they make sense to me. FWIW, I think you're on the right track.

Now I'm just trying to think about the user-facing API change. I'm also wondering how to make it easier to support with Beam schemas.

"Translated Avro Schema for scion",
"org.apache.beam.sdk.io.gcp.bigquery",

This comment has been minimized.

Copy link
@pabloem

pabloem Oct 5, 2019

Member

superfluous change? : )

This comment has been minimized.

Copy link
@steveniemitz

steveniemitz Oct 5, 2019

Author Contributor

oh no actually, this is pretty important! The previous version flipped the namespace and description parameters, resulting in an invalid namespace. See the corresponding change up in BigQueryAvroUtils.

Really avro's fault for making a function with 3 string parameters...

@pabloem

This comment has been minimized.

Copy link
Member

pabloem commented Oct 5, 2019

This is just a brain dump of what I'm thinking...

I wonder whether we need the AvroWriteRequest, and the Avro schema. I guess we do, as the InputElement (whatever it is) + the Avro schema are all one needs to build the GenericRecord. Having the AvroWriteRequest may help make the formatting function as concise as possible....

As for supporting Beam schemas + avro files, one could have a useBeamSchemaForAvroFiles()... though it's a little strange....

Another option is to have useBeamSchema, and a pre-coded avro formatting function called something like ... BigQueryIOUtils.beamRowToAvroRecord(). Though this is a little awkward too.


Overall, I like using AvroWriteRequest as input for the avro format function... and for supporting Beam schemas, it may be that useBeamSchemaForAvroFiles (or some better name) is the more reasonable options.
WDYT?

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Oct 5, 2019

Thanks for the thoughts! My comments inline

This is just a brain dump of what I'm thinking...

I wonder whether we need the AvroWriteRequest, and the Avro schema. I guess we do, as the InputElement (whatever it is) + the Avro schema are all one needs to build the GenericRecord. Having the AvroWriteRequest may help make the formatting function as concise as possible....

I really went back and forth on this a few times. We could use SerializableBiFunction here, but if in the future we ever wanted to add another parameter, it'd be a breaking change. This was we can just add a field to the class. This follows the same pattern as read does, where it takes a SchemaAndRecord as an input. You do need both the avro schema and the element though in order to support more advanced cases w/ DynamicDestinations, etc. Plus avro schemas themselves aren't easily serializable (until avro 1.9) so users can't simply create a closure over them.

I do hate the name though, if you can think of anything better I'd love to rename this!

As for supporting Beam schemas + avro files, one could have a useBeamSchemaForAvroFiles()... though it's a little strange....

Another option is to have useBeamSchema, and a pre-coded avro formatting function called something like ... BigQueryIOUtils.beamRowToAvroRecord(). Though this is a little awkward too.

Yeah I struggled with this as well. The only thing stopping us from having a version that supports beam schemas is the interface. useBeamSchemaForAvroFiles is a pretty reasonable name.

Overall, I like using AvroWriteRequest as input for the avro format function... and for supporting Beam schemas, it may be that useBeamSchemaForAvroFiles (or some better name) is the more reasonable options.
WDYT?

I'd be up for adding that in a follow-up PR. I also have some ideas around writeGenericRecords() I want to play around with (that would also use beam schemas, similar to AvroIO.writeGenericRecords() )

import org.slf4j.LoggerFactory;

/** Writes {@link TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */
abstract class AbstractRowWriter<T> implements AutoCloseable {

This comment has been minimized.

Copy link
@steveniemitz

steveniemitz Oct 5, 2019

Author Contributor

nit: I don't usually write java, is the style convention AbstractDerp for base classes like this? or should this just be RowWriter<T>?

This comment has been minimized.

Copy link
@pabloem

pabloem Oct 11, 2019

Member

In my experience, the Abstract is usually not added to the abstract class. I am okay with either.

This comment has been minimized.

Copy link
@chamikaramj

chamikaramj Oct 30, 2019

Contributor

+1 for just RowWriter or BigQueryRowWriter to be more specific.

@pabloem

This comment has been minimized.

Copy link
Member

pabloem commented Oct 23, 2019

LMK if you'd like me to take another look.

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Oct 23, 2019

LMK if you'd like me to take another look.

oh, yeah please do. I don't have much more from my end other than renaming the class above.

@pabloem

This comment has been minimized.

Copy link
Member

pabloem commented Oct 24, 2019

Run Java PreCommit

@pabloem

This comment has been minimized.

Copy link
Member

pabloem commented Oct 28, 2019

ok this LGTM. @chamikaramj - thoughts?

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Oct 28, 2019

Thanks! I'll do the class rename asap too.

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Oct 29, 2019

Thanks! You can merge this now and I'll do the rename in another PR, or I can get to it tomorrow, up to you!

@pabloem

This comment has been minimized.

Copy link
Member

pabloem commented Oct 29, 2019

I don't think we strictly need the rename. But I do want to wait for @chamikaramj to take a look : ) - I don't think he'll have objections, but just in case he thinks of any improvements.

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Oct 30, 2019

Sorry about the delay. Taking a look.

Copy link
Contributor

chamikaramj left a comment

Thanks. Looks great to me.

To make sure I understood correctly, this will not be enabled for existing users by default and to enable this users have to specify withAvroFormatFunction(), correct ?

Also, can we add a version of BigQueryIOIT so that we can continue to monitor both Avro and JSON based BQ write transforms ?

import org.slf4j.LoggerFactory;

/** Writes {@link TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */
abstract class AbstractRowWriter<T> implements AutoCloseable {

This comment has been minimized.

Copy link
@chamikaramj

chamikaramj Oct 30, 2019

Contributor

+1 for just RowWriter or BigQueryRowWriter to be more specific.

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Nov 7, 2019

Thanks. Looks great to me.

To make sure I understood correctly, this will not be enabled for existing users by default and to enable this users have to specify withAvroFormatFunction(), correct ?

Correct, with schemas I think we could make this enabled transparently, but for now its opt-in only.

Also, can we add a version of BigQueryIOIT so that we can continue to monitor both Avro and JSON based BQ write transforms ?

Yeah I can add that in there.

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Nov 7, 2019

@chamikaramj are there up-to-date docs anywhere on how to run the BigQueryIOIT tests? The example args in the javadoc are super out of date.

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Nov 7, 2019

@chamikaramj are there up-to-date docs anywhere on how to run the BigQueryIOIT tests? The example args in the javadoc are super out of date.

nm I hacked it up enough to get it to run.

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Nov 8, 2019

Thanks.

Do TODOs in PR description still apply ?

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Nov 8, 2019

Run Java PostCommit

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Nov 8, 2019

Also please fixup commits before merging.

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Nov 8, 2019

Thanks.

Do TODOs in PR description still apply ?

nope, I just edited the message

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Nov 8, 2019

Also please fixup commits before merging.

Was this for me or whoever ends up merging it?

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Nov 8, 2019

It's for you :)

@steveniemitz steveniemitz force-pushed the tc-dc:bq-write-avro branch from b561236 to ad3fdf2 Nov 8, 2019
@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Nov 8, 2019

coolio, fwiw, the contribution guide is ambiguous wrt who should do the squashing.

Beam committers can squash all commits in the PR during merge, however if a PR has a mixture of independent changes that should not be squashed, and fixup commits, then the PR author should help squashing fixup commits to maintain a clean commmit history.

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Nov 8, 2019

Thanks. LGTM.

Yeah, committer can squash and commit if you just need one commit.

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Nov 8, 2019

Run Java PostCommit

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Nov 8, 2019

Will merge after post-commit tests pass.

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Nov 8, 2019

Unfortunately, looks like BigQueryIOIT is a recently added test that is currently not captured by any of the test suites.

@steveniemitz Will you be able to add a test that writes to BQ using Avro to the BigQueryTornadoesIT that is captured by the Beam Java PostCommit test suite ?
https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
https://builds.apache.org/view/A-D/view/Beam/view/PostCommit/job/beam_PostCommit_Java/lastCompletedBuild/testReport/org.apache.beam.examples.cookbook/BigQueryTornadoesIT/

@mwalenia can you comment on the status of BigQueryIOIT ?

cc: @pabloem

@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Nov 8, 2019

@steveniemitz Will you be able to add a test that writes to BQ using Avro to the BigQueryTornadoesIT that is captured by the Beam Java PostCommit test suite ?

honestly at this point I'm not going to prioritize doing so.

@chamikaramj

This comment has been minimized.

Copy link
Contributor

chamikaramj commented Nov 8, 2019

Ok, fair enough.

I'll go ahead and merge this. But please consider adding a regularly running integration test in a follow up PR to make sure that this codepath does not become stale/broken.

@chamikaramj chamikaramj merged commit 35da90a into apache:master Nov 8, 2019
6 checks passed
6 checks passed
Java ("Run Java PreCommit") SUCCESS
Details
Java SDK Post Commit Tests SUCCESS
Details
JavaPortabilityApi ("Run JavaPortabilityApi PreCommit") SUCCESS
Details
Java_Examples_Dataflow ("Run Java_Examples_Dataflow PreCommit") SUCCESS
Details
RAT ("Run RAT PreCommit") SUCCESS
Details
Spotless ("Run Spotless PreCommit") SUCCESS
Details
@steveniemitz

This comment has been minimized.

Copy link
Contributor Author

steveniemitz commented Nov 8, 2019

maybe we should just make BigQueryIOIT actually run ;)

@steveniemitz steveniemitz deleted the tc-dc:bq-write-avro branch Nov 8, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.