Skip to content

Commit

Permalink
[fix][fn] Correct TLS cert config translation from broker to fn worker (
Browse files Browse the repository at this point in the history
apache#20297)

### Motivation

The `initializeWorkerConfigFromBrokerConfig` method converts a `ServiceConfiguration` object to a `WorkerConfig`. This is used when the function worker is running within the broker and when pulsar is running in standalone mode. The TLS certificates that are trusted by the broker should also be trusted by the function worker, and the TLS certificates that are trusted by the broker client should be trusted by the function worker's client.

### Modifications

* Improve the `PulsarFunctionTlsTest` test by adding awaitility. Before this change, the test was flaky on my machine.
* Fix the mapping of broker config to function worker config.

### Verifying this change

A test is added. Note that the old test doesn't technically fail due to the misconfiguration at the moment. The error is in the logs. Here is one of the stack traces that is gone after this change:

```
2023-05-10T23:19:27,087 - WARN  - [pulsar-client-io-253-3:ClientCnx@344] - [localhost/127.0.0.1:59715] Got exception io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: General OpenSslEngine problem
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:499)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: javax.net.ssl.SSLHandshakeException: General OpenSslEngine problem
	at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.handshakeException(ReferenceCountedOpenSslEngine.java:1907)
	at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.wrap(ReferenceCountedOpenSslEngine.java:834)
	at java.base/javax.net.ssl.SSLEngine.wrap(SSLEngine.java:564)
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:1042)
	at io.netty.handler.ssl.SslHandler.wrapNonAppData(SslHandler.java:928)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1418)
	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1256)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1296)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
	... 17 more
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
	at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
	at java.base/sun.security.validator.Validator.validate(Validator.java:264)
	at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)
	at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
	at io.netty.handler.ssl.ReferenceCountedOpenSslClientContext$ExtendedTrustManagerVerifyCallback.verify(ReferenceCountedOpenSslClientContext.java:234)
	at io.netty.handler.ssl.ReferenceCountedOpenSslContext$AbstractCertificateVerifier.verify(ReferenceCountedOpenSslContext.java:779)
	at io.netty.internal.tcnative.CertificateVerifierTask.runTask(CertificateVerifierTask.java:36)
	at io.netty.internal.tcnative.SSLTask.run(SSLTask.java:48)
	at io.netty.internal.tcnative.SSLTask.run(SSLTask.java:42)
	at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.runAndResetNeedTask(ReferenceCountedOpenSslEngine.java:1496)
	at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.access$700(ReferenceCountedOpenSslEngine.java:94)
	at io.netty.handler.ssl.ReferenceCountedOpenSslEngine$TaskDecorator.run(ReferenceCountedOpenSslEngine.java:1471)
	at io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1558)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1404)
	... 21 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
	at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
	at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
	at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
	... 35 more
```

### Does this pull request potentially affect one of the following parts:

This affects the configuration, but I think it should be classified as fixing a bug, so it is not a breaking change.

### Documentation

- [x] `doc-not-needed` 
### Matching PR in forked repository

PR in forked repository: the modified test passes locally, so skipping forked test.
  • Loading branch information
michaeljmarshall committed May 11, 2023
1 parent 4b24c9e commit 525dd2f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,8 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu
workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled());
workerConfig.setTlsEnableHostnameVerification(brokerConfig.isTlsHostnameVerificationEnabled());
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getBrokerClientTrustCertsFilePath());
workerConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());

// client in worker will use this config to authenticate with broker
workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -65,6 +66,7 @@ public class PulsarFunctionTlsTest {
private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
private static final String CA_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";

LocalBookkeeperEnsemble bkEnsemble;
protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
Expand Down Expand Up @@ -111,9 +113,9 @@ void setup() throws Exception {
config.setAuthenticationProviders(providers);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
config.setBrokerClientTlsEnabled(true);
config.setBrokerClientTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
config.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
Expand Down Expand Up @@ -141,7 +143,6 @@ void setup() throws Exception {
workerConfig.setUseTls(true);
workerConfig.setTlsEnableHostnameVerification(true);
workerConfig.setTlsAllowInsecureConnection(false);
workerConfig.setBrokerClientTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);

configurations[i] = config;
Expand All @@ -163,8 +164,7 @@ void setup() throws Exception {

pulsarAdmins[i] = PulsarAdmin.builder()
.serviceHttpUrl(pulsarServices[i].getWebServiceAddressTls())
.tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
.allowTlsInsecureConnection(true)
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
.authentication(authTls)
.build();
}
Expand Down Expand Up @@ -216,6 +216,13 @@ void tearDown() throws Exception {
}
}

@Test
public void testTLSTrustCertsConfigMapping() throws Exception {
WorkerConfig workerConfig = fnWorkerServices[0].getWorkerConfig();
assertEquals(workerConfig.getTlsTrustCertsFilePath(), CA_CERT_FILE_PATH);
assertEquals(workerConfig.getBrokerClientTrustCertsFilePath(), CA_CERT_FILE_PATH);
}

@Test
public void testFunctionsCreation() throws Exception {

Expand All @@ -233,6 +240,12 @@ public void testFunctionsCreation() throws Exception {
functionConfig, jarFilePathUrl
);

// Function creation is not strongly consistent, so this test can fail with a get that is too eager and
// does not have retries.
final PulsarAdmin admin = pulsarAdmins[i];
Awaitility.await().ignoreExceptions()
.untilAsserted(() -> admin.functions().getFunction(testTenant, "my-ns", functionName));

FunctionConfig config = pulsarAdmins[i].functions().getFunction(testTenant, "my-ns", functionName);
assertEquals(config.getTenant(), testTenant);
assertEquals(config.getNamespace(), "my-ns");
Expand Down

0 comments on commit 525dd2f

Please sign in to comment.