Skip to content

Commit

Permalink
Migrated BookkeeperSchemaStorage to use MetadataStore (#10545)
Browse files Browse the repository at this point in the history
* Migrated BookkeeperSchemaStorage to use MetadataStore

* Fixed test

* Fixed checkstyle

* Fixed tests
  • Loading branch information
merlimat authored May 12, 2021
1 parent 283f541 commit 55af397
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static java.util.Objects.nonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import static org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -46,7 +48,6 @@
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
Expand All @@ -55,27 +56,22 @@
import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookkeeperSchemaStorage implements SchemaStorage {
private static final Logger log = LoggerFactory.getLogger(BookkeeperSchemaStorage.class);

private static final String SchemaPath = "/schemas";
private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private static final byte[] LedgerPassword = "".getBytes();

private final MetadataStore store;
private final PulsarService pulsar;
private final ZooKeeper zooKeeper;
private final ZooKeeperCache localZkCache;
private final MetadataCache<SchemaStorageFormat.SchemaLocator> locatorEntryCache;

private final ServiceConfiguration config;
private BookKeeper bookKeeper;

Expand All @@ -85,20 +81,19 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
@VisibleForTesting
BookkeeperSchemaStorage(PulsarService pulsar) {
this.pulsar = pulsar;
this.localZkCache = pulsar.getLocalZkCache();
this.zooKeeper = localZkCache.getZooKeeper();
this.store = pulsar.getLocalMetadataStore();
this.config = pulsar.getConfiguration();
}
this.locatorEntryCache = store.getMetadataCache(new MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
@Override
public byte[] serialize(SchemaStorageFormat.SchemaLocator value) {
return value.toByteArray();
}

@VisibleForTesting
public void init() throws KeeperException, InterruptedException {
try {
if (zooKeeper.exists(SchemaPath, false) == null) {
zooKeeper.create(SchemaPath, new byte[]{}, Acl, CreateMode.PERSISTENT);
@Override
public SchemaStorageFormat.SchemaLocator deserialize(byte[] content) throws IOException {
return SchemaStorageFormat.SchemaLocator.parseFrom(content);
}
} catch (KeeperException.NodeExistsException error) {
// race on startup, ignore.
}
});
}

@Override
Expand Down Expand Up @@ -158,10 +153,6 @@ CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
return getSchemaLocator(getSchemaPath(key));
}

public void clearLocatorCache(String key) {
localZkCache.invalidate(getSchemaPath(key));
}

public List<Long> getSchemaLedgerList(String key) throws IOException {
Optional<LocatorEntry> locatorEntry = null;
try {
Expand Down Expand Up @@ -302,7 +293,7 @@ private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] h
updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
if (ex.getCause() instanceof KeeperException.BadVersionException) {
if (ex.getCause() instanceof BadVersionException) {
// There was a race condition on the schema creation.
// Since it has now been created,
// retry the whole operation so that we have a chance to
Expand All @@ -328,8 +319,8 @@ private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] h
createNewSchema(schemaId, data, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
if (ex.getCause() instanceof NodeExistsException
|| ex.getCause() instanceof KeeperException.BadVersionException) {
if (ex.getCause() instanceof AlreadyExistsException
|| ex.getCause() instanceof BadVersionException) {
// There was a race condition on the schema creation. Since it has now been created,
// retry the whole operation so that we have a chance to recover without bubbling error
// back to producer/consumer
Expand Down Expand Up @@ -411,15 +402,13 @@ private CompletableFuture<Long> deleteSchema(String schemaId, boolean forceFully
});
FutureUtil.waitForAll(deleteFutures).whenComplete((v, e) -> {
final String path = getSchemaPath(schemaId);
ZkUtils.asyncDeleteFullPathOptimistic(zooKeeper, path, -1, (rc, path1, ctx) -> {
if (rc != Code.OK.intValue()) {
future.completeExceptionally(KeeperException.create(Code.get(rc)));
} else {
clearLocatorCache(getSchemaPath(schemaId));
future.complete(version);
}
}, path);

store.delete(path, Optional.empty())
.thenRun(() -> {
future.complete(version);
}).exceptionally(ex1 -> {
future.completeExceptionally(ex1);
return null;
});
});
}
});
Expand Down Expand Up @@ -468,7 +457,7 @@ private CompletableFuture<Long> updateSchemaLocator(
.setInfo(info)
.addAllIndex(
concat(locator.getIndexList(), newArrayList(info))
).build(), locatorEntry.zkZnodeVersion
).build(), locatorEntry.version
).thenApply(ignore -> nextVersion);
}

