-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30425][runtime][security] Generalize token receive side #21604
Conversation
FYI, the manual test gist updated to use the operator. This way we're not bound to local machine at all. |
I've tested w/ keytab and TGT cache scenario and both worked. I've added expected exception throwing on the receiver side as well (just like it exists on the provider side) to simulate temporary failures. |
de99236
to
051bf28
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a couple nits on a cursory look, will do manual testing and a deeper code review tomorrow.
...ime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
Outdated
Show resolved
Hide resolved
...ime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
Outdated
Show resolved
Hide resolved
provider.serviceName(), | ||
e); | ||
provider.serviceName()); | ||
LOG.debug(e.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some explanation to this log message since given the multi-threaded nature of Flink it might not directly follow the previous line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part has been changed significantly so I think worth a second look.
@@ -120,8 +135,8 @@ private Map<String, DelegationTokenProvider> loadProviders() { | |||
} catch (Exception | NoClassDefFoundError e) { | |||
LOG.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@XComp This is the part what I've meant. No more stack trace going to be printed in WARN level. Though we print out the failure for debugging purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this part has been reworked but the main statement is the same. We're not writing out stacktrace unless we stop the workload.
f95df6c
to
328e2f6
Compare
/** | ||
* The general rule how a provider/receiver must behave is the following: The provider and | ||
* the receiver must be added to the classpath together with all the additionally required | ||
* dependencies. | ||
* | ||
* <p>This null check is required because the HBase provider is always on classpath but | ||
* HBase jars are optional. Such case configuration is not able to be loaded. This construct | ||
* is intended to be removed when HBase provider/receiver pair can be externalized (namely | ||
* if a provider/receiver throws an exception then workload must be stopped). | ||
*/ | ||
if (hbaseConf == null) { | ||
LOG.debug( | ||
"HBase is not available (not packaged with this application), hence no " | ||
+ "tokens will be acquired."); | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is temporary solution as mentioned before.
/** | ||
* The general rule how a provider/receiver must behave is the following: The provider and | ||
* the receiver must be added to the classpath together with all the additionally required | ||
* dependencies. | ||
* | ||
* <p>This null check is required because the Hadoop FS provider is always on classpath but | ||
* Hadoop FS jars are optional. Such case configuration is not able to be loaded. This | ||
* construct is intended to be removed when HBase provider/receiver pair can be externalized | ||
* (namely if a provider/receiver throws an exception then workload must be stopped). | ||
*/ | ||
if (hadoopConfiguration == null) { | ||
LOG.debug( | ||
"Hadoop FS is not available (not packaged with this application), hence no " | ||
+ "tokens will be acquired."); | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is temporary solution as mentioned before.
// The intentional general rule is that if a provider's init method throws exception | ||
// then stop the workload | ||
LOG.error( | ||
"Failed to initialize delegation token provider {}", | ||
provider.serviceName(), | ||
e); | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will be the most clean statement for later development. The actually existing HBase + HadoopFS providers are exceptional because the're not externalized. The problem w/ them is that the provider/receiver is always on classpath but HadoopFS/HBase dependency is optional.
- The temporary solution is to catch no class found exception within the provider and ignore the provider in
delegationTokensRequired
if conf is null - The long term solution is to externalize both of them and expect provider/receiver + all dependencies together on the classpath
try { | ||
delegationTokenReceivers.get(serviceName).onNewTokensObtained(tokens); | ||
} catch (Exception e) { | ||
LOG.warn("Failed to send tokens to delegation token receiver {}", serviceName, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We handle receiver failures as temporary issue and we're going forward to increase resiliency. This way maybe not all of the tokens stored but still we're increasing the availability of the overall system.
@AfterAll | ||
public static void afterAll() { | ||
@AfterEach | ||
public void afterEach() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a minor improvement which is not related. We need to do reset after each test and not after class.
5b8cbcb
to
1e225ce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't do a proper check since @mbalassi is already helping out and he's probably the better person to judge. I have two comments while flying over some part of the code.
@@ -58,6 +59,13 @@ | |||
@Internal | |||
public class DefaultDelegationTokenManager implements DelegationTokenManager { | |||
|
|||
private static final String PROVIDER_RECEIVER_INCONSISTENCY_ERROR = | |||
" There is an inconsistency between loaded delegation token providers and receivers. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
" There is an inconsistency between loaded delegation token providers and receivers. " | |
"There is an inconsistency between loaded delegation token providers and receivers. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
"Failed to initialize delegation token provider {}", | ||
provider.serviceName(), | ||
e); | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we do either logging or throwing the exception and if we decide to throw the exception, adding the log message as the error message might be useful. 🤔 FlinkRuntimeException
might be useful since it indicates that the exception was actually initiated by Flink code. ...just as an idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to use FlinkRuntimeException
, changed on both places.
Waiting w/ the next push to see a full run whether there are test failures... |
d98b162
to
4fce8e2
Compare
Do we have test cases for the specific providers/receivers, like HBase/HadoopFS? |
There is an end-to-end test for HDFS but for HBase we don't have anything. Honestly it's horror complex to setup everything within a single process, like KDC, HDFS, HBase, Flink app. It's definitely doable but I personally think that it must be included within the connector when externalized. |
|
||
import org.apache.hadoop.security.Credentials; | ||
import org.apache.hadoop.security.UserGroupInformation; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** Delegation token updater functionality. */ | ||
/** Hadoop delegation token receiver base class. */ | ||
@Internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could I use this class for the HMSDelegationTokenReceiver implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could but since it's @Internal
it's subject to change.
4fce8e2
to
d74db13
Compare
d74db13
to
3f19253
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested the changes locally on minikube following @gaborgsomogyi's instructions. Looks good, will wait for the CI to complete after the recent rebase to master.
Thanks, seems like nowadays we have less and less flaky tests and passed🙂 |
What is the purpose of the change
Flink delegation token obtain side is generic already but on the receiver side there is still an assumption that only Hadoop tokens can come. In this PR I've introduced
DelegationTokenReceiver
interface which can be implemented by every.Brief change log
DelegationTokenReceiver
interfaceHadoopFSDelegationTokenReceiver
for Hadoop FSHBaseDelegationTokenReceiver
for HBaseDelegationTokenReceiverRepository
which loads all receivers, and propagates new tokens to themVerifying this change
Existing/new unit/integration tests + manually on minikube.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation