-
Notifications
You must be signed in to change notification settings - Fork 466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove orphaned orchestrator nodes on environmentd startup #16200
Conversation
511f518
to
f37b75d
Compare
handle.block_on(coord.bootstrap(builtin_migration_metadata, builtin_table_updates)); | ||
let bootstrap = handle.block_on(async { | ||
coord | ||
.bootstrap(builtin_migration_metadata, builtin_table_updates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible that bootstrapping creates new objects with a user ID higher than next_ids.0
due to the builtin schema migration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably fix this by getting the next IDs after bootrapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to get those before the migration, I'm interested in objects that would be left over from a previous environmentd run, so no need to look at the ID's being allocated during bootstrap and normal operation.
The only thing I'm worried about is that someone else does a create replica
and we see the increased replica ID in the next_replica_id
, but not the replica object itself. Then remove_orphans
would gladly remove this from the orchestrator. Afaik this can not happen, because both the initial catalog load and replica create happen inside a transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we see a new object created during migration, when removing orphans then will this code panic in the storage controller?
if id >= next_id {
// Found a storaged in kubernetes with a higher id than what we are aware of. This
// must have been created by an environmentd with a higher epoch number.
panic!(
"Found storaged id ({}) in orchestrator >= than next_id ({})",
id, next_id
);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing I'm worried about is that someone else does a create replica and we see the increased replica ID in the next_replica_id, but not the replica object itself. Then remove_orphans would gladly remove this from the orchestrator. Afaik this can not happen, because both the initial catalog load and replica create happen inside a transaction.
I think we're OK here. A user can't create a replica until we're done with bootstrapping and finished removing orphaned nodes. If a new Coordinator creates a new replica and increases the next_replica_id
then our read of next_replica_id
will fail because we're no longer the leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we see a new object created during migration, when removing orphans then will this code panic in the storage controller?
You're right, it won't be deleted, but we will panic: We need to fetch the next_id
after migration. When I just fetch them after the bootstrap (using peek_key_one
on the ID allocation collection), that call does an epoch check right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. All stash reads and writes will do an epoch check.
874e033
to
baf1b17
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just took a quick skim, but looks 👌🏽. Thank you!
if id >= next_id { | ||
// Found a storaged in kubernetes with a higher id than what we are aware of. This | ||
// must have been created by an environmentd with a higher epoch number. | ||
panic!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
panic!( | |
halt!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added!
e5760a0
to
dbf4d4f
Compare
2eae69a
to
fd7c0ac
Compare
src/controller/src/lib.rs
Outdated
next_ids: (GlobalId, ReplicaId), | ||
) -> Result<(), anyhow::Error> { | ||
self.storage.remove_orphans(next_ids.0).await?; | ||
self.compute.remove_orphans(next_ids.1).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we ever attempt to remove orphaned indexes or entire compute clusters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For orphaned indexes (or dataflows), the rehydration logic should take care of it. In case of a crash, there is no orphans as the computeds are stateless and wait to be rehydrated from environmentd.
Entire compute clusters don't have that problem either, because the only externally persisted state are the replica pod entries in kubernetes. So we should be good there.
I do have another potential source of inconsistencies in mind: The builtin table updates that don't run transactionally with the catalog update. (<- let me know if this is not true). So if envd crashes right after a drop replica has been communicated to the catalog, are we sure the builtin table updates are always correct?
If you know of any other state that envd modifies (outside the catalog) let me know!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Builtin tables should be fine. As part of bootstrapping, the Coord identifies what all the system tables should look like and what they currently look like, and will send the needed appends to get them into the correct state.
materialize/src/adapter/src/coord.rs
Lines 865 to 892 in 796cd14
// Add builtin table updates the clear the contents of all system tables | |
info!("coordinator init: resetting system tables"); | |
let read_ts = self.get_local_read_ts(); | |
for system_table in entries | |
.iter() | |
.filter(|entry| entry.is_table() && entry.id().is_system()) | |
{ | |
info!( | |
"coordinator init: resetting system table {} ({})", | |
self.catalog.resolve_full_name(system_table.name(), None), | |
system_table.id() | |
); | |
let current_contents = self | |
.controller | |
.storage | |
.snapshot(system_table.id(), read_ts) | |
.await | |
.unwrap(); | |
info!("coordinator init: table size {}", current_contents.len()); | |
let retractions = current_contents | |
.into_iter() | |
.map(|(row, diff)| BuiltinTableUpdate { | |
id: system_table.id(), | |
row, | |
diff: diff.neg(), | |
}); | |
builtin_table_updates.extend(retractions); | |
} |
745ecb0
to
339d97e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one little API naming nit and this LGTM! (I reviewed structurally mostly and didn't get into the weeds, since @jkosh44 approved.)
src/adapter/src/catalog/storage.rs
Outdated
@@ -838,6 +838,27 @@ impl<S: Append> Connection<S> { | |||
Ok(GlobalId::User(id)) | |||
} | |||
|
|||
/// Get the next user and replica id without allocating them. | |||
pub async fn get_next_ids(&mut self) -> Result<(GlobalId, ReplicaId), Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bundling the fetching of these two IDs into a single API method corrupts the public API of the catalog with the specific needs of the Controller
. Could you split this into two methods: get_next_replica_id
and get_next_user_global_id
?
src/adapter/src/catalog.rs
Outdated
@@ -3105,6 +3105,11 @@ impl<S: Append> Catalog<S> { | |||
self.storage().await.get_persisted_timestamp(timeline).await | |||
} | |||
|
|||
/// Get the next user and replica id without allocating them. | |||
pub async fn get_next_ids(&mut self) -> Result<(GlobalId, ReplicaId), Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(See below.)
src/controller/src/lib.rs
Outdated
/// Remove orphaned services from the orchestrator. | ||
pub async fn remove_orphans( | ||
&mut self, | ||
next_ids: (GlobalId, ReplicaId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next_ids: (GlobalId, ReplicaId), | |
next_replica_id: ReplicaId, | |
next_storage_host_id: GlobalId, |
List service wrongly applies a filter meant for pods to StatefulSet resulting in an always empty list. This commit reverts the change introduced by 58135e2 to no filter and return all services.
Make environmentd crash on drop replica after the catalog transaction but before the orchestrator call. Then ensure that the orphan will get cleaned up on environmentd restart.
339d97e
to
61e3f42
Compare
Addressed @Bensch's naming nit, now there is |
@@ -768,7 +769,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { | |||
|
|||
/// Lists the identifiers of all known services. | |||
async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> { | |||
let stateful_sets = self.stateful_set_api.list(&self.list_params()).await?; | |||
let stateful_sets = self.stateful_set_api.list(&Default::default()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing some context, but do we not want to only list the ones with the correct environmentd.materialize.cloud/namespace
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussion in chat, this label is not set on the statefulsets, so was previously returning an empty list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have the context for all of this, but the parts I understand look good.
When environmentd crashes during an operation that removes a computed (
DROP CLUSTER REPLICA...
) or a storaged (DROP SOURCE ...
) it is possible that the object is removed from the catalog but the corresponding service is not removed from the orchestrator. This PR cleans up those orphaned nodes on environmentd restart.Motivation
Implementation:
Unlike #16114 this PR detects orphaned nodes on startup using solely the replica/storaged ID. These IDs are monotonically increasing, thus an environmentd can create replicas only with higher IDs and it is safe to remove all orchestrator services that are not known at envd boot time and have an ID lower than the biggest ID we are currently aware of. If we encounter an orchestrator node with higher ID, we know that another envd with higher epoch is running and we terminate ourselves.
Tips for reviewer
Consider previous discussion in #16114.
First commit cleans up compute replicas, second commit cleans up storaged's on startup and the last commit adds testing.
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way) and therefore is tagged with aT-proto
label.