New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-6636: Fixed ListGCSBucket file duplication error #3702
Conversation
ListGCSBucket duplicated files if they arrived not in alphabetical order. The set storing the name of the latest blob (which was loaded with the highest timestamp during the previous run of the processor) was cleared too early. Also changed the state persisting logic: it is now saved only once at the end of onTrigger() (similar to ListS3). Some inconsistent state (only blob names without the timestamp) was also saved earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @turcsanyip ! I had not noticed this bug before, but I was easily able to replicate the problem after reading the Jira. I was also able to verify that the problem was addressed by this PR. However, I do think we need to add back in the saving of the state. Removing that can result in huge amounts of duplication in certain situations.
session.commit(); | ||
persistState(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think removing this is a good idea. Consider performing a listing of a bucket that has 1 million entries. The processor runs and performs a listing of 800,000 elements, committing the session several times. Now NiFi is restarted. The state was not persisted. So now, upon restart, NiFi will duplicate each of those 800,000 elements. The call to persistState
there makes sense to avoid huge amounts of duplication when performing a listing on a huge bucket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markap14 Thanks for the quick feedback!
I agree with you that there should be some checkpointing / recovery mechanism but the previous solution did not work either.
persistState()
saves currentTimestamp
and currentKeys
. currentTimestamp
gets updated only at the end, after the while cycle over the blob pages, so it is useless to save it within the cycle. On the other hand, we can't update currentTimestamp
in the cycle because the blobs are not sorted by time (that is the next cycle / blob page can contain items that haven't been loaded yet but which are older than the max timestamp from the previous page).
Previously, only currentKeys
were updated in the cycle but it was an inconsistent state without the timestamp, furthermore caused the current bug. So I removed it and also the useless persistState()
call.
Implementing a proper solution to this problem could be the scope of a separate issue I think.
I could not find a way to parameterize the GCS call to return the blobs sorted by last modification time (only alphabetical order).
We could fetch all the blobs, sort them on our side and then create the flowfiles in a cycle with saving the current state. However, its performance impact (memory considerations) must be investigated beforehand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, OK, I didn't realize that was the case. I was able to verify this by performing a listing on a bucket with nearly 100,000 items. After about 40-50 thousand had been listed, i restarted NiFi and upon restart, sure enough it re-listed everything. I do think that we need to find a way to handle this better. But for now, this PR does fix a bug and doesn't make anything worse, so it makes sense to go ahead and merge as-is.
getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount}); | ||
private void commit(final ProcessContext context, final ProcessSession session, int loadCount) { | ||
if (loadCount > 0) { | ||
getLogger().info("Successfully loaded {} new files from GCS; routing to success", new Object[] {loadCount}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 'listing' here is actually more accurate - saying that they were 'loaded' might imply that the contents were transferred from GCS to NiFi.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely right, I will rename it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks good to me.
Thanks for the update @turcsanyip . All looks good at this point so +1 merged to master! |
ListGCSBucket duplicated files if they arrived not in alphabetical order. The set storing the name of the latest blob (which was loaded with the highest timestamp during the previous run of the processor) was cleared too early. Also changed the state persisting logic: it is now saved only once at the end of onTrigger() (similar to ListS3). Some inconsistent state (only blob names without the timestamp) was also saved earlier. This closes apache#3702. Signed-off-by: Mark Payne <markap14@hotmail.com>
ListGCSBucket duplicated files if they arrived not in alphabetical order. The set storing the name of the latest blob (which was loaded with the highest timestamp during the previous run of the processor) was cleared too early. Also changed the state persisting logic: it is now saved only once at the end of onTrigger() (similar to ListS3). Some inconsistent state (only blob names without the timestamp) was also saved earlier. This closes apache#3702. Signed-off-by: Mark Payne <markap14@hotmail.com>
ListGCSBucket duplicated files if they arrived not in alphabetical order.
The set storing the name of the latest blob (which was loaded with the highest
timestamp during the previous run of the processor) was cleared too early.
Also changed the state persisting logic: it is now saved only once at the end
of onTrigger() (similar to ListS3). Some inconsistent state (only blob names
without the timestamp) was also saved earlier.
Thank you for submitting a contribution to Apache NiFi.
Please provide a short description of the PR here:
Description of PR
Enables X functionality; fixes bug NIFI-YYYY.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically
master
)?Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not
squash
or use--force
when pushing to allow for clean monitoring of changes.For code changes:
mvn -Pcontrib-check clean install
at the rootnifi
folder?LICENSE
file, including the mainLICENSE
file undernifi-assembly
?NOTICE
file, including the mainNOTICE
file found undernifi-assembly
?.displayName
in addition to .name (programmatic access) for each of the new properties?For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.