Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] add broker config to enforce producer to publish encrypted message #8055

Merged
merged 1 commit into from Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Expand Up @@ -356,6 +356,9 @@ enableRunBookieAutoRecoveryTogether=false
# Using a value of 0, is disabling maxProducersPerTopic-limit check.
maxProducersPerTopic=0

# Enforce producer to publish encrypted messages.(default disable).
encryptionRequireOnProducer=false

# Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxConsumersPerTopic-limit check.
Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Expand Up @@ -237,6 +237,9 @@ enableNonPersistentTopics=true
# Using a value of 0, is disabling maxProducersPerTopic-limit check.
maxProducersPerTopic=0

# Enforce producer to publish encrypted messages.(default disable).
encryptionRequireOnProducer=false

# Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxConsumersPerTopic-limit check.
Expand Down
Expand Up @@ -690,6 +690,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " Using a value of 0, is disabling maxProducersPerTopic-limit check.")
private int maxProducersPerTopic = 0;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enforce producer to publish encrypted messages.(default disable).")
private boolean encryptionRequireOnProducer = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Max number of consumers allowed to connect to topic. \n\nOnce this limit reaches,"
Expand Down
Expand Up @@ -168,6 +168,7 @@ public class ServerCnx extends PulsarHandler {
private boolean preciseDispatcherFlowControl;

private boolean preciseTopicPublishRateLimitingEnable;
private boolean encryptionRequireOnProducer;

// Flag to manage throttling-rate by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
Expand Down Expand Up @@ -203,6 +204,7 @@ public ServerCnx(PulsarService pulsar) {
this.resumeReadsThreshold = maxPendingSendRequests / 2;
this.preciseDispatcherFlowControl = pulsar.getConfiguration().isPreciseDispatcherFlowControl();
this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.encryptionRequireOnProducer = pulsar.getConfiguration().isEncryptionRequireOnProducer();
}

@Override
Expand Down Expand Up @@ -1103,7 +1105,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}

// Check whether the producer will publish encrypted messages or not
if (topic.isEncryptionRequired() && !isEncrypted) {
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
Expand Down
Expand Up @@ -1347,6 +1347,41 @@ public void testProducerFailureOnEncryptionRequiredTopic() throws Exception {
channel.finish();
}

@Test(timeOut = 30000)
public void testProducerFailureOnEncryptionRequiredOnBroker() throws Exception {
// (a) Set encryption-required at broker level
pulsar.getConfig().setEncryptionRequireOnProducer(true);
resetChannel();
setChannelConnected();

// (b) Set encryption_required to false on policy
ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
Policies policies = mock(Policies.class);
// Namespace policy doesn't require encryption
policies.encryption_required = false;
policies.topicDispatchRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
policies.clusterDispatchRate = Maps.newHashMap();
doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace()));
doReturn(zkDataCache).when(configCacheService).policiesCache();

// test failure case: unencrypted producer cannot connect
ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* request id */,
"unencrypted-producer", false, null);
channel.writeInbound(clientCommand);

Object response = getResponse();
assertEquals(response.getClass(), CommandError.class);
CommandError errorResponse = (CommandError) response;
assertEquals(errorResponse.getError(), ServerError.MetadataError);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 0);

channel.finish();
}

@Test(timeOut = 30000)
public void testSendSuccessOnEncryptionRequiredTopic() throws Exception {
resetChannel();
Expand Down