Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2058] Generate BigQuery load job at runtime #2657

Closed
wants to merge 7 commits into from

Conversation

reuvenlax
Copy link
Contributor

@reuvenlax reuvenlax commented Apr 23, 2017

Generate the load job at run time instead of submission time. This allows a job with a BQ sink to be reexecuted without failing.

R: @bjchambers

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.01%) to 70.036% when pulling 01b3a0f on reuvenlax:repeatable_bq_sink into a670197 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.01%) to 70.178% when pulling 5954e22 on reuvenlax:repeatable_bq_sink into 15bd3a3 on apache:master.

Copy link
Contributor

@bjchambers bjchambers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we should be able to test this somehow. For instance, running the same pipeline twice in a test, and making sure it uses different prefixes?

@@ -189,7 +193,9 @@ public void getTempFilePrefix(ProcessContext c) {
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return stepUuid;
String jobId = BigQueryHelpers.randomUUIDString();
LOG.info("BigQuery export job id {}", jobId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is logged by the layer that actually sends the export job. Is there value to logging it twice? If not, should we just verify that layer logs it and then leave this out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@reuvenlax
Copy link
Contributor Author

@bjchambers Testing isn't so easy, because running the pipeline twice would result in different job ids even before. What we'd need to do is simulate what templates do - generate the graph once and run it twice - and I don't know how to do that in a test.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.6%) to 70.745% when pulling a767a9d on reuvenlax:repeatable_bq_sink into 15bd3a3 on apache:master.

c.getPipelineOptions().getTempLocation(),
"BigQueryWriteTemp",
BigQueryHelpers.randomUUIDString());
LOG.info("BigQuery temporary file location {}", tempLocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Writing BigQuery temporary files to {} before importing them"

(or something similar that describes what we're doing?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@coveralls
Copy link

Coverage Status

Coverage increased (+0.6%) to 70.775% when pulling 2a9a5e8 on reuvenlax:repeatable_bq_sink into 15bd3a3 on apache:master.

@reuvenlax
Copy link
Contributor Author

@bjchambers anything more, or can this be merged?

@davorbonaci
Copy link
Member

R: @jkff

String tempLocation = resolveTempLocation(
c.getPipelineOptions().getTempLocation(),
"BigQueryWriteTemp",
BigQueryHelpers.randomUUIDString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is no longer equal to the one at line 197 - is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - no reason for it to be except for visibility, and we are logging the tempLocation so that shouldn't be an issue.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll do the final fixup myself and merge.

tempLocation);
c.output(tempLocation);
}
}).withSideInputs(jobIdTokenView))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just pass it as main input ;)


// Create a singleton job ID token at execution time. This will be used as the base for all
// load jobs issued from this instance of the transform.
PCollectionView<String> jobIdTokenView =
final PCollectionView<String> jobIdTokenView =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outside scope of this PR: such singleton stuff starts being a recurring pattern, wonder if we should have a Create.of(SerializableFunction<Void, T>) overload.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants