New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementing authentication for Pulsar Functions #3735
Changes from 7 commits
594bcc8
1fabe0b
313bcba
027e9f7
3c7e5a2
3ebaf54
804d178
903aa81
6a1a194
61414aa
814e50d
17f14c1
59b437c
9e71238
a163eaa
7df641b
d9f42fc
5a3a621
9b390c5
91aff80
510a23f
51f33ff
29885af
3a4e66f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,6 +141,11 @@ message FunctionMetaData { | |
uint64 version = 3; | ||
uint64 createTime = 4; | ||
map<int32, FunctionState> instanceStates = 5; | ||
FunctionAuthenticationSpec functionAuthSpec = 6; | ||
} | ||
|
||
message FunctionAuthenticationSpec { | ||
string data = 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am wondering why are you using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't know what kind of params/data would need to be stored so I just based this off of the authParams String format we already have in Pulsar. We can change this to bytes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also you can always base64 encode a binary to be stored as a string There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jerrypeng bytes is a more natural representation :-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sijie i have changed |
||
} | ||
|
||
message Instance { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.functions.auth; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.pulsar.broker.authentication.AuthenticationDataSource; | ||
import org.apache.pulsar.client.impl.auth.AuthenticationToken; | ||
import org.apache.pulsar.functions.instance.AuthenticationConfig; | ||
import org.apache.pulsar.functions.proto.Function; | ||
|
||
import javax.naming.AuthenticationException; | ||
|
||
import static org.apache.pulsar.broker.authentication.AuthenticationProviderToken.HTTP_HEADER_VALUE_PREFIX; | ||
import static org.apache.pulsar.client.impl.auth.AuthenticationDataToken.HTTP_HEADER_NAME; | ||
|
||
public class ClearTextFunctionTokenAuthProvider implements FunctionAuthProvider { | ||
@Override | ||
public void configureAuthenticationConfig(AuthenticationConfig authConfig, Function.FunctionAuthenticationSpec functionAuthenticationSpec) { | ||
authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName()); | ||
authConfig.setClientAuthenticationParameters("token://" + functionAuthenticationSpec.getData()); | ||
} | ||
|
||
@Override | ||
public Function.FunctionAuthenticationSpec cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception { | ||
String token = null; | ||
try { | ||
token = getToken(authenticationDataSource); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
if (token != null) { | ||
return Function.FunctionAuthenticationSpec.newBuilder().setData(token).build(); | ||
} | ||
return null; | ||
} | ||
|
||
@Override | ||
public void cleanUpAuthData(String tenant, String namespace, String name, Function.FunctionAuthenticationSpec | ||
functionAuthenticationSpec) throws Exception { | ||
//no-op | ||
} | ||
|
||
private String getToken(AuthenticationDataSource authData) throws AuthenticationException { | ||
if (authData.hasDataFromCommand()) { | ||
// Authenticate Pulsar binary connection | ||
return authData.getCommandData(); | ||
} else if (authData.hasDataFromHttp()) { | ||
// Authentication HTTP request. The format here should be compliant to RFC-6750 | ||
// (https://tools.ietf.org/html/rfc6750#section-2.1). Eg: Authorization: Bearer xxxxxxxxxxxxx | ||
String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME); | ||
if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) { | ||
throw new AuthenticationException("Invalid HTTP Authorization header"); | ||
} | ||
|
||
// Remove prefix | ||
String token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length()); | ||
return validateToken(token); | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
private String validateToken(final String token) throws AuthenticationException { | ||
if (StringUtils.isNotBlank(token)) { | ||
return token; | ||
} else { | ||
throw new AuthenticationException("Blank token found"); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.functions.auth; | ||
|
||
import org.apache.pulsar.broker.authentication.AuthenticationDataSource; | ||
import org.apache.pulsar.functions.instance.AuthenticationConfig; | ||
import org.apache.pulsar.functions.proto.Function; | ||
|
||
public interface FunctionAuthProvider { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you please add javadoc comments in the interface? otherwise other people will have difficulties on understanding how to implement an Auth provider. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a very high level question here - why do you need a separate AuthProvider? why can you use the AuthenticationProvider interface at the broker? what are the real differences between these two interfaces? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the AuthenticationProvider is interface to simply do authentication. The interface here for functions is a little bit more involved. It needs to have to ability to:
This interface doesn't actually authenticate a user but facilitates the authentication process for functions. Whether the name of the interface is appropriate can be up for discussion There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. A better name or a javadoc for this class is good to have for this class. |
||
|
||
void configureAuthenticationConfig(AuthenticationConfig authConfig, Function.FunctionAuthenticationSpec functionAuthenticationSpec); | ||
|
||
Function.FunctionAuthenticationSpec cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we avoid exposing protobuf structures in the interface? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup we can have a wrapper for this |
||
|
||
void cleanUpAuthData(String tenant, String namespace, String name, Function.FunctionAuthenticationSpec functionAuthenticationSpec) throws Exception; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.functions.auth; | ||
|
||
import io.kubernetes.client.models.V1ServiceAccount; | ||
import io.kubernetes.client.models.V1StatefulSet; | ||
import org.apache.pulsar.functions.proto.Function; | ||
|
||
public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a bad sign if for the first serious use of an abstraction you need to create another abstraction to work work around shortcomings in the first abstraction. However, I don't think it's even needed in this case. The functionAuthData returned by cacheAuthData should contain the actual token. The caller of cacheAuthData can then create a secret, and then mount that secret on each of the instance pods. When the instance runs, it can always check for this secret and if it is mounted, the data can be read in and passed to configureAuthenticationConfig. I don't see why service accounts are needed at all here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why is that bad? I designed this so that it is cleaner compared to AuthenticationDataSource where you have a mixture of interfaces to support different authentication methods. I imagine in the future, different runtimes will have different requirements / interfaces needed to support authentication. It doesn't make sense to clutter them all together.
Why should functionAuthData contain the actual token? This is implementation specific data. Different implementations of FunctionAuthenticationProvider should have the flexibility to use it in a way that makes sense for the implementation.
I think there is some misunderstanding here. The architecture of Pulsar Function decouples submitting functions and running functions. The worker that the user actually submits the function do is not necessarily going to be the same worker that is going to run the function. However, the auth data will only be passed to the worker that the user at first submits his or her function to. That is why that worker needs to be able to distribute that auth data or data based on that auth data to the rest of the workers that will potentially need to run a function instance. The interface cacheAuthData returns the data that needs to be distributed to the other workers via the function metadata topic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ya, I needed to map out the interactions. See my top level comment. |
||
|
||
void configureAuthDataKubernetesServiceAccount(Function.FunctionAuthenticationSpec functionAuthenticationSpec, V1ServiceAccount sa); | ||
|
||
void configureAuthDataStatefulSet(Function.FunctionAuthenticationSpec functionAuthenticationSpec, V1StatefulSet statefulSet); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than making this public, you should make getToken(AuthenticationDataSource) static and call it from the other locations.