Skip to content

Conversation

gaborgsomogyi
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi commented Feb 8, 2022

What is the purpose of the change

It adds a pluggable delegation token framework to Flink. From high level perspective delegation token framework is loaded in all deployment modes when:

  • security.kerberos.fetch.delegation-token is true
  • hadoop-common dependency is on classpath

Please note that this PR is not adding the whole feature, there are several subtasks in FLINK-21232 which needs to be solved as well.

Brief change log

  • Added generic DelegationTokenManager which is loaded in ClusterEntrypoint
  • Added DelegationTokenProvider API
  • DelegationTokenProvider implementations are loaded by DelegationTokenManager which is covered in unit tests
  • Added delegation token serialization/deserialization functionality with unit tests
  • Added the new framework usage to YarnClusterDescriptor

Verifying this change

  • Existing + new unit tests
  • Manually
    • security.kerberos.fetch.delegation-token=true + hadoop-common dependency is on classpath
    • security.kerberos.fetch.delegation-token=false + hadoop-common dependency is on classpath
    • security.kerberos.fetch.delegation-token=true + hadoop-common dependency is NOT on classpath
    • security.kerberos.fetch.delegation-token=false + hadoop-common dependency is NOT on classpath

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? All documentation is intended to be added in FLINK-25911 when everything works as a whole

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 8, 2022

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 20cf691 (Tue Feb 08 13:02:17 UTC 2022)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 8, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gaborgsomogyi gaborgsomogyi force-pushed the FLINK-25907 branch 5 times, most recently from ade4165 to 323538b Compare February 9, 2022 08:12
@gaborgsomogyi gaborgsomogyi changed the title [FLINK-25907][SECURITY] Add pluggable delegation token manager [FLINK-25907][runtime][security] Add pluggable delegation token manager Feb 9, 2022
@gaborgsomogyi
Copy link
Contributor Author

cc @dmvk @gyfora @mbalassi @zentol

Copy link
Member

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @gaborgsomogyi, I think this goes in a right direction. My main concern is about how / where the DTM is instantiated. Also it would be good to hide the implementation behind an interface for easier testing.


private static final Logger LOG = LoggerFactory.getLogger(HadoopDependency.class);

public static boolean isHadoopCommonOnClasspath(ClassLoader classLoader) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: javadoc

I really like this, that class not found exception was confusing users for ages :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add javadoc to crystal clear functions but I think this is just waste of java compiler time.
If we have an agreement to add such then I would add it of course...

new ResourceManagerException("Error while shutting down resource manager", e);
}

delegationTokenManager.ifPresent(DelegationTokenManager::stop);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to handle exceptions here as with the other services?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding it but it would be good to understand that do heartbeat services need to do the same or they're somehow different?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut feeling would be that they should follow the same pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I've added the suggestion and we can discuss whether we want to add the mentioned change in a separate PR for the old code.

Comment on lines 1266 to 1276
DelegationTokenManager delegationTokenManager =
new DelegationTokenManager(flinkConfiguration);
delegationTokenManager.obtainDelegationTokens(credentials);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This makes me think whether DTM should be Closeable instead of having stop() method. I know that start / stop methods are intended for the renewal, but it feels like the obtainDelegationTokens will also need to create a resource for pulling the token out of KDC that needs be closed 🤔

Suggested change
DelegationTokenManager delegationTokenManager =
new DelegationTokenManager(flinkConfiguration);
delegationTokenManager.obtainDelegationTokens(credentials);
try (final DelegationTokenManager delegationTokenManager =
new DelegationTokenManager(flinkConfiguration) {
delegationTokenManager.obtainDelegationTokens(credentials);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obtainDelegationTokens obtains tokens but these are mustn't be invalidated if you mean that.
start/stop just starts and stops automatic re-obtain and propagation. Not yet see why stopping this needs to be called close. In this case close would be no-op because manual obtain is used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant, that it probably needs to open some kind of kerberos client underneath that needs to be closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, UGI is used which doesn't need to be closed.

@gaborgsomogyi
Copy link
Contributor Author

Oh gosh, resolving the conflict...

@gaborgsomogyi
Copy link
Contributor Author

@flinkbot run azure

@gaborgsomogyi
Copy link
Contributor Author

@dmvk Thanks for the quick review. Hope this round of azure is going to pass.
I've solved all the suggestion or left a comment where I would like to discuss things further.
If you can have another round I would appreciate...

@dmvk
Copy link
Member

dmvk commented Feb 11, 2022

@gaborgsomogyi Thanks for updating the PR. Sure will do, but right now the team is really busy with the feature freeze, so I can get back to this after that.

In general I think it will be better to start merging DTM related PRs into 1.16, so we don't release something that's not yet complete that might confuse users.

@gaborgsomogyi
Copy link
Contributor Author

right now the team is really busy with the feature freeze

Sure, I know OSP is not lightning fast at some points, just wanted to say I'm not touching the code until further discussion.

In general I think it will be better to start merging DTM related PRs into 1.16

I've planned this feature to be done there, so same side.

@gaborgsomogyi
Copy link
Contributor Author

@flinkbot run azure

@gaborgsomogyi
Copy link
Contributor Author

@dmvk since there was a branch cut may I ask to have another round please?

@dmvk
Copy link
Member

dmvk commented Mar 16, 2022

Hi @gaborgsomogyi, I'm on vacation until 22nd; maybe @zentol or @XComp could take a look instead?

@gaborgsomogyi
Copy link
Contributor Author

Have fun then :)

@gaborgsomogyi
Copy link
Contributor Author

@dmvk hope you've had fun, recharged and I would like to ask then to have another round please.

Copy link
Member

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, thanks for the contribution Gabor 👍 I've added few cosmetic comments, PTAL. Could you please fix the commit history, so it's ready for merge (I guess this could be done in a single commit)?


@Override
public void obtainDelegationTokens(Credentials credentials) {
LOG.debug("obtainDelegationTokens");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep this debug messages? Personally I'd be in favor of removing them as this could cause a confusion when user enables the debug logging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I was trying to debug such a framework w/o such messages and it was complete nightmare. Nobody knew what started/happened when. Maybe these can be more descriptive but I'm pretty sure that if the system is not doing what we want then something is needed.

*/
public class ExceptionThrowingDelegationTokenProvider implements DelegationTokenProvider {

public static volatile boolean enabled = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of this approach as it's fairly fragile and the tests that are using the class can not be parallelized. 🤔

It's probably ok-ish as long as this is used only by a single test class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fully agree that it's ugly but I haven't had better idea which is not an overkill. If this would be used from multiple places then something different is needed for sure.

@gaborgsomogyi
Copy link
Contributor Author

gaborgsomogyi commented Apr 1, 2022

Could you please fix the commit history, so it's ready for merge (I guess this could be done in a single commit)?

Do you mean squash all commits in the branch into a single one?

Seems like there are things to learn here for me. When I've squashed commits on github then the comments are blown up all the time. I'm basically fine with that just wanted to ask before I do something.

@gaborgsomogyi gaborgsomogyi force-pushed the FLINK-25907 branch 2 times, most recently from eb0db2a to 3f85e9c Compare April 1, 2022 08:57
@gaborgsomogyi
Copy link
Contributor Author

I've squashed everything into a single commit.

@gaborgsomogyi
Copy link
Contributor Author

Unrelated issue:
[ERROR] Failed to execute goal on project flink-connector-hbase-1.4: Could not resolve dependencies for project org.apache.flink:flink-connector-hbase-1.4:jar:1.16-SNAPSHOT: Failed to collect dependencies at org.apache.hadoop:hadoop-minicluster:jar:2.8.5 -> org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.8.5: Failed to read artifact descriptor for org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.8.5: Could not transfer artifact org.apache.hadoop:hadoop-mapreduce-client-app:pom:2.8.5 from/to google-maven-central (https://maven-central-eu.storage-download.googleapis.com/maven2/): Connect to maven-central-eu.storage-download.googleapis.com:443 [maven-central-eu.storage-download.googleapis.com/172.217.168.208] failed: Connection refused (Connection refused) -> [Help 1]

@gaborgsomogyi
Copy link
Contributor Author

Finally passed.

@dmvk dmvk merged commit 26aa543 into apache:master Apr 4, 2022
@gaborgsomogyi
Copy link
Contributor Author

@dmvk thanks for the review and taking care! New PRs are on the way...

@gaborgsomogyi gaborgsomogyi deleted the FLINK-25907 branch September 13, 2023 08:38
@dpengpeng
Copy link

@gaborgsomogyi Hello, I am currently using Flink version 1.15.2 and have encountered an issue with the HDFS delegation token expiring after 7 days in a Kerberos scenario.
I have seen your new delegation token framework (https://issues.apache.org/jira/browse/FLINK-21232) and have merged the code commits from 1 to 12 (Sub-Tasks 1-12) in the link into my Flink version 1.15.2.
Now, it is possible to refresh the delegation token periodically. However, after 7 days, if the JobManager or TaskManager needs to be restarted due to an exception [example:token (HDFS_DELEGATION_TOKEN token 31615466 for xx) can't be found in cache)],
I found that the Yarn container used to start JM/TM still uses the HDFS_DELEGATION_KIND that was generated the first time the job was submitted.
That is, the token in the container environment variable HADOOP_TOKEN_FILE_LOCATION file content has not been updated.
I would like to ask if your new framework supports refreshing the token corresponding to the HADOOP_TOKEN_FILE_LOCATION file.
Please let me know where the code logic is, and I would be very grateful.

@gaborgsomogyi
Copy link
Contributor Author

The token framework stores everything in-memory and it's not compatible with HADOOP_TOKEN_FILE_LOCATION. Such case the new container will load the file content with old tokens (YARN can keep them valid for 7 days). When the TM container then registers itself at the JM then it receives a new initial token set. When there is an HDFS access between these 2 steps then it will blow up.

My suggestion is to either use Flink delegation token framework or use external token management which is for example HADOOP_TOKEN_FILE_LOCATION.

I encourage you to either use the mailing list or slack.

@dpengpeng
Copy link

The token framework stores everything in-memory and it's not compatible with HADOOP_TOKEN_FILE_LOCATION. Such case the new container will load the file content with old tokens (YARN can keep them valid for 7 days). When the TM container then registers itself at the JM then it receives a new initial token set. When there is an HDFS access between these 2 steps then it will blow up.

My suggestion is to either use Flink delegation token framework or use external token management which is for example HADOOP_TOKEN_FILE_LOCATION.

I encourage you to either use the mailing list or slack.

@gaborgsomogyi Hello,Thank you for your response.
I have subscribed and sent an email to user@flink.apache.org, but there has been no response for several days.
I would like to consult whether your new delegation framework can solve scenarios where YARN containers fail to start when the HDFS delegation token expiring after 7 days.
The phenomenon indicates that the delegation token is used when starting the container.
I am using Flink's default Kerberos feature (security.kerberos.fetch.delegation-token=true),
and I have not specified the use of external token management, such as HADOOP_TOKEN_FILE_LOCATION.
I have now merged your code from the link provided(https://issues.apache.org/jira/browse/FLINK-21232), but I have not seen any effect.
Could you please explain how the new framework takes effect during container startup?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants