From 1aecf92ea2cbe1b8e15785257af8d6ee84499489 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 3 Nov 2021 22:10:39 -0700 Subject: [PATCH] Allow to configure schema compatibility policy for system topics (#12598) (cherry picked from commit 7aea58d293ba2ca29e0acbf4cfd5733d84846120) --- conf/broker.conf | 3 +++ .../pulsar/broker/ServiceConfiguration.java | 7 ++++++ .../pulsar/broker/service/AbstractTopic.java | 8 +++++- ...NamespaceEventsSystemTopicServiceTest.java | 25 +++++++++++++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index 2d7df90c611ef..8e518abc28cfd 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -534,6 +534,9 @@ zookeeperSessionExpiredPolicy=shutdown # Enable or disable system topic systemTopicEnabled=false +# The schema compatibility strategy to use for system topics +systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE + # Enable or disable topic level policies, topic level policies depends on the system topic # Please enable the system topic first. topicLevelPoliciesEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0d578f4da369c..ce8ab98c7be67 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1043,6 +1043,13 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Enable or disable system topic.") private boolean systemTopicEnabled = false; + @FieldContext( + category = CATEGORY_SCHEMA, + doc = "The schema compatibility strategy to use for system topics" + ) + private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy = + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE; + @FieldContext( category = CATEGORY_SERVER, doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 183fb05acb2d8..be4f904c135d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -47,6 +47,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; @@ -92,6 +93,8 @@ public abstract class AbstractTopic implements Topic { // Whether messages published must be encrypted or not in this topic protected volatile boolean isEncryptionRequired = false; + + @Getter protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; protected volatile Boolean isAllowAutoUpdateSchema; @@ -537,7 +540,10 @@ public void recordAddLatency(long latency, TimeUnit unit) { } protected void setSchemaCompatibilityStrategy(Policies policies) { - if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { + if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) { + schemaCompatibilityStrategy = + brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy(); + } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { schemaCompatibilityStrategy = brokerService.pulsar() .getConfig().getSchemaCompatibilityStrategy(); if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java index b524e1a2148c6..2daca67520320 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -19,9 +19,14 @@ package org.apache.pulsar.broker.systopic; import com.google.common.collect.Sets; +import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.EventType; import org.apache.pulsar.common.events.EventsTopicNames; @@ -31,6 +36,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.slf4j.Logger; @@ -64,6 +70,25 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test + public void testSchemaCompatibility() throws Exception { + TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory + .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1)); + String topicName = systemTopicClientForNamespace1.getTopicName().toString(); + @Cleanup + Reader reader = pulsarClient.newReader(Schema.BYTES) + .topic(topicName) + .startMessageId(MessageId.earliest) + .create(); + + PersistentTopic topic = + (PersistentTopic) pulsar.getBrokerService() + .getTopic(topicName, false) + .join().get(); + + Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy()); + } + @Test public void testSendAndReceiveNamespaceEvents() throws Exception { TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory