diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java index b661658139981..5da4ca2da46af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java @@ -1533,5 +1533,51 @@ private void validatePeerClusterConflict(String clusterName, Set replica } } + @POST + @Path("/{property}/{cluster}/{namespace}/encryptionRequired") + @ApiOperation(value = "Message encryption is required or not for all topics in a namespace") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + }) + public void modifyEncryptionRequired(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, boolean encryptionRequired) { + validateAdminAccessOnProperty(property); + validatePoliciesReadOnlyAccess(); + + NamespaceName nsName = NamespaceName.get(property, cluster, namespace); + Entry policiesNode = null; + + try { + // Force to read the data s.t. the watch to the cache content is setup. + policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace)) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist")); + policiesNode.getKey().encryption_required = encryptionRequired; + + // Write back the new policies into zookeeper + globalZk().setData(path(POLICIES, property, cluster, namespace), + jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion()); + policiesCache().invalidate(path(POLICIES, property, cluster, namespace)); + + log.info("[{}] Successfully {} on namespace {}/{}/{}", clientAppId(), + encryptionRequired ? "true" : "false", property, cluster, namespace); + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to modify encryption required status for namespace {}/{}/{}: does not exist", clientAppId(), + property, cluster, namespace); + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (KeeperException.BadVersionException e) { + log.warn( + "[{}] Failed to modify encryption required status on namespace {}/{}/{} expected policy node version={} : concurrent modification", + clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion()); + + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (Exception e) { + log.error("[{}] Failed to modify encryption required status on namespace {}/{}/{}", clientAppId(), property, + cluster, namespace, e); + throw new RestException(e); + } + } + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 53df5b0da0b4d..4c7bf8297fb80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats; @@ -72,8 +73,9 @@ public class Producer { private final boolean isRemote; private final String remoteCluster; private final boolean isNonPersistentTopic; + private final boolean isEncrypted; - public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId) { + public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted) { this.topic = topic; this.cnx = cnx; this.producerId = producerId; @@ -93,6 +95,8 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName this.isRemote = producerName .startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix()); this.remoteCluster = isRemote ? producerName.split("\\.")[2] : null; + + this.isEncrypted = isEncrypted; } @Override @@ -130,6 +134,24 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP return; } + if (topic.isEncryptionRequired()) { + + headersAndPayload.markReaderIndex(); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.resetReaderIndex(); + + // Check whether the message is encrypted or not + if (msgMetadata.getEncryptionKeysCount() < 1) { + log.warn("[{}] Messages must be encrypted", getTopic().getName()); + cnx.ctx().channel().eventLoop().execute(() -> { + cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError, + "Messages must be encrypted")); + cnx.completedSendOperation(isNonPersistentTopic); + }); + return; + } + } + startPublishOperation(); topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize)); @@ -440,6 +462,14 @@ public void checkPermissions() { } } + public void checkEncryption() { + if (topic.isEncryptionRequired() && !isEncrypted) { + log.info("[{}] [{}] Unencrypted producer is not allowed to produce from destination [{}] anymore", + producerId, producerName, topic.getName()); + disconnect(); + } + } + private static final Logger log = LoggerFactory.getLogger(Producer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2a407b125993d..aaf86a7eecc12 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -469,6 +469,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { final String topicName = cmdProducer.getTopic(); final long producerId = cmdProducer.getProducerId(); final long requestId = cmdProducer.getRequestId(); + final boolean isEncrypted = cmdProducer.getEncrypted(); authorizationFuture.thenApply(isAuthorized -> { if (isAuthorized) { if (log.isDebugEnabled()) { @@ -522,9 +523,17 @@ protected void handleProducer(final CommandProducer cmdProducer) { return; } + // Check whether the producer will publish encrypted messages or not + if (topic.isEncryptionRequired() && !isEncrypted) { + String msg = String.format("Encryption is required in %s", topicName); + log.warn("[{}] {}", remoteAddress, msg); + ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg)); + return; + } + disableTcpNoDelayIfNeeded(topicName, producerName); - Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole); + Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole, isEncrypted); try { topic.addProducer(producer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index b7b8badff0262..f3e0c605d0eca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -105,6 +105,8 @@ CompletableFuture subscribe(ServerCnx cnx, String subscriptionName, lo boolean isBacklogQuotaExceeded(String producerName); + boolean isEncryptionRequired(); + BacklogQuota getBacklogQuota(); void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index bd59e43730f74..7c0bb9484b3cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -129,6 +129,9 @@ protected TopicStats initialValue() { } }; + // Whether messages published must be encrypted or not in this topic + private volatile boolean isEncryptionRequired = false; + private static class TopicStats { public double averageMsgSize; public double aggMsgRateIn; @@ -164,6 +167,16 @@ public NonPersistentTopic(String topic, BrokerService brokerService) { USAGE_COUNT_UPDATER.set(this, 0); this.lastActive = System.nanoTime(); + + try { + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace())) + .orElseThrow(() -> new KeeperException.NoNodeException()); + isEncryptionRequired = policies.encryption_required; + } catch (Exception e) { + log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage()); + isEncryptionRequired = false; + } } @Override @@ -845,7 +858,14 @@ public void checkGC(int gcIntervalInSeconds) { @Override public CompletableFuture onPoliciesUpdate(Policies data) { - producers.forEach(Producer::checkPermissions); + if (log.isDebugEnabled()) { + log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); + } + isEncryptionRequired = data.encryption_required; + producers.forEach(producer -> { + producer.checkPermissions(); + producer.checkEncryption(); + }); subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions)); return checkReplicationAndRetryOnFailure(); } @@ -870,6 +890,11 @@ public boolean isBacklogQuotaExceeded(String producerName) { return false; } + @Override + public boolean isEncryptionRequired() { + return isEncryptionRequired; + } + @Override public CompletableFuture unsubscribe(String subName) { // No-op diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7297dacf16b71..08f8268c90905 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -155,6 +155,9 @@ public class PersistentTopic implements Topic, AddEntryCallback { private final MessageDeduplication messageDeduplication; + // Whether messages published must be encrypted or not in this topic + private volatile boolean isEncryptionRequired = false; + private static final FastThreadLocal threadLocalTopicStats = new FastThreadLocal() { @Override protected TopicStats initialValue() { @@ -218,6 +221,16 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.lastActive = System.nanoTime(); this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger); + + try { + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace())) + .orElseThrow(() -> new KeeperException.NoNodeException()); + isEncryptionRequired = policies.encryption_required; + } catch (Exception e) { + log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage()); + isEncryptionRequired = false; + } } @Override @@ -1327,7 +1340,14 @@ private boolean shouldTopicBeRetained() { @Override public CompletableFuture onPoliciesUpdate(Policies data) { - producers.forEach(Producer::checkPermissions); + if (log.isDebugEnabled()) { + log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); + } + isEncryptionRequired = data.encryption_required; + producers.forEach(producer -> { + producer.checkPermissions(); + producer.checkEncryption(); + }); subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions)); checkMessageExpiry(); CompletableFuture replicationFuture = checkReplicationAndRetryOnFailure(); @@ -1373,6 +1393,11 @@ public boolean isBacklogQuotaExceeded(String producerName) { return false; } + @Override + public boolean isEncryptionRequired() { + return isEncryptionRequired; + } + public CompletableFuture terminate() { CompletableFuture future = new CompletableFuture<>(); ledger.asyncTerminate(new TerminateCallback() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index fe264e967e9ef..bb2a0bcc261a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -322,7 +322,7 @@ public void testAddRemoveProducer() throws Exception { String role = "appid1"; // 1. simple add producer - Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role); + Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false); topic.addProducer(producer); assertEquals(topic.getProducers().size(), 1); @@ -337,7 +337,7 @@ public void testAddRemoveProducer() throws Exception { // 3. add producer for a different topic PersistentTopic failTopic = new PersistentTopic(failTopicName, ledgerMock, brokerService); - Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name", role); + Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name", role, false); try { topic.addProducer(failProducer); fail("should have failed"); @@ -480,7 +480,7 @@ public void testDeleteTopic() throws Exception { // 2. delete topic with producer topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); - Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role); + Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false); topic.addProducer(producer); assertTrue(topic.delete().isCompletedExceptionally()); @@ -635,7 +635,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { try { String role = "appid1"; Thread.sleep(10); /* delay to ensure that the delete gets executed first */ - Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role); + Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false); topic.addProducer(producer); fail("Should have failed"); } catch (BrokerServiceException e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 269aaf50d5b90..4a932b1338d83 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.matches; @@ -43,6 +44,8 @@ import javax.naming.AuthenticationException; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -56,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationManager; @@ -76,7 +80,9 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandError; import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess; +import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; @@ -121,6 +127,7 @@ public class ServerCnxTest { private final String successTopicName = "persistent://prop/use/ns-abc/successTopic"; private final String failTopicName = "persistent://prop/use/ns-abc/failTopic"; private final String nonOwnedTopicName = "persistent://prop/use/ns-abc/success-not-owned-topic"; + private final String encryptionRequiredTopicName = "persistent://prop/use/ns-abc/successEncryptionRequiredTopic"; private final String successSubName = "successSub"; private final String nonExistentTopicName = "persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic"; private final String topicWithNonLocalCluster = "persistent://prop/usw/ns-abc/successTopic"; @@ -1195,6 +1202,133 @@ public void testFlowCommand() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception { + resetChannel(); + setChannelConnected(); + + // Set encryption_required to true + ZooKeeperDataCache zkDataCache = mock(ZooKeeperDataCache.class); + Policies policies = mock(Policies.class); + policies.encryption_required = true; + policies.clusterDispatchRate = Maps.newHashMap(); + doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, DestinationName.get(encryptionRequiredTopicName).getNamespace())); + doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, DestinationName.get(encryptionRequiredTopicName).getNamespace())); + doReturn(zkDataCache).when(configCacheService).policiesCache(); + + // test success case: encrypted producer can connect + ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */, + "encrypted-producer", true); + channel.writeInbound(clientCommand); + + Object response = getResponse(); + assertEquals(response.getClass(), CommandProducerSuccess.class); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName); + assertNotNull(topicRef); + assertEquals(topicRef.getProducers().size(), 1); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void testProducerFailureOnEncryptionRequiredTopic() throws Exception { + resetChannel(); + setChannelConnected(); + + // Set encryption_required to true + ZooKeeperDataCache zkDataCache = mock(ZooKeeperDataCache.class); + Policies policies = mock(Policies.class); + policies.encryption_required = true; + policies.clusterDispatchRate = Maps.newHashMap(); + doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, DestinationName.get(encryptionRequiredTopicName).getNamespace())); + doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, DestinationName.get(encryptionRequiredTopicName).getNamespace())); + doReturn(zkDataCache).when(configCacheService).policiesCache(); + + // test failure case: unencrypted producer cannot connect + ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* request id */, + "unencrypted-producer", false); + channel.writeInbound(clientCommand); + + Object response = getResponse(); + assertEquals(response.getClass(), CommandError.class); + CommandError errorResponse = (CommandError) response; + assertEquals(errorResponse.getError(), ServerError.MetadataError); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName); + assertNotNull(topicRef); + assertEquals(topicRef.getProducers().size(), 0); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void testSendSuccessOnEncryptionRequiredTopic() throws Exception { + resetChannel(); + setChannelConnected(); + + // Set encryption_required to true + ZooKeeperDataCache zkDataCache = mock(ZooKeeperDataCache.class); + Policies policies = mock(Policies.class); + policies.encryption_required = true; + policies.clusterDispatchRate = Maps.newHashMap(); + doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, DestinationName.get(encryptionRequiredTopicName).getNamespace())); + doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, DestinationName.get(encryptionRequiredTopicName).getNamespace())); + doReturn(zkDataCache).when(configCacheService).policiesCache(); + + ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */, + "prod-name", true); + channel.writeInbound(clientCommand); + assertTrue(getResponse() instanceof CommandProducerSuccess); + + // test success case: encrypted messages can be published + MessageMetadata messageMetadata = MessageMetadata.newBuilder() + .setPublishTime(System.currentTimeMillis()) + .setProducerName("prod-name") + .setSequenceId(0) + .addEncryptionKeys(EncryptionKeys.newBuilder().setKey("testKey").setValue(ByteString.copyFrom("testVal".getBytes()))) + .build(); + ByteBuf data = Unpooled.buffer(1024); + + clientCommand = Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data); + channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); + clientCommand.release(); + assertTrue(getResponse() instanceof CommandSendReceipt); + channel.finish(); + } + + @Test(timeOut = 30000) + public void testSendFailureOnEncryptionRequiredTopic() throws Exception { + resetChannel(); + setChannelConnected(); + + // Set encryption_required to true + ZooKeeperDataCache zkDataCache = mock(ZooKeeperDataCache.class); + Policies policies = mock(Policies.class); + policies.encryption_required = true; + policies.clusterDispatchRate = Maps.newHashMap(); + doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, DestinationName.get(encryptionRequiredTopicName).getNamespace())); + doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, DestinationName.get(encryptionRequiredTopicName).getNamespace())); + doReturn(zkDataCache).when(configCacheService).policiesCache(); + + ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */, + "prod-name", true); + channel.writeInbound(clientCommand); + assertTrue(getResponse() instanceof CommandProducerSuccess); + + // test failure case: unencrypted messages cannot be published + MessageMetadata messageMetadata = MessageMetadata.newBuilder() + .setPublishTime(System.currentTimeMillis()) + .setProducerName("prod-name") + .setSequenceId(0) + .build(); + ByteBuf data = Unpooled.buffer(1024); + + clientCommand = Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data); + channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); + clientCommand.release(); + assertTrue(getResponse() instanceof CommandSendError); + channel.finish(); + } + private void resetChannel() throws Exception { int MaxMessageSize = 5 * 1024 * 1024; if (channel != null && channel.isActive()) { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index afc7d1db1f241..edd8c5ce73ed0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -810,4 +810,29 @@ void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle, * @throws PulsarAdminException */ void unsubscribeNamespaceBundle(String namespace, String bundle, String subscription) throws PulsarAdminException; + + /** + * Set the encryption required status for all topics within a namespace. + *

+ * When encryption required is true, the broker will prevent to store unencrypted messages. + *

+ * Request example: + * + *

+     * true
+     * 
+ * + * @param namespace + * Namespace name + * @param encryptionRequired + * whether message encryption is required or not + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 9b7eac68e0786..9f6467950cf58 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -459,4 +459,15 @@ public void unsubscribeNamespaceBundle(String namespace, String bundle, String s throw getApiException(e); } } + + @Override + public void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()).path("encryptionRequired")) + .post(Entity.entity(encryptionRequired, MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 9977e635fdab9..9acf3badee377 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -512,6 +512,28 @@ void run() throws Exception { } + @Parameters(commandDescription = "Enable or disable message encryption required for a namespace") + private class SetEncryptionRequired extends CliCommand { + @Parameter(description = "property/cluster/namespace", required = true) + private java.util.List params; + + @Parameter(names = { "--enable", "-e" }, description = "Enable message encryption required") + private boolean enable = false; + + @Parameter(names = { "--disable", "-d" }, description = "Disable message encryption required") + private boolean disable = false; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + + if (enable == disable) { + throw new ParameterException("Need to specify either --enable or --disable"); + } + admin.namespaces().setEncryptionRequiredStatus(namespace, enable); + } + } + private static long validateSizeString(String s) { char last = s.charAt(s.length() - 1); String subStr = s.substring(0, s.length() - 1); @@ -599,5 +621,7 @@ public CmdNamespaces(PulsarAdmin admin) { jcommander.addCommand("clear-backlog", new ClearBacklog()); jcommander.addCommand("unsubscribe", new Unsubscribe()); + + jcommander.addCommand("set-encryption-required", new SetEncryptionRequired()); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b26108c156c51..b0f45d9be5eef 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -800,7 +800,7 @@ void connectionOpened(final ClientCnx cnx) { long requestId = client.newRequestId(); - cnx.sendRequestWithId(Commands.newProducer(topic, producerId, requestId, producerName), requestId) + cnx.sendRequestWithId(Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled()), requestId) .thenAccept(pair -> { String producerName = pair.getLeft(); long lastSequenceId = pair.getRight(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 1ea16d7fbcbd9..aea9abbc6cc3c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -401,6 +401,10 @@ public static ByteBuf newCloseProducer(long producerId, long requestId) { } public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName) { + return newProducer(topic, producerId, requestId, producerName, false); + } + + public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, boolean encrypted) { CommandProducer.Builder producerBuilder = CommandProducer.newBuilder(); producerBuilder.setTopic(topic); producerBuilder.setProducerId(producerId); @@ -408,6 +412,7 @@ public static ByteBuf newProducer(String topic, long producerId, long requestId, if (producerName != null) { producerBuilder.setProducerName(producerName); } + producerBuilder.setEncrypted(encrypted); CommandProducer producer = producerBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.PRODUCER).setProducer(producer)); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index a3ae02ac62e98..b7cf8a4e52a2a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -8665,6 +8665,10 @@ public interface CommandProducerOrBuilder // optional string producer_name = 4; boolean hasProducerName(); String getProducerName(); + + // optional bool encrypted = 5 [default = false]; + boolean hasEncrypted(); + boolean getEncrypted(); } public static final class CommandProducer extends com.google.protobuf.GeneratedMessageLite @@ -8787,11 +8791,22 @@ private com.google.protobuf.ByteString getProducerNameBytes() { } } + // optional bool encrypted = 5 [default = false]; + public static final int ENCRYPTED_FIELD_NUMBER = 5; + private boolean encrypted_; + public boolean hasEncrypted() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public boolean getEncrypted() { + return encrypted_; + } + private void initFields() { topic_ = ""; producerId_ = 0L; requestId_ = 0L; producerName_ = ""; + encrypted_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -8834,6 +8849,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(4, getProducerNameBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBool(5, encrypted_); + } } private int memoizedSerializedSize = -1; @@ -8858,6 +8876,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeBytesSize(4, getProducerNameBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(5, encrypted_); + } memoizedSerializedSize = size; return size; } @@ -8979,6 +9001,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000004); producerName_ = ""; bitField0_ = (bitField0_ & ~0x00000008); + encrypted_ = false; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -9028,6 +9052,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer buildPartial to_bitField0_ |= 0x00000008; } result.producerName_ = producerName_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.encrypted_ = encrypted_; result.bitField0_ = to_bitField0_; return result; } @@ -9046,6 +9074,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandPro if (other.hasProducerName()) { setProducerName(other.getProducerName()); } + if (other.hasEncrypted()) { + setEncrypted(other.getEncrypted()); + } return this; } @@ -9107,6 +9138,11 @@ public Builder mergeFrom( producerName_ = input.readBytes(); break; } + case 40: { + bitField0_ |= 0x00000010; + encrypted_ = input.readBool(); + break; + } } } } @@ -9227,6 +9263,27 @@ void setProducerName(com.google.protobuf.ByteString value) { } + // optional bool encrypted = 5 [default = false]; + private boolean encrypted_ ; + public boolean hasEncrypted() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public boolean getEncrypted() { + return encrypted_; + } + public Builder setEncrypted(boolean value) { + bitField0_ |= 0x00000010; + encrypted_ = value; + + return this; + } + public Builder clearEncrypted() { + bitField0_ = (bitField0_ & ~0x00000010); + encrypted_ = false; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandProducer) } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 3a598d6d16137..e2b7b059f7c9b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -46,6 +46,8 @@ public class Policies { public static final String FIRST_BOUNDARY = "0x00000000"; public static final String LAST_BOUNDARY = "0xffffffff"; + public boolean encryption_required = false; + @Override public boolean equals(Object obj) { if (obj instanceof Policies) { @@ -58,7 +60,8 @@ public boolean equals(Object obj) { && Objects.equals(persistence, other.persistence) && Objects.equals(bundles, other.bundles) && Objects.equals(latency_stats_sample_rate, other.latency_stats_sample_rate) && message_ttl_in_seconds == other.message_ttl_in_seconds - && Objects.equals(retention_policies, other.retention_policies); + && Objects.equals(retention_policies, other.retention_policies) + && Objects.equals(encryption_required, other.encryption_required); } return false; @@ -82,7 +85,8 @@ public String toString() { .add("clusterDispatchRate", clusterDispatchRate) .add("latency_stats_sample_rate", latency_stats_sample_rate) .add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies) - .add("deleted", deleted).toString(); + .add("deleted", deleted) + .add("encryption_required", encryption_required).toString(); } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 5bc2f39fe5ea4..b94b83afa8770 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -230,6 +230,8 @@ message CommandProducer { /// If a producer name is specified, the name will be used, /// otherwise the broker will generate a unique name optional string producer_name = 4; + + optional bool encrypted = 5 [default = false]; } message CommandSend {