Skip to content

Commit

Permalink
MINOR: Avoid dividing by zero (#7143)
Browse files Browse the repository at this point in the history
Reviews: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>, Guozhang Wang <guozhang@confluent.io>
  • Loading branch information
mjsax committed Aug 3, 2019
1 parent e33f1b3 commit d00e741
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1038,15 +1038,15 @@ private boolean maybePunctuate() {
* or if the task producer got fenced (EOS)
*/
boolean maybeCommit() {
int committed = 0;
final int committed;

if (commitTimeMs >= 0 && now - lastCommitMs > commitTimeMs) {
if (log.isTraceEnabled()) {
log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)",
taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
}

committed += taskManager.commitAll();
committed = taskManager.commitAll();
if (committed > 0) {
final long intervalCommitLatency = advanceNowAndComputeLatency();
streamsMetrics.commitTimeSensor.record(intervalCommitLatency / (double) committed, now);
Expand All @@ -1063,11 +1063,10 @@ boolean maybeCommit() {
lastCommitMs = now;
processStandbyRecords = true;
} else {
final int commitPerRequested = taskManager.maybeCommitActiveTasksPerUserRequested();
if (commitPerRequested > 0) {
committed = taskManager.maybeCommitActiveTasksPerUserRequested();
if (committed > 0) {
final long requestCommitLatency = advanceNowAndComputeLatency();
streamsMetrics.commitTimeSensor.record(requestCommitLatency / (double) committed, now);
committed += commitPerRequested;
}
}

Expand Down

0 comments on commit d00e741

Please sign in to comment.