-
Notifications
You must be signed in to change notification settings - Fork 428
OAK-9576: Multithreaded download synchronization issues #383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* Fixing a problem with test
* Fixing synchronization issues * Fixing OOM issue * Adding delay between download retries
| "if memory drop below {} GB (max {})", pool.getName(), minMemoryBytes/ONE_GB, humanReadableByteCount(maxMemory)); | ||
| pool.setCollectionUsageThreshold(minMemoryBytes); | ||
| checkMemory(usage); | ||
| // todo - should we check and block in the beginning? This creates problem in case of download resume. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it create problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a purpose of checking the memory during the beginning. There is nothing to do in this case. The only thing we could do is not to proceed further, but such kind of check could be added even before this stage.
This one creates a problem during resume, in case memory is low due to previous run. It won't have any registered clients who still need to dump data (since previous run task objects would be unreachable) and checkMemory waits for clients to dump data. So the process blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, in this case could you remove the TODO and write e.g. "We don't check memory here, as there is no good way here to free up memory in case it is low."?
| .from(cd); | ||
| checkMemory(info.getUsage()); | ||
| synchronized (sufficientMemory) { | ||
| if (sufficientMemory.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sufficientMemory is AtomicBoolean. I don't see why you would want to synchronize on it. Is the problem that you want to protect against concurrent calls on checkMemory? If yes, then checkMemory should be synchronized instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the current code, we don't want to call checkMemory again, if sufficientMemory is already false, hence this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm this not clear... I don't currently see an answer to the question "why you would want to synchronize on it"
| File storeFile = writeToStore(storeDir, getStoreFileName()); | ||
| return sortStoreFile(storeFile); | ||
| } finally { | ||
| nodeStates.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds weird that createSortedStoreFile has a side effect of closing the nodeStates. Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this looks weird since this class is not the owner of nodeStates. This has been done to prevent OOM for multi threaded download. We need to keep on closing the nodeStates as the download tasks keep on finishing. This is mainly for TraverseAndSortTask (tasks created for parallel download) but due to code flow same pattern had to be followed here.
We could improve this but some more refactoring would be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see you mean that particular method - createSortedStoreFile.
I will see if we can add close method as you suggested.
...main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
Show resolved
Hide resolved
| memoryManager.deregisterClient(registrationID); | ||
| } | ||
| try { | ||
| nodeStates.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here again we close the nodeStates.... why here? What about having a separate close() method in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #383 (comment)
| try (BufferedWriter writer = FlatFileStoreUtils.createWriter(newtmpfile, compressionEnabled)) { | ||
| // no concurrency issue with this traversal because addition to this list is only done in #addEntry which, for | ||
| // a given TraverseAndSortTask object will only be called from same thread | ||
| for (NodeStateHolder h : entryBatch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, no, here we don't synchronize on entryBatch! That's inconsistent synchronization, and can cause big problems I think. (The way around it would be to clone entryBatch... but again, I would prefer if we don't need any synchronization).
...java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java
Show resolved
Hide resolved
| writeToSortedFiles(); | ||
| return sortStoreFile(); | ||
| } finally { | ||
| nodeStates.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here again, closing the nodeStates as a side effect of another method... I find it weird.
...c/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
Show resolved
Hide resolved
...c/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
Show resolved
Hide resolved
* Using linkedlist in tasks for freeing memory early * Dumping if data is greater than one MB
* Closing node state entry traversors using try with
...src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
Outdated
Show resolved
Hide resolved
...java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
Show resolved
Hide resolved
...he/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
Show resolved
Hide resolved
...main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
Show resolved
Hide resolved
* Incorporating some feedback from review comments
| "if memory drop below {} GB (max {})", pool.getName(), minMemoryBytes/ONE_GB, humanReadableByteCount(maxMemory)); | ||
| pool.setCollectionUsageThreshold(minMemoryBytes); | ||
| checkMemory(usage); | ||
| // todo - should we check and block in the beginning? This creates problem in case of download resume. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, in this case could you remove the TODO and write e.g. "We don't check memory here, as there is no good way here to free up memory in case it is low."?
| .from(cd); | ||
| checkMemory(info.getUsage()); | ||
| synchronized (sufficientMemory) { | ||
| if (sufficientMemory.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm this not clear... I don't currently see an answer to the question "why you would want to synchronize on it"
| try { | ||
| nodeStates.close(); | ||
| } catch (IOException e) { | ||
| log.error("{} could not close NodeStateEntryTraverser", taskID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you log the stack trace as well? Just add ", e" to the log.error call.
| //Holder line consist only of json and not 'path|json' | ||
| NodeStateHolder h = new StateInBytesHolder(e.getPath(), jsonText); | ||
| entryBatch.add(h); | ||
| synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the exact use for synchronization... which data structure needs to be protected from concurrent operations?
...main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
Show resolved
Hide resolved
...main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
Show resolved
Hide resolved
* Replacing explicit synchronization with atomic operations
* Using same memory manager across retries
* Moving retry delay to exception block
No description provided.