Skip to content

Commit

Permalink
Check message encryption when producer connect and publish (#904)
Browse files Browse the repository at this point in the history
  • Loading branch information
nkurihar authored and merlimat committed Jan 2, 2018
1 parent 039932a commit 663e8f4
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 11 deletions.
Expand Up @@ -1533,5 +1533,51 @@ private void validatePeerClusterConflict(String clusterName, Set<String> replica
}
}

@POST
@Path("/{property}/{cluster}/{namespace}/encryptionRequired")
@ApiOperation(value = "Message encryption is required or not for all topics in a namespace")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
})
public void modifyEncryptionRequired(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, boolean encryptionRequired) {
validateAdminAccessOnProperty(property);
validatePoliciesReadOnlyAccess();

NamespaceName nsName = NamespaceName.get(property, cluster, namespace);
Entry<Policies, Stat> policiesNode = null;

try {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
policiesNode.getKey().encryption_required = encryptionRequired;

// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, property, cluster, namespace),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, property, cluster, namespace));

log.info("[{}] Successfully {} on namespace {}/{}/{}", clientAppId(),
encryptionRequired ? "true" : "false", property, cluster, namespace);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to modify encryption required status for namespace {}/{}/{}: does not exist", clientAppId(),
property, cluster, namespace);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to modify encryption required status on namespace {}/{}/{} expected policy node version={} : concurrent modification",
clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to modify encryption required status on namespace {}/{}/{}", clientAppId(), property,
cluster, namespace, e);
throw new RestException(e);
}
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
Expand Down Expand Up @@ -72,8 +73,9 @@ public class Producer {
private final boolean isRemote;
private final String remoteCluster;
private final boolean isNonPersistentTopic;
private final boolean isEncrypted;

public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId) {
public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
Expand All @@ -93,6 +95,8 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName
this.isRemote = producerName
.startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
this.remoteCluster = isRemote ? producerName.split("\\.")[2] : null;

this.isEncrypted = isEncrypted;
}

@Override
Expand Down Expand Up @@ -130,6 +134,24 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP
return;
}

if (topic.isEncryptionRequired()) {

headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();

// Check whether the message is encrypted or not
if (msgMetadata.getEncryptionKeysCount() < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted"));
cnx.completedSendOperation(isNonPersistentTopic);
});
return;
}
}

startPublishOperation();
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize));
Expand Down Expand Up @@ -440,6 +462,14 @@ public void checkPermissions() {
}
}

public void checkEncryption() {
if (topic.isEncryptionRequired() && !isEncrypted) {
log.info("[{}] [{}] Unencrypted producer is not allowed to produce from destination [{}] anymore",
producerId, producerName, topic.getName());
disconnect();
}
}

private static final Logger log = LoggerFactory.getLogger(Producer.class);

}
Expand Up @@ -469,6 +469,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
final String topicName = cmdProducer.getTopic();
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
final boolean isEncrypted = cmdProducer.getEncrypted();
authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -522,9 +523,17 @@ protected void handleProducer(final CommandProducer cmdProducer) {
return;
}

// Check whether the producer will publish encrypted messages or not
if (topic.isEncryptionRequired() && !isEncrypted) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
return;
}

disableTcpNoDelayIfNeeded(topicName, producerName);

Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole);
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole, isEncrypted);

try {
topic.addProducer(producer);
Expand Down
Expand Up @@ -105,6 +105,8 @@ CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, lo

boolean isBacklogQuotaExceeded(String producerName);

boolean isEncryptionRequired();

BacklogQuota getBacklogQuota();

void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats,
Expand Down
Expand Up @@ -129,6 +129,9 @@ protected TopicStats initialValue() {
}
};

// Whether messages published must be encrypted or not in this topic
private volatile boolean isEncryptionRequired = false;

private static class TopicStats {
public double averageMsgSize;
public double aggMsgRateIn;
Expand Down Expand Up @@ -164,6 +167,16 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
USAGE_COUNT_UPDATER.set(this, 0);

this.lastActive = System.nanoTime();

try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
isEncryptionRequired = policies.encryption_required;
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
}
}

@Override
Expand Down Expand Up @@ -845,7 +858,14 @@ public void checkGC(int gcIntervalInSeconds) {

@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producers.forEach(Producer::checkPermissions);
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
}
isEncryptionRequired = data.encryption_required;
producers.forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
return checkReplicationAndRetryOnFailure();
}
Expand All @@ -870,6 +890,11 @@ public boolean isBacklogQuotaExceeded(String producerName) {
return false;
}

@Override
public boolean isEncryptionRequired() {
return isEncryptionRequired;
}

@Override
public CompletableFuture<Void> unsubscribe(String subName) {
// No-op
Expand Down
Expand Up @@ -155,6 +155,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {

private final MessageDeduplication messageDeduplication;

// Whether messages published must be encrypted or not in this topic
private volatile boolean isEncryptionRequired = false;

private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
protected TopicStats initialValue() {
Expand Down Expand Up @@ -218,6 +221,16 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.lastActive = System.nanoTime();

this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);

try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
isEncryptionRequired = policies.encryption_required;
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
}
}

@Override
Expand Down Expand Up @@ -1327,7 +1340,14 @@ private boolean shouldTopicBeRetained() {

@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producers.forEach(Producer::checkPermissions);
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
}
isEncryptionRequired = data.encryption_required;
producers.forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
Expand Down Expand Up @@ -1373,6 +1393,11 @@ public boolean isBacklogQuotaExceeded(String producerName) {
return false;
}

@Override
public boolean isEncryptionRequired() {
return isEncryptionRequired;
}

public CompletableFuture<MessageId> terminate() {
CompletableFuture<MessageId> future = new CompletableFuture<>();
ledger.asyncTerminate(new TerminateCallback() {
Expand Down
Expand Up @@ -322,7 +322,7 @@ public void testAddRemoveProducer() throws Exception {

String role = "appid1";
// 1. simple add producer
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role);
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false);
topic.addProducer(producer);
assertEquals(topic.getProducers().size(), 1);

Expand All @@ -337,7 +337,7 @@ public void testAddRemoveProducer() throws Exception {

// 3. add producer for a different topic
PersistentTopic failTopic = new PersistentTopic(failTopicName, ledgerMock, brokerService);
Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name", role);
Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name", role, false);
try {
topic.addProducer(failProducer);
fail("should have failed");
Expand Down Expand Up @@ -480,7 +480,7 @@ public void testDeleteTopic() throws Exception {

// 2. delete topic with producer
topic = (PersistentTopic) brokerService.getTopic(successTopicName).get();
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role);
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false);
topic.addProducer(producer);

assertTrue(topic.delete().isCompletedExceptionally());
Expand Down Expand Up @@ -635,7 +635,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
try {
String role = "appid1";
Thread.sleep(10); /* delay to ensure that the delete gets executed first */
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role);
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false);
topic.addProducer(producer);
fail("Should have failed");
} catch (BrokerServiceException e) {
Expand Down

0 comments on commit 663e8f4

Please sign in to comment.