New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-9940] [API/DataStream][File source] out-of-order files were missed in continuous monitoring #6613
Conversation
…rder files were missed
Rebasing to apache/flink
Revert "Rebasing to apache/flink"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this pr. I have some minor comments below. In general, I would like to know whether it is possible that we could reduce the mutable variables in theContinuousFileMonitoringFunction
to reduce some complexity to some extend.
For the test part would you like to rebase master first? thanks
*/ | ||
private <OUT> ContinuousFileMonitoringFunction<OUT> createTestContinuousFileMonitoringFunction(FileInputFormat<OUT> format, FileProcessingMode fileProcessingMode) { | ||
ContinuousFileMonitoringFunction<OUT> monitoringFunction = | ||
new ContinuousFileMonitoringFunction<>(format, fileProcessingMode, 1, INTERVAL); | ||
new ContinuousFileMonitoringFunction<>(format, fileProcessingMode, 1, INTERVAL, READ_CONSISTENCY_OFFSET_INTERVAL); | ||
monitoringFunction.setRuntimeContext(Mockito.mock(RuntimeContext.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is best not to rely on mockito when writing tests. You could follow the coding guide https://flink.apache.org/contributing/code-style-and-quality-common.html
...n/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
Outdated
Show resolved
Hide resolved
private volatile long globalModificationTime = Long.MIN_VALUE; | ||
|
||
/** The maximum file modification time seen so far. */ | ||
private volatile long maxProcessedTime = Long.MIN_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Construction already initializes the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There has been a recent change which added the initialization for globalModificationTime in the constructor, thus I will be making the change for that variable as well.
/** The list of processed files having modification time within the period from globalModificationTime | ||
* to maxProcessedTime in the form of a Map<filePath, lastModificationTime>. */ | ||
private volatile Map<String, Long> processedFiles; | ||
|
||
private transient Object checkpointLock; | ||
|
||
private volatile boolean isRunning = true; | ||
|
||
private transient ListState<Long> checkpointedState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this state in the current implementation?
@@ -172,6 +236,26 @@ public void initializeState(FunctionInitializationContext context) throws Except | |||
LOG.debug("{} retrieved a global mod time of {}.", | |||
getClass().getSimpleName(), globalModificationTime); | |||
} | |||
if (retrievedStates2.size() == 1 && processedFiles.size() != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I miss something but I want to know why we need this check?
Currently, I find that the processedFiles.size()
always is 0 when initializeState
is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic was copied from the existing code (13 lines earlier in the same file) where the check was done
if (retrievedStates.size() == 1 && globalModificationTime != Long.MIN_VALUE) {
I thought there might be some edge cases which might lead to the situation where RestoreStates is called when the states have already been initialised.
Might need @kl0u 's help.
...n/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
Show resolved
Hide resolved
*/ | ||
private final long readConsistencyOffset; | ||
|
||
/** The current modification time watermark. */ | ||
private volatile long globalModificationTime = Long.MIN_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that we could not depend on this variable? And I think this would reduce the complexity of restoring logical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I explained in the corresponding JIRA ticket https://issues.apache.org/jira/browse/FLINK-9940, yes, this is needed.
@@ -376,9 +479,12 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { | |||
|
|||
this.checkpointedState.clear(); | |||
this.checkpointedState.add(this.globalModificationTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above
// This check is to ensure that globalModificationTime will not go backward | ||
// even if readConsistencyOffset is changed to a large value after a restore from checkpoint, | ||
// so files would be processed twice | ||
globalModificationTime = Math.max(maxProcessedTime - readConsistencyOffset, globalModificationTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could remove this logic if we do not use the globalModificationTime.
I'm closing this as "Abandoned", since there is no more activity. |
Sorry @lvhuyen for closing this, I am reopening it because I just noticed there is discussion on the related JIRA. |
Thank you Kostas.I have been waiting for Guowei's reply on the Jira ticket as it might render my PR redundant.
BTW, may I ask how you monitor the activeness of a PR? My PR was created almost 2 years ago, but most of the time it stayed in "waiting for review" state.
Thanks.Averell Huyen Levan
On Thursday, August 20, 2020, 07:43:22 PM GMT+10, Kostas Kloudas <notifications@github.com> wrote:
Sorry @lvhuyen for closing this, I am reopening it because I just noticed there is discussion on the related JIRA.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or unsubscribe.
|
# Conflicts: # flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
Sorry for the late reply. Thanks @lvhuyen very much for resolving the comments. What I concern is that why we need Of course maybe I miss some scenario which needs the two states. So would you like to enlighten me why we still need these two variables at the same time. Thanks again for your patience. |
Thanks @guoweiM. |
Do not worry. Take your time. |
Hi @guoweiM |
As said on pr, this problem does not exist on the new FileSource. So I close the pr. |
Thank you @guoweiM |
[FLINK-9940] Fix - File-source continuous monitoring mode - out-of-order files were missed
Fix the issue with ContinuousFileMonitoringFunction - out-of-order files were missed in continuous directory scanning mode.
Cause: In the existing directory monitoring mechanism, Flink was maintaining the maximum last-modified-timestamp of all identified files (globalModificationTime) so that in the next scan, all files with last-modified-timestamp equal or earlier than that globalModificationTime will be ignored.
Fix: This fix provides an additional param when creating a ContinuousFileMonitoringFunction: readConsistencyOffset. Every scan now starts from that max last-modified-timestamp minus this offset. A new list of processedFiles is also maintained, which consists of all known files having modTimestamp in that offset period.
For testing this fix, a change to flink-fs-tests has also been made: The collection of seenFiles is changed from a TreeSet to a SortedList. This change is to verify the ExactOnce of file scanning, instead of AtLeastOnce.
Verifying this change
This change is already covered by existing tests with slight update.
ContinuousFileProcessingMigrationTest.testMonitoringSourceRestore.
ContinuousFileProcessingTest.{testFunctionRestore, testProcessContinuously}
This change also added test:
ContinuousFileProcessingTest.testProcessContinuouslyWithNoteTooLateFile
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation