Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Schema] Schema compatibility strategy in broker level. #11856

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,13 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
# if you enable this setting, it will cause non-java clients failed to produce.
isSchemaValidationEnforced=false

# The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`,
# broker will use it in broker level. If schemaCompatibilityStrategy is `UNDEFINED` will use `FULL`.
# SchemaCompatibilityStrategy : UNDEFINED, ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
# FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
# default : UNDEFINED
schemaCompatibilityStrategy=

### --- Ledger Offloading --- ###

# The directory for all the offloader implementations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
Expand Down Expand Up @@ -1940,11 +1941,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " if you enable this setting, it will cause non-java clients failed to produce."
)
private boolean isSchemaValidationEnforced = false;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The schema storage implementation used by this broker"
)
private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The list compatibility checkers to be used in schema registry"
Expand All @@ -1955,6 +1958,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
"org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck"
);

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`"
+ ", schema compatibility strategy check will use it in broker level."
)
private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.UNDEFINED;

/**** --- WebSocket --- ****/
@FieldContext(
category = CATEGORY_WEBSOCKET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,11 @@ public void recordAddLatency(long latency, TimeUnit unit) {

protected void setSchemaCompatibilityStrategy (Policies policies) {
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
schemaCompatibilityStrategy = brokerService.pulsar()
.getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL;
}
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -78,6 +79,30 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSchemaCompatibilityStrategyInBrokerLevel() throws PulsarClientException {
conf.setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);

String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
"testSchemaCompatibilityStrategyInBrokerLevel"
).toString();

pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonOne>builder().
withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build()))
.topic(topicName)
.create();

ProducerBuilder<Schemas.PersonThree> producerBuilder = pulsarClient.newProducer(Schema.AVRO(SchemaDefinition
.<Schemas.PersonThree>builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build()))
.topic(topicName);

Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create);
assertTrue(t.getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
}

@Test
public void structTypeProducerProducerUndefinedCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
Expand Down