-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-25329][runtime] Support memory execution graph store in session cluster #18360
[FLINK-25329][runtime] Support memory execution graph store in session cluster #18360
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit ff1d101 (Fri Jan 14 05:19:58 UTC 2022) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
ff1d101
to
b5d9b58
Compare
@flinkbot run azure |
@rmetzger Can you please help to review this PR when you're free, thanks |
Thanks for the PR, @zjureel . I think it is a valid improvement. Regarding the implementation, I think we can simply introduce a config option "jobstore.flush-to-disk" to |
93ceba0
to
b10b4c3
Compare
Thanks @KarmaGYZ I have updated the codes and add a config option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @zjureel . It LGTM except for some minor comments. Please address them and squash your commits.
...k-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
Outdated
Show resolved
Hide resolved
...k-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
Outdated
Show resolved
Hide resolved
...ntime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
Outdated
Show resolved
Hide resolved
Thanks @KarmaGYZ , I have updated the doc and comments in |
65220a8
to
56fee29
Compare
@zjureel I think we need also ignore the |
6915706
to
5d6e58a
Compare
Thanks @KarmaGYZ , it doesn't matter and I have used |
"'Memory': the memory job store keeps the archived execution graphs in memory and " | ||
+ " it may cause FullGC or OOM when there are too many graphs")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"'Memory': the memory job store keeps the archived execution graphs in memory and " | |
+ " it may cause FullGC or OOM when there are too many graphs")) | |
"'Memory': the memory job store keeps the archived execution graphs in memory. You " | |
+ "may need to limit the %s to mitigate FullGC or OOM when there are too many graphs", | |
code(JOB_STORE_MAX_CAPACITY.key()))) |
*/ | ||
public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore { | ||
|
||
private final Map<JobID, ExecutionGraphInfo> serializableExecutionGraphInfos = new HashMap<>(4); | ||
private final Map<JobID, ExecutionGraphInfo> serializableExecutionGraphInfos = | ||
new ConcurrentHashMap<>(4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new ConcurrentHashMap<>(4); | |
new ConcurrentHashMap<>(); |
/** Capacity of the memory store, 0 means unlimited. */ | ||
private final int maximumCapacity; | ||
|
||
private final ScheduledExecutor scheduledExecutor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final ScheduledExecutor scheduledExecutor; | |
@Nullable private final ScheduledExecutor scheduledExecutor; |
private final int maximumCapacity; | ||
|
||
private final ScheduledExecutor scheduledExecutor; | ||
private ScheduledFuture<?> cleanupFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private ScheduledFuture<?> cleanupFuture; | |
@Nullable private ScheduledFuture<?> cleanupFuture; |
new ConcurrentHashMap<>(4); | ||
|
||
/** Job id with creation timestamp. */ | ||
private final Queue<JobGraphTimestamp> jobGraphQueue = new ConcurrentLinkedQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can leverage the guava cache to handle the cleanup, expiration and concurrent access. Please refer to how we use it in FileExecutionGraphInfoStore
. We can maintain a Cache<JobID, ExecutionGraphInfo> serializableExecutionGraphInfos
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I address it myself in https://github.com/KarmaGYZ/flink/tree/FLINK_25329_support_memory_store_in_session . You can take a look and fix the test issue.
* Base test case of execution graph store for {@link FileExecutionGraphInfoStore} and {@link | ||
* MemoryExecutionGraphInfoStore}. | ||
*/ | ||
public abstract class ExecutionGraphInfoStoreTest extends TestLogger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can make it a utility class.
Thanks @KarmaGYZ , use cache in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. The PR is now LGTM. Please address the left minor comments and squash the commits. I'll merge it once CI passed.
|
||
@Override | ||
public int size() { | ||
return serializableExecutionGraphInfos.size(); | ||
return (int) serializableExecutionGraphInfos.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return (int) serializableExecutionGraphInfos.size(); | |
return Math.toIntExact(serializableExecutionGraphInfos.size()); |
* Test utils class for {@link FileExecutionGraphInfoStore} and {@link | ||
* MemoryExecutionGraphInfoStore}. | ||
*/ | ||
public class ExecutionGraphInfoStoreTestUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of extracting the shared utilities for memory and file store tests. I think this change should belong to a separate hotfix commit.
executionGraphInfoStore.get(executionGraphInfo.getJobId()), | ||
Matchers.nullValue()); | ||
|
||
// check that the persisted file has been deleted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// check that the persisted file has been deleted | |
// check that the store is empty |
|
||
/** Tests that all persisted files are cleaned up after closing the store. */ | ||
@Test | ||
public void testCloseCleansUp() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use try
clause here in case there is IOException
from Store#put
.
72130f1
to
8c7f47b
Compare
...time/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
Show resolved
Hide resolved
8c7f47b
to
8e4a673
Compare
…onGraphInfoStoreTestUtils
…mory graph store in session cluster This closes apache#18360.
8e4a673
to
c6b8f50
Compare
…mory graph store in session cluster This closes apache#18360.
What is the purpose of the change
This PR aims to support memory execution graph store in session cluster
Brief change log
Verifying this change
This change added tests and can be verified as follows:
MemoryExecutionGraphInfoStoreTest
that test the methods inMemoryExecutionGraphInfoStore
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no) noDocumentation