Skip to content

Commit

Permalink
Allow to configure schema compatibility policy for system topics (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored and eolivelli committed Nov 29, 2021
1 parent 25ff807 commit c90109f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 1 deletion.
3 changes: 3 additions & 0 deletions conf/broker.conf
Expand Up @@ -534,6 +534,9 @@ zookeeperSessionExpiredPolicy=shutdown
# Enable or disable system topic
systemTopicEnabled=false

# The schema compatibility strategy to use for system topics
systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE

# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false
Expand Down
Expand Up @@ -1041,6 +1041,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Enable or disable system topic.")
private boolean systemTopicEnabled = false;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The schema compatibility strategy to use for system topics"
)
private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy =
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " +
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
Expand Down Expand Up @@ -93,6 +94,8 @@ public abstract class AbstractTopic implements Topic {

// Whether messages published must be encrypted or not in this topic
protected volatile boolean isEncryptionRequired = false;

@Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
Expand Down Expand Up @@ -521,7 +524,10 @@ public void recordAddLatency(long latency, TimeUnit unit) {
}

protected void setSchemaCompatibilityStrategy(Policies policies) {
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
schemaCompatibilityStrategy =
brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
} else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = brokerService.pulsar()
.getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
Expand Down
Expand Up @@ -19,9 +19,14 @@
package org.apache.pulsar.broker.systopic;

import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
Expand All @@ -31,6 +36,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.slf4j.Logger;
Expand Down Expand Up @@ -64,6 +70,25 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSchemaCompatibility() throws Exception {
TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
.createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
String topicName = systemTopicClientForNamespace1.getTopicName().toString();
@Cleanup
Reader<byte[]> reader = pulsarClient.newReader(Schema.BYTES)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create();

PersistentTopic topic =
(PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName, false)
.join().get();

Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
}

@Test
public void testSendAndReceiveNamespaceEvents() throws Exception {
TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
Expand Down

0 comments on commit c90109f

Please sign in to comment.