-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage: differentiate between append-only and incremental storage-managed collections #27496
storage: differentiate between append-only and incremental storage-managed collections #27496
Conversation
114665d
to
0c44f04
Compare
src/storage-controller/src/lib.rs
Outdated
} | ||
IntrospectionType::StorageSourceStatistics => { | ||
self.collection_manager.register_incremental_collection(id, read_handle_fn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure that the stats should be modeled as incremental collections. I kind of think you might just want to toss the stats until the read-write promotion. Otherwise the cutover from the old version to the new version might cause the stats to regress, which will be quite surprising.
High level approach LGTM! The diff to the storage controller is larger than I feel comfortable reviewing myself though. Seems like it might be possible to do separate PRs for the append only collections and the incremental collections? I wouldn't want you to go the trouble of splitting this up until the storage controller experts (just @petrosagg, now?) have signed off on the high level approach too though. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a bunch of comments, the general approach looks good I think.
I'm still not entirely convinced that the added complexity is necessary though. I may be missing something, but for the 0dt use case it seems sufficient for the incremental write task to do a single retraction update when it comes out of read-only mode, to truncate the shard and fence out the older writer at the same time, then commence with directly appending the updates it receives like the appender task does.
async fn append_introspection_updates( | ||
&mut self, | ||
type_: IntrospectionType, | ||
updates: Vec<(Row, Diff)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should replace the Diff
here with u64
, to make the type system ensure that this doesn't have retractions.
prev_task.is_finished(), | ||
"should only spawn a new task if the previous is finished" | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this already checked above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment above says that we double-check it. Maybe we want to be extra sure. I just copied this code, which yes, is not a good excuse... 🤷♂️
where | ||
R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, Diff>> + Send>> | ||
+ Send | ||
+ 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of giving the CollectionManager
a handle to the PersistClientCache
? Then the register methods would only need the CollectionMetadata
and pass both to the writer task who could then create any read and write handles it wants. This seems like it could lead to a cleaner API in a future where we have ripped out the PersistMonotonicWriteWorker
and started to make use of the txn system for the managed collections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that feels smart. Will do that if/when we figured out the bigger questions around this PR 👌
); | ||
|
||
(tx, handle.abort_on_drop(), shutdown_tx) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to replace fn incremental_write_task
with a struct IncrementalWriteTask
and split up this monster into more digestible methods? :D
(This is a complaint I also have about append_only_write_task
btw, but that one isn't new code.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I bet there is also some opportunity to deduplicate logic between the two task types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure there is! 🙈 I should have spelled this out more, but the code is not yet "up to snuff". Once we agreed on the concepts I would have gone through and do some of these cleanups.
@teskje I did this for future proofing, to a degree. Even if we only wanted to do a single truncate/write when we come out of read mode, we would still have to have a retry loop around that, which has all of the logic that the current retry loop has. Because the old envd will continue trying to write (or could be) while we try and get in our retractions + fence. And we would then have two states: a) the initial state, where we try and get in our retractions/fenc, b) regular operations. Just having one state/retry loop felt simpler to me. And it's automatically resilient to future shenanigans where we have multiple writers of an introspection collection. Interesting future scenario I wanted to be prepared for: multiple processes writing to a single introspection collection in parallel, but for example where each process is responsible only for a subset of the contents. The updates would be of the form:
The writer for a |
Fair enough. I'm not actually sure if the single-retraction-and-fencing approach would be less complicated than the self-correcting one. I figured not having to worry about consolidating often enough (but not too often) would simplify things. Also we may be able to save a bit of CPU and memory in read-write state by not having to do the diffing continually. But I'm not sure if either of these are significant. You could have made the complexity argument also for the self-correcting persist sink, but I think by now its clear that making it continually self-correcting was the right decision, based on how often this has simplified things elsewhere in the system.
Hm, yes fencing doesn't really work in this scenario anymore, at least not if it's based on upper mismatches.
Maybe! We had considered having all replicas write their introspection data to persist directly, all to the same shard. The main wrinkle with this is how do you ensure things get retracted correctly when a replica drops or restarts? You needs something to write these updates down that's alive longer than the replica is. The current design has that thing be the controller and the controller writes introspection updates in a single thread, but that's not carved in stone. |
self.to_write.extend(rows.iter().cloned()); | ||
self.desired.extend(rows.iter().cloned()); | ||
|
||
// TODO: Maybe don't do it every time? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could consider using the ConsolidatingVec
that's used by persist_sink. It only performs (amortized) O(log n) consolidation work per inserted update, at the cost of using up to twice the amount of memory of the fully consolidated vector. It's trading off CPU for memory which I'm not sure makes sense in this context.
55a3384
to
63f1bbe
Compare
MitigationsCompleting required mitigations increases Resilience Coverage.
Risk Summary:The pull request carries a high risk with a score of 81, influenced by factors such as the sum of bug reports of the files affected and the change in executable lines. Historically, pull requests with similar characteristics are 111% more likely to introduce a bug compared to the repository baseline. Additionally, three files modified in this request have a recent history of frequent bug fixes, which further contributes to the risk. The repository's overall bug trend is currently decreasing, which is a positive sign, but it's important to note that this trend is based on past observations and is separate from the risk prediction. Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity. Bug Hotspots:
|
@teskje I rebased this and I think I addressed all your comments. I structured it such that you can follow along the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
Given that this is a somewhat risky change and it's probably hard to feature-flag it in a meaningful way, it would be good to have some extra @MaterializeInc/testing scrutiny on this too!
}, | ||
); | ||
|
||
(tx, handle.abort_on_drop(), shutdown_tx) | ||
} | ||
|
||
async fn run(mut self) -> Result<(), ()> { | ||
async fn run(mut self) -> ControlFlow<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, didn't know this existed! Makes things much clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I only remembered it because persist uses it quite a bit:
) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I annotated some spots that are not covered according to coverage/428.
fn handle_shutdown(&mut self) { | ||
let mut senders = Vec::new(); | ||
|
||
// Prevent new messages from being sent. | ||
self.cmd_rx.close(); | ||
|
||
// Get as many waiting senders as possible. | ||
while let Ok((_batch, sender)) = self.cmd_rx.try_recv() { | ||
senders.push(sender); | ||
|
||
// Note: because we're shutting down the sending side of `rx` is no | ||
// longer accessible, and thus we should no longer receive new | ||
// requests. We add this check just as an extra guard. | ||
if senders.len() > CHANNEL_CAPACITY { | ||
// There's not a correctness issue if we receive new requests, | ||
// just unexpected behavior. | ||
tracing::error!("Write task channel should not be receiving new requests"); | ||
break; | ||
} | ||
} | ||
|
||
// Notify them that this collection is closed. | ||
// | ||
// Note: if a task is shutting down, that indicates the source has been | ||
// dropped, at which point the identifier is invalid. Returning this | ||
// error provides a better user experience. | ||
notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not seem to be covered by tests according to coverage/428.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I should have expected that! This code path is only taken when removing a collection that is manged by this. But this code is only used for builtin storage-managed collections (the introspection collections), and we normally never remove them, or can't even do that.
We might add code in the future that does drop these collections, in which case this code path will come in handy, but it might rot! Would you prefer I remove this, and add something akin to unreachable!()
, or leave it in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like unreachable!()
sounds good
let append_result = match self.write_handle.compare_and_append(request.clone()).await { | ||
// We got a response! | ||
Ok(append_result) => append_result, | ||
// Failed to receive which means the worker shutdown. | ||
Err(_recv_error) => { | ||
// Sender hung up, this seems fine and can happen when | ||
// shutting down. | ||
notify_listeners(responders, || { | ||
Err(StorageError::ShuttingDown("PersistMonotonicWriteWorker")) | ||
}); | ||
|
||
// End the task since we can no longer send writes to persist. | ||
return ControlFlow::Break("sender hung up".to_string()); | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not covered according to coverage/428.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, same as above. This code path is defensive, but the case that it protects against currently can't really happen. 🙈
if retries.next().await.is_none() { | ||
notify_listeners(responders, || { | ||
Err(StorageError::InvalidUppers(failed_ids.clone())) | ||
}); | ||
error!( | ||
"exhausted retries when appending to managed collection {failed_ids:?}" | ||
); | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not seem to be covered by tests according to coverage/428.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't currently trigger this code path because we don't have concurrent writers. We might have that in the future.
let update = Update { | ||
row: row_buf.clone(), | ||
timestamp: expected_upper.clone(), | ||
diff: -1, | ||
}; | ||
|
||
update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not seem to be covered by tests according to coverage/428.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is peculiar! The code was there before, I only added to it. I think it's only reached when a) there is enough updates in a status collection, and b) we do a restart of envd
.
Err(storage_err) => { | ||
match storage_err { | ||
StorageError::InvalidUppers(failed_ids) => { | ||
assert_eq!( | ||
failed_ids.len(), | ||
1, | ||
"received errors for more than one collection" | ||
); | ||
assert_eq!( | ||
failed_ids[0].id, id, | ||
"received errors for a different collection" | ||
); | ||
|
||
// This is fine, it just means the upper moved because | ||
// of continual upper advancement or because seomeone | ||
// already appended some more retractions/updates. | ||
// | ||
// NOTE: We might want to attempt these partial | ||
// retractions on an interval, instead of only when | ||
// starting up! | ||
info!(%id, current_upper = ?failed_ids[0].current_upper, "failed to append partial truncation"); | ||
} | ||
// Uh-oh, something else went wrong! | ||
other => { | ||
panic!("Unhandled error while appending to managed collection {id:?}: {other:?}") | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not seem to be covered by tests according to coverage/428.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this also can't happen today because we can't have concurrent envd
processes which are attempting to write. We might see this when we have real 0dt upgrades, where we temporarily have two envd
processes alive.
So it's very much future-looking code.
…naged collections In support of 0dt upgrades: MaterializeInc#27406 This makes previously existing implicit differences in how we handle storage-managed collections explicit. So that we can be smarter about how we handle them during read-only mode / 0dt upgrades. We also fix potential problems around concurrency, by making all moments where we try and append updates/retractions based on the current shard state handle upper mismatches. This is a **pessimization** for memory usage per differential collection (see below). Differential collections mirror state that we keep in memory in some form. With my change, we keep an additional copy of that in the form of the `desired` collection. I do this so that we don’t have to continually update/consolidate a desired collection by reading from persist: we only update our view of the world when there is an upper mismatch, which is rare today. Today, I don’t think this is a problem, the in-memory state was negligible before, and doubling it is still negligible. Maybe a hot take! I think the types of these collections are already locked in, just by how we currently use them and drive them around. This change it making those differences _explicit_: - Append-only: Only accepts blind writes, writes that can be applied at any timestamp and don’t depend on current collection contents. - Pseudo append-only: We treat them largely as append-only collections but periodically (currently on bootstrap) retract old updates from them. This change fixes a problem where those retractions might not line up with what is there, because we didn’t morally do a `compare_and_append`. - Differential: at any given time `t` , collection contents mirrors some (small cardinality) state. The cardinality of the collection stays constant if the thing that is mirrored doesn’t change in cardinality. At steady state, updates always come in pairs of retractions/additions.
63f1bbe
to
5f06e68
Compare
Thanks everyone for the review and patience with this one! 🙏 |
In support of 0dt upgrades:
#27406
This makes previously existing implicit differences in how we handle
storage-managed collections explicit. So that we can be smarter about
how we handle them during read-only mode / 0dt upgrades.
We also fix potential problems around concurrency, by making all moments
where we try and append updates/retractions based on the current shard
state handle upper mismatches.
This is a pessimization for memory usage per differential collection
(see below). Differential collections mirror state that we keep in
memory in some form. With my change, we keep an additional copy of that
in the form of the
desired
collection. I do this so that we don’t haveto continually update/consolidate a desired collection by reading from
persist: we only update our view of the world when there is an upper
mismatch, which is rare today. Today, I don’t think this is a problem,
the in-memory state was negligible before, and doubling it is still
negligible. Maybe a hot take!
I think the types of these collections are already locked in, just by
how we currently use them and drive them around. This change it making
those differences explicit:
any timestamp and don’t depend on current collection contents.
but periodically (currently on bootstrap) retract old updates from
them. This change fixes a problem where those retractions might not
line up with what is there, because we didn’t morally do a
compare_and_append
.t
, collection contents mirrors some(small cardinality) state. The cardinality of the collection stays
constant if the thing that is mirrored doesn’t change in cardinality.
At steady state, updates always come in pairs of
retractions/additions.
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.