Skip to content

Commit

Permalink
--bug=97395019 pulsar client扩分区不感知问题修复 (merge request !62)
Browse files Browse the repository at this point in the history
Squash merge branch 'fix_add_partiton_error' into '2.8.1'
### Motivation
修复pulsar client,扩分区不感知问题
chery pick from apache#12620
  • Loading branch information
jerrypeng authored and mayozhang committed Mar 9, 2022
1 parent 1d00b2d commit 6eaaaf3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1318,28 +1318,35 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
topicName, oldPartitionNumber, currentPartitionNumber);
return FutureUtil.failedFuture(new NotSupportedException("not support shrink topic partitions"));
}
}).exceptionally(throwable -> {
log.warn("[{}] Failed to get partitions for topic to determine if new partitions are added", throwable);
return null;
});
}

private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled() || getState() != State.Ready) {
return;
}
try {
if (timeout.isCancelled() || getState() != State.Ready) {
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
}
if (log.isDebugEnabled()) {
log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
}

// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
}
} catch (Throwable th) {
log.warn("Encountered error in partition auto update timer task for multi-topic consumer. Another task will be scheduled.", th);
} finally {
// schedule the next re-check task
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}

// schedule the next re-check task
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,22 +356,26 @@ public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtende
private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled() || getState() != State.Ready) {
return;
}
try {
if (timeout.isCancelled() || getState() != State.Ready) {
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic);
}
if (log.isDebugEnabled()) {
log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic);
}

// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
}
} catch (Throwable th) {
log.warn("Encountered error in partition auto update timer task for partition producer. Another task will be scheduled.", th);
} finally {
// schedule the next re-check task
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}

// schedule the next re-check task
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
};

Expand Down

0 comments on commit 6eaaaf3

Please sign in to comment.