NIFI-4715: ListS3 produces duplicates in frequently updated buckets#2361
NIFI-4715: ListS3 produces duplicates in frequently updated buckets#2361adamlamar wants to merge 2 commits intoapache:masterfrom
Conversation
|
|
||
| if (maxTimestamp > currentTimestamp) { | ||
| currentTimestamp = maxTimestamp; | ||
| } |
There was a problem hiding this comment.
maxTimestamp should always be greater than currentTimestamp, but this adds a sanity check.
| if (!commit(context, session, listCount)) { | ||
| if (currentTimestamp > 0) { | ||
| persistState(context); | ||
| } |
There was a problem hiding this comment.
Since currentTimestamp is never overwritten by a maxTimestamp with a value of zero, this check shouldn't be necessary anymore.
| } | ||
|
|
||
| // Persist all state, including any currentKeys | ||
| persistState(context); |
There was a problem hiding this comment.
Both exit paths already perform persistState - this just makes that more clear.
There was a problem hiding this comment.
Do we still need this? Isn't updating state within commit() enough? We should minimize the number of status updates as some state storage is not designed for frequently updates, e.g. Zookeeper. I think if the processor didn't find any new file to list, then it does not have to update state, does it?
I might be missing something as the original reason is not clear to me, to call persistState() when there was nothing to commit.
There was a problem hiding this comment.
@ijokarumawak Its important to persistState when currentKeys has been modified, even if the currentTimestamp hasn't been modified, to avoid producing duplicates when multiple files are listed during the same millisecond. This is a related but distinct issue. Its a rare condition though.
You're correct that this change will cause more load on the state manager. How about setting a flag like dirtyState that would avoid calling setState if it has not been modified?
There was a problem hiding this comment.
@adamlamar Well, if I'm not overlooking anything, currentKeys is only modified in onTrigger method when only new entry is found. Would you show me an example? It still confounds me. If we can set a flag like dirtyState, then the condition should be clarify though. Thanks!
There was a problem hiding this comment.
@ijokarumawak I started writing an example, but then realized you are correct - there is no need to manually call persistState because any addition to currentKeys will also increment listCount, and the normal update mechanism will take over from there. We shouldn't need a dirtyState flag.
ijokarumawak
left a comment
There was a problem hiding this comment.
@adamlamar Thanks for your contribution, the changes mostly good to me. I just posted a comment regarding when to persist state. Would you check that out?
| } | ||
|
|
||
| // Persist all state, including any currentKeys | ||
| persistState(context); |
There was a problem hiding this comment.
Do we still need this? Isn't updating state within commit() enough? We should minimize the number of status updates as some state storage is not designed for frequently updates, e.g. Zookeeper. I think if the processor didn't find any new file to list, then it does not have to update state, does it?
I might be missing something as the original reason is not clear to me, to call persistState() when there was nothing to commit.
|
@ijokarumawak I did as you suggested and pulled Instead, I took a slightly different approach with the change just pushed. Since I did a bunch of manual testing with concurrent I also introduced There's a lot going on in this one - let me know if you have any other questions! |
| if (!commit(context, session, listCount)) { | ||
| if (currentTimestamp > 0) { | ||
| persistState(context); | ||
| } |
There was a problem hiding this comment.
Note that this commit isn't required, since the last part of the main do/while loop already does a commit. Further, it sets listCount to zero, so this branch would always be taken.
ijokarumawak
left a comment
There was a problem hiding this comment.
@adamlamar Thanks for the updates and descriptions. I confirmed that I got the same unit test failure if the persistState call after the main do/while loop is removed, and as you pointed out that's because currentTimestamp is not updated within do/while loop.
As you did in the latest commit, calling persistState once after the main do/while loop is one possible approach.
However, I prefer having both updating the currentTimestamp and persistState actions within the commit method. Because when commit is executed, the processor has already pushed some created FlowFiles to downstream flow. If something, such as IOException happens when a subsequent API call is made, those FlowFiles can be reproduced when the processor resumes from the persisted state. I think we should keep the relationship between NiFi session and processor state as tight as possible to avoid having these inconsistency.
Current ListS3 implementation assumes that S3 API returns versions in last modified timestamp ascending order. The same assumption can guarantee that it is safe to update currentTimestamp within the do/while loop.
Thanks again for having this ongoing discussion patiently. Let me know how you think, thanks!
|
|
||
| // Update stateManger with the most recent timestamp | ||
| currentTimestamp = maxTimestamp; | ||
| persistState(context); |
There was a problem hiding this comment.
These two lines of code can be embedded in commit method.
// Update stateManger with the most recent timestamp
currentTimestamp = maxTimestamp;
persistState(context);
| if (currentTimestamp > 0) { | ||
| persistState(context); | ||
| } | ||
| if (totalListCount == 0) { |
|
@ijokarumawak From the AWS S3 API documentation (see the
I really wish we could take the approach you suggested (would certainly make things easier), but since the entries are in lexicographical/alphabetical order, we must iterate over the entire listing before updating Unfortunately this does also mean that duplicates are possible when a listing fails, like the I appreciate your help getting this reviewed! :) |
|
@adamlamar I tried to find the API documentation but couldn't find the exact statement. Thanks, now it does makes sense to defer updating Actually that makes me want to ask another related question. Do we risk making duplication by updating SimulationFor example, ListS3 listed following entities at the 1st onTrigger. The 1st onTrigger simulation at t1This should track
The 2nd onTrigger simulation at t2If S3 is being updated at the same time, there may be additional entities having the same t1 timestamp, but were not listed at the 1st onTrigger. In such case, those will be listed at the next onTrigger. Following table shows the expected effect of tracking
The 2nd onTrigger simulation at t2 with newer timestamp entry preceding in lexicographical orderBased on the above scenario, let's think about an edge case. New entries having later lastModified timestamp can be added at the same time at the time of the 2nd onTrigger ran. This might break the current implementation that updates
With current implementation, Suggestion
Above approach would reflect background better, and also provide cleaner easily understandable code. I may be overly concerning details, but am feeling this can be better a bit more. Thanks for your patience! |
Yes, I think we do! I identified a similar (possibly the same) bug, and I agree with all of your suggestions. The question in my mind is whether we should fix all of these issues in this JIRA or defer to another. As far as the original JIRA goes, I believe the current commit will address the issue. I also did a fair bit of manual testing so I would be comfortable moving forward with this change as-is. Before refactoring, I'd like to put some additional unit tests in place for safety. Its clear from the discussion that there is some meat here and I'd really like to enumerate a few cases I've seen while testing in unit tests. So its up to you - would you prefer that I start the unit tests and address (potentially) multiple bugs in this PR, or should we merge this and create another JIRA? |
|
@adamlamar How long do you think you need to address multiple bugs you are aware of? If those can be addressed in the same ListS3 processor, then I'd prefer to have all (as much as we can) in this JIRA/PR, as we can reduce testing effort and overall review cycle. If it's too complicated to be done at once, then please submit different JIRAs to beak those into smaller pieces. Thank you! |
|
@ijokarumawak Roger, I will expand the scope of this JIRA to include those other fixes. Should be doable in a single JIRA, was just unsure how you all prefer to move forward. Thanks again for all your help - will likely be several days before I'm able to push new commits. |
|
@adamlamar How is it going? Looking forward to review the updated PR. Just wanted to check if you have any issues. Thanks! |
|
@ijokarumawak I've made some progress, but unfortunately just trying to find time! No tech questions at this point, but thanks for checking in. |
|
Thank you, @adamlamar I understand that. |
Keep totalListCount, reduce unnecessary persistState This closes apache#2361. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
|
Hi @adamlamar , I hope this message finds you well. Since some other users asked about this issue, I went ahead and took over the remaining concerns around updating When it gets merged, this PR will be closed automatically. If you have any comments, please keep discussing on the new PR. Thanks again for your contribution! |
|
Hi @ijokarumawak, I'm sorry I wasn't able to take this across the finish line. Thanks a bunch for continuing the effort! |
ListS3 used to update currentKeys within listing loop, that causes
duplicates. Because S3 returns object list in lexicographic order, if we
clear currentKeys during the loop, we cannot tell if the object has been
listed or not, in a case where newer object has a lexicographically
former name.
Signed-off-by: James Wing <jvwing@gmail.com>
This closes #3116, closes #2361.
|
Thanks, @adamlamar! |
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
[Y] Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
[Y] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
[Y] Has your PR been rebased against the latest commit within the target branch (typically master)?
[Y] Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.