Skip to content

Commit

Permalink
Fix creating lots of versions for the same schema
Browse files Browse the repository at this point in the history
  • Loading branch information
aloyszhang committed Nov 29, 2022
1 parent 49ff0f8 commit 9fd555a
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 138 deletions.
Expand Up @@ -23,8 +23,6 @@
import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import static org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -49,6 +47,7 @@
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.LocatorEntry;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
Expand All @@ -63,7 +62,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookkeeperSchemaStorage implements SchemaStorage {
public class BookkeeperSchemaStorage implements SchemaStorage<LocatorEntry> {
private static final Logger log = LoggerFactory.getLogger(BookkeeperSchemaStorage.class);

private static final String SchemaPath = "/schemas";
Expand Down Expand Up @@ -114,6 +113,10 @@ public CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] has
return putSchema(key, value, hash).thenApply(LongSchemaVersion::new);
}

public CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash, LocatorEntry locatorEntry) {
return putSchema(key, value, hash, locatorEntry).thenApply(LongSchemaVersion::new);
}

@Override
public CompletableFuture<StoredSchema> get(String key, SchemaVersion version) {
if (version == SchemaVersion.Latest) {
Expand All @@ -126,33 +129,33 @@ public CompletableFuture<StoredSchema> get(String key, SchemaVersion version) {

@Override
public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key) {
CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new CompletableFuture<>();
getLocator(key).thenAccept(locator -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Get all schemas - locator: {}", key, locator);
}
return getAll(key, null);
}

if (!locator.isPresent()) {
public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key, LocatorEntry locator) {
CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new CompletableFuture<>();
if (log.isDebugEnabled()) {
log.debug("[{}] Get all schemas - locator: {}", key, locator);
}
CompletableFuture<Optional<LocatorEntry>> locatorFuture = locator == null
? getLocator(key) : CompletableFuture.completedFuture(Optional.of(locator));
List<CompletableFuture<StoredSchema>> list = new ArrayList<>();
locatorFuture.thenAccept(locatorEntry -> {
if (locatorEntry.isPresent()) {
locatorEntry.get().locator.getIndexList().forEach(indexEntry ->
list.add(readSchemaEntry(indexEntry.getPosition()).thenApply(entry -> new StoredSchema(
entry.getSchemaData().toByteArray(),
new LongSchemaVersion(indexEntry.getVersion())))));
result.complete(list);
} else {
result.complete(Collections.emptyList());
return;
}

SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator;
List<CompletableFuture<StoredSchema>> list = new ArrayList<>();
schemaLocator.getIndexList().forEach(indexEntry -> list.add(readSchemaEntry(indexEntry.getPosition())
.thenApply(entry -> new StoredSchema
(
entry.getSchemaData().toByteArray(),
new LongSchemaVersion(indexEntry.getVersion())
)
)
));
result.complete(list);
});
return result;
}

CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
@Override
public CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
return getSchemaLocator(getSchemaPath(key));
}

Expand Down Expand Up @@ -284,60 +287,26 @@ private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] h
return readSchemaEntry(locator.getIndexList().get(0).getPosition())
.thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId,
locator.getIndexList(), data).thenCompose(
position -> {
CompletableFuture<Long> future = new CompletableFuture<>();
updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
if (ex.getCause() instanceof BadVersionException) {
// There was a race condition on the schema creation.
// Since it has now been created,
// retry the whole operation so that we have a chance to
// recover without bubbling error
putSchema(schemaId, data, hash)
.thenAccept(future::complete)
.exceptionally(ex2 -> {
future.completeExceptionally(ex2);
return null;
});
} else {
// For other errors, just fail the operation
future.completeExceptionally(ex);
}
return null;
});
return future;
})
position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash))
);
} else {
// No schema was defined yet
CompletableFuture<Long> future = new CompletableFuture<>();
createNewSchema(schemaId, data, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
if (ex.getCause() instanceof AlreadyExistsException
|| ex.getCause() instanceof BadVersionException) {
// There was a race condition on the schema creation. Since it has now been created,
// retry the whole operation so that we have a chance to recover without bubbling error
// back to producer/consumer
putSchema(schemaId, data, hash)
.thenAccept(future::complete)
.exceptionally(ex2 -> {
future.completeExceptionally(ex2);
return null;
});
} else {
// For other errors, just fail the operation
future.completeExceptionally(ex);
}
return null;
});

return future;
return createNewSchema(schemaId, data, hash);
}
});
}

@NotNull
private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] hash, LocatorEntry locatorEntry) {
if (locatorEntry == null) {
return createNewSchema(schemaId, data, hash);
} else {
return addNewSchemaEntryToStore(schemaId,
locatorEntry.locator.getIndexList(), data).thenCompose(
position -> updateSchemaLocator(schemaId, locatorEntry, position, hash));
}
}

private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, byte[] hash) {
SchemaStorageFormat.IndexEntry emptyIndex = SchemaStorageFormat.IndexEntry.newBuilder()
.setVersion(0)
Expand Down Expand Up @@ -673,7 +642,7 @@ static SchemaStorageFormat.PositionInfo newPositionInfo(long ledgerId, long entr
}
}

static class LocatorEntry {
public static class LocatorEntry {
final SchemaStorageFormat.SchemaLocator locator;
final long version;

Expand Down
Expand Up @@ -20,13 +20,13 @@

import javax.validation.constraints.NotNull;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.LocatorEntry;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;

@SuppressWarnings("unused")
public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
@Override
@NotNull
public SchemaStorage create(PulsarService pulsar) {
public @NotNull SchemaStorage<LocatorEntry> create(PulsarService pulsar) {
return new BookkeeperSchemaStorage(pulsar);
}
}

0 comments on commit 9fd555a

Please sign in to comment.