New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make users' cursors trigger cursor read at the shards when needed - [MOD-5580] #3853
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #3853 +/- ##
==========================================
+ Coverage 82.77% 82.82% +0.04%
==========================================
Files 192 192
Lines 32605 32649 +44
==========================================
+ Hits 26990 27040 +50
+ Misses 5615 5609 -6
☔ View full report in Codecov by Sentry. |
@@ -545,10 +546,22 @@ int MRIteratorCallback_ResendCommand(MRIteratorCallbackCtx *ctx, MRCommand *cmd) | |||
ctx); | |||
} | |||
|
|||
// Use after modifying `pending` (or any other variable of the iterator) to make sure it's visible to other threads | |||
void MRIteratorCallback_ProcessDone(MRIteratorCallbackCtx *ctx) { | |||
__atomic_fetch_sub(&ctx->ic->inProcess, 1, __ATOMIC_RELEASE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it needs to be atomic? don't we always decrease it/set it from a single thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, a single thread sets (with no race), and then a single one decreases while another one observes the change. the atomicity is used more for memory fencing than for counter-modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So why not use volatile?
@@ -246,3 +246,47 @@ def testExceedCursorCapacity(env): | |||
def testExceedCursorCapacityBG(): | |||
env = Env(moduleArgs='WORKER_THREADS 1 MT_MODE MT_MODE_FULL') | |||
testExceedCursorCapacity(env) | |||
|
|||
def testCursorOnCoordinator(env): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can think of how to test cases where the cursor is timeout on one shard but not on the other? This could have happened even before this change but now it is more likely to happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a TODO
regarding that, let's wait for when we can:
- pass
MAXIDLE
to the shards (currently using the default of 5 minutes) - Tell if a timeout occurred
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we will need RediSearch mock to control the RediSearch reply to the coordinator and verify that the coordinator act as expected. Maybe we should open a task for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 few comments.
If possible, maybe we can improve the PR top comment to give some more details about the new variables (inProcess
, depleted
, forCursor
) and explain what they mean and when they are updated?
@@ -33,8 +34,9 @@ extern SearchClusterConfig clusterConfig; | |||
.numPartitions = 0, \ | |||
.connPerShard = 0, \ | |||
.type = DetectClusterType(), \ | |||
.timeoutMS = 500, \ | |||
.timeoutMS = 0, \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why changing default timeout? What am I missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TL;DR: it was never in use, and the actual default value was 0, so this is for keeping the default value that was currently used
This default definition was never used, and we always set all the configurations to 0 by default. I noticed that when I made the new configuration with the default value of 1, but in tests it always got it set to 0.
We need to discuss this configuration. It has a few false default values in the code that are always ignored or overridden, and set to 0. The configuration name for the FT.CONFIG
API is TIMEOUT
, which is also the name of the default timeout of a query, making it inaccessible for modification (since the first TIMEOUT
config that is found is always the query timeout one). It is also marked as available to modify on runtime but its value is copied to a global variable once at modul-init and is never changed again. Lastly, it is used for giving a timeout for the client blocking in the rmr
calls. Before making this value 0, some tests started failing on random timeouts, and I'm not sure we want to use RM API for handling timeout like that. Maybe it was an initial idea for how to ensure that queries are finished after some time even if some shards did not reply on time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, lets just mention it on the PR top comment with detailed explanation on why it was broken and why the changes keeps the current broken behavior of timeout 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few last comments about the changes.
Successfully created backport PR for |
Successfully created backport PR for |
Backport failed for Please cherry-pick the changes locally. git fetch origin 2.6
git worktree add -d .worktree/backport-3853-to-2.6 origin/2.6
cd .worktree/backport-3853-to-2.6
git checkout -b backport-3853-to-2.6
ancref=$(git merge-base 2ed7b842df3041969d53e066cface69d33a61144 82664b5506d72296f3a23668f660c6424949b129)
git cherry-pick -x $ancref..82664b5506d72296f3a23668f660c6424949b129 |
…MOD-5580] (#3853) * make users' cursors trigger cursor read at the shards when needed * fix waiting indefinitely * improve manually triggering * added a threshold for channel size in `MR_ManuallyTriggerNextIfNeeded` * fix potential read after free * fix triggering logic * change misunderstood threshold * improved readability and made some explanations * fix a comment * another comment fix * added a test * improved test * added some comments * review fixes and improvements * improved test * fix cluster default configuration * improved test * small fix * revert effect of using configuration * explicitly use 0 instead of `timeout_g` (which is always 0)
Describe the changes in the pull request
Previously, any
FT.AGREGATE
query (WITHCURSOR
or without) triggered a train of cursor calls on the shards (after the initial_FT.AGGREGATE WITHCURSOR
, each reply triggers a callback that pushes to the queue the next_FT.CURSOR READ
) until the cursor on the shard depletes.On users' cursor requests (
FT.AGGREGATE WITHCURSOR
), they may not supply any number of results limit. It will cause unlimited commands on the shards to deplete, and for all of the replies to accumulate on the reply channel, waiting for the user's cursor to read from. Since such cursors may not need all the results, may be idle for a long time, and sinceread
commands on redis do not obey any memory limit, this could cause a big memory spick that can stay for a long time and even crash the server.On this fix, we make cursors' commands "manually" trigger the next read from the shards only when the next command is needed. This way, the number of results accumulated on the coordinator's
RPNet
channel is always limited.In the current state, we have in the modified code a mild race between two threads (main and uv), one is reading a counter's value and one is modifying it (--). since I wanted to assume some relation between this counter (
inProcess
) and other variables in the struct, I decided to use atomic operations with an acquire-release memory fencing. Since there is almost no race, and there are other locks in the code flow that are slowing it down anyway, the use of atomic operations should have a very small performance effect.Future Work
Additional note:
While adding the new configuration for the reply threshold, we discovered the default values for cluster configurations. Luckily, most of them were meant to be 0 and we set the entire structure to 0s so there was no major effect on anything, besides a problematic timeout value. We decided that we are OK with keeping the actual 0 value as a default (and it might even be the preferable value anyway).
From this PR forward, we use the
DEFAULT_CLUSTER_CONFIG
and it can be used to define default values in future configurations. More information about this is written in this comment.Which issues this PR fixes
Main objects this PR modified
inProcess
atMRIteratorCtx
: Number of currently running commands on shards. in contrast/complement ofpending
, which is the number of shards with more results (not depleted)forCursor
atMRCommand
- a flag that indicates that the user requested a cursor for the aggregation commanddepleted
atMRCommand
- indicates that this command (for this shard) is depleted and no need to re-run_FT.CURSOR READ
at this shardMark if applicable