Skip to content

[BEAM-4347] Enforce ErrorProne analysis in kafka IO#5422

Merged
iemejia merged 4 commits intoapache:masterfrom
timrobertson100:BEAM-4347
May 23, 2018
Merged

[BEAM-4347] Enforce ErrorProne analysis in kafka IO#5422
iemejia merged 4 commits intoapache:masterfrom
timrobertson100:BEAM-4347

Conversation

@timrobertson100
Copy link
Contributor

A simple PR to enforce error prone and fail the build on warnings raised. Current errorprone warnings are fixed (but I did not do an IDEA code analysis in this editing round).

CC @iemejia @swegner - could you please assign a reviewer?

@iemejia
Copy link
Member

iemejia commented May 18, 2018

R: @rangadi Can you please take a look at this one since it is Kafka related.

@iemejia iemejia self-requested a review May 18, 2018 14:07
Copy link
Contributor

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this @timrobertson100. Left a few comments (TODO comments and checking if switch to 'ArrayList' is required)

@iemejia, LGTM overall.

ProducerSpEL.beginTransaction(producer);
}

@SuppressWarnings("FutureReturnValueIgnored")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please leave a TODO for me.
// TODO : rangadi : add explanation for why this is ok.
Can we make this send() return future and ignore it at the call site?

Copy link
Contributor Author

@timrobertson100 timrobertson100 May 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'm not much in favour of leaving TODOs within code (looks cluttered, doesn't help with tracking outstanding work etc).
How about I open a jira task to review future handling in KafkaIO, informing that we've suppressed warnings (doesn't change current behaviour) but would like to review if we may drop messages on exceptional cases? I'll be happy to dig deeper with you on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Please file a jira. I wanted to add the comment soon after this gets merged. I know why it is ok to ignore here (it will be waited on inside commitTxn()). You can add the explanation in PR itself. If you are adding the comment, I suggest changing sendRecord() return the future and let the call ignore it with a comment saying they are flushed in commitTxn().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make the code change - new commit later

* closed if it is stays in cache for more than 1 minute, i.e. not used inside
* KafkaExactlyOnceSink DoFn for a minute.
*/
@SuppressWarnings("FutureReturnValueIgnored")
Copy link
Contributor

@rangadi rangadi May 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where a Future is ignored. Please leave a comment :
// TODO : rangadi : review if this is even required.

Copy link
Contributor Author

@timrobertson100 timrobertson100 May 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: It's needed because of the SCHEDULED_CLEAN_UP_THREAD.scheduleAtFixedRate(...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment to that effect. Will be useful for other readers.

}

@ProcessElement
@SuppressWarnings("FutureReturnValueIgnored")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 // TODO : rangadi : explain why this is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above - can I include this in a Review future handling in KafkaIO Jira instead please?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, either jira, or better, you can add a comment. Reason it is ok to ignore here is that SendCallback() keeps track of failures, which are checked inside finishBundle().


// cycle through the partitions in order to interleave records from each.
curBatch = Iterators.cycle(new LinkedList<>(partitionStates));
curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this also recommended by ErrorProne?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

LinkedList almost never out-performs ArrayList or ArrayDeque. If you are using LinkedList as a list, prefer ArrayList. If you are using LinkedList as a stack or queue/deque, prefer ArrayDeque.

Because ArrayDeque rejects nulls I erred on the side of caution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using LinkedList because of elements deleted in random order (in advance()). There are no nulls. Was it an ErrorProne error?
Overall I don't think the difference matters at all either way. I would be surprised if ErrorProne insisted we change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error prone reports it as warning but we now fail builds on warning (unless suppressed of course). Thanks for confirming no nulls.

(record -> new Instant(TimeUnit.SECONDS.toMillis(record.getKV().getValue())
+ customTimestampStartMillis)),
Duration.millis(0),
Duration.ZERO,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: ErrorProne suggested this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
It offers some nice suggestions for readability I've found (e.g. seconds(180) -> minutes(3))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

@timrobertson100
Copy link
Contributor Author

Thanks for the review @rangadi

If you're ok with the proposal I create a Jira for a review of Future handling in KafkaIO I don't think there are further changes. Alternatively, we can do that exploration now - if we find changes are needed they may be more invasive and warrant a specific jira for that anyway though.

@rangadi
Copy link
Contributor

rangadi commented May 20, 2018

Commented. Since you are interested, simpler thing might be to add the explanations for ignoring futures in this PR itself. JIRA is also fine.

@timrobertson100
Copy link
Contributor Author

Thanks @rangadi. I'll make a few changes and provide another commit (probably tomorrow or Tuesday)

@timrobertson100
Copy link
Contributor Author

Please can you take a look @rangadi ?

Copy link
Contributor

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Suggested two minor changes : removing 'Note:' in comments and reverting ArrayDeque change.

KafkaExactlyOnceSink.ensureEOSSupport();
}

// Note: Futures ignored as exceptions will be flushed out in the commitTxn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A short comment is already a note :). We could remove them.


// cycle through the partitions in order to interleave records from each.
curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
curBatch = Iterators.cycle(new ArrayDeque<>(partitionStates));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list is neither used as a stack or nor as a queue. I would say you can either leave it as ArrayList or LinkedList. ArrayDeque incorrectly implies usage.

@timrobertson100
Copy link
Contributor Author

Thanks for review @rangadi - those changes now applied.

}

// Note: Suppression since rrrors are tracked in SendCallback(), and checked on finishBundle()
// Suppression since rrrors are tracked in SendCallback(), and checked on finishBundle()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in 'errors'. Also 'in finishBundle()'?.

@rangadi
Copy link
Contributor

rangadi commented May 23, 2018

LGTM. Thanks @timrobertson100.
@iemejia : ready to merge (couple of typos are optional, which might already be fixed by the time you merge).

@iemejia iemejia merged commit e31f27f into apache:master May 23, 2018
@iemejia
Copy link
Member

iemejia commented May 23, 2018

Oups I screwed the correct title by double clicking merge by mistake. Thanks @timrobertson100 and @rangadi for the proper review.

@iemejia
Copy link
Member

iemejia commented May 23, 2018

LGTM in postfix mode due to my sloppy fingers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants