Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31109][yarn] Support Hadoop proxy user when delegation token f… #22009

Merged
merged 3 commits into from
Feb 26, 2023

Conversation

venkata91
Copy link
Contributor

…etch is disabled

What is the purpose of the change

FLINK-28330 removed old delegation token framework code as part of it removed the existing support for delegation tokens that are managed outside of Flink

Brief change log

  • As part of YarnClusterDescriptor#setTokensFor load the available delegation tokens in the client machine and then load the tokens obtained through DelegationTokenManager if security.delegation.tokens.enabled is set to true.
  • HadoopModule should throw exception if the UserGroupInformation.currentUser is a hadoop proxy user and also security.delegation.tokens.enabled is set to true.

Verifying this change

  • Added tests in the HadoopModuleTest and in KerberosLoginProviderITCase.
  • Manually tested in our env where delegation token fetch is managed.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@venkata91
Copy link
Contributor Author

Please review. cc @MartijnVisser @gaborgsomogyi @becketqin

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 24, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@venkata91
Copy link
Contributor Author

Looking into the test failure, somehow it worked locally but failing in the pipeline.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing it! Some minors found but when fixed then it's mergeble.

@gaborgsomogyi
Copy link
Contributor

e2e tests are still running so most probably it's going to pass.

@MartijnVisser
Copy link
Contributor

@gaborgsomogyi So all there's left are the minor review comments and then it's ready to merge from your perspective?

@gaborgsomogyi
Copy link
Contributor

@MartijnVisser when then mentioned suggestions are fixed then it can go. Planning to track the changes and approve it when I think it's flawless.

@gaborgsomogyi
Copy link
Contributor

@MartijnVisser everything is green so good to go from.my side.

@MartijnVisser MartijnVisser merged commit 7813613 into apache:master Feb 26, 2023
@MartijnVisser
Copy link
Contributor

@venkata91 Can you also create a back port towards release-1.17?

MartijnVisser pushed a commit to MartijnVisser/flink that referenced this pull request Feb 27, 2023
…etch is disabled. This closes apache#22009

* [FLINK-31109][yarn] Support Hadoop proxy user when delegation token fetch is disabled

(cherry picked from commit 7813613)
MartijnVisser pushed a commit that referenced this pull request Feb 27, 2023
…etch is disabled. This closes #22009

* [FLINK-31109][yarn] Support Hadoop proxy user when delegation token fetch is disabled

(cherry picked from commit 7813613)
@venkata91
Copy link
Contributor Author

@venkata91 Can you also create a back port towards release-1.17?

@MartijnVisser Looks like you already backported it. I was about to get to it. Thanks for backporting it to release-1.17 as well.

@wForget
Copy link
Member

wForget commented May 21, 2024

I've tried specifying proxyUser using HADOOP_PROXY_USER and maintaining the delegation tokens with an external service and sending them into jobmanager, but I can't seem to update delegation tokens of taskmanager.

Flink seems to use DefaultDelegationTokenManager#listener to update the delegation tokens of taskmanager. Can we provide a custom DelegationTokenManager?

@venkata91 @gaborgsomogyi Could you please take a look?

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented May 21, 2024

I've tried specifying proxyUser using HADOOP_PROXY_USER and maintaining the delegation tokens with an external service and sending them into jobmanager, but I can't seem to update delegation tokens of taskmanager.

Not sure what you mean here. If you update the tokens externally then it's the external code responsibility to do its job properly, right?

Flink seems to use DefaultDelegationTokenManager#listener to update the delegation tokens of taskmanager. Can we provide a custom DelegationTokenManager?

In your own distribution you can do anything you can imagine. There is no configuration infra for this so you need to change the source (namely DefaultDelegationTokenManagerFactory).

In general, what is the use-case what you would like to achieve here? If I understand correctly the goal is to have 2 users in a single cluster (proxy user + the other one who is creating tokens)?

@wForget
Copy link
Member

wForget commented May 21, 2024

@gaborgsomogyi Thank you for your reply.

I want to implement impersonation mode for flink engine in Apahce Kyuubi. Flink proxy user mode relies on external services to maintain delegation tokens, and kyuubi server can do this well.

Kyuubi server uses super user proxy to sessionUser to obtain delegation tokens, and brings them to flink jobmanger when starting the flink engine. It will regularly update delegation tokens and send them to jobmanager, and at the same time I want to update taskmanager tokens (but I can't find a suitable way).

Related issues:
apache/kyuubi#6367
apache/kyuubi#6368

@wForget
Copy link
Member

wForget commented May 21, 2024

In your own distribution you can do anything you can imagine. There is no configuration infra for this so you need to change the source (namely DefaultDelegationTokenManagerFactory).

Is this improvement reasonable? Can we introduce it upstream?

@gaborgsomogyi
Copy link
Contributor

Just to give some historical background Flink (just like Spark) can't really guarantee that an externally given proxy user TGT will be renewed. This may end-up in stopped streaming workloads which will be bad UX for users. As a conclusion we decided to say plz manage it externally. If some internal Flink constructs can be re-used to do that then it's fine but that was not an explicit aim.

Is this improvement reasonable? Can we introduce it upstream?

Adding a config which manager implementation should be used is no brainer but the default manager has some requirements in its constructor:

public DefaultDelegationTokenManager(
Configuration configuration,
@Nullable PluginManager pluginManager,
@Nullable ScheduledExecutor scheduledExecutor,
@Nullable ExecutorService ioExecutor) {

When we would say that the manager can be changed that would mean this constructor param set must be a user facing API which I wouldn't do. These however are subject to change.

As a conclusion I can imagine the following:

  • update the UserGroupInformation directly (maybe with side car) and turn off Flink delegation token manager
  • create a custom Flink distro where custom manager is included (I think this can't be really an option for you)

Flink and Spark session cluster scalable multi-user handling is something which can be enhanced but it's pending.

@wForget
Copy link
Member

wForget commented May 21, 2024

update the UserGroupInformation directly (maybe with side car) and turn off Flink delegation token manager

Yes, I want to update UserGroupInformation directly, but this update is difficult to pass to taskmanagers since we turned off delegation token manager. If there are any exposed interfaces, that would be great.

public void onNewTokensObtained(byte[] tokens) throws Exception {
latestTokens.set(tokens);
log.info("Updating delegation tokens for {} task manager(s).", taskExecutors.size());
if (!taskExecutors.isEmpty()) {
final List<CompletableFuture<Acknowledge>> futures =
new ArrayList<>(taskExecutors.size());
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry :
taskExecutors.entrySet()) {
WorkerRegistration<WorkerType> registration = workerRegistrationEntry.getValue();
log.info("Updating delegation tokens for node {}.", registration.getNodeId());
final TaskExecutorGateway taskExecutorGateway =
registration.getTaskExecutorGateway();
futures.add(taskExecutorGateway.updateDelegationTokens(getFencingToken(), tokens));
}
FutureUtils.combineAll(futures).get();
}

@gaborgsomogyi
Copy link
Contributor

The framework is so flexible that one can do anything with it 🙂
One thing which just came into my mind:

  • Remove HadoopModuleFactory from security.module.factory.classes
  • Turn off all existing delegation token providers security.delegation.token.provider.X.enabled=false
  • Create your own DelegationTokenProvider + DelegationTokenReceiver implementation(s)
  • Implement any custom logic what you can imagine there
  • Add them to flink/libs as external jar
  • Enjoy

@wForget
Copy link
Member

wForget commented May 21, 2024

The framework is so flexible that one can do anything with it 🙂 One thing which just came into my mind:

  • Remove HadoopModuleFactory from security.module.factory.classes
  • Turn off all existing delegation token providers security.delegation.token.provider.X.enabled=false
  • Create your own DelegationTokenProvider + DelegationTokenReceiver implementation(s)
  • Implement any custom logic what you can imagine there
  • Add them to flink/libs as external jar
  • Enjoy

Sounds great, thanks, I will try this way.

@wForget
Copy link
Member

wForget commented May 27, 2024

The framework is so flexible that one can do anything with it 🙂 One thing which just came into my mind:

  • Remove HadoopModuleFactory from security.module.factory.classes
  • Turn off all existing delegation token providers security.delegation.token.provider.X.enabled=false
  • Create your own DelegationTokenProvider + DelegationTokenReceiver implementation(s)
  • Implement any custom logic what you can imagine there
  • Add them to flink/libs as external jar
  • Enjoy

Thanks @gaborgsomogyi , this is an effective way, I've tried it in kyuubi and it mostly works well. Just one point, since we disabled security.delegation.token.provider.hadoopfs.enabled, hdfs token will not be fetched and sent to yarn am container.

for (Map.Entry<String, byte[]> e : container.getTokens().entrySet()) {
if (e.getKey().equals("hadoopfs")) {
credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue()));
}
}

Shall we use DelegationTokenReceiverRepository here to receive tokens obtained by DelegationTokenManager?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants