Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to configure schema compatibility policy for system topics #12598

Merged
merged 2 commits into from Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Expand Up @@ -534,6 +534,9 @@ zookeeperSessionExpiredPolicy=shutdown
# Enable or disable system topic
systemTopicEnabled=false

# The schema compatibility strategy to use for system topics
systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE

# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false
Expand Down
Expand Up @@ -1041,6 +1041,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Enable or disable system topic.")
private boolean systemTopicEnabled = false;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The schema compatibility strategy to use for system topics"
)
private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy =
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " +
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
Expand Down Expand Up @@ -93,6 +94,8 @@ public abstract class AbstractTopic implements Topic {

// Whether messages published must be encrypted or not in this topic
protected volatile boolean isEncryptionRequired = false;

@Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
Expand Down Expand Up @@ -521,7 +524,10 @@ public void recordAddLatency(long latency, TimeUnit unit) {
}

protected void setSchemaCompatibilityStrategy(Policies policies) {
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
schemaCompatibilityStrategy =
brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
} else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = brokerService.pulsar()
.getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
Expand Down
Expand Up @@ -19,9 +19,14 @@
package org.apache.pulsar.broker.systopic;

import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
Expand All @@ -31,6 +36,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.slf4j.Logger;
Expand Down Expand Up @@ -64,6 +70,25 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSchemaCompatibility() throws Exception {
TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
.createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
String topicName = systemTopicClientForNamespace1.getTopicName().toString();
@Cleanup
Reader<byte[]> reader = pulsarClient.newReader(Schema.BYTES)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create();

PersistentTopic topic =
(PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName, false)
.join().get();

Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
}

@Test
public void testSendAndReceiveNamespaceEvents() throws Exception {
TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
Expand Down