-
Notifications
You must be signed in to change notification settings - Fork 551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent duplicate SharedShardContext.readerId #15520
Conversation
@@ -102,9 +101,8 @@ public SharedShardContext getOrCreateContext(ShardId shardId) throws IndexNotFou | |||
SharedShardContext sharedShardContext = allocatedShards.get(shardId); | |||
if (sharedShardContext == null) { | |||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); | |||
sharedShardContext = new SharedShardContext(indexService, shardId, readerId, wrapSearcher); | |||
sharedShardContext = new SharedShardContext(indexService, shardId, shardId.hashCode(), wrapSearcher); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these are always tied to a shard, and there is only a single context per shard for a given job, then do we even need the separate reader ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you as you said it looks to me that we can eliminate reader ID completely. @mfussenegger could you clarify if this makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the fetch case the readerId is used to encode into a FetchId
which shard is used, so that we can lookup a document from the correct shard. See:
crate/server/src/main/java/io/crate/planner/ReaderAllocations.java
Lines 41 to 70 in b2f20a3
ReaderAllocations(TreeMap<String, Integer> bases, Map<String, Map<Integer, String>> shardNodes, Map<RelationName, Collection<String>> tableIndices) { this.bases = bases; this.tableIndices = tableIndices; this.indicesToIdents = new HashMap<>(tableIndices.values().size()); for (Map.Entry<RelationName, Collection<String>> entry : tableIndices.entrySet()) { for (String index : entry.getValue()) { indicesToIdents.put(index, entry.getKey()); } } for (Map.Entry<String, Integer> entry : bases.entrySet()) { readerIndices.put(entry.getValue(), entry.getKey()); } for (Map.Entry<String, Map<Integer, String>> entry : shardNodes.entrySet()) { Integer base = bases.get(entry.getKey()); if (base == null) { continue; } for (Map.Entry<Integer, String> nodeEntries : entry.getValue().entrySet()) { int readerId = base + nodeEntries.getKey(); IntSet readerIds = nodeReaders.get(nodeEntries.getValue()); if (readerIds == null) { readerIds = new IntHashSet(); nodeReaders.put(nodeEntries.getValue(), readerIds); } readerIds.add(readerId); } } } crate/server/src/main/java/io/crate/execution/engine/fetch/FetchTask.java
Lines 293 to 297 in b2f20a3
int readerId = base + shardId.id(); SharedShardContext shardContext = shardContexts.get(readerId); if (shardContext == null) { try { shardContext = sharedShardContexts.createContext(shardId, readerId); public final class FetchId {
What'd be more interesting is how the increments can lead to duplicates. The whole code section is under the assumption that the preparation phase is run single threaded. See also #5248
If that assumption is no longer true - maybe due to #10373, or if the SharedShardContexts is accessed elsewhere, we probably need to change more than just the readerId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the pointers, I think it is due to #10373 which changes ShardCollectSource.getIterator()
to return future
type.
crate/server/src/main/java/io/crate/execution/engine/collect/sources/ShardCollectSource.java
Lines 435 to 441 in a177d69
CompletableFuture<BatchIterator<Row>> iterator = shardCollectorProvider | |
.awaitShardSearchActive() | |
.thenApply(batchIteratorFactory -> batchIteratorFactory.getIterator( | |
collectPhase, | |
requiresScroll, | |
collectTask | |
)) |
When more than one
awaitShardSearchActive()
completes, it would cause a race for a reader ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to sequentialize the calls to SharedShardContexts.getOrCreateContext()
which is the only place that assigns reader ids to SharedShardContext
, 3da1073.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mfussenegger could you have a look if the fix is ok before I start adding tests? Now Sorry my last minute fix causes test failures, I will look at this first.SharedShardContexts.getOrCreateContext()
is only called from two places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd prefer adding synchronization to the methods again. But first we should ensure that we've identified the real problem. Did you do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the real problem is #15520 (comment).
For example, If I attach a logger:
public SharedShardContext getOrCreateContext(ShardId shardId) throws IndexNotFoundException {
LOGGER.info("Begin getOrCreateContext : " + Thread.currentThread());
SharedShardContext sharedShardContext = allocatedShards.get(shardId);
if (sharedShardContext == null) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
sharedShardContext = new SharedShardContext(indexService, shardId, readerId, wrapSearcher);
allocatedShards.put(shardId, sharedShardContext);
readerId++;
}
LOGGER.info("End getOrCreateContext : " + Thread.currentThread());
return sharedShardContext;
}
I can observe two thread being interleaved:
[2024-02-07T09:20:43,629][INFO ][i.c.e.j.SharedShardContexts] [Grand Parpaillon] Begin getOrCreateContext : Thread[#356,cratedb[Grand Parpaillon][listener][T#3],5,main]
[2024-02-07T09:20:43,629][INFO ][i.c.e.j.SharedShardContexts] [Grand Parpaillon] Begin getOrCreateContext : Thread[#354,cratedb[Grand Parpaillon][listener][T#1],5,main]
[2024-02-07T09:20:43,629][INFO ][i.c.e.j.SharedShardContexts] [Grand Parpaillon] End getOrCreateContext : Thread[#356,cratedb[Grand Parpaillon][listener][T#3],5,main]
[2024-02-07T09:20:43,629][INFO ][i.c.e.j.SharedShardContexts] [Grand Parpaillon] End getOrCreateContext : Thread[#354,cratedb[Grand Parpaillon][listener][T#1],5,main]
right before the exception is thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we then just revert parts of https://github.com/crate/crate/pull/5248/files and bring back the synchronization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, reverted.
5811a37
to
381b0ee
Compare
381b0ee
to
3928001
Compare
#11677 will be also addressed by this? |
It is hard to say without being able to reproduce. I just look into crate/server/src/main/java/io/crate/execution/jobs/RootTask.java Lines 203 to 206 in 6e7588d
Another possibility could be #15495 (comment) which I was looking into before Moll provided reproduction steps. |
synchronized (this) { | ||
sharedShardContext = allocatedShards.get(shardId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other allocatedShards
accesses within the file afaik also need to be synchronized to ensure this works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, It does look like createContext()
needs to be synchronized but if there is a scenario where createContext()
and getOrCreateContext()
race for allocatedShards
, shouldn't we fix it by not calling createContext()
? Considering createContext()
as a lightweight version of getOrCreateContext()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I guess we need synchronization for allocatedShards.put()
calls.
a8007a7
to
46ebe4e
Compare
- Fixed an issue that caused exceptions with messages like | ||
'ShardCollectContext already added' in low heap situations causing multiple | ||
shards to be idle and be active simultaneously. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this has anything to do with low heap, as the heap has no influence on whether shards are idle and active.
And I also don't think the amount of shards going from idle to active has any impact on if the race condition happens.
- Fixed an issue that caused exceptions with messages like | |
'ShardCollectContext already added' in low heap situations causing multiple | |
shards to be idle and be active simultaneously. | |
- Fixed a race condition that could lead to ``ShardCollectContext already | |
added`` errors when making a query after a table had been idle without any | |
accesses for a while. |
@@ -40,15 +40,18 @@ | |||
|
|||
import com.carrotsearch.hppc.IntIndexedContainer; | |||
|
|||
import io.crate.common.annotations.VisibleForTesting; | |||
import io.crate.metadata.IndexParts; | |||
|
|||
@NotThreadSafe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@NotThreadSafe |
by synchronizing part of `SharedShardContexts.getOrCreateContext()` that increments `readerId` and populates `allocatedShards`
46ebe4e
to
636f55e
Compare
Summary of the changes / Why this improves CrateDB
Fixes #15518.
As shown,
SharedShardContext.readerId
is duplicated causing the exception:To my understanding, the only purpose ofSee #15520 (comment) for details.readerId
is to be used as the keys forCollectTask.searchers
. If so, we can replace it withshardId
which is unique by design and prevents the race that caused the duplicate ids.Checklist
docs/appendices/release-notes/<x.y.0>.rst
for user facing changessql_features
table for user facing changesdocs/appendices/release-notes/<x.y.0>.rst
(E.g. AdminUI)