Skip to content

Commit

Permalink
[improve][sec] Support for Elliptic Curve Cryptography (EC, ECC) (cer…
Browse files Browse the repository at this point in the history
…tificates/private keys) (apache#21621)
  • Loading branch information
mattisonchao committed Nov 27, 2023
1 parent 8dac8a5 commit e1d06b5
Show file tree
Hide file tree
Showing 36 changed files with 736 additions and 6 deletions.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,8 @@ flexible messaging model and an intuitive client API.</description>
<exclude>**/*.crt</exclude>
<exclude>**/*.key</exclude>
<exclude>**/*.csr</exclude>
<exclude>**/*.srl</exclude>
<exclude>**/*.txt</exclude>
<exclude>**/*.pem</exclude>
<exclude>**/*.json</exclude>
<exclude>**/*.htpasswd</exclude>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.security.tls;

import static org.apache.pulsar.utils.ResourceUtils.getAbsolutePath;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;


public abstract class MockedPulsarStandalone implements AutoCloseable {

@Getter
private final ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
private PulsarTestContext pulsarTestContext;

@Getter
private PulsarService pulsarService;
private PulsarAdmin serviceInternalAdmin;


{
serviceConfiguration.setClusterName(TEST_CLUSTER_NAME);
serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
serviceConfiguration.setBrokerServicePort(Optional.of(0));
serviceConfiguration.setBrokerServicePortTls(Optional.of(0));
serviceConfiguration.setAdvertisedAddress("localhost");
serviceConfiguration.setWebServicePort(Optional.of(0));
serviceConfiguration.setWebServicePortTls(Optional.of(0));
serviceConfiguration.setNumExecutorThreadPoolSize(5);
serviceConfiguration.setExposeBundlesMetricsInPrometheus(true);
}

@SneakyThrows
protected void loadECTlsCertificateWithFile() {
serviceConfiguration.setTlsEnabled(true);
serviceConfiguration.setBrokerServicePort(Optional.empty());
serviceConfiguration.setWebServicePort(Optional.empty());
serviceConfiguration.setTlsTrustCertsFilePath(TLS_EC_TRUSTED_CERT_PATH);
serviceConfiguration.setTlsCertificateFilePath(TLS_EC_SERVER_CERT_PATH);
serviceConfiguration.setTlsKeyFilePath(TLS_EC_SERVER_KEY_PATH);
serviceConfiguration.setBrokerClientTlsEnabled(true);
serviceConfiguration.setBrokerClientTrustCertsFilePath(TLS_EC_TRUSTED_CERT_PATH);
serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
final Map<String, String> brokerClientAuthParams = new HashMap<>();
brokerClientAuthParams.put("tlsCertFile", TLS_EC_BROKER_CLIENT_CERT_PATH);
brokerClientAuthParams.put("tlsKeyFile", TLS_EC_BROKER_CLIENT_KEY_PATH);
serviceConfiguration.setBrokerClientAuthenticationParameters(mapper.writeValueAsString(brokerClientAuthParams));
}

@SneakyThrows
protected void loadECTlsCertificateWithKeyStore() {
serviceConfiguration.setTlsEnabled(true);
serviceConfiguration.setBrokerServicePort(Optional.empty());
serviceConfiguration.setWebServicePort(Optional.empty());
serviceConfiguration.setTlsEnabledWithKeyStore(true);
serviceConfiguration.setTlsKeyStore(TLS_EC_KS_SERVER_STORE);
serviceConfiguration.setTlsKeyStorePassword(TLS_EC_KS_SERVER_PASS);
serviceConfiguration.setTlsTrustStore(TLS_EC_KS_TRUSTED_STORE);
serviceConfiguration.setTlsTrustStorePassword(TLS_EC_KS_TRUSTED_STORE_PASS);
serviceConfiguration.setTlsRequireTrustedClientCertOnConnect(true);
serviceConfiguration.setBrokerClientTlsEnabled(true);
serviceConfiguration.setBrokerClientTlsEnabledWithKeyStore(true);
serviceConfiguration.setBrokerClientTlsKeyStore(TLS_EC_KS_BROKER_CLIENT_STORE);
serviceConfiguration.setBrokerClientTlsKeyStorePassword(TLS_EC_KS_BROKER_CLIENT_PASS);
serviceConfiguration.setBrokerClientTlsTrustStore(TLS_EC_KS_TRUSTED_STORE);
serviceConfiguration.setBrokerClientTlsTrustStorePassword(TLS_EC_KS_TRUSTED_STORE_PASS);
serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
final Map<String, String> brokerClientAuthParams = new HashMap<>();
brokerClientAuthParams.put("keyStorePath", TLS_EC_KS_BROKER_CLIENT_STORE);
brokerClientAuthParams.put("keyStorePassword", TLS_EC_KS_BROKER_CLIENT_PASS);
serviceConfiguration.setBrokerClientAuthenticationParameters(mapper.writeValueAsString(brokerClientAuthParams));
}

protected void enableTlsAuthentication() {
serviceConfiguration.setAuthenticationEnabled(true);
serviceConfiguration.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderTls.class.getName()));
}

@SneakyThrows
protected void start() {
this.pulsarTestContext = PulsarTestContext.builder()
.spyByDefault()
.config(serviceConfiguration)
.withMockZookeeper(false)
.build();
this.pulsarService = pulsarTestContext.getPulsarService();
this.serviceInternalAdmin = pulsarService.getAdminClient();
setupDefaultTenantAndNamespace();
}

private void setupDefaultTenantAndNamespace() throws Exception {
if (!serviceInternalAdmin.clusters().getClusters().contains(TEST_CLUSTER_NAME)) {
serviceInternalAdmin.clusters().createCluster(TEST_CLUSTER_NAME,
ClusterData.builder().serviceUrl(pulsarService.getWebServiceAddress()).build());
}
if (!serviceInternalAdmin.tenants().getTenants().contains(DEFAULT_TENANT)) {
serviceInternalAdmin.tenants().createTenant(DEFAULT_TENANT, TenantInfo.builder().allowedClusters(
Sets.newHashSet(TEST_CLUSTER_NAME)).build());
}
if (!serviceInternalAdmin.namespaces().getNamespaces(DEFAULT_TENANT).contains(DEFAULT_NAMESPACE)) {
serviceInternalAdmin.namespaces().createNamespace(DEFAULT_NAMESPACE);
}
}


@Override
public void close() throws Exception {
if (pulsarTestContext != null) {
pulsarTestContext.close();
}
}

// Utils
protected static final ObjectMapper mapper = new ObjectMapper();

// Static name
private static final String DEFAULT_TENANT = "public";
private static final String DEFAULT_NAMESPACE = "public/default";
private static final String TEST_CLUSTER_NAME = "test-standalone";

// EC certificate
protected static final String TLS_EC_TRUSTED_CERT_PATH =
getAbsolutePath("certificate-authority/ec/ca.cert.pem");
private static final String TLS_EC_SERVER_KEY_PATH =
getAbsolutePath("certificate-authority/ec/server.key-pk8.pem");
private static final String TLS_EC_SERVER_CERT_PATH =
getAbsolutePath("certificate-authority/ec/server.cert.pem");
private static final String TLS_EC_BROKER_CLIENT_KEY_PATH =
getAbsolutePath("certificate-authority/ec/broker_client.key-pk8.pem");
private static final String TLS_EC_BROKER_CLIENT_CERT_PATH =
getAbsolutePath("certificate-authority/ec/broker_client.cert.pem");
protected static final String TLS_EC_CLIENT_KEY_PATH =
getAbsolutePath("certificate-authority/ec/client.key-pk8.pem");
protected static final String TLS_EC_CLIENT_CERT_PATH =
getAbsolutePath("certificate-authority/ec/client.cert.pem");

// EC KeyStore
private static final String TLS_EC_KS_SERVER_STORE =
getAbsolutePath("certificate-authority/ec/jks/server.keystore.jks");
private static final String TLS_EC_KS_SERVER_PASS = "serverpw";
private static final String TLS_EC_KS_BROKER_CLIENT_STORE =
getAbsolutePath("certificate-authority/ec/jks/broker_client.keystore.jks");
private static final String TLS_EC_KS_BROKER_CLIENT_PASS = "brokerclientpw";
protected static final String TLS_EC_KS_CLIENT_STORE =
getAbsolutePath("certificate-authority/ec/jks/client.keystore.jks");
protected static final String TLS_EC_KS_CLIENT_PASS = "clientpw";
protected static final String TLS_EC_KS_TRUSTED_STORE =
getAbsolutePath("certificate-authority/ec/jks/ca.truststore.jks");
protected static final String TLS_EC_KS_TRUSTED_STORE_PASS = "rootpw";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.security.tls.ec;


import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.security.tls.MockedPulsarStandalone;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


@Test
public class TlsWithECCertificateFileTest extends MockedPulsarStandalone {

@BeforeClass(alwaysRun = true)
public void suitSetup() {
loadECTlsCertificateWithFile();
enableTlsAuthentication();
super.start(); // start standalone service
}

@SneakyThrows
@AfterClass(alwaysRun = true)
public void suitShutdown() {
super.close(); // close standalone service
}


@Test(expectedExceptions = PulsarClientException.class)
@SneakyThrows
public void testConnectionFailWithoutCertificate() {
@Cleanup final PulsarClient client = PulsarClient.builder()
.serviceUrl(getPulsarService().getBrokerServiceUrlTls())
.build();
@Cleanup final Producer<byte[]> producer = client.newProducer()
.topic("should_be_failed")
.create();
}


@Test
@SneakyThrows
public void testConnectionSuccessWithCertificate() {
final AuthenticationTls authentication = new AuthenticationTls(TLS_EC_CLIENT_CERT_PATH, TLS_EC_CLIENT_KEY_PATH);
final String topicName = "persistent://public/default/" + UUID.randomUUID();
final int testMsgNum = 10;
@Cleanup final PulsarAdmin admin = PulsarAdmin.builder()
.authentication(authentication)
.serviceHttpUrl(getPulsarService().getWebServiceAddressTls())
.tlsTrustCertsFilePath(TLS_EC_TRUSTED_CERT_PATH)
.build();
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, "sub-1", MessageId.earliest);
@Cleanup final PulsarClient client = PulsarClient.builder()
.serviceUrl(getPulsarService().getBrokerServiceUrlTls())
.authentication(authentication)
.tlsTrustCertsFilePath(TLS_EC_TRUSTED_CERT_PATH)
.build();
@Cleanup final Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.create();
@Cleanup final Consumer<byte[]> consumer = client.newConsumer()
.topic(topicName)
.subscriptionName("sub-1")
.consumerName("cons-1")
.subscribe();
for (int i = 0; i < testMsgNum; i++) {
producer.send((i + "").getBytes(StandardCharsets.UTF_8));
}

for (int i = 0; i < testMsgNum; i++) {
final Message<byte[]> message = consumer.receive();
assertNotNull(message);
final byte[] b = message.getValue();
final String s = new String(b, StandardCharsets.UTF_8);
assertEquals(s, i + "");
}
}
}
Loading

0 comments on commit e1d06b5

Please sign in to comment.