New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-25445] No need to create local recovery dirs when disabled loc… #18306
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 7d972d4 (Sun Jan 09 05:35:41 UTC 2022) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
Could you help review? @dmvk Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @zuston! This already looks pretty good ;) I went through the usages of the LocalRecoveryDirectoryProvider
and the change seems safe.
My only concern is around proper handling of @Nullable
providers in LocalRecoveryConfig
, PTAL
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
Outdated
Show resolved
Hide resolved
Thanks for your review @dmvk By the way, why we make task local-recovery disabled defaultly? Only due to the uncompatility with unaligned checkpoint? |
The behavior of the local recovery highly depends on the state backend. AFAIK heap state backend still doesn't support it. For RocksDB you can find some context in https://issues.apache.org/jira/browse/FLINK-15507 |
Thank very much @dmvk . I’ll take a look |
Updated @dmvk Could you help review again? Thanks |
@dmvk Gentle ping. If you have time, could you help review it? At weekends, i will have more time to deal with it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @zuston, I've added few more minor comments. I think this is mostly good to go.
My primary concern would be that we should get rid of the SavepointLocalRecoveryProvider
from the state processor API, as it's no longer needed after this patch.
Thanks for working on this! 👍
return localStateDirectories == null | ||
? Optional.empty() | ||
: Optional.of(localStateDirectories); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return localStateDirectories == null | |
? Optional.empty() | |
: Optional.of(localStateDirectories); | |
return Optional.ofNullable(localStateDirectories); |
@@ -31,11 +35,11 @@ | |||
private final boolean localRecoveryEnabled; | |||
|
|||
/** Encapsulates the root directories and the subtask-specific path. */ | |||
@Nonnull private final LocalRecoveryDirectoryProvider localStateDirectories; | |||
@Nullable private final LocalRecoveryDirectoryProvider localStateDirectories; | |||
|
|||
public LocalRecoveryConfig( | |||
boolean localRecoveryEnabled, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to get rid of this parameter completely. This two parameters having the same meaning thing, just leads into some weird implementations.
For example we can now easily get rid of the SavepointLocalRecoveryProvider
from the state processor API.
@@ -44,9 +48,10 @@ public boolean isLocalRecoveryEnabled() { | |||
return localRecoveryEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return localRecoveryEnabled; | |
return localStateDirectories != null; |
|
||
/** | ||
* An {@link NullOfDirProviderException} is thrown when the {@link | ||
* LocalRecoveryDirectoryProvider} is null. | ||
*/ | ||
@Internal | ||
public static class NullOfDirProviderException extends RuntimeException { | ||
public NullOfDirProviderException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
|
||
public NullOfDirProviderException(String message) { | ||
super(message); | ||
} | ||
|
||
public NullOfDirProviderException() { | ||
this( | ||
"When task local recovery is enabled, the NullPointerException of LocalRecoveryDirectoryProvider should not happen."); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** | |
* An {@link NullOfDirProviderException} is thrown when the {@link | |
* LocalRecoveryDirectoryProvider} is null. | |
*/ | |
@Internal | |
public static class NullOfDirProviderException extends RuntimeException { | |
public NullOfDirProviderException(String message, Throwable cause) { | |
super(message, cause); | |
} | |
public NullOfDirProviderException(String message) { | |
super(message); | |
} | |
public NullOfDirProviderException() { | |
this( | |
"When task local recovery is enabled, the NullPointerException of LocalRecoveryDirectoryProvider should not happen."); | |
} | |
} | |
public static Supplier<IllegalStateException> localRecoveryNotEnabled() { | |
return () -> | |
new IllegalStateException( | |
"Getting a LocalRecoveryDirectoryProvider is only supported with the local recovery enabled. This is a bug and should be reported."); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Updated @dmvk. Could you help check again? |
Gentle ping @dmvk . Thanks :) |
…al-recovery in TaskExecutorLocalStateStoresManager
Rebase latest master and squash all commits. @dmvk If you have time, could you help review again? Thanks :) |
@flinkbot run azure |
@dmvk Gentle ping. Could u help review it ? Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍 Thanks for the contribution
…al-recovery in TaskExecutorLocalStateStoresManager
What is the purpose of the change
TaskExecutor
will create file, checklocalRecoveryEnabled
and load local state store for each task submission in methodlocalStateStoreForSubtask
. For batch jobs, thelocalRecoveryEnabled
is always false, and there's no need to create these local files for task inTaskExecutor
Brief change log
When disable task local recovery, it will not create subtask state dirs.
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation