From 6232a140f13ce2e8259d654a68c8f9656746b183 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 16 May 2024 17:36:57 +0800 Subject: [PATCH] [fix][schema] Error checking schema compatibility on a schema-less topic via REST API (#22720) (cherry picked from commit 101aee4543fb66035165d8744def630f9a9c3a59) --- .../AvroSchemaBasedCompatibilityCheck.java | 6 ++-- ...rotobufNativeSchemaCompatibilityCheck.java | 4 ++- .../schema/SchemaRegistryServiceImpl.java | 2 +- .../IncompatibleSchemaException.java | 4 +++ .../broker/admin/AdminApiSchemaTest.java | 30 +++++++++++++++++++ .../service/schema/SchemaServiceTest.java | 4 +-- 6 files changed, 43 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java index 1e75834a12988..e5fc7800c5170 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java @@ -64,8 +64,10 @@ public void checkCompatible(Iterable from, SchemaData to, SchemaComp log.warn("Error during schema parsing: {}", e.getMessage()); throw new IncompatibleSchemaException(e); } catch (SchemaValidationException e) { - log.warn("Error during schema compatibility check: {}", e.getMessage()); - throw new IncompatibleSchemaException(e); + String msg = String.format("Error during schema compatibility check with strategy %s: %s: %s", + strategy, e.getClass().getName(), e.getMessage()); + log.warn(msg); + throw new IncompatibleSchemaException(msg, e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java index 16b3b33ec7894..fc935e80dca36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java @@ -67,7 +67,9 @@ public void checkCompatible(Iterable from, SchemaData to, SchemaComp private void checkRootMessageChange(Descriptor fromDescriptor, Descriptor toDescriptor, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException { if (!fromDescriptor.getFullName().equals(toDescriptor.getFullName())) { - throw new IncompatibleSchemaException("Protobuf root message isn't allow change!"); + throw new IncompatibleSchemaException("Protobuf root message change is not allowed under the '" + + strategy + "' strategy. Original message name: '" + fromDescriptor.getFullName() + + "', new message name: '" + toDescriptor.getFullName() + "'."); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index ae56df248d85d..903f57cb7803a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -473,7 +473,7 @@ private CompletableFuture checkCompatibilityWithLatest(String schemaId, Sc } return result; } else { - return FutureUtils.exception(new IncompatibleSchemaException("Do not have existing schema.")); + return CompletableFuture.completedFuture(null); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java index c1a2d9fd703fd..bbe2f4111d759 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java @@ -33,6 +33,10 @@ public IncompatibleSchemaException(String message) { super(message); } + public IncompatibleSchemaException(String message, Throwable e) { + super(message, e); + } + public IncompatibleSchemaException(Throwable e) { super(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java index f67bd6fcfce5b..34d7dbeb8183c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java @@ -467,4 +467,34 @@ public void testCompatibility() throws Exception { assertTrue(e.getMessage().contains("Incompatible schema: exists schema type STRING, new schema type INT8")); } } + + @Test + public void testCompatibilityWithEmpty() throws Exception { + List> checkSchemas = List.of( + Schema.STRING, + Schema.JSON(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()), + Schema.AVRO(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()), + Schema.KeyValue(Schema.STRING, Schema.STRING) + ); + for (Schema schema : checkSchemas) { + SchemaInfo schemaInfo = schema.getSchemaInfo(); + String topicName = schemaCompatibilityNamespace + "/testCompatibilityWithEmpty"; + PostSchemaPayload postSchemaPayload = new PostSchemaPayload(schemaInfo.getType().toString(), + schemaInfo.getSchemaDefinition(), new HashMap<>()); + + // check compatibility with empty schema + IsCompatibilityResponse isCompatibilityResponse = + admin.schemas().testCompatibility(topicName, postSchemaPayload); + assertTrue(isCompatibilityResponse.isCompatibility()); + assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL.name()); + + // set schema compatibility strategy is FULL_TRANSITIVE to cover checkCompatibilityWithAll + admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, SchemaCompatibilityStrategy.FULL_TRANSITIVE); + isCompatibilityResponse = admin.schemas().testCompatibility(topicName, postSchemaPayload); + assertTrue(isCompatibilityResponse.isCompatibility()); + assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL_TRANSITIVE.name()); + // set back to FULL + admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, SchemaCompatibilityStrategy.FULL); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index 2d8b610e04d58..3954920b9d001 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service.schema; -import static org.testng.Assert.assertThrows; import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; @@ -47,7 +46,6 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -406,7 +404,7 @@ public void testKeyValueSchema() throws Exception { .build(), SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new byte[0]) .build(), KeyValueEncodingType.SEPARATED); - assertThrows(PulsarAdminException.ServerSideErrorException.class, () -> admin.schemas().testCompatibility(topicName, schemaInfo)); + Assert.assertTrue(admin.schemas().testCompatibility(topicName, schemaInfo).isCompatibility()); admin.schemas().createSchema(topicName, schemaInfo); final IsCompatibilityResponse isCompatibilityResponse = admin.schemas().testCompatibility(topicName, schemaInfo);