diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index dc85a7146e84d..b17052cfbef72 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -23,8 +23,6 @@ import static java.util.Objects.isNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry; -import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; -import static org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.nio.ByteBuffer; @@ -49,6 +47,7 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.LocatorEntry; import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.protocol.schema.SchemaVersion; @@ -63,7 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BookkeeperSchemaStorage implements SchemaStorage { +public class BookkeeperSchemaStorage implements SchemaStorage { private static final Logger log = LoggerFactory.getLogger(BookkeeperSchemaStorage.class); private static final String SchemaPath = "/schemas"; @@ -114,6 +113,10 @@ public CompletableFuture put(String key, byte[] value, byte[] has return putSchema(key, value, hash).thenApply(LongSchemaVersion::new); } + public CompletableFuture put(String key, byte[] value, byte[] hash, LocatorEntry locatorEntry) { + return putSchema(key, value, hash, locatorEntry).thenApply(LongSchemaVersion::new); + } + @Override public CompletableFuture get(String key, SchemaVersion version) { if (version == SchemaVersion.Latest) { @@ -126,33 +129,33 @@ public CompletableFuture get(String key, SchemaVersion version) { @Override public CompletableFuture>> getAll(String key) { - CompletableFuture>> result = new CompletableFuture<>(); - getLocator(key).thenAccept(locator -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Get all schemas - locator: {}", key, locator); - } + return getAll(key, null); + } - if (!locator.isPresent()) { + public CompletableFuture>> getAll(String key, LocatorEntry locator) { + CompletableFuture>> result = new CompletableFuture<>(); + if (log.isDebugEnabled()) { + log.debug("[{}] Get all schemas - locator: {}", key, locator); + } + CompletableFuture> locatorFuture = locator == null + ? getLocator(key) : CompletableFuture.completedFuture(Optional.of(locator)); + List> list = new ArrayList<>(); + locatorFuture.thenAccept(locatorEntry -> { + if (locatorEntry.isPresent()) { + locatorEntry.get().locator.getIndexList().forEach(indexEntry -> + list.add(readSchemaEntry(indexEntry.getPosition()).thenApply(entry -> new StoredSchema( + entry.getSchemaData().toByteArray(), + new LongSchemaVersion(indexEntry.getVersion()))))); + result.complete(list); + } else { result.complete(Collections.emptyList()); - return; } - - SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator; - List> list = new ArrayList<>(); - schemaLocator.getIndexList().forEach(indexEntry -> list.add(readSchemaEntry(indexEntry.getPosition()) - .thenApply(entry -> new StoredSchema - ( - entry.getSchemaData().toByteArray(), - new LongSchemaVersion(indexEntry.getVersion()) - ) - ) - )); - result.complete(list); }); return result; } - CompletableFuture> getLocator(String key) { + @Override + public CompletableFuture> getLocator(String key) { return getSchemaLocator(getSchemaPath(key)); } @@ -284,60 +287,26 @@ private CompletableFuture putSchema(String schemaId, byte[] data, byte[] h return readSchemaEntry(locator.getIndexList().get(0).getPosition()) .thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId, locator.getIndexList(), data).thenCompose( - position -> { - CompletableFuture future = new CompletableFuture<>(); - updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash) - .thenAccept(future::complete) - .exceptionally(ex -> { - if (ex.getCause() instanceof BadVersionException) { - // There was a race condition on the schema creation. - // Since it has now been created, - // retry the whole operation so that we have a chance to - // recover without bubbling error - putSchema(schemaId, data, hash) - .thenAccept(future::complete) - .exceptionally(ex2 -> { - future.completeExceptionally(ex2); - return null; - }); - } else { - // For other errors, just fail the operation - future.completeExceptionally(ex); - } - return null; - }); - return future; - }) + position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)) ); } else { // No schema was defined yet - CompletableFuture future = new CompletableFuture<>(); - createNewSchema(schemaId, data, hash) - .thenAccept(future::complete) - .exceptionally(ex -> { - if (ex.getCause() instanceof AlreadyExistsException - || ex.getCause() instanceof BadVersionException) { - // There was a race condition on the schema creation. Since it has now been created, - // retry the whole operation so that we have a chance to recover without bubbling error - // back to producer/consumer - putSchema(schemaId, data, hash) - .thenAccept(future::complete) - .exceptionally(ex2 -> { - future.completeExceptionally(ex2); - return null; - }); - } else { - // For other errors, just fail the operation - future.completeExceptionally(ex); - } - return null; - }); - - return future; + return createNewSchema(schemaId, data, hash); } }); } + @NotNull + private CompletableFuture putSchema(String schemaId, byte[] data, byte[] hash, LocatorEntry locatorEntry) { + if (locatorEntry == null) { + return createNewSchema(schemaId, data, hash); + } else { + return addNewSchemaEntryToStore(schemaId, + locatorEntry.locator.getIndexList(), data).thenCompose( + position -> updateSchemaLocator(schemaId, locatorEntry, position, hash)); + } + } + private CompletableFuture createNewSchema(String schemaId, byte[] data, byte[] hash) { SchemaStorageFormat.IndexEntry emptyIndex = SchemaStorageFormat.IndexEntry.newBuilder() .setVersion(0) @@ -673,7 +642,7 @@ static SchemaStorageFormat.PositionInfo newPositionInfo(long ledgerId, long entr } } - static class LocatorEntry { + public static class LocatorEntry { final SchemaStorageFormat.SchemaLocator locator; final long version; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java index 2c84805584dae..06e28d670304a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java @@ -20,13 +20,13 @@ import javax.validation.constraints.NotNull; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.LocatorEntry; import org.apache.pulsar.common.protocol.schema.SchemaStorage; @SuppressWarnings("unused") public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory { @Override - @NotNull - public SchemaStorage create(PulsarService pulsar) { + public @NotNull SchemaStorage create(PulsarService pulsar) { return new BookkeeperSchemaStorage(pulsar); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index c819d96114d81..d3917856911a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; @@ -57,9 +58,11 @@ import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; +import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; @Slf4j -public class SchemaRegistryServiceImpl implements SchemaRegistryService { +public class SchemaRegistryServiceImpl implements SchemaRegistryService { private static HashFunction hashFunction = Hashing.sha256(); private final Map compatibilityChecks; private final SchemaStorage schemaStorage; @@ -150,10 +153,15 @@ public CompletableFuture getSchema(String schemaId, SchemaVer @Override public CompletableFuture>> getAllSchemas(String schemaId) { - long start = this.clock.millis(); + return getAllSchemas(schemaId, null); + } - return schemaStorage.getAll(schemaId) - .thenApply(schemas -> { + public CompletableFuture>> getAllSchemas(String schemaId, + T locatorEntry) { + long start = this.clock.millis(); + CompletableFuture>> storedSchemaFuture = locatorEntry == null + ? schemaStorage.getAll(schemaId) : schemaStorage.getAll(schemaId, locatorEntry); + return storedSchemaFuture.thenApply(schemas -> { List> futures = schemas.stream() .map(future -> future.thenCompose(stored -> Functions.bytesToSchemaInfo(stored.data) @@ -180,57 +188,108 @@ public CompletableFuture>> getAllSchem @Override @NotNull public CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema, - SchemaCompatibilityStrategy strategy) { - return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> - getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> { - if (schemaVersion != null) { - if (log.isDebugEnabled()) { - log.debug("[{}] Schema is already exists", schemaId); - } - return CompletableFuture.completedFuture(schemaVersion); - } - CompletableFuture checkCompatibilityFuture = new CompletableFuture<>(); - if (schemaAndMetadataList.size() != 0) { - if (isTransitiveStrategy(strategy)) { - checkCompatibilityFuture = - checkCompatibilityWithAll(schemaId, schema, strategy, schemaAndMetadataList); - } else { - checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, schema, strategy); - } + SchemaCompatibilityStrategy strategy) { + + return putSchemaIfAbsent(schemaId, schema, strategy, 0, 0); + } + + @NotNull + public CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema, + SchemaCompatibilityStrategy strategy, long startTime, int retry) { + + //1. read metastore and get SchemaLocator(with version) + //2. check if already exists, if yes, return; if no, do next step + //3. build schema entry, create new ledger, write schema entry to ledger + //4. update schema locator with new version and new index(added with new schema entry position) + //5. update schema locator to metastore + //6. if update metastore failed with AlreadyExistException or BadVersionException, retry from step 1 + return schemaStorage.getLocator(schemaId).thenCompose(locator -> { + Optional locatorEntry = locator == null ? Optional.empty() : (Optional) locator; + CompletableFuture> future; + // for compatibility, user-defined SchemaStorage may not support getLocator() + if (locator == null) { + future = trimDeletedSchemaAndGetList(schemaId); } else { - checkCompatibilityFuture.complete(null); + future = locatorEntry.isPresent() + ? trimDeletedSchemaAndGetList(schemaId, locatorEntry.get()) + : CompletableFuture.completedFuture(Collections.emptyList()); } - return checkCompatibilityFuture.thenCompose(v -> { - byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); - SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() - .setType(Functions.convertFromDomainType(schema.getType())) - .setSchema(ByteString.copyFrom(schema.getData())) - .setSchemaId(schemaId) - .setUser(schema.getUser()) - .setDeleted(false) - .setTimestamp(clock.millis()) - .addAllProps(toPairs(schema.getProps())) - .build(); - - long start = this.clock.millis(); - - return schemaStorage - .put(schemaId, info.toByteArray(), context) - .whenComplete((__, t) -> { - if (t != null) { - log.error("[{}] Put schema failed", schemaId); - this.stats.recordPutFailed(schemaId); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] Put schema finished", schemaId); - } - this.stats.recordPutLatency(schemaId, this.clock.millis() - start); - } - }); - + return future.thenCompose(schemaAndMetadataList -> { + return getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> { + if (schemaVersion != null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Schema is already exists", schemaId); + } + return CompletableFuture.completedFuture(schemaVersion); + } + CompletableFuture checkCompatibilityFuture = new CompletableFuture<>(); + if (schemaAndMetadataList.size() != 0) { + if (isTransitiveStrategy(strategy)) { + checkCompatibilityFuture = checkCompatibilityWithAll(schemaId, schema, + strategy, schemaAndMetadataList); + } else { + checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, + schema, strategy); + } + } else { + checkCompatibilityFuture.complete(null); + } + return checkCompatibilityFuture.thenCompose(v -> { + CompletableFuture persistentFuture = new CompletableFuture<>(); + byte[] hashBytes = hashFunction.hashBytes(schema.getData()).asBytes(); + SchemaRegistryFormat.SchemaInfo info = buildSchemaInfo(schemaId, schema); + long start = startTime > 0 ? startTime : this.clock.millis(); + schemaStorage + .put(schemaId, info.toByteArray(), hashBytes, + locatorEntry.isEmpty() ? null : locatorEntry.get()) + .thenAccept(sv -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Schema is successfully added", schemaId); + } + this.stats.recordPutLatency(schemaId, this.clock.millis() - start); + persistentFuture.complete((SchemaVersion) sv); + }) + .exceptionally(e -> { + Throwable ex = (Throwable) e; + if (ex.getCause() instanceof AlreadyExistsException + || ex.getCause() instanceof BadVersionException) { + // retry if put schemaLocator to zk failed caused by race condition + if (log.isDebugEnabled()) { + log.debug("[{}] Put schema failed because of {}, retry {} times", + ex.getCause().getMessage(), schemaId, retry + 1); + } + putSchemaIfAbsent(schemaId, schema, strategy, start, retry + 1) + .thenAccept(persistentFuture::complete) + .exceptionally(ex2 -> { + persistentFuture.completeExceptionally(ex2); + return null; + }); + + } else { + log.error("[{}] Put schema failed", schemaId, ex.getCause()); + this.stats.recordPutFailed(schemaId); + persistentFuture.completeExceptionally(ex); + } + return null; + }); + return persistentFuture; + }); + }); }); + }); + } - })); + private SchemaRegistryFormat.SchemaInfo buildSchemaInfo(String schemaId, SchemaData schema) { + return SchemaRegistryFormat.SchemaInfo + .newBuilder() + .setType(Functions.convertFromDomainType(schema.getType())) + .setSchema(ByteString.copyFrom(schema.getData())) + .setSchemaId(schemaId) + .setUser(schema.getUser()) + .setDeleted(false) + .setTimestamp(clock.millis()) + .addAllProps(toPairs(schema.getProps())) + .build(); } @Override @@ -525,15 +584,21 @@ private CompletableFuture checkCompatibilityWithAll(String schemaId, Schem } public CompletableFuture> trimDeletedSchemaAndGetList(String schemaId) { + return trimDeletedSchemaAndGetList(schemaId, null); + } + + public CompletableFuture> trimDeletedSchemaAndGetList(String schemaId, + T locatorEntry) { CompletableFuture> schemaResult = new CompletableFuture<>(); - CompletableFuture>> schemaFutureList = getAllSchemas(schemaId); + CompletableFuture>> schemaFutureList = + locatorEntry == null ? getAllSchemas(schemaId) : getAllSchemas(schemaId, locatorEntry); schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, ex) -> { List list = ex != null ? new ArrayList<>() : schemaList; if (ex != null) { - boolean recoverable = ex.getCause() != null && (ex.getCause() instanceof SchemaException) - ? ((SchemaException) ex.getCause()).isRecoverable() - : true; + boolean recoverable = ex.getCause() == null + || (!(ex.getCause() instanceof SchemaException)) + || ((SchemaException) ex.getCause()).isRecoverable(); // if error is recoverable then fail the request. if (recoverable) { schemaResult.completeExceptionally(ex.getCause()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index a2bd506a8f939..e044ec3f1f794 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -154,10 +154,22 @@ public void writeReadBackDeleteSchemaEntry() throws Exception { assertNull(schemaRegistryService.getSchema(schemaId1).get()); } + @Test + public void testConcurrentPutSchema() throws Exception { + String schemaId = "tenant/ns/topic_test_concurrent_put_schema" + UUID.randomUUID(); + CompletableFuture.allOf( + putSchema(schemaId, schemaData1), + putSchema(schemaId, schemaData1), + putSchema(schemaId, schemaData1), + putSchema(schemaId, schemaData1)) + .get(); + assertEquals(getAllSchemas(schemaId).size(), 1); + } + @Test public void findSchemaVersionTest() throws Exception { putSchema(schemaId1, schemaData1, version(0)); - assertEquals(0, schemaRegistryService.findSchemaVersion(schemaId1, schemaData1).get().longValue()); + assertEquals(0, ((CompletableFuture)schemaRegistryService.findSchemaVersion(schemaId1, schemaData1)).get().longValue()); } @Test @@ -243,7 +255,7 @@ public void addLotsOfEntriesThenDelete() throws Exception { deleteSchema(schemaId1, version(3)); - SchemaRegistry.SchemaAndMetadata version3 = schemaRegistryService.getSchema(schemaId1, version(3)).get(); + SchemaRegistry.SchemaAndMetadata version3 = (SchemaAndMetadata) schemaRegistryService.getSchema(schemaId1, version(3)).get(); assertNull(version3); } @@ -279,7 +291,7 @@ public void trimDeletedSchemaAndGetListTest() throws Exception { schemaId1, schemaData2, SchemaCompatibilityStrategy.FULL); newVersion = put.get(); list.add(new SchemaAndMetadata(schemaId1, schemaData2, newVersion)); - List list1 = schemaRegistryService.trimDeletedSchemaAndGetList(schemaId1).get(); + List list1 = (List) schemaRegistryService.trimDeletedSchemaAndGetList(schemaId1).get(); assertEquals(list.size(), list1.size()); HashFunction hashFunction = Hashing.sha256(); for (int i = 0; i < list.size(); i++) { @@ -306,9 +318,9 @@ public void checkIsCompatible() throws Exception { putSchema(schemaId1, schemaData1, version(0), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); putSchema(schemaId1, schemaData2, version(1), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); - assertTrue(schemaRegistryService.isCompatible(schemaId1, schemaData3, + assertTrue((Boolean) schemaRegistryService.isCompatible(schemaId1, schemaData3, SchemaCompatibilityStrategy.BACKWARD).get()); - assertFalse(schemaRegistryService.isCompatible(schemaId1, schemaData3, + assertFalse((Boolean) schemaRegistryService.isCompatible(schemaId1, schemaData3, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE).get()); putSchema(schemaId1, schemaData3, version(2), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); } @@ -319,6 +331,10 @@ public void testSchemaStorageFailed() throws Exception { restartBroker(); } + private CompletableFuture putSchema(String schemaId, SchemaData schema) { + return schemaRegistryService.putSchemaIfAbsent(schemaId, schema, SchemaCompatibilityStrategy.FULL); + } + private void putSchema(String schemaId, SchemaData schema, SchemaVersion expectedVersion) throws Exception { putSchema(schemaId, schema, expectedVersion, SchemaCompatibilityStrategy.FULL); } @@ -332,14 +348,14 @@ private void putSchema(String schemaId, SchemaData schema, SchemaVersion expecte } private SchemaData getLatestSchema(String schemaId, SchemaVersion expectedVersion) throws Exception { - SchemaRegistry.SchemaAndMetadata schemaAndVersion = schemaRegistryService.getSchema(schemaId).get(); + SchemaRegistry.SchemaAndMetadata schemaAndVersion = (SchemaAndMetadata) schemaRegistryService.getSchema(schemaId).get(); assertEquals(expectedVersion, schemaAndVersion.version); assertEquals(schemaId, schemaAndVersion.id); return schemaAndVersion.schema; } private SchemaData getSchema(String schemaId, SchemaVersion version) throws Exception { - SchemaRegistry.SchemaAndMetadata schemaAndVersion = schemaRegistryService.getSchema(schemaId, version).get(); + SchemaRegistry.SchemaAndMetadata schemaAndVersion = (SchemaAndMetadata) schemaRegistryService.getSchema(schemaId, version).get(); assertEquals(version, schemaAndVersion.version); assertEquals(schemaId, schemaAndVersion.id); return schemaAndVersion.schema; @@ -348,14 +364,14 @@ private SchemaData getSchema(String schemaId, SchemaVersion version) throws Exce private List getAllSchemas(String schemaId) throws Exception { List result = new ArrayList<>(); for (CompletableFuture schema : - schemaRegistryService.getAllSchemas(schemaId).get()) { + (List>)schemaRegistryService.getAllSchemas(schemaId).get()) { result.add(schema.get().schema); } return result; } private void deleteSchema(String schemaId, SchemaVersion expectedVersion) throws Exception { - SchemaVersion version = schemaRegistryService.deleteSchema(schemaId, userId, false).get(); + SchemaVersion version = (SchemaVersion) schemaRegistryService.deleteSchema(schemaId, userId, false).get(); assertEquals(expectedVersion, version); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java index ecfe4b2e769d6..adf936c0b4085 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java @@ -19,19 +19,32 @@ package org.apache.pulsar.common.protocol.schema; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; /** * Schema storage. */ -public interface SchemaStorage { +public interface SchemaStorage { CompletableFuture put(String key, byte[] value, byte[] hash); + default CompletableFuture put(String key, byte[] value, byte[] hash, T locator) { + return put(key, value, hash); + } + CompletableFuture get(String key, SchemaVersion version); + default CompletableFuture> getLocator(String key) { + return null; + } + CompletableFuture>> getAll(String key); + default CompletableFuture>> getAll(String key, T locator) { + return getAll(key); + } + CompletableFuture delete(String key, boolean forcefully); CompletableFuture delete(String key);