Skip to content

Commit

Permalink
[fix][schema] Error checking schema compatibility on a schema-less to…
Browse files Browse the repository at this point in the history
…pic via REST API (apache#22720)
  • Loading branch information
shibd committed May 16, 2024
1 parent f3e52b5 commit 101aee4
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaComp
log.warn("Error during schema parsing: {}", e.getMessage());
throw new IncompatibleSchemaException(e);
} catch (SchemaValidationException e) {
log.warn("Error during schema compatibility check: {}", e.getMessage());
throw new IncompatibleSchemaException(e);
String msg = String.format("Error during schema compatibility check with strategy %s: %s: %s",
strategy, e.getClass().getName(), e.getMessage());
log.warn(msg);
throw new IncompatibleSchemaException(msg, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaComp
private void checkRootMessageChange(Descriptor fromDescriptor, Descriptor toDescriptor,
SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
if (!fromDescriptor.getFullName().equals(toDescriptor.getFullName())) {
throw new IncompatibleSchemaException("Protobuf root message isn't allow change!");
throw new IncompatibleSchemaException("Protobuf root message change is not allowed under the '"
+ strategy + "' strategy. Original message name: '" + fromDescriptor.getFullName()
+ "', new message name: '" + toDescriptor.getFullName() + "'.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ private CompletableFuture<Void> checkCompatibilityWithLatest(String schemaId, Sc
}
return result;
} else {
return FutureUtils.exception(new IncompatibleSchemaException("Do not have existing schema."));
return CompletableFuture.completedFuture(null);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public IncompatibleSchemaException(String message) {
super(message);
}

public IncompatibleSchemaException(String message, Throwable e) {
super(message, e);
}

public IncompatibleSchemaException(Throwable e) {
super(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,4 +467,34 @@ public void testCompatibility() throws Exception {
assertTrue(e.getMessage().contains("Incompatible schema: exists schema type STRING, new schema type INT8"));
}
}

@Test
public void testCompatibilityWithEmpty() throws Exception {
List<Schema<?>> checkSchemas = List.of(
Schema.STRING,
Schema.JSON(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()),
Schema.AVRO(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()),
Schema.KeyValue(Schema.STRING, Schema.STRING)
);
for (Schema<?> schema : checkSchemas) {
SchemaInfo schemaInfo = schema.getSchemaInfo();
String topicName = schemaCompatibilityNamespace + "/testCompatibilityWithEmpty";
PostSchemaPayload postSchemaPayload = new PostSchemaPayload(schemaInfo.getType().toString(),
schemaInfo.getSchemaDefinition(), new HashMap<>());

// check compatibility with empty schema
IsCompatibilityResponse isCompatibilityResponse =
admin.schemas().testCompatibility(topicName, postSchemaPayload);
assertTrue(isCompatibilityResponse.isCompatibility());
assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL.name());

// set schema compatibility strategy is FULL_TRANSITIVE to cover checkCompatibilityWithAll
admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, SchemaCompatibilityStrategy.FULL_TRANSITIVE);
isCompatibilityResponse = admin.schemas().testCompatibility(topicName, postSchemaPayload);
assertTrue(isCompatibilityResponse.isCompatibility());
assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL_TRANSITIVE.name());
// set back to FULL
admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, SchemaCompatibilityStrategy.FULL);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
Expand Down Expand Up @@ -48,7 +47,6 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
Expand Down Expand Up @@ -407,7 +405,7 @@ public void testKeyValueSchema() throws Exception {
.build(),
SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new byte[0])
.build(), KeyValueEncodingType.SEPARATED);
assertThrows(PulsarAdminException.ServerSideErrorException.class, () -> admin.schemas().testCompatibility(topicName, schemaInfo));
Assert.assertTrue(admin.schemas().testCompatibility(topicName, schemaInfo).isCompatibility());
admin.schemas().createSchema(topicName, schemaInfo);

final IsCompatibilityResponse isCompatibilityResponse = admin.schemas().testCompatibility(topicName, schemaInfo);
Expand Down

0 comments on commit 101aee4

Please sign in to comment.