Skip to content

Commit

Permalink
add additional error handling in auto partition update task MultiTopi…
Browse files Browse the repository at this point in the history
…csConsumerImpl (#12620)

Co-authored-by: Jerry Peng <jerryp@splunk.com>
(cherry picked from commit 11cfbe4)
  • Loading branch information
jerrypeng authored and eolivelli committed Nov 9, 2021
1 parent f99ff9c commit c89e1c6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1324,28 +1324,35 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
topicName, oldPartitionNumber, currentPartitionNumber);
return FutureUtil.failedFuture(new NotSupportedException("not support shrink topic partitions"));
}
}).exceptionally(throwable -> {
log.warn("[{}] Failed to get partitions for topic to determine if new partitions are added", throwable);
return null;
});
}

private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled() || getState() != State.Ready) {
return;
}
try {
if (timeout.isCancelled() || getState() != State.Ready) {
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
}
if (log.isDebugEnabled()) {
log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
}

// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
}
} catch (Throwable th) {
log.warn("Encountered error in partition auto update timer task for multi-topic consumer. Another task will be scheduled.", th);
} finally {
// schedule the next re-check task
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}

// schedule the next re-check task
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,22 +410,26 @@ public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtende
private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled() || getState() != State.Ready) {
return;
}
try {
if (timeout.isCancelled() || getState() != State.Ready) {
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic);
}
if (log.isDebugEnabled()) {
log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic);
}

// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
}
} catch (Throwable th) {
log.warn("Encountered error in partition auto update timer task for partition producer. Another task will be scheduled.", th);
} finally {
// schedule the next re-check task
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}

// schedule the next re-check task
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
};

Expand Down

0 comments on commit c89e1c6

Please sign in to comment.