-
Notifications
You must be signed in to change notification settings - Fork 480
adapter,controller: more principled handling of read holds #29516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
8ea6f83
to
04acec2
Compare
// Having installed all entries, creating all constraints, we can now drop read holds and | ||
// relax read policies. | ||
drop(dataflow_read_holds); | ||
// TODO -- Improve `initialize_read_policies` API so we can avoid calling this in a loop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO suggested that calling initialize_read_policies
in a loop is somehow a problem, which I'm not sure why it would be. I assume we are worried about start up times, but is there anything that would make an initialize_read_policies
invocation slow?
04acec2
to
f70e431
Compare
} | ||
} | ||
|
||
self.store_transaction_read_holds(ctx.session(), read_holds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to pass the read holds through the Stage*
results, rather than adding them to the txn holds, as that is more explicit and we don't have to expect
their existence later.
12dfa49
to
06ca586
Compare
// Collect the read holds we'll pass to the compute controller when executing the peek. | ||
// Usually those will be available in `txn_read_holds` (see above), except if we read from | ||
// indexes over constant collections. | ||
let read_holds = if determination.timestamp_context.timestamp().is_some() { | ||
let txn_holds = self | ||
.txn_read_holds | ||
.get(session.conn_id()) | ||
.expect("set above"); | ||
txn_holds.clone_for(source_bundle) | ||
} else { | ||
self.acquire_read_holds(source_bundle) | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love this part. Seems like we should also put read holds for indexes on constant views into txn_read_holds
? I'm not sure how to do that though. When we select from those indexes, timeline_context
will be TimestampIndependent
because we don't take indexes into account when determining it. And that leads to determination.timestamp_context.timestamp()
being None
, which leads to us dropping the read holds above. I tried putting them into txn_read_holds
instead, but that makes various tests in timeline.slt
.
This commit adds `initialize_{storage,compute}_read_policy` methods that work like `initialize_{storage,compute}_read_policies` but operate on a single collection only. It also remove the generic `initialize_read_policies` method. The next commit will change bootstrapping to initialize collection read policies while iterating over the catalog objects, rather than at a separate step, so the bulk APIs are no longer useful there. Most other places that initialize read polcies only do so for a single collection as well. Apart from simplifying things, this also prepares the read policy API for getting read holds passed in by the caller. Previously that was cumbersome because the `CollectionIdBundle` type wouldn't allow passing additional per-collection data. With the new methods, we can simply pass a `ReadHold` instead of the collection's `GlobalId`.
This commit makes the coordinator collect read holds during bootstrapping. This is necessary so we can pass them to `create_dataflow` calls, but it also has the nice side effect of letting us initialize read policies immediately, simplifying that logic a bit.
This commit changes the compute controller API to expect any required read holds to be passed in by the caller, rather than attempting to acquire them ourselves. This commit does not yet adjust callers to pass in read holds as well. The next commit will deal with that.
This commit makes the coordinator pass read holds to the compute controller's `create_dataflow` and `peek` APIs.
This commit makes the storage and compute controller APIs that create storage/compute collections, return read holds for any readable collections created, simplifying the usage in the coordinator a bit.
This commit makes the `initialize_{storage,compute}_read_policy` methods take read holds in their arguments, instead of acquiring new read holds themselves. Read holds are readily available at all their call sites, so we can just use those instead of acquiring new ones.
06ca586
to
28409b6
Compare
This commit changes how read holds are handled in the coordinator and the controller:
ReadHold
s for themReadHold
s to be passed by the caller, instead of acquiring new onesCoordinator::initialize_{storage,compute}_read_policy
expects aReadHold
to be passed by the caller, instead of acquiring a new oneAs a result the code becomes easier to reason about wrt. to read holds. We are less likely to forget to acquire read holds when reading methods require them to be passed.
More importantly, this prepares the code for compute controller decoupling (#27656). Once the compute controller lives in a separate tasks, calls to
acquire_read_hold
will becomeasync
and blocking, so we want to avoid them as much as possible. There are still some calls toacquire_read_hold
left, but they are all in a place where we can use the staged-sequencing mechanism to move them off the coordinator's main thread.Motivation
Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.