Expand Down Expand Up @@ -518,42 +507,21 @@ private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(

@NotNull
private CompletableFuture<Void> updateSchemaLocator(String id,
SchemaStorageFormat.SchemaLocator schema, int version) {
CompletableFuture<Void> future = new CompletableFuture<>();
zooKeeper.setData(id, schema.toByteArray(), version, (rc, path, ctx, stat) -> {
Code code = Code.get(rc);
if (code != Code.OK) {
future.completeExceptionally(KeeperException.create(code));
} else {
future.complete(null);
}
}, null);
return future;
SchemaStorageFormat.SchemaLocator schema, long version) {
return store.put(id, schema.toByteArray(), Optional.of(version)).thenApply(__ -> null);
}

@NotNull
private CompletableFuture<LocatorEntry> createSchemaLocator(String id, SchemaStorageFormat.SchemaLocator locator) {
CompletableFuture<LocatorEntry> future = new CompletableFuture<>();

ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, id, locator.toByteArray(), Acl,
CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
Code code = Code.get(rc);
if (code != Code.OK) {
future.completeExceptionally(KeeperException.create(code));
} else {
// Newly created z-node will have version 0
future.complete(new LocatorEntry(locator, 0));
}
}, null);

return future;
return store.put(id, locator.toByteArray(), Optional.of(-1L))
.thenApply(stat -> new LocatorEntry(locator, stat.getVersion()));
}

@NotNull
private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String schema) {
return localZkCache.getEntryAsync(schema, new SchemaLocatorDeserializer()).thenApply(optional ->
optional.map(entry -> new LocatorEntry(entry.getKey(), entry.getValue().getVersion()))
);
return locatorEntryCache.getWithStats(schema)
.thenApply(o ->
o.map(r -> new LocatorEntry(r.getValue(), r.getStat().getVersion())));
}

@NotNull
Expand Down Expand Up @@ -692,20 +660,13 @@ static SchemaStorageFormat.PositionInfo newPositionInfo(long ledgerId, long entr
}
}

static class SchemaLocatorDeserializer implements ZooKeeperCache.Deserializer<SchemaStorageFormat.SchemaLocator> {
@Override
public SchemaStorageFormat.SchemaLocator deserialize(String key, byte[] content) throws Exception {
return SchemaStorageFormat.SchemaLocator.parseFrom(content);
}
}

static class LocatorEntry {
final SchemaStorageFormat.SchemaLocator locator;
final Integer zkZnodeVersion;
final long version;

LocatorEntry(SchemaStorageFormat.SchemaLocator locator, Integer zkZnodeVersion) {
LocatorEntry(SchemaStorageFormat.SchemaLocator locator, long version) {
this.locator = locator;
this.zkZnodeVersion = zkZnodeVersion;
this.version = version;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
@Override
@NotNull
public SchemaStorage create(PulsarService pulsar) throws Exception {
BookkeeperSchemaStorage service = new BookkeeperSchemaStorage(pulsar);
service.init();
return service;
public SchemaStorage create(PulsarService pulsar) {
return new BookkeeperSchemaStorage(pulsar);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.bookkeeper.client.api.BKException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;

import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.bkException;
Expand Down Expand Up @@ -61,7 +61,7 @@ public void testVersionFromBytes() {
byte[] versionBytesPost240 = bbPost240.array();

PulsarService mockPulsarService = mock(PulsarService.class);
when(mockPulsarService.getLocalZkCache()).thenReturn(mock(LocalZooKeeperCache.class));
when(mockPulsarService.getLocalMetadataStore()).thenReturn(mock(MetadataStoreExtended.class));
BookkeeperSchemaStorage schemaStorage = new BookkeeperSchemaStorage(mockPulsarService);
assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPre240));
assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPost240));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ protected void setup() throws Exception {
conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory");
super.internalSetup();
BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar);
storage.init();
storage.start();
Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
Expand Down

0 comments on commit 55af397

Please sign in to comment.