-
Notifications
You must be signed in to change notification settings - Fork 1.9k
IGNITE-11970 CQ buffers cleanup after acknowledge receiving implemented #7934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
|
||
| qry.setRemoteTransformerFactory(factory); | ||
|
|
||
| qry.setLocalListener((evts) -> evts.forEach(e -> System.out.println("val=" + e))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we eliminate System.out.println usage in test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it seems reasonable. I'll modify local listener and add additional check for updates count in the tests.
…es count added to ContinuousQueryBufferCleanupAbstractTest. codestyle fixed.
| for (int i = next; i < entries.length; i++) { | ||
| CacheContinuousQueryEntry e = entries[i]; | ||
|
|
||
| if (e != null && e.updateCounter() <= updateCntr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need checking e for non-null value? I think we need to clean up entries that have update counter less than acked counter each time from the beginning of entries array.
We can have out of order events on backup due to cache messages reordering, that's why an entry which comes after the acked counter (this entry has less counter than acked counter from the client) will never be cleaned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean up from the beginning of entries array was implemented.
| if (e != null && e.updateCounter() <= updateCntr) { | ||
| entries[i] = null; | ||
|
|
||
| lastCleaned = i; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to my comment above we can remove lastCleaned position.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| /** | ||
| * @param updateCntr Acknowledged counter. | ||
| */ | ||
| synchronized void cleanupEntries(Long updateCntr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use long primitive type here to avoid unboxing each time comparing update counters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
| sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx); | ||
| sendBackupAcknowledge(ackBufBackup.onAcknowledged(batch), routineId, ctx); | ||
|
|
||
| cleanupBuffers(ackBuf.onAcknowledged(batch)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we achieve the same condition by calling lsnr.cleanupBackupQueue here with an acknowledged collection of counters? Why do we need a separate buffer for clearing partition buffers?
ackedTup = ackBufBackup.onAcknowledged(batch);
sendBackupAcknowledge(ackedTup, routineId, ctx);
lsnr.cleanupBackupQueue(ackedTup.get1());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separate buffer for backups is not required and was removed.
| } | ||
|
|
||
| @Override public void cleanupBackupQueue(Map<Integer, Long> updateCntrs) { | ||
| for (Map.Entry<Integer, Long> e : updateCntrs.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the cleanupBackupQueue becomes not the right name for this case. Let's rename it to something like cleanupPartitionBuffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to "cleanupOnAck"
| srvs[i] = startGrid("srv" + i); | ||
|
|
||
| Ignite qryNode = useClient | ||
| ? startGrid(optimize(getConfiguration("client").setClientMode(true))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use startClientGrid().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startClientGrid() used
| private void checkBuffer(int srvCnt, int backupsCnt, boolean useClient) throws Exception { | ||
| System.setProperty("IGNITE_CONTINUOUS_QUERY_ACK_THRESHOLD", Integer.toString(ACK_THRESHOLD)); | ||
|
|
||
| IgniteEx[] srvs = new IgniteEx[srvCnt]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this array is not needed. Let's use grid(0) instead of srv[0].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| for (int i = 0; i < RECORDS_CNT; i++) | ||
| cache.put(i, Integer.toString(i)); | ||
|
|
||
| Thread.sleep(2000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need sleep here? I think we should use the waitForCondition here that all of the entries are processed by CQ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitForCondition used instead of sleep.
| private static final int RECORDS_CNT = 10000; | ||
|
|
||
| /** */ | ||
| private static final int ACK_THRESHOLD = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the constant from the CacheContinuousQueryHandler should be used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| IgniteCache<Integer, String> cache = qryNode.getOrCreateCache(cacheCfg); | ||
|
|
||
| AbstractContinuousQuery<Integer, String> qry = getContinuousQuery(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the @Parameterized as a continuous query can also be used here. So we can reduce the number of tests added to IgniteCacheQuerySelfTestSuite6. Can you, please, take a look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented
# Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java # modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
…anup execution changed. Parameterized tests implemented.
lobanovdmitry
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, trying to test this patch, found that one small change in
src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java still needed.
lines 551-553
@Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) {
sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
}
This callback is called by BackupCleaner and in some circumstances it releases AcknowledgeBuffer without calling
cleanupBuffers(acknowledged);
Please, consider this.
And thank you very much for the changes.
Thank you for submitting the pull request to the Apache Ignite.
In order to streamline the review of the contribution
we ask you to ensure the following steps have been taken:
The Contribution Checklist
The description explains WHAT and WHY was made instead of HOW.
The following pattern must be used:
IGNITE-XXXX Change summarywhereXXXX- number of JIRA issue.(see the Maintainers list)
the
green visaattached to the JIRA ticket (see TC.Bot: Check PR)Notes
If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com #ignite channel.