Skip to content

Commit

Permalink
[fix] [broker] Part-2: Replicator can not created successfully due to…
Browse files Browse the repository at this point in the history
… an orphan replicator in the previous topic owner (apache#21948)

(cherry picked from commit b774666)
  • Loading branch information
poorbarcode committed May 7, 2024
1 parent 076b55e commit 6038bbf
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) {
}
startProducer();
}).exceptionally(ex -> {
log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
+ " trigger a terminate. Replicator state: {}",
localTopicName, replicatorId, STATE_UPDATER.get(this), ex);
terminate();
Expand Down Expand Up @@ -376,9 +376,13 @@ public CompletableFuture<Void> terminate() {
this.producer = null;
// set the cursor as inactive.
disableReplicatorRead();
// release resources.
doReleaseResources();
});
}

protected void doReleaseResources() {}

protected boolean tryChangeStatusToTerminating() {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){
return true;
Expand Down Expand Up @@ -467,4 +471,8 @@ protected ImmutablePair<Boolean, State> compareSetAndGetState(State expect, Stat
}
return compareSetAndGetState(expect, update);
}

public boolean isTerminated() {
return state == State.Terminating || state == State.Terminated;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
boolean isConnected();

long getNumberOfEntriesInBacklog();

boolean isTerminated();
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
long waitTimeMillis = readFailureBackoff.next();

if (exception instanceof CursorAlreadyClosedException) {
log.error("[{}] Error reading entries because replicator is"
log.warn("[{}] Error reading entries because replicator is"
+ " already deleted and cursor is already closed {}, ({})",
replicatorId, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
Expand Down Expand Up @@ -569,7 +569,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
exception.getMessage(), exception);
if (exception instanceof CursorAlreadyClosedException) {
log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
log.warn("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
+ " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
terminate();
Expand Down Expand Up @@ -694,6 +694,11 @@ public boolean isConnected() {
return producer != null && producer.isConnected();
}

@Override
protected void doReleaseResources() {
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
}

private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,7 @@ public CompletableFuture<Void> checkReplication() {
return deleteForcefully();
}

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Check for missing replicators
Expand Down Expand Up @@ -1668,6 +1669,8 @@ private CompletableFuture<Void> checkShadowReplication() {
if (log.isDebugEnabled()) {
log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics);
}

removeTerminatedReplicators(shadowReplicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Check for missing replicators
Expand Down Expand Up @@ -1818,19 +1821,30 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
if (replicationClient == null) {
return;
}
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
lock.readLock().lock();
try {
if (isClosingOrDeleting) {
// Whether is "transferring" or not, do not create new replicator.
log.info("[{}] Skip to create replicator because this topic is closing."
+ " remote cluster: {}. State of transferring : {}",
topic, remoteCluster, transferring);
return;
}
return null;
});

// clean up replicator if startup is failed
if (replicator == null) {
replicators.removeNullValue(remoteCluster);
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
}
return null;
});
// clean up replicator if startup is failed
if (replicator == null) {
replicators.removeNullValue(remoteCluster);
}
} finally {
lock.readLock().unlock();
}
});
}
Expand Down Expand Up @@ -3484,9 +3498,27 @@ private void fenceTopicToCloseOrDelete() {
}

private void unfenceTopicToResume() {
subscriptions.values().forEach(sub -> sub.resumeAfterFence());
isFenced = false;
isClosingOrDeleting = false;
subscriptions.values().forEach(sub -> sub.resumeAfterFence());
unfenceReplicatorsToResume();
}

private void unfenceReplicatorsToResume() {
checkReplication();
checkShadowReplication();
}

private void removeTerminatedReplicators(ConcurrentOpenHashMap<String, Replicator> replicators) {
Map<String, Replicator> terminatedReplicators = new HashMap<>();
replicators.forEach((cluster, replicator) -> {
if (replicator.isTerminated()) {
terminatedReplicators.put(cluster, replicator);
}
});
terminatedReplicators.entrySet().forEach(entry -> {
replicators.remove(entry.getKey(), entry.getValue());
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -48,6 +51,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -492,4 +496,166 @@ public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throw
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}

/**
* See the description and execution flow: https://github.com/apache/pulsar/pull/21948.
* Steps:
* 1.Create topic, does not enable replication now.
* - The topic will be loaded in the memory.
* 2.Enable namespace level replication.
* - Broker creates a replicator, and the internal producer of replicator is starting.
* - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start.
* 3.Unload bundle.
* - Starting to close the topic.
* - The replicator will be closed, but it will not close the internal producer, because the producer has not
* been created successfully.
* - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still
* in the process of being closed now.
* 4.Internal producer retry to connect.
* - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer
* will not be closed now.
* 5.Topic closed.
* - Cancel the stuck of closing the "repl.cursor".
* - The topic is wholly closed.
* 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected
* to the remote cluster.
*/
@Test
public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", "");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_");
// 1.Create topic, does not enable replication now.
admin1.namespaces().createNamespace(namespaceName);
admin2.namespaces().createNamespace(namespaceName);
admin1.topics().createNonPartitionedTopic(topicName);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();

// We inject an error to make the internal producer fail to connect.
// The delay time of next retry to create producer is below:
// 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
return originalProducer;
}
log.info("Retry create replicator.producer count: {}", createProducerCounter);
// Release producer and fail callback.
originalProducer.closeAsync();
throw new RuntimeException("mock error");
}
return originalProducer;
});

