Skip to content

Commit

Permalink
Fix race condition in concurrent schema deletion (apache#11606)
Browse files Browse the repository at this point in the history
This PR fixes apache#11605

### Motivation

Concurrently deleting topics with the same schema may cause race condition in broker side. If we do not handle these scenarios correctly we will get unexpected exceptions in broker logs.

### Modifications

1. Add existence checks before schema deletion in `AbstractTopic#deleteSchema`.
2. Add existence checks before actually performing schema storage deletion in `BookkeeperSchemaStorage#deleteSchema`.
3. Ignore `NoNodeException` in `BookkeeperSchemaStorage#deleteSchema`.
  • Loading branch information
wuzhanpeng committed Aug 17, 2021
1 parent 1d1f633 commit 43ded59
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 9 deletions.
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -371,7 +372,7 @@ public CompletableFuture<SchemaVersion> deleteSchema() {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return schemaRegistryService.getSchema(id)
return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
.thenCompose(schema -> {
if (schema != null) {
// It's different from `SchemasResource.deleteSchema`
Expand Down
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -58,6 +59,7 @@
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -174,7 +176,12 @@ BookKeeper getBookKeeper() {

@Override
public CompletableFuture<SchemaVersion> delete(String key, boolean forcefully) {
return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new);
return deleteSchema(key, forcefully).thenApply(version -> {
if (version == null) {
return null;
}
return new LongSchemaVersion(version);
});
}

@Override
Expand Down Expand Up @@ -369,10 +376,10 @@ private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, by
}

@NotNull
private CompletableFuture<Long> deleteSchema(String schemaId, boolean forceFully) {
return (forceFully ? CompletableFuture.completedFuture(null) : getSchema(schemaId))
.thenCompose(schemaAndVersion -> {
if (!forceFully && isNull(schemaAndVersion)) {
private CompletableFuture<Long> deleteSchema(String schemaId, boolean forcefully) {
return (forcefully ? CompletableFuture.completedFuture(null)
: ignoreUnrecoverableBKException(getSchema(schemaId))).thenCompose(schemaAndVersion -> {
if (!forcefully && isNull(schemaAndVersion)) {
return completedFuture(null);
} else {
// The version is only for the compatibility of the current interface
Expand Down Expand Up @@ -405,9 +412,20 @@ private CompletableFuture<Long> deleteSchema(String schemaId, boolean forceFully
store.delete(path, Optional.empty())
.thenRun(() -> {
future.complete(version);
}).exceptionally(ex1 -> {
future.completeExceptionally(ex1);
return null;
}).exceptionally(zkException -> {
if (zkException.getCause()
instanceof MetadataStoreException.NotFoundException) {
// The znode has been deleted by others.
// In some cases, the program may enter this logic.
// Since the znode is gone, we don’t need to deal with it.
if (log.isDebugEnabled()) {
log.debug("No node for schema path: {}", path);
}
future.complete(null);
} else {
future.completeExceptionally(zkException);
}
return null;
});
});
}
Expand Down Expand Up @@ -681,4 +699,22 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
&& rc != BKException.Code.NoSuchEntryException;
return new SchemaException(recoverable, message);
}

public static <T> CompletableFuture<T> ignoreUnrecoverableBKException(CompletableFuture<T> source) {
return source.exceptionally(t -> {
if (t.getCause() != null
&& (t.getCause() instanceof SchemaException)
&& !((SchemaException) t.getCause()).isRecoverable()) {
// Meeting NoSuchLedgerExistsException or NoSuchEntryException when reading schemas in
// bookkeeper. This also means that the data has already been deleted by other operations
// in deleting schema.
if (log.isDebugEnabled()) {
log.debug("Schema data in bookkeeper may be deleted by other operations.", t);
}
return null;
}
// rethrow other cases
throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t);
});
}
}
Expand Up @@ -81,6 +81,7 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -731,6 +732,32 @@ public void testDeleteSchema() throws Exception {
assertFalse(topicHasSchema(topicName));
}

@Test
public void testConcurrentlyDeleteSchema() throws Exception {
String topic = "persistent://prop/ns-delete-schema/concurrently-delete-schema-test";
int partitions = 50;
admin.namespaces().createNamespace("prop/ns-delete-schema", 3);
admin.topics().createPartitionedTopic(topic, partitions);

Producer producer = pulsarClient
.newProducer(Schema.JSON(Schemas.BytesRecord.class))
.topic(topic)
.create();
producer.close();

CompletableFuture[] asyncFutures = new CompletableFuture[partitions];
for (int i = 0; i < partitions; i++) {
asyncFutures[i] = getTopic(TopicName.get(topic).getPartition(i).toString()).get().deleteSchema();
}

try {
// delete the schema concurrently, and wait for the end of all operations
CompletableFuture.allOf(asyncFutures).join();
} catch (Exception e) {
fail("Should not fail");
}
}

/**
* A topic that has retention policy set to non-0, should not be GCed until it has been inactive for at least the
* retention time.
Expand Down

0 comments on commit 43ded59

Please sign in to comment.