Skip to content

Commit

Permalink
[functions] Distributed the CA for KubernetesSecretsTokenAuthProvider (
Browse files Browse the repository at this point in the history
…#5398)

Currently, if a user has TLS enabled and is using a custom CA that isn't
baked into the image, when the functions worker starts, it won't have
the CA in order to validate the cert presented by the broker.

This adds support to have  the `KubernetesSecretsTokenAuthProvider`
also distribute the CA via the same kubernetes secret used for the
token.
  • Loading branch information
addisonj authored and sijie committed Oct 28, 2019
1 parent 2a6d2c6 commit 28b0c3a
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider {

void initialize(CoreV1Api coreClient, String kubeNamespace);
void initialize(CoreV1Api coreClient, String kubeNamespace, byte[] caBytes);

/**
* Configure function statefulset spec based on function auth data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.auth;

import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1DeleteOptions;
Expand All @@ -39,6 +40,8 @@

import javax.naming.AuthenticationException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -55,14 +58,18 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
private static final String SECRET_NAME = "function-auth";
private static final String DEFAULT_SECRET_MOUNT_DIR = "/etc/auth";
private static final String FUNCTION_AUTH_TOKEN = "token";
private static final String FUNCTION_CA_CERT = "ca.pem";


private CoreV1Api coreClient;
private String kubeNamespace;
private byte[] caBytes;

@Override
public void initialize(CoreV1Api coreClient, String kubeNamespace) {
public void initialize(CoreV1Api coreClient, String kubeNamespace, byte[] caBytes) {
this.coreClient = coreClient;
this.kubeNamespace = kubeNamespace;
this.caBytes = caBytes;
}

@Override
Expand Down Expand Up @@ -99,6 +106,10 @@ public void configureAuthenticationConfig(AuthenticationConfig authConfig, Optio
} else {
authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
authConfig.setClientAuthenticationParameters(String.format("file://%s/%s", DEFAULT_SECRET_MOUNT_DIR, FUNCTION_AUTH_TOKEN));
// if we have ca bytes, update the new path for the CA
if (this.caBytes != null) {
authConfig.setTlsTrustCertsFilePath(String.format("%s/%s", DEFAULT_SECRET_MOUNT_DIR, FUNCTION_CA_CERT));
}
}
}

Expand Down Expand Up @@ -252,6 +263,16 @@ public Optional<FunctionAuthData> updateAuthData(String tenant, String namespace
return existingFunctionAuthData;
}

@VisibleForTesting
Map<String, byte[]> buildSecretMap(String token) {
Map<String, byte[]> valueMap = new HashMap<>();
valueMap.put(FUNCTION_AUTH_TOKEN, token.getBytes());
if (caBytes != null) {
valueMap.put(FUNCTION_CA_CERT, caBytes);
}
return valueMap;
}

private void upsertSecret(String token, String tenant, String namespace, String name, String secretName) throws InterruptedException {

Actions.Action createAuthSecret = Actions.Action.builder()
Expand All @@ -262,7 +283,7 @@ private void upsertSecret(String token, String tenant, String namespace, String
String id = RandomStringUtils.random(5, true, true).toLowerCase();
V1Secret v1Secret = new V1Secret()
.metadata(new V1ObjectMeta().name(secretName))
.data(Collections.singletonMap(FUNCTION_AUTH_TOKEN, token.getBytes()));
.data(buildSecretMap(token));

try {
coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null);
Expand Down Expand Up @@ -315,7 +336,7 @@ private String createSecret(String token, String tenant, String namespace, Strin
String id = RandomStringUtils.random(5, true, true).toLowerCase();
V1Secret v1Secret = new V1Secret()
.metadata(new V1ObjectMeta().name(getSecretName(id)))
.data(Collections.singletonMap(FUNCTION_AUTH_TOKEN, token.getBytes()));
.data(buildSecretMap(token));
try {
coreClient.createNamespacedSecret(kubeNamespace, v1Secret, "true");
} catch (ApiException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class KubernetesInfo {
private Resources functionInstanceMinResources;
private final boolean authenticationEnabled;
private Optional<KubernetesFunctionAuthProvider> authProvider;
private final byte[] serverCaBytes;

@VisibleForTesting
public KubernetesRuntimeFactory(String k8Uri,
Expand All @@ -117,6 +118,7 @@ public KubernetesRuntimeFactory(String k8Uri,
String pulsarAdminUri,
String stateStorageServiceUri,
AuthenticationConfig authConfig,
byte[] serverCaBytes,
Integer expectedMetricsCollectionInterval,
String changeConfigMap,
String changeConfigMapNamespace,
Expand Down Expand Up @@ -171,6 +173,7 @@ public KubernetesRuntimeFactory(String k8Uri,
this.customLabels = customLabels;
this.stateStorageServiceUri = stateStorageServiceUri;
this.authConfig = authConfig;
this.serverCaBytes = serverCaBytes;
this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/java-instance.jar";
this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
Expand All @@ -190,7 +193,7 @@ public KubernetesRuntimeFactory(String k8Uri,
+ functionAuthProvider.get().getClass().getName() + " must implement KubernetesFunctionAuthProvider");
} else {
KubernetesFunctionAuthProvider kubernetesFunctionAuthProvider = (KubernetesFunctionAuthProvider) functionAuthProvider.get();
kubernetesFunctionAuthProvider.initialize(coreClient, jobNamespace);
kubernetesFunctionAuthProvider.initialize(coreClient, jobNamespace, serverCaBytes);
this.authProvider = Optional.of(kubernetesFunctionAuthProvider);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,37 @@ public class KubernetesSecretsTokenAuthProviderTest {

@Test
public void testConfigureAuthDataStatefulSet() {
byte[] testBytes = new byte[]{0, 1, 2, 3, 4};

CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default");
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default", testBytes);


V1StatefulSet statefulSet = new V1StatefulSet();
statefulSet.setSpec(
new V1StatefulSetSpec().template(
new V1PodTemplateSpec().spec(
new V1PodSpec().containers(
Collections.singletonList(new V1Container())))));
FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
kubernetesSecretsTokenAuthProvider.configureAuthDataStatefulSet(statefulSet, Optional.of(functionAuthData));

Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().get(0).getName(), "function-auth");
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().get(0).getSecret().getSecretName(), "pf-secret-foo");

Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().size(), 1);
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().size(), 1);
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(), "function-auth");
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(), "/etc/auth");
}

@Test
public void testConfigureAuthDataStatefulSetNoCa() {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default", null);


V1StatefulSet statefulSet = new V1StatefulSet();
Expand All @@ -77,7 +104,7 @@ public void testCacheAuthData() throws ApiException {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString());
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default");
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default", null);
Optional<FunctionAuthData> functionAuthData = kubernetesSecretsTokenAuthProvider.cacheAuthData("test-tenant",
"test-ns", "test-func", new AuthenticationDataSource() {
@Override
Expand All @@ -97,22 +124,39 @@ public String getCommandData() {

@Test
public void configureAuthenticationConfig() {
byte[] testBytes = new byte[]{0, 1, 2, 3, 4};
CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default", testBytes);
AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
kubernetesSecretsTokenAuthProvider.configureAuthenticationConfig(authenticationConfig, Optional.of(functionAuthData));

Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(), AuthenticationToken.class.getName());
Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(), "file:///etc/auth/token");
Assert.assertEquals(authenticationConfig.getTlsTrustCertsFilePath(), "/etc/auth/ca.pem");
}

@Test
public void configureAuthenticationConfigNoCa() {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default");
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default", null);
AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
kubernetesSecretsTokenAuthProvider.configureAuthenticationConfig(authenticationConfig, Optional.of(functionAuthData));

Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(), AuthenticationToken.class.getName());
Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(), "file:///etc/auth/token");
Assert.assertEquals(authenticationConfig.getTlsTrustCertsFilePath(), null);
}


@Test
public void testUpdateAuthData() throws Exception {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default");
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, "default", null);
// test when existingFunctionAuthData is empty
Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
Optional<FunctionAuthData> functionAuthData = kubernetesSecretsTokenAuthProvider.updateAuthData("test-tenant",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir,
null,
null,
null,
null,
null,
minResources,
new TestSecretProviderConfigurator(), false, functionAuthProvider));
Expand Down Expand Up @@ -274,7 +275,7 @@ public void cleanUpAuthData(String tenant, String namespace, String name,
}

@Override
public void initialize(CoreV1Api coreClient, String kubeNamespace) {
public void initialize(CoreV1Api coreClient, String kubeNamespace, byte[] caBytes) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int
null,
null,
null,
null,
null, new TestSecretProviderConfigurator(), false,
Optional.empty()));
doNothing().when(factory).setupClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
workerConfig.getStateStorageServiceUrl(),
authConfig,
workerConfig.getTlsTrustChainBytes(),
workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval(),
workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -539,6 +541,18 @@ public String getWorkerHostname() {
return this.workerHostname;
}

public byte[] getTlsTrustChainBytes() {
if (StringUtils.isNotEmpty(getTlsTrustCertsFilePath()) && Files.exists(Paths.get(getTlsTrustCertsFilePath()))) {
try {
return Files.readAllBytes(Paths.get(getTlsTrustCertsFilePath()));
} catch (IOException e) {
throw new IllegalStateException("Failed to read CA bytes", e);
}
} else {
return null;
}
}

public String getWorkerWebAddress() {
return String.format("http://%s:%d", this.getWorkerHostname(), this.getWorkerPort());
}
Expand Down

0 comments on commit 28b0c3a

Please sign in to comment.