Skip to content

Commit

Permalink
Add Client auth plugin and tls support for function to connect with b…
Browse files Browse the repository at this point in the history
…roker (#1935)

* Add Client auth plugin and tls support for function to connect with broker

* add authConfig builder

* add hostnameverification and tlsCertPath

* add broker-tls url on worker

* take string type for boolean data-type
  • Loading branch information
rdhabalia committed Jun 8, 2018
1 parent 1950538 commit 6e336b4
Show file tree
Hide file tree
Showing 18 changed files with 375 additions and 63 deletions.
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.Mockito.spy;

Expand All @@ -27,6 +28,8 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -40,14 +43,17 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -87,7 +93,7 @@ public class PulsarSinkE2ETest {

ServiceConfiguration config;
WorkerConfig workerConfig;
URL url;
URL urlTls;
PulsarService pulsar;
PulsarAdmin admin;
PulsarClient pulsarClient;
Expand All @@ -102,8 +108,16 @@ public class PulsarSinkE2ETest {

private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
private final int brokerWebServicePort = PortManager.nextFreePort();
private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
private final int brokerServicePort = PortManager.nextFreePort();
private final int brokerServiceTlsPort = PortManager.nextFreePort();
private final int workerServicePort = PortManager.nextFreePort();

private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";

private static final Logger log = LoggerFactory.getLogger(PulsarSinkE2ETest.class);

@BeforeMethod
Expand All @@ -118,31 +132,67 @@ void setup(Method method) throws Exception {
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
bkEnsemble.start();

String hostHttpUrl = "http://127.0.0.1" + ":";
String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;

config = spy(new ServiceConfiguration());
config.setClusterName("use");
Set<String> superUsers = Sets.newHashSet("superUser");
config.setSuperUserRoles(superUsers);
config.setWebServicePort(brokerWebServicePort);
config.setWebServicePortTls(brokerWebServiceTlsPort);
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config.setBrokerServicePort(brokerServicePort);
config.setBrokerServicePortTls(brokerServiceTlsPort);
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());


Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
config.setAuthenticationProviders(providers);
config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);


functionsWorkerService = createPulsarFunctionWorker(config);
url = new URL(hostHttpUrl + brokerWebServicePort);
urlTls = new URL(brokerServiceUrl);
boolean isFunctionWebServerRequired = method.getName()
.equals("testExternalReplicatorRedirectionToWorkerService");
Optional<WorkerService> functionWorkerService = isFunctionWebServerRequired ? Optional.ofNullable(null)
: Optional.of(functionsWorkerService);
pulsar = new PulsarService(config, functionWorkerService);
pulsar.start();
admin = new PulsarAdmin(url, (Authentication) null);

Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);

admin = spy(
PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
.allowTlsInsecureConnection(true).authentication(authTls).build());

brokerStatsClient = admin.brokerStats();
primaryHost = String.format("http://%s:%d", InetAddress.getLocalHost().getHostName(), brokerWebServicePort);

// update cluster metadata
ClusterData clusterData = new ClusterData(url.toString());
ClusterData clusterData = new ClusterData(urlTls.toString());
admin.clusters().updateCluster(config.getClusterName(), clusterData);

pulsarClient = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
&& isNotBlank(workerConfig.getClientAuthenticationParameters())) {
clientBuilder.enableTls(workerConfig.isUseTls());
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
workerConfig.getClientAuthenticationParameters());
}
pulsarClient = clientBuilder.build();
// pulsarClient = PulsarClient.builder().serviceUrl(urlTls.toString()).statsInterval(0,
// TimeUnit.SECONDS).build();

TenantInfo propAdmin = new TenantInfo();
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
Expand All @@ -166,6 +216,7 @@ void shutdown() throws Exception {
if (workerExecutor != null) {
workerExecutor.shutdown();
}
pulsarClient.close();
admin.close();
pulsar.close();
functionsWorkerService.stop();
Expand All @@ -179,8 +230,8 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort());
workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort());
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls());
workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls());
workerConfig.setFailureCheckFreqMs(100);
workerConfig.setNumFunctionPackageReplicas(1);
workerConfig.setClusterCoordinationTopicName("coordinate");
Expand All @@ -193,11 +244,19 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setWorkerHostname(hostname);
workerConfig
.setWorkerId("c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort());

workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setClientAuthenticationParameters(
String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);

return new WorkerService(workerConfig);
}

