Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-744] A runner should be able to override KafkaIO max wait prope… #1125

Closed
wants to merge 3 commits into from

Conversation

amitsela
Copy link
Member

@amitsela amitsela commented Oct 18, 2016

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

…rties.

Add KafkaOptions for the UnboundedKafkaReader.

@amitsela
Copy link
Member Author

R: @rangadi and @dhalperi (committer).
CC: @aljoscha, @tgroh (Flink/Direct runners).

@amitsela
Copy link
Member Author

amitsela commented Oct 18, 2016

Jenkins failures seem to be related to Dataflow IT tests - NoSuchMethodError - could it be a dirty classpath in the container running the job ?

@dhalperi
Copy link
Contributor

dhalperi commented Oct 18, 2016

I think that this is generally on the wrong path. Runners should not need to override temporal constants in specific transforms to get sane behavior. I believe the simple rule of thumb should be "readers should return as soon as they are able" + "runners may poll advance() in a loop for a certain period of time if it returned too fast" + "runners must tolerate sources that take a long time to start or advance, because real systems operate that way".

I think we're violating all of these in various places, but that combined these principles add up to a good solution. Thoughts?

(Also, if we reach agreement we should probably summarize to dev@ list?)

@amitsela
Copy link
Member Author

I'd be happy to summarize this once we have something, and I agree with what you write Dan, but it seems that this was an issue for the DirectRunner as explained in this conversation.
According to your suggestion, we should set the two "wait" properties to something like 10 msec, correct ?
If the DirectRunner can handle it, I'd be happy to change the ticket, PR, and commit.

@dhalperi
Copy link
Contributor

per conversation with @tgroh , I believe we can and should do this now.

@tgroh
Copy link
Member

tgroh commented Oct 18, 2016

The DirectRunner should reuse readers for some amount of time, and forever until they produce elements. #1128 fixes the current state to actually continue to read from a shard that ever returned false in an initial evaluation, but is not directly related to this change.

@@ -757,7 +757,7 @@ public void validate() {

private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
// how long to wait for new records from kafka consumer inside start()
private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.standardSeconds(5);
private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to remove this then.

Recently when I modified KafkaIOTest, I removed a bit of extra code that handled 'false' from start(). I need to put that back. I can send a separate PR for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we'll only have NEW_RECORDS_POLL_TIMEOUT, sure why not.

@rangadi
Copy link
Contributor

rangadi commented Oct 18, 2016

I believe the simple rule of thumb should be "readers should return as soon as they are able" + "runners may poll advance() in a loop for a certain period of time if it returned too fast" + "runners must tolerate sources that take a long time to start or advance, because real systems operate that way".

I like these. 👍

Copy link
Contributor

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Suggested a minor improvement to comment.
I will send another CL with a fix to KafkaIOTest (otherwise it would occasionally flake).

// how long to wait for new records from kafka consumer inside start()
private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.standardSeconds(5);
// how long to wait for new records from kafka consumer inside advance()
// how long to wait for new records from kafka consumer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add 'inside advance()' or 'inside advance()/start()' to this comment? Would make it more clear where the time out is.

@rangadi
Copy link
Contributor

rangadi commented Oct 19, 2016

Fixed KafkaIOTest in #1133

@@ -968,7 +966,7 @@ public void run() {

// Wait for longer than normal when fetching a batch to improve chances a record is available
// when start() returns.
nextBatch(START_NEW_RECORDS_POLL_TIMEOUT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, can you remove this arg for nextBatch and use NEW_RECORDS_POLL_TIMEOUT diretly inside nextBatch().

@@ -968,7 +966,7 @@ public void run() {

// Wait for longer than normal when fetching a batch to improve chances a record is available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this comment.

@amitsela
Copy link
Member Author

@rangadi I've addressed your comments, PTAL.

TimeUnit.MILLISECONDS);
// poll available records, wait (if necessary) up to the specified timeout.
records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional/minor : align the args?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't mind but it seems like we don't have a consensus on arg-alignment..
I'll align and commit, thanks!

Copy link
Contributor

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 LGTM.
Thanks for the updates.

@asfgit asfgit closed this in b0cb2e8 Oct 19, 2016
@amitsela amitsela deleted the BEAM-744 branch October 19, 2016 18:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants