Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in concurrent schema deletion #11606

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,6 +28,7 @@
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -48,6 +49,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
Expand Down Expand Up @@ -372,6 +374,21 @@ public CompletableFuture<SchemaVersion> deleteSchema() {
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return schemaRegistryService.getSchema(id)
.exceptionally(t -> {
if (t.getCause() != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition looks a little bit hacky.

  1. we are only checking t.getCause(), IMHO we should have some utility method that traverses the full exception chain, looking for a SchemaException
  2. a "non recoverable" SchemaException is not enough to say that this is a "Schema does not exist" exception, can we introduce some specific SchemaException subclass or method like isRecoverable() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your reply!

For question 1, the condition is inspired by SchemaRegistryServiceImpl#trimDeletedSchemaAndGetList(java.lang.String), that is, those schemas with unrecoverable exceptions would be regarded as deleted schemas. Therefore here I continue to use the design concept.

For question 2, I agree that unrecoverable SchemaException is not equivant to "Schema does not exist". However in schema storage deletion scenario, an unrecoverable exception, which includes NoSuchLedgerException and NoSuchEntryException, is enough to say that the schema has already been deleted by others and we can quit the process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your clarification

What about creating an utility method and reduce code duplication?

&& (t.getCause() instanceof SchemaException)
&& !((SchemaException) t.getCause()).isRecoverable()) {
// Meeting NoSuchLedgerExistsException or NoSuchEntryException when reading schemas in
// bookkeeper. This also means that the data has already been deleted by other operations
// in deleting schema.
if (log.isDebugEnabled()) {
log.debug("Schema:{}'s data in bookkeeper may be deleted by other operations", id);
}
return null;
}
// rethrow other cases
throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t);
})
.thenCompose(schema -> {
if (schema != null) {
// It's different from `SchemasResource.deleteSchema`
Expand Down
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -58,6 +59,7 @@
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -174,7 +176,12 @@ BookKeeper getBookKeeper() {

@Override
public CompletableFuture<SchemaVersion> delete(String key, boolean forcefully) {
return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new);
return deleteSchema(key, forcefully).thenApply(version -> {
if (version == null) {
return null;
}
return new LongSchemaVersion(version);
});
}

@Override
Expand Down Expand Up @@ -370,7 +377,21 @@ private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, by

@NotNull
private CompletableFuture<Long> deleteSchema(String schemaId, boolean forceFully) {
return (forceFully ? CompletableFuture.completedFuture(null) : getSchema(schemaId))
return (forceFully ? CompletableFuture.completedFuture(null) : getSchema(schemaId).exceptionally(t -> {
if (t.getCause() != null
&& (t.getCause() instanceof SchemaException)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, at least we should have one utility method to detect this case

&& !((SchemaException) t.getCause()).isRecoverable()) {
// Meeting NoSuchLedgerExistsException or NoSuchEntryException when reading schemas in
// bookkeeper. This also means that the data has already been deleted by other operations
// in deleting schema.
if (log.isDebugEnabled()) {
log.debug("Schema:{}'s data in bookkeeper may be deleted by other operations", schemaId);
}
return null;
}
// rethrow other cases
throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t);
}))
.thenCompose(schemaAndVersion -> {
if (!forceFully && isNull(schemaAndVersion)) {
return completedFuture(null);
Expand Down Expand Up @@ -405,9 +426,20 @@ private CompletableFuture<Long> deleteSchema(String schemaId, boolean forceFully
store.delete(path, Optional.empty())
.thenRun(() -> {
future.complete(version);
}).exceptionally(ex1 -> {
future.completeExceptionally(ex1);
return null;
}).exceptionally(zkException -> {
if (zkException.getCause()
instanceof MetadataStoreException.NotFoundException) {
// The znode has been deleted by others.
// In some cases, the program may enter this logic.
// Since the znode is gone, we don’t need to deal with it.
if (log.isDebugEnabled()) {
log.debug("No node for schema path: {}", path);
}
future.complete(null);
} else {
future.completeExceptionally(zkException);
}
return null;
});
});
}
Expand Down