/**
* Validates pulsar sink e2e functionality on functions.
* Validates pulsar sink e2e functionality on functions.
*
* @throws Exception
*/
Expand Down Expand Up @@ -236,17 +295,17 @@ public void testE2EPulsarSink() throws Exception {
}
retryStrategically((test) -> {
try {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values()
.iterator().next();
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
.next();
return subStats.unackedMessages == 0;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
// validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
// due to publish failure
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
.next().unackedMessages, 0);
Assert.assertEquals(
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, 0);

}

Expand Down Expand Up @@ -281,7 +340,7 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String

// set up sink spec
SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
//sinkSpecBuilder.setClassName(PulsarSink.class.getName());
// sinkSpecBuilder.setClassName(PulsarSink.class.getName());
sinkSpecBuilder.setTopic(String.format("persistent://%s/%s/%s", tenant, namespace, "output"));
Map<String, Object> sinkConfigMap = Maps.newHashMap();
sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
Expand All @@ -290,4 +349,4 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String

return functionDetailsBuilder.build();
}
}
}
Expand Up @@ -51,6 +51,8 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
Expand Down Expand Up @@ -642,11 +644,34 @@ class LocalRunner extends FunctionDetailsCommand {

@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
protected String brokerServiceUrl;

@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker")
protected String clientAuthPlugin;

@Parameter(names = "--clientAuthParams", description = "Client authentication param")
protected String clientAuthParams;

@Parameter(names = "--use_tls", description = "Use tls connection\n")
protected boolean useTls;

@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
protected boolean tlsAllowInsecureConnection;

@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
protected boolean tlsHostNameVerificationEnabled;

@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;

@Override
void runCmd() throws Exception {
CmdFunctions.startLocalRun(convertProto2(functionConfig),
functionConfig.getParallelism(), brokerServiceUrl, userCodeFile, admin);
CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(), brokerServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
userCodeFile, admin);
}
}

Expand Down Expand Up @@ -911,7 +936,8 @@ private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functio
}

protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
int parallelism, String brokerServiceUrl, String userCodeFile, PulsarAdmin admin)
int parallelism, String brokerServiceUrl, AuthenticationConfig authConfig,
String userCodeFile, PulsarAdmin admin)
throws Exception {

String serviceUrl = admin.getServiceUrl();
Expand All @@ -921,8 +947,8 @@ protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.F
if (serviceUrl == null) {
serviceUrl = DEFAULT_SERVICE_URL;
}
try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
serviceUrl, null, null, null)) {
try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, authConfig, null, null,
null)) {
List<RuntimeSpawner> spawners = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
Expand Down Expand Up @@ -99,11 +100,35 @@ class LocalSinkRunner extends CreateSink {

@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
protected String brokerServiceUrl;

@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker")
protected String clientAuthPlugin;

@Parameter(names = "--clientAuthParams", description = "Client authentication param")
protected String clientAuthParams;

@Parameter(names = "--use_tls", description = "Use tls connection\n")
protected boolean useTls;

@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
protected boolean tlsAllowInsecureConnection;

@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
protected boolean tlsHostNameVerificationEnabled;

@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;

@Override
void runCmd() throws Exception {
CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig),
sinkConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), sinkConfig.getParallelism(),
brokerServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
jarFile, admin);
}
}

Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
Expand Down Expand Up @@ -95,11 +96,35 @@ class LocalSourceRunner extends CreateSource {

@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
protected String brokerServiceUrl;

@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker")
protected String clientAuthPlugin;

@Parameter(names = "--clientAuthParams", description = "Client authentication param")
protected String clientAuthParams;

@Parameter(names = "--use_tls", description = "Use tls connection\n")
protected boolean useTls;

@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
protected boolean tlsAllowInsecureConnection;

@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
protected boolean tlsHostNameVerificationEnabled;

@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;

@Override
void runCmd() throws Exception {
CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
sourceConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), sourceConfig.getParallelism(),
brokerServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
jarFile, admin);
}
}

Expand Down
Expand Up @@ -55,7 +55,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private int maxLookupRequest = 50000;
private int maxNumberOfRejectedRequestPerConnection = 50;
private int keepAliveIntervalSeconds = 30;

public ClientConfigurationData clone() {
try {
return (ClientConfigurationData) super.clone();
Expand Down

0 comments on commit 6e336b4

Please sign in to comment.