Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing authentication for Pulsar Functions #3735

Merged
merged 24 commits into from Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,7 +35,7 @@
public class AuthenticationProviderToken implements AuthenticationProvider {

final static String HTTP_HEADER_NAME = "Authorization";
final static String HTTP_HEADER_VALUE_PREFIX = "Bearer ";
public final static String HTTP_HEADER_VALUE_PREFIX = "Bearer ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than making this public, you should make getToken(AuthenticationDataSource) static and call it from the other locations.


// When symmetric key is configured
final static String CONF_TOKEN_SECRET_KEY = "tokenSecretKey";
Expand Down
Expand Up @@ -82,7 +82,7 @@ public void registerFunction(final @PathParam("tenant") String tenant,
final @FormDataParam("functionConfig") String functionConfigJson) {

functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, null, functionConfigJson, clientAppId());
functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
}

@PUT
Expand Down
Expand Up @@ -79,7 +79,7 @@ public void registerSink(final @PathParam("tenant") String tenant,
final @FormDataParam("sinkConfig") String sinkConfigJson) {

sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, null, sinkConfigJson, clientAppId());
functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
}

@PUT
Expand Down
Expand Up @@ -79,7 +79,7 @@ public void registerSource(final @PathParam("tenant") String tenant,
final @FormDataParam("sourceConfig") String sourceConfigJson) {

source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
functionPkgUrl, null, sourceConfigJson, clientAppId());
functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
}

@PUT
Expand Down
Expand Up @@ -23,6 +23,7 @@
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;

/**
Expand All @@ -40,6 +41,7 @@ public class InstanceConfig {
private String functionVersion;
private FunctionDetails functionDetails;
private int maxBufferedTuples;
private Function.FunctionAuthenticationSpec functionAuthenticationSpec;
private int port;
private String clusterName;

Expand Down
5 changes: 5 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Expand Up @@ -141,6 +141,11 @@ message FunctionMetaData {
uint64 version = 3;
uint64 createTime = 4;
map<int32, FunctionState> instanceStates = 5;
FunctionAuthenticationSpec functionAuthSpec = 6;
}

message FunctionAuthenticationSpec {
bytes data = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also include method, which the function runner can use to figure out which function auth provider to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function auth provider should be a cluster wide setting and not on a per function basis right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment to clarify the content and usage of "data" field, to summarize the below discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat i have add comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does data includes the credential? Or a pointer to the credentials? Can you add comment to clarify the field meaning?
If it includes credentials, should this be included in the function metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it can include the actual credentials (not secure) or a pointer to the credentials. This all depends on the underlying implementation of the interface. I left this field open ended so that there can be flexibility of what can be stored in here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should always include the credentials. If you want to make it secure, make it secure using the caller of cacheAuthData, not within cacheAuthData

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivankelly I don't quite follow what you are proposing. This protobuf message will be stored in the function metadata topic. This message was designed with a generic purpose to store some data for the function authentication provider so that it can distribute it to all the workers. What that data is is up to the function authentication provider implementation. This gives us flexibility in the types of authentication we can support in the future.

}

message Instance {
Expand Down
6 changes: 6 additions & 0 deletions pulsar-functions/runtime/pom.xml
Expand Up @@ -63,6 +63,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker-common</artifactId>
<version>${project.version}</version>
</dependency>

merlimat marked this conversation as resolved.
Show resolved Hide resolved
</dependencies>

</project>
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;

import javax.naming.AuthenticationException;

import static org.apache.pulsar.broker.authentication.AuthenticationProviderToken.HTTP_HEADER_VALUE_PREFIX;
import static org.apache.pulsar.client.impl.auth.AuthenticationDataToken.HTTP_HEADER_NAME;

public class ClearTextFunctionTokenAuthProvider implements FunctionAuthProvider {
@Override
public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
authConfig.setClientAuthenticationParameters("token://" + functionAuthData.getData());
}

@Override
public FunctionAuthData cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception {
String token = null;
try {
token = getToken(authenticationDataSource);
} catch (Exception e) {
throw new RuntimeException(e);
}

if (token != null) {
return FunctionAuthData.builder().data(token.getBytes()).build();
}
return null;
}

@Override
public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
//no-op
}

private String getToken(AuthenticationDataSource authData) throws AuthenticationException {
if (authData.hasDataFromCommand()) {
// Authenticate Pulsar binary connection
return authData.getCommandData();
} else if (authData.hasDataFromHttp()) {
// Authentication HTTP request. The format here should be compliant to RFC-6750
// (https://tools.ietf.org/html/rfc6750#section-2.1). Eg: Authorization: Bearer xxxxxxxxxxxxx
String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME);
if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
throw new AuthenticationException("Invalid HTTP Authorization header");
}

// Remove prefix
String token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length());
return validateToken(token);
} else {
return null;
}
}

private String validateToken(final String token) throws AuthenticationException {
if (StringUtils.isNotBlank(token)) {
return token;
} else {
throw new AuthenticationException("Blank token found");
}
}
}
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
/**
* A wrapper for authentication data for functions
*/
public class FunctionAuthData {
private byte[] data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a string member here to hold which functions auth provider created the auth data. this can be used by the k8s runtime provider at least validate the auth data is of the type expected. I know this will be configured at a cluster level, but people screw up configuration all the time. Better to be defensive.

}
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;

import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;

/**
* This is a generic interface that functions can use to cache and distribute appropriate authentication
* data that is needed to configure the runtime of functions to support appropriate authentication of function instances
*/
public interface FunctionAuthProvider {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add javadoc comments in the interface? otherwise other people will have difficulties on understanding how to implement an Auth provider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a very high level question here - why do you need a separate AuthProvider? why can you use the AuthenticationProvider interface at the broker? what are the real differences between these two interfaces?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the AuthenticationProvider is interface to simply do authentication. The interface here for functions is a little bit more involved. It needs to have to ability to:

  1. We need to be able to cache some data in the function meta data topic. This could be the auth data itself or a pointer the auth data.
  2. Based on the auth data of the function, we need to be able to manipulate the function runtime somehow (depends on runtime)

This interface doesn't actually authenticate a user but facilitates the authentication process for functions. Whether the name of the interface is appropriate can be up for discussion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. A better name or a javadoc for this class is good to have for this class.


/**
* Set authentication configs for function instance based on the data in FunctionAuthenticationSpec
* @param authConfig authentication configs passed to the function instance
* @param functionAuthData function authentication data that is provider specific
*/
void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configureAuthenticationConfig sounds weird, why not just configureAuth?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure I can do that but the reason I named the method configureAuthenticationConfig is because we are literally configuring AuthenticationConfig based on FunctionAuthData


/**
* Cache auth data in as part of function metadata for function that runtime may need to configure authentication
* @param tenant tenant that the function is running under
* @param namespace namespace that is the function is running under
* @param name name of the function
* @param authenticationDataSource auth data
* @return
* @throws Exception
*/
FunctionAuthData cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of caching auth data? If there are credentials being submitted, we should immediately store them in the secrets backend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mechanism to distribute auth credentials or pointers to auth credentials based on implementtion to workers that need to run an instance of the function. Since in the function architecture, submitting the function and running the function are decoupled and might not happen on the same machine we need a mechanism to distribute some information to workers about how to configure authentication for individual function instances


/**
* Clean up operation for auth when function is terminated
* @param tenant tenant that the function is running under
* @param namespace namespace that is the function is running under
* @param name name of the function
* @param functionAuthData function auth data
* @throws Exception
*/
void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception;
}
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;

import org.apache.pulsar.functions.proto.Function;

public final class FunctionAuthUtils {

public static final FunctionAuthData getFunctionAuthData(Function.FunctionAuthenticationSpec functionAuthenticationSpec) {
return FunctionAuthData.builder().data(functionAuthenticationSpec.getData().toByteArray()).build();
}
}
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;

import io.kubernetes.client.models.V1ServiceAccount;
import io.kubernetes.client.models.V1StatefulSet;

/**
* Kubernetes runtime specific functions authentication provider
*/
public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bad sign if for the first serious use of an abstraction you need to create another abstraction to work work around shortcomings in the first abstraction. However, I don't think it's even needed in this case.

The functionAuthData returned by cacheAuthData should contain the actual token. The caller of cacheAuthData can then create a secret, and then mount that secret on each of the instance pods. When the instance runs, it can always check for this secret and if it is mounted, the data can be read in and passed to configureAuthenticationConfig.

I don't see why service accounts are needed at all here.

Copy link
Contributor Author

@jerrypeng jerrypeng Mar 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bad sign if for the first serious use of an abstraction you need to create another abstraction to work work around shortcomings in the first abstraction. However, I don't think it's even needed in this case.

Why is that bad? I designed this so that it is cleaner compared to AuthenticationDataSource where you have a mixture of interfaces to support different authentication methods. I imagine in the future, different runtimes will have different requirements / interfaces needed to support authentication. It doesn't make sense to clutter them all together.

The functionAuthData returned by cacheAuthData should contain the actual token.

Why should functionAuthData contain the actual token? This is implementation specific data. Different implementations of FunctionAuthenticationProvider should have the flexibility to use it in a way that makes sense for the implementation.

The caller of cacheAuthData can then create a secret, and then mount that secret on each of the instance pods.

I think there is some misunderstanding here. The architecture of Pulsar Function decouples submitting functions and running functions. The worker that the user actually submits the function do is not necessarily going to be the same worker that is going to run the function. However, the auth data will only be passed to the worker that the user at first submits his or her function to. That is why that worker needs to be able to distribute that auth data or data based on that auth data to the rest of the workers that will potentially need to run a function instance. The interface cacheAuthData returns the data that needs to be distributed to the other workers via the function metadata topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is some misunderstanding here.

Ya, I needed to map out the interactions. See my top level comment.


/**
* Configure service account spec generated for function based on function auth data
* @param serviceAccount service account spec for the function
* @param functionAuthData function auth data
*/
void configureAuthDataKubernetesServiceAccount(V1ServiceAccount serviceAccount, FunctionAuthData functionAuthData);

/**
* Configure function statefulset spec based on function auth data
* @param statefulSet statefulset spec for function
* @param functionAuthData function auth data
*/
void configureAuthDataStatefulSet(V1StatefulSet statefulSet, FunctionAuthData functionAuthData);
}