diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index b03feb37b6b64..10c83245c50dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1052,7 +1052,7 @@ public CompletableFuture isSchemaCompatible(SchemaData schema) { String id = TopicName.get(base).getSchemaName(); return brokerService.pulsar() .getSchemaRegistryService() - .isCompatibleWithLatestVersion(id, schema, schemaCompatibilityStrategy); + .isCompatible(id, schema, schemaCompatibilityStrategy); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b0392f9106af5..ed59c37c32880 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1942,7 +1942,7 @@ public CompletableFuture isSchemaCompatible(SchemaData schema) { String id = TopicName.get(base).getSchemaName(); return brokerService.pulsar() .getSchemaRegistryService() - .isCompatibleWithLatestVersion(id, schema, schemaCompatibilityStrategy); + .isCompatible(id, schema, schemaCompatibilityStrategy); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java index a91edab065f72..99594e0db5957 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java @@ -20,7 +20,9 @@ import static java.nio.charset.StandardCharsets.UTF_8; -import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; + import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.avro.SchemaValidationException; @@ -34,45 +36,49 @@ */ @Slf4j abstract class AvroSchemaBasedCompatibilityCheck implements SchemaCompatibilityCheck { + @Override - public boolean isWellFormed(SchemaData to) { - try { - Schema.Parser toParser = new Schema.Parser(); - Schema toSchema = toParser.parse(new String(to.getData(), UTF_8)); - } catch (SchemaParseException e) { - log.error("Error during schema parsing: {}", e.getMessage(), e); - return false; - } - return true; + public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) { + return isCompatible(Collections.singletonList(from), to, strategy); } @Override - public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) { + public boolean isCompatible(Iterable from, SchemaData to, SchemaCompatibilityStrategy strategy) { + LinkedList fromList = new LinkedList<>(); try { - Schema.Parser fromParser = new Schema.Parser(); - Schema fromSchema = fromParser.parse(new String(from.getData(), UTF_8)); - Schema.Parser toParser = new Schema.Parser(); - Schema toSchema = toParser.parse(new String(to.getData(), UTF_8)); - - SchemaValidator schemaValidator = createSchemaValidator(strategy, true); - schemaValidator.validate(toSchema, Arrays.asList(fromSchema)); - } catch (SchemaParseException | SchemaValidationException e) { + for (SchemaData schemaData : from) { + Schema.Parser parser = new Schema.Parser(); + fromList.addFirst(parser.parse(new String(schemaData.getData(), UTF_8))); + } + Schema.Parser parser = new Schema.Parser(); + Schema toSchema = parser.parse(new String(to.getData(), UTF_8)); + SchemaValidator schemaValidator = createSchemaValidator(strategy); + schemaValidator.validate(toSchema, fromList); + } catch (SchemaParseException e) { + log.error("Error during schema parsing: {}", e.getMessage(), e); + return false; + } catch (SchemaValidationException e) { log.error("Error during schema compatibility check: {}", e.getMessage(), e); return false; } return true; } - static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy, - boolean onlyLatestValidator) { + static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) { final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder(); switch (compatibilityStrategy) { + case BACKWARD_TRANSITIVE: + return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), false); case BACKWARD: - return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator); + return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), true); + case FORWARD_TRANSITIVE: + return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), false); case FORWARD: - return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator); + return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), true); + case FULL_TRANSITIVE: + return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), false); case FULL: - return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator); + return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), true); case ALWAYS_COMPATIBLE: return AlwaysSchemaValidator.INSTANCE; default: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 82c7156bdbdd3..b99f27c91bb17 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -38,6 +39,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import javax.validation.constraints.NotNull; @@ -118,6 +120,33 @@ public CompletableFuture get(String key, SchemaVersion version) { } } + @Override + public CompletableFuture>> getAll(String key) { + CompletableFuture>> result = new CompletableFuture<>(); + getSchemaLocator(getSchemaPath(key)).thenAccept(locator -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Get all schemas - locator: {}", key, locator); + } + + if (!locator.isPresent()) { + result.complete(Collections.emptyList()); + } + + SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator; + List> list = new ArrayList<>(); + schemaLocator.getIndexList().forEach(indexEntry -> list.add(readSchemaEntry(indexEntry.getPosition()) + .thenApply(entry -> new StoredSchema + ( + entry.getSchemaData().toByteArray(), + new LongSchemaVersion(schemaLocator.getInfo().getVersion()) + ) + ) + )); + result.complete(list); + }); + return result; + } + @Override public CompletableFuture delete(String key) { return deleteSchema(key).thenApply(LongSchemaVersion::new); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java index 2988079660dd9..38e09979fcd0e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java @@ -20,6 +20,8 @@ import static java.util.concurrent.CompletableFuture.completedFuture; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.common.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaVersion; @@ -35,6 +37,12 @@ public CompletableFuture getSchema(String schemaId, SchemaVer return completedFuture(null); } + @Override + public CompletableFuture>> getAllSchemas(String schemaId) { + return completedFuture(Collections.emptyList()); + } + + @Override public CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) { @@ -47,14 +55,13 @@ public CompletableFuture deleteSchema(String schemaId, String use } @Override - public SchemaVersion versionFromBytes(byte[] version) { - return null; + public CompletableFuture isCompatible(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) { + return completedFuture(false); } @Override - public CompletableFuture isCompatibleWithLatestVersion(String schemaId, SchemaData schema, - SchemaCompatibilityStrategy strategy) { - return CompletableFuture.completedFuture(true); + public SchemaVersion versionFromBytes(byte[] version) { + return null; } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java index 136f41fb15ea1..b50d7ad98a6a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityCheck.java @@ -22,42 +22,51 @@ import org.apache.pulsar.common.schema.SchemaType; public interface SchemaCompatibilityCheck { + SchemaType getSchemaType(); /** * + * @param from the current schema i.e. schema that the broker has * @param to the future schema i.e. the schema sent by the producer - * @return whether the schemas are well-formed + * @param strategy the strategy to use when comparing schemas + * @return whether the schemas are compatible */ - boolean isWellFormed(SchemaData to); + boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy); /** * - * @param from the current schema i.e. schema that the broker has + * @param from the current schemas i.e. schemas that the broker has * @param to the future schema i.e. the schema sent by the producer * @param strategy the strategy to use when comparing schemas * @return whether the schemas are compatible */ - boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy); + boolean isCompatible(Iterable from, SchemaData to, SchemaCompatibilityStrategy strategy); SchemaCompatibilityCheck DEFAULT = new SchemaCompatibilityCheck() { + @Override public SchemaType getSchemaType() { return SchemaType.NONE; } @Override - public boolean isWellFormed(SchemaData to) { - return true; + public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) { + if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) { + return false; + } else { + return true; + } } @Override - public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) { + public boolean isCompatible(Iterable from, SchemaData to, SchemaCompatibilityStrategy strategy) { if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) { return false; } else { return true; } } + }; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java index c53642ef94fa2..b15568d6f20c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaCompatibilityStrategy.java @@ -44,24 +44,49 @@ public enum SchemaCompatibilityStrategy { /** * Equivalent to both FORWARD and BACKWARD */ - FULL; + FULL, + + /** + * Be similar to BACKWARD, BACKWARD_TRANSITIVE ensure all previous version schema can + * be read by the new schema. + */ + BACKWARD_TRANSITIVE, + + /** + * Be similar to FORWARD, FORWARD_TRANSITIVE ensure new schema can be ready by all previous + * version schema. + */ + FORWARD_TRANSITIVE, + + /** + * Equivalent to both FORWARD_TRANSITIVE and BACKWARD_TRANSITIVE + */ + FULL_TRANSITIVE; + + public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateCompatibilityStrategy strategy) { if (strategy == null) { return SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE; } switch (strategy) { - case Backward: - return BACKWARD; - case Forward: - return FORWARD; - case Full: - return FULL; - case AlwaysCompatible: - return ALWAYS_COMPATIBLE; - case AutoUpdateDisabled: - default: - return ALWAYS_INCOMPATIBLE; + case Backward: + return BACKWARD; + case Forward: + return FORWARD; + case Full: + return FULL; + case AlwaysCompatible: + return ALWAYS_COMPATIBLE; + case ForwardTransitive: + return FORWARD_TRANSITIVE; + case BackwardTransitive: + return BACKWARD_TRANSITIVE; + case FullTransitive: + return FULL_TRANSITIVE; + case AutoUpdateDisabled: + default: + return ALWAYS_INCOMPATIBLE; } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java index 6b8ea24797f3d..12ab192435a29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.service.schema; import com.google.common.base.MoreObjects; + +import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.common.schema.SchemaData; @@ -30,12 +32,14 @@ public interface SchemaRegistry extends AutoCloseable { CompletableFuture getSchema(String schemaId, SchemaVersion version); + CompletableFuture>> getAllSchemas(String schemaId); + CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy); CompletableFuture deleteSchema(String schemaId, String user); - CompletableFuture isCompatibleWithLatestVersion(String schemaId, SchemaData schema, + CompletableFuture isCompatible(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy); SchemaVersion versionFromBytes(byte[] version); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index fe18fa7649508..43f79c8e77d09 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -20,6 +20,7 @@ import static java.util.Objects.isNull; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap; import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs; import com.google.common.annotations.VisibleForTesting; @@ -35,7 +36,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import javax.validation.constraints.NotNull; + +import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; import org.apache.pulsar.common.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; @@ -81,19 +85,33 @@ public CompletableFuture getSchema(String schemaId, SchemaVer ); } + @Override + public CompletableFuture>> getAllSchemas(String schemaId) { + return schemaStorage.getAll(schemaId).thenApply(schemas -> + schemas.stream().map(future -> future.thenCompose(stored -> + Functions.bytesToSchemaInfo(stored.data) + .thenApply(Functions::schemaInfoToSchema) + .thenApply(schema -> new SchemaAndMetadata(schemaId, schema, stored.version)) + )).collect(Collectors.toList())); + } + @Override @NotNull public CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) { return getSchema(schemaId) - .thenApply( + .thenCompose( (existingSchema) -> - existingSchema == null || - existingSchema.schema.isDeleted() || - (isWellFormed(schema) && - isCompatible(existingSchema, schema, strategy))) - .thenCompose(isValid -> { - if (isValid) { + { + if (existingSchema == null || existingSchema.schema.isDeleted()) { + return completedFuture(true); + } else { + return isCompatible(schemaId, schema, strategy); + } + } + ) + .thenCompose(isCompatible -> { + if (isCompatible) { byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() .setType(Functions.convertFromDomainType(schema.getType())) @@ -119,9 +137,16 @@ public CompletableFuture deleteSchema(String schemaId, String use } @Override - public CompletableFuture isCompatibleWithLatestVersion(String schemaId, SchemaData schema, + public CompletableFuture isCompatible(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) { - return checkCompatibilityWithLatest(schemaId, schema, strategy); + switch (strategy) { + case FORWARD_TRANSITIVE: + case BACKWARD_TRANSITIVE: + case FULL_TRANSITIVE: + return checkCompatibilityWithAll(schemaId, schema, strategy); + default: + return checkCompatibilityWithLatest(schemaId, schema, strategy); + } } @Override @@ -145,11 +170,6 @@ private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String user) { .build(); } - private boolean isWellFormed(SchemaData schema) { - return compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT) - .isWellFormed(schema); - } - private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema, SchemaCompatibilityStrategy strategy) { HashCode existingHash = hashFunction.hashBytes(existingSchema.schema.getData()); @@ -168,6 +188,18 @@ private CompletableFuture checkCompatibilityWithLatest(String schemaId, && isCompatible(existingSchema, schema, strategy)); } + private CompletableFuture checkCompatibilityWithAll(String schemaId, SchemaData schema, + SchemaCompatibilityStrategy strategy) { + return getAllSchemas(schemaId) + .thenCompose(FutureUtils::collect) + .thenApply(schemaAndMetadataList -> schemaAndMetadataList + .stream() + .map(schemaAndMetadata -> schemaAndMetadata.schema) + .collect(Collectors.toList())) + .thenApply(schemas -> compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT) + .isCompatible(schemas, schema, strategy)); + } + interface Functions { static SchemaType convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) { if (type.getNumber() < 0) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java index c5c7f832ece64..5a3b44ccf9bd8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.schema; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.common.schema.SchemaVersion; @@ -27,6 +28,8 @@ public interface SchemaStorage { CompletableFuture get(String key, SchemaVersion version); + CompletableFuture>> getAll(String key); + CompletableFuture delete(String key); SchemaVersion versionFromBytes(byte[] version); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java index 67d8432532df5..c9bf39f905d86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BaseAvroSchemaCompatibilityTest.java @@ -23,6 +23,9 @@ import org.testng.Assert; import org.testng.annotations.Test; +import java.util.Arrays; +import java.util.Collections; + public abstract class BaseAvroSchemaCompatibilityTest { SchemaCompatibilityCheck schemaCompatibilityCheck; @@ -69,6 +72,11 @@ public abstract class BaseAvroSchemaCompatibilityTest { "\"type\":\"string\",\"default\":\"bar\"}]}"; private static final SchemaData schemaData7 = getSchemaData(schemaJson7); + private static final String schemaJson8 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," + + "{\"name\":\"field2\",\"type\":\"string\"}]}"; + private static final SchemaData schemaData8 = getSchemaData(schemaJson8); public abstract SchemaCompatibilityCheck getSchemaCheck(); @@ -158,6 +166,45 @@ public void testFullCompatibility() { } + @Test + public void testBackwardTransitive() { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCheck(); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData1, schemaData2), schemaData5, + SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData1, schemaData2, schemaData5), + schemaData6, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Collections.singletonList(schemaData2), schemaData8, + SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData1, schemaData2), schemaData8, + SchemaCompatibilityStrategy.BACKWARD)); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData1, schemaData2), schemaData8, + SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData1, schemaData2, schemaData5), + schemaData8, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE)); + } + + @Test + public void testForwardTransitive() { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCheck(); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData1, schemaData2), schemaData3, + SchemaCompatibilityStrategy.FORWARD_TRANSITIVE)); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData1, schemaData2, schemaData3), + schemaData7, SchemaCompatibilityStrategy.FORWARD_TRANSITIVE)); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData3, schemaData2), schemaData1, + SchemaCompatibilityStrategy.FORWARD)); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData3, schemaData2), schemaData1, + SchemaCompatibilityStrategy.FORWARD_TRANSITIVE)); + } + + @Test + public void testFullTransitive() { + SchemaCompatibilityCheck schemaCompatibilityCheck = getSchemaCheck(); + Assert.assertTrue(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData1, schemaData2), schemaData3, + SchemaCompatibilityStrategy.FULL)); + Assert.assertFalse(schemaCompatibilityCheck.isCompatible(Arrays.asList(schemaData1, schemaData2), schemaData3, + SchemaCompatibilityStrategy.FULL_TRANSITIVE)); + } + private static SchemaData getSchemaData(String schemaJson) { return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).build(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index 9ccdbf03d5fc5..baa9542fa48ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -19,15 +19,21 @@ package org.apache.pulsar.broker.service.schema; import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; -import com.google.common.collect.Maps; import java.time.Clock; import java.time.Instant; import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.common.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; @@ -79,7 +85,9 @@ protected void setup() throws Exception { BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar); storage.init(); storage.start(); - schemaRegistryService = new SchemaRegistryServiceImpl(storage, Maps.newHashMap(), MockClock); + Map checkMap = new HashMap<>(); + checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck()); + schemaRegistryService = new SchemaRegistryServiceImpl(storage, checkMap, MockClock); } @AfterMethod @@ -139,6 +147,18 @@ public void getByVersionReturnsTheCorrectEntry3() throws Exception { assertEquals(schema1, version1); } + @Test + public void getAllVersionSchema() throws Exception { + putSchema(schemaId1, schema1, version(0)); + putSchema(schemaId1, schema2, version(1)); + putSchema(schemaId1, schema3, version(2)); + + List allSchemas = getAllSchemas(schemaId1); + assertEquals(schema1, allSchemas.get(0)); + assertEquals(schema2, allSchemas.get(1)); + assertEquals(schema3, allSchemas.get(2)); + } + @Test public void addLotsOfEntriesThenDelete() throws Exception { SchemaData randomSchema1 = randomSchema(); @@ -218,9 +238,43 @@ public void dontReAddExistingSchemaInMiddle() throws Exception { putSchema(schemaId1, schema2, version(1)); } + @Test(expectedExceptions = ExecutionException.class) + public void checkIsCompatible() throws Exception { + String schemaJson1 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}"; + SchemaData schemaData1 = getSchemaData(schemaJson1); + + String schemaJson2 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," + + "{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}"; + SchemaData schemaData2 = getSchemaData(schemaJson2); + + String schemaJson3 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," + + "{\"name\":\"field2\",\"type\":\"string\"}]}"; + SchemaData schemaData3 = getSchemaData(schemaJson3); + + putSchema(schemaId1, schemaData1, version(0), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); + putSchema(schemaId1, schemaData2, version(1), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); + + assertTrue(schemaRegistryService.isCompatible(schemaId1, schemaData3, + SchemaCompatibilityStrategy.BACKWARD).get()); + assertFalse(schemaRegistryService.isCompatible(schemaId1, schemaData3, + SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE).get()); + putSchema(schemaId1, schemaData3, version(2), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); + } + private void putSchema(String schemaId, SchemaData schema, SchemaVersion expectedVersion) throws Exception { + putSchema(schemaId, schema, expectedVersion, SchemaCompatibilityStrategy.FULL); + } + + private void putSchema(String schemaId, SchemaData schema, SchemaVersion expectedVersion, + SchemaCompatibilityStrategy strategy) throws ExecutionException, InterruptedException { CompletableFuture put = schemaRegistryService.putSchemaIfAbsent( - schemaId, schema, SchemaCompatibilityStrategy.FULL); + schemaId, schema, strategy); SchemaVersion newVersion = put.get(); assertEquals(expectedVersion, newVersion); } @@ -239,6 +293,15 @@ private SchemaData getSchema(String schemaId, SchemaVersion version) throws Exce return schemaAndVersion.schema; } + private List getAllSchemas(String schemaId) throws Exception { + List result = new ArrayList<>(); + for (CompletableFuture schema : + schemaRegistryService.getAllSchemas(schemaId).get()) { + result.add(schema.get().schema); + } + return result; + } + private void deleteSchema(String schemaId, SchemaVersion expectedVersion) throws Exception { SchemaVersion version = schemaRegistryService.deleteSchema(schemaId, userId).get(); assertEquals(expectedVersion, version); @@ -256,6 +319,10 @@ private SchemaData randomSchema() { .build(); } + private SchemaData getSchemaData(String schemaJson) { + return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).user(userId).build(); + } + private SchemaVersion version(long version) { return new LongSchemaVersion(version); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java index bc437ed651a07..7a26c65e5b519 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SchemaAutoUpdateCompatibilityStrategy.java @@ -51,5 +51,22 @@ public enum SchemaAutoUpdateCompatibilityStrategy { * Always Compatible - The new schema will not be checked for compatibility against * old schemas. In other words, new schemas will always be marked assumed compatible. */ - AlwaysCompatible + AlwaysCompatible, + + /** + * Be similar to Backward. BackwardTransitive ensure all previous version schema can + * be read by the new schema. + */ + BackwardTransitive, + + /** + * Be similar to Forward, ForwardTransitive ensure new schema can be ready by all previous + * version schema. + */ + ForwardTransitive, + + /** + * BackwardTransitive and ForwardTransitive + */ + FullTransitive }