From c90109f5673327fd5ff5f2508421cd22937b1b0c Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 3 Nov 2021 22:10:39 -0700 Subject: [PATCH] Allow to configure schema compatibility policy for system topics (#12598) --- conf/broker.conf | 3 +++ .../pulsar/broker/ServiceConfiguration.java | 7 ++++++ .../pulsar/broker/service/AbstractTopic.java | 8 +++++- ...NamespaceEventsSystemTopicServiceTest.java | 25 +++++++++++++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index bd37eb9f9a1134..13e955bdee53da 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -534,6 +534,9 @@ zookeeperSessionExpiredPolicy=shutdown # Enable or disable system topic systemTopicEnabled=false +# The schema compatibility strategy to use for system topics +systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE + # Enable or disable topic level policies, topic level policies depends on the system topic # Please enable the system topic first. topicLevelPoliciesEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a6205c3c580bd5..832a389e58bb4c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1041,6 +1041,13 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Enable or disable system topic.") private boolean systemTopicEnabled = false; + @FieldContext( + category = CATEGORY_SCHEMA, + doc = "The schema compatibility strategy to use for system topics" + ) + private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy = + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE; + @FieldContext( category = CATEGORY_SERVER, doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 951d6025eb51b3..edf922e3fe7e53 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; @@ -93,6 +94,8 @@ public abstract class AbstractTopic implements Topic { // Whether messages published must be encrypted or not in this topic protected volatile boolean isEncryptionRequired = false; + + @Getter protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; protected volatile boolean isAllowAutoUpdateSchema = true; @@ -521,7 +524,10 @@ public void recordAddLatency(long latency, TimeUnit unit) { } protected void setSchemaCompatibilityStrategy(Policies policies) { - if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { + if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) { + schemaCompatibilityStrategy = + brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy(); + } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { schemaCompatibilityStrategy = brokerService.pulsar() .getConfig().getSchemaCompatibilityStrategy(); if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java index b524e1a2148c6c..2daca675203207 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -19,9 +19,14 @@ package org.apache.pulsar.broker.systopic; import com.google.common.collect.Sets; +import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.EventType; import org.apache.pulsar.common.events.EventsTopicNames; @@ -31,6 +36,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.slf4j.Logger; @@ -64,6 +70,25 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test + public void testSchemaCompatibility() throws Exception { + TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory + .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1)); + String topicName = systemTopicClientForNamespace1.getTopicName().toString(); + @Cleanup + Reader reader = pulsarClient.newReader(Schema.BYTES) + .topic(topicName) + .startMessageId(MessageId.earliest) + .create(); + + PersistentTopic topic = + (PersistentTopic) pulsar.getBrokerService() + .getTopic(topicName, false) + .join().get(); + + Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy()); + } + @Test public void testSendAndReceiveNamespaceEvents() throws Exception { TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory