-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4652] Allow PubsubIO to read public data #5788
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chamikaramj @lgajowy do you know anything about how I should set this up?
PCollection<PubsubMessage> messages = | ||
pipeline.apply( | ||
PubsubIO.readMessages() | ||
.fromTopic("projects/pubsub-public-data/topics/taxirides-realtime")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rangadi do you know about this?
"waitForAnyMessage", signal.signalSuccessWhen(messages.getCoder(), anyMessage -> true)); | ||
|
||
Duration timeout = | ||
pipeline.getOptions().getRunner().getSimpleName().endsWith("DataflowRunner") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hack so we don't have to it forever waiting for a DirectRunner to fail.
? Duration.standardMinutes(5) | ||
: Duration.standardSeconds(30); | ||
signal.waitForSuccess(timeout); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know much about Pubsub and testing it (we tested batch scenarios only yet), but doesn't this test require pipeline.run()
invocation (sorry if this is my ignorance)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lol. Yes. And I was sprinkling debugging throughout, asking myself "why isn't createSubscription
ever begin called?!"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OTOH the TestPipeline
is supposed to complain if run()
is not called. But I guess it never reaches that error because of the signal timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I also think this is the case. I'm glad this helped a little bit. :)
18d0ef5
to
daecb5c
Compare
OK, I have now confirmed that if I run the pipeline the test fails before my fix and succeeds after. So it is a real problem. |
@@ -1297,7 +1298,9 @@ private SubscriptionPath createRandomSubscription(PipelineOptions options) { | |||
timestampAttribute, idAttribute, options.as(PubsubOptions.class))) { | |||
SubscriptionPath subscriptionPath = | |||
pubsubClient.createRandomSubscription( | |||
project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC); | |||
PubsubClient.projectPathFromId(options.as(GcpOptions.class).getProject()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this is not quite right - it should respect the project
ValueProvider if one was provided, yet default it to providing this.
daecb5c
to
e451161
Compare
run java postcommit |
e451161
to
5eca626
Compare
run java postcommit |
@@ -1235,7 +1235,7 @@ public PubsubUnboundedSource( | |||
/** Get the project path. */ | |||
@Nullable | |||
public ProjectPath getProject() { | |||
return project == null ? null : project.get(); | |||
return project != null ? null : project.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -693,9 +693,9 @@ public String toString() { | |||
|
|||
@Nullable | |||
ValueProvider<ProjectPath> projectPath = | |||
getTopicProvider() == null | |||
getSubscriptionProvider() == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this require user to provide subscription rather than topic for read? The integration test test added here specifies topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is what I can tell:
PubsubUnboundedSource
is made so that you can customize the project for the subscription.- But
PubsubIO
is not made to customize the project.
Before my change:
PubsubIO
always uses the project off the topic if it isread().fromTopic()
and leaves it null if it isread().fromSubscription()
PubsubUnboundedSource
always requires aproject
if it is given atopic
because it needs it to create the random subscription
After my change:
PubsubIO
always uses the project from the subscription and leaves it null forread().fromTopic()
PubsubUnboundedSource
never requires a project, because it defaults to getting it from PipelineOptions
So I guess actually since PubsubIO
never actually provides a useful project, it could always be left null. Or probably better to refactor PubsubUnboundedSource
to have two variants with some shared internals so there are zero nullable fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. That makes sense. I was also wondering why createReader()
also creates a random subscription if required (that does not sound correct). As you mentioned, may be best to always require project to be present in GcpOptions.
5eca626
to
c6bc82c
Compare
|
||
StatefulPredicateCheck(Coder<T> coder, SerializableFunction<Set<T>, Boolean> successPredicate) { | ||
this.seenEvents = StateSpecs.set(coder); | ||
this.seenEvents = StateSpecs.bag(coder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @akedin the only reason to use SetState
is if you want to do efficient membership checking. Since the code here reads the whole thing, it doesn't add any functionality. And more runners support BagState
so I switched it.
Precommit failed due to formatting and Postcommit failed because Dataflow doesn't support SetState. Should both be fixed. I'll run them both again. |
run java postcommit |
PubsubIO/PubsubUnboundedSource changes LGTM. We can even just require --project option in GcpOptions to be always present, which is quite normal for many GCP components. |
c6bc82c
to
5112419
Compare
run java postcommit |
1 similar comment
run java postcommit |
de5f515
to
93ea4b4
Compare
run java postcommit |
3de0308
to
6ff715e
Compare
run java postcommit |
6ff715e
to
85a683e
Compare
run java postcommit |
85a683e
to
5b9f073
Compare
run java postcommit |
This test does not pass because of idiosyncrasies of the TestDataflowRunner. It is clearly observable that the condition under test. Getting a DirectRunner integration test in place provides a regression test for our ability to read public data.
5b9f073
to
b606d8a
Compare
run java postcommit |
It passed! The Python failure is on master, and fixed separately. |
} | ||
} | ||
}); | ||
Future<List<String>> queryResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @amaliujia
This adds an integration test run for GCP IOs and then adds an integration test for reading a read public data set.
It does not pass, but it also does not fail in the same way that my local testing fails. When I write a pipeline to read this dataset I get a 403 on creating a subscription. The logs that gradle captures are a bit different.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)