// 2.Enable namespace level replication.
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2));
AtomicReference<PersistentReplicator> replicator = new AtomicReference<PersistentReplicator>();
Awaitility.await().untilAsserted(() -> {
assertFalse(persistentTopic.getReplicators().isEmpty());
replicator.set(
(PersistentReplicator) persistentTopic.getReplicators().values().iterator().next());
// Since we inject a producer creation error, the replicator can not start successfully.
assertFalse(replicator.get().isConnected());
});

// We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal
// producer of the replicator started.
SpyCursor spyCursor =
spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName());
CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor);

// 3.Unload bundle: call "topic.close(false)".
// Stuck start new producer, until the state of replicator change to Stopped.
// The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully.
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
assertTrue(createProducerCounter.get() >= failTimes);
});
CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
String state = String.valueOf(replicator.get().getState());
log.error("replicator state: {}", state);
assertTrue(state.equals("Disconnected") || state.equals("Terminated"));
});

// 5.Delay close cursor, until "replicator.producer" create successfully.
// The next once retry time of create "replicator.producer" will be 3.2s.
Thread.sleep(4 * 1000);
log.info("Replicator.state: {}", replicator.get().getState());
cursorCloseSignal.startClose();
cursorCloseSignal.startCallback();
// Wait for topic close successfully.
topicCloseFuture.join();

// 6. Verify there is no orphan producer on the remote cluster.
Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
PersistentTopic persistentTopic2 =
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
assertEquals(persistentTopic2.getProducers().size(), 0);
Assert.assertFalse(replicator.get().isConnected());
});

// cleanup.
cleanupTopics(namespaceName, () -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1));
admin1.namespaces().deleteNamespace(namespaceName);
admin2.namespaces().deleteNamespace(namespaceName);
}

@Test
public void testUnFenceTopicToReuse() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
// Wait for replicator started.
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
waitReplicatorStarted(topicName);

// Inject an error to make topic close fails.
final String mockProducerName = UUID.randomUUID().toString();
final org.apache.pulsar.broker.service.Producer mockProducer =
mock(org.apache.pulsar.broker.service.Producer.class);
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
.when(mockProducer).disconnect(any());
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
.when(mockProducer).disconnect();
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
persistentTopic.getProducers().put(mockProducerName, mockProducer);

// Do close.
GeoPersistentReplicator replicator1 =
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
try {
persistentTopic.close(true, false).join();
fail("Expected close fails due to a producer close fails");
} catch (Exception ex) {
log.info("Expected error: {}", ex.getMessage());
}

// Broker will call `topic.unfenceTopicToResume` if close clients fails.
// Verify: the replicator will be re-created.
Awaitility.await().untilAsserted(() -> {
assertTrue(producer1.isConnected());
GeoPersistentReplicator replicator2 =
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
assertNotEquals(replicator1, replicator2);
assertFalse(replicator1.isConnected());
assertFalse(replicator1.producer != null && replicator1.producer.isConnected());
assertTrue(replicator2.isConnected());
assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
});

// cleanup.
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,16 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
}

protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
waitChangeEventsInit(replicatedNamespace);
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Collections.singleton(cluster1));
admin1.namespaces().unload(replicatedNamespace);
cleanupTopics(replicatedNamespace, cleanupTopicAction);
}

protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception {
waitChangeEventsInit(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1));
admin1.namespaces().unload(namespace);
cleanupTopicAction.run();
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1, cluster2));
waitChangeEventsInit(replicatedNamespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2));
waitChangeEventsInit(namespace);
}

protected void waitChangeEventsInit(String namespace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public Object[][] partitionedTopicProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@Test
@Test(priority = Integer.MAX_VALUE)
public void testConfigChange() throws Exception {
log.info("--- Starting ReplicatorTest::testConfigChange ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during
Expand Down

0 comments on commit 6038bbf

Please sign in to comment.