From 525dd2f3b541d8bb8fe166b28b79597aae45ef4f Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 11 May 2023 17:29:22 -0500 Subject: [PATCH] [fix][fn] Correct TLS cert config translation from broker to fn worker (#20297) ### Motivation The `initializeWorkerConfigFromBrokerConfig` method converts a `ServiceConfiguration` object to a `WorkerConfig`. This is used when the function worker is running within the broker and when pulsar is running in standalone mode. The TLS certificates that are trusted by the broker should also be trusted by the function worker, and the TLS certificates that are trusted by the broker client should be trusted by the function worker's client. ### Modifications * Improve the `PulsarFunctionTlsTest` test by adding awaitility. Before this change, the test was flaky on my machine. * Fix the mapping of broker config to function worker config. ### Verifying this change A test is added. Note that the old test doesn't technically fail due to the misconfiguration at the moment. The error is in the logs. Here is one of the stack traces that is gone after this change: ``` 2023-05-10T23:19:27,087 - WARN - [pulsar-client-io-253-3:ClientCnx@344] - [localhost/127.0.0.1:59715] Got exception io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: General OpenSslEngine problem at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:499) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: javax.net.ssl.SSLHandshakeException: General OpenSslEngine problem at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.handshakeException(ReferenceCountedOpenSslEngine.java:1907) at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.wrap(ReferenceCountedOpenSslEngine.java:834) at java.base/javax.net.ssl.SSLEngine.wrap(SSLEngine.java:564) at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:1042) at io.netty.handler.ssl.SslHandler.wrapNonAppData(SslHandler.java:928) at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1418) at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1256) at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1296) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ... 17 more Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439) at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) at java.base/sun.security.validator.Validator.validate(Validator.java:264) at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285) at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144) at io.netty.handler.ssl.ReferenceCountedOpenSslClientContext$ExtendedTrustManagerVerifyCallback.verify(ReferenceCountedOpenSslClientContext.java:234) at io.netty.handler.ssl.ReferenceCountedOpenSslContext$AbstractCertificateVerifier.verify(ReferenceCountedOpenSslContext.java:779) at io.netty.internal.tcnative.CertificateVerifierTask.runTask(CertificateVerifierTask.java:36) at io.netty.internal.tcnative.SSLTask.run(SSLTask.java:48) at io.netty.internal.tcnative.SSLTask.run(SSLTask.java:42) at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.runAndResetNeedTask(ReferenceCountedOpenSslEngine.java:1496) at io.netty.handler.ssl.ReferenceCountedOpenSslEngine.access$700(ReferenceCountedOpenSslEngine.java:94) at io.netty.handler.ssl.ReferenceCountedOpenSslEngine$TaskDecorator.run(ReferenceCountedOpenSslEngine.java:1471) at io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1558) at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1404) ... 21 more Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141) at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126) at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297) at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434) ... 35 more ``` ### Does this pull request potentially affect one of the following parts: This affects the configuration, but I think it should be classified as fixing a bug, so it is not a breaking change. ### Documentation - [x] `doc-not-needed` ### Matching PR in forked repository PR in forked repository: the modified test passes locally, so skipping forked test. --- .../apache/pulsar/broker/PulsarService.java | 3 ++- .../worker/PulsarFunctionTlsTest.java | 23 +++++++++++++++---- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5fc9920d0f215..f833674ceeb0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1859,7 +1859,8 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection()); workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled()); workerConfig.setTlsEnableHostnameVerification(brokerConfig.isTlsHostnameVerificationEnabled()); - workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath()); + workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getBrokerClientTrustCertsFilePath()); + workerConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath()); // client in worker will use this config to authenticate with broker workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index 00db6a65b16ea..246d980d6178c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -51,6 +51,7 @@ import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.worker.service.WorkerServiceLoader; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -65,6 +66,7 @@ public class PulsarFunctionTlsTest { private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem"; private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem"; private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem"; + private static final String CA_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem"; LocalBookkeeperEnsemble bkEnsemble; protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT]; @@ -111,9 +113,9 @@ void setup() throws Exception { config.setAuthenticationProviders(providers); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); - config.setTlsAllowInsecureConnection(true); + config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH); config.setBrokerClientTlsEnabled(true); - config.setBrokerClientTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH); + config.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH); config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); config.setBrokerClientAuthenticationParameters( "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH); @@ -141,7 +143,6 @@ void setup() throws Exception { workerConfig.setUseTls(true); workerConfig.setTlsEnableHostnameVerification(true); workerConfig.setTlsAllowInsecureConnection(false); - workerConfig.setBrokerClientTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig); configurations[i] = config; @@ -163,8 +164,7 @@ void setup() throws Exception { pulsarAdmins[i] = PulsarAdmin.builder() .serviceHttpUrl(pulsarServices[i].getWebServiceAddressTls()) - .tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH) - .allowTlsInsecureConnection(true) + .tlsTrustCertsFilePath(CA_CERT_FILE_PATH) .authentication(authTls) .build(); } @@ -216,6 +216,13 @@ void tearDown() throws Exception { } } + @Test + public void testTLSTrustCertsConfigMapping() throws Exception { + WorkerConfig workerConfig = fnWorkerServices[0].getWorkerConfig(); + assertEquals(workerConfig.getTlsTrustCertsFilePath(), CA_CERT_FILE_PATH); + assertEquals(workerConfig.getBrokerClientTrustCertsFilePath(), CA_CERT_FILE_PATH); + } + @Test public void testFunctionsCreation() throws Exception { @@ -233,6 +240,12 @@ public void testFunctionsCreation() throws Exception { functionConfig, jarFilePathUrl ); + // Function creation is not strongly consistent, so this test can fail with a get that is too eager and + // does not have retries. + final PulsarAdmin admin = pulsarAdmins[i]; + Awaitility.await().ignoreExceptions() + .untilAsserted(() -> admin.functions().getFunction(testTenant, "my-ns", functionName)); + FunctionConfig config = pulsarAdmins[i].functions().getFunction(testTenant, "my-ns", functionName); assertEquals(config.getTenant(), testTenant); assertEquals(config.getNamespace(), "my-ns");