Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove orphaned orchestrator nodes on environmentd startup #16114

Closed
wants to merge 3 commits into from

Conversation

lluki
Copy link
Contributor

@lluki lluki commented Nov 16, 2022

When environmentd crashes during an operation that removes a computed (DROP CLUSTER REPLICA...) or a storaged (DROP SOURCE ...) it is possible that the object is removed from the catalog but the corresponding service is not removed from the orchestrator. This PR cleans up those orphaned nodes on environmentd restart.

Motivation

  • This PR fixes a recognized bug: Fixes MaterializeInc/database-issues#4493, Fixes MaterializeInc/database-issues#3391

Implementation:

Environmentd has an epoch, only the envd with the highest epoch number is able to do catalog transactions (for example creating or dropping replicas), environmentds with an out of date epoch will crash when they notice.

Each Kubernetes StatefulSet gets an epoch set to the epoch of the calling environmentd. There are no atomic compare-and-swap operations on StatefulSet (as far as I know). Thus the implementation relies solely on ensure overwriting any previous state.

If environmentd starts, it loads the state from the catalog, installs the replicas into the compute controller and then calls remove_orphans . Remove orphans fetches all running replicas from kubernetes and deletes all replicas that have an older epoch number and are not in use by the calling envd (both conditions need to be true to be removed).

Correctness

Because ensure_service will always overwrite existing StatefulSets, it is possible that epoch numbers in the StatefulSet go backwards. For example if envd@3 (that is environmentd running with epoch 3) calls ensure_replica(r1) followed by envd@2 calling ensure_replica(r1) that the epoch number in kubernetes is set to two, despite a newer envd having called ensure_replica in the past.

This can also happen in practice, if we (accidentally) start multiple environmentds at the same time:

───────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       │
envd@3 │                   load                 ensure_replica           remove_orphans       starts serving
       │                   catalog              r1 -> @3                     [3]              requests
───────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       │
envd@2 │           load               ensure_replica                                     remove_orphans     starts serving
       │           catalog            r1 -> @2                                                [2]           requests
───────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       │
envd@1 │   load                                          ensure_replica         remove_orphans        starts serving
       │   catalog                                       r1 -> @1                   [1]               requests
───────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────────────

Thus we have to consider some corner cases that are not obviously correct. For example, in the above example envd@2 in remove_orphans sees a replica with epoch 1 hence relying on a past epoch alone is not enough.

We have to ensure that all replicas the latest envd ( envd@3 in the example above ) is aware of are present in kubernetes. This could be violated if any envd removes a replica that envd@3 wants to keep using. An out-of date envd can not call drop cluster replica , as it causes a catalog transaction and the envd would notice it is out of date and terminate. That leaves the remove_orphans method. remove_orphans of the latest envd is trivially correct, as it will never remove a replica the calling envd is still using. But what about other envd’s calling remove_orphans? It is possible that envd@1 calls ensure_replica after envd@2 has done so, then envd@2 would see r1@1 (replica 1 with epoch set to 1) that it could possible remove. However, for this interleaving to happen, envd@1 and envd@2 must run from the same catalog state. For the catalog to be different, envd@1 would have to start serving requests, receive a drop/create and then envd@2 starts reading the catalog. But then the ensure_replica X remove_orphans interleaving is ruled. Thus, either there is no interleaving, or they operate on the same set of replicas. In neither case a replica is wrongly removed.

Tips for reviewer

First commit adds the epoch label to StatefulSet, the second commit uses this the remove orphans and the third one adds testing for those by deliberately crashing environmentd in the right spots.

Checklist

Add knowledge of the current envd epoch to namespaced orchestrator,
which in turn adds the epoch as label to created kubernetes pods.
Make environmentd crash on drop replica after the catalog transaction
but before the orchestrator call. Then ensure that the orphan will get cleaned
up on environmentd restart.
Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine! I added some comments on things I found.

let supervisors = self.supervisors.lock().expect("lock poisoned");
Ok(supervisors.keys().cloned().collect())
// The process orchestrator does not have a concept of epochs and thus always returns epoch 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it returns epoch 1?

async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
let stateful_sets = self.stateful_set_api.list(&self.list_params()).await?;
async fn list_services(&self) -> Result<Vec<(String, NonZeroI64)>, anyhow::Error> {
let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to flag this change because I don't understand the implications of changing the list parameters from self.list_params() to Default::default(). Can you add an explanation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It reverts a mistake introduced in 58135e2 . I made a separate commit that should explain this in the follow up PR (ready soon).

Comment on lines +1144 to +1152
let bootstrap = handle
.block_on(coord.bootstrap(builtin_migration_metadata, builtin_table_updates))
.and_then(|_| {
// Controller is fully populated, synchronize with
// orchestrator before reporting ready.
handle
.block_on(coord.controller.remove_orphans())
.map_err(AdapterError::Orchestrator)
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about passing an async block to block_on instead of calling it twice? Something like the following:

let bootstrap = handle.block_on(async {
    coord.bootstrap(builtin_migration_metadata, builtin_table_updates).await?;
    coord.controller.remove_orphans().await?;
    Ok(())
});

// This method must be called only after we have loaded the catalog state. Assert
// that this has happened by checking that we have at least one cluster (there should
// always be at least the system cluster)
assert!(self.instances.len() > 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be potentially dangerous. If we delete the system cluster, we arrive in a situation where self.instances might be empty, and this would prevent us from starting from this point forward, which also means that we can't fix the problem from within the environment. What exactly is the assert supposed to prevent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to ensure we populate the controller properly before calling remove orphans. If that happens, it would be quite dramatic, as remove_orphans would then go off to delete all replicas. It's removed in the new PR as it works quite differently now.

#[cfg(feature = "tokio-console")]
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
#[cfg(feature = "tokio-console")]
use std::time::Duration;
use std::{collections::HashSet, ffi::OsString};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not a fan of these imports, but I'm not blocking on it :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rust-analyzer did that :-O gone in the new PR

try:
mz.environmentd.sql(sql)
except InterfaceError:
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Print error with a non-error level to make debugging easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally agree, but as it is used now, the function will create an InterfaceError all the time. Maybe I can rename the function to crashing_sql or so and assert that an error happens.

after = get_replica()

# A environmentd crash must increment the epoch
assert epoch(before[1]) + 1 == epoch(after[1])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for less than rather than equality. The epoch is not guaranteed to be incremented by 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gone now

@benesch
Copy link
Member

benesch commented Nov 17, 2022

Marked this as also fixing MaterializeInc/database-issues#3391.

Copy link
Member

@benesch benesch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the epoch is making this problem easier to solve. Here's what I'd propose.

  • At startup, the storage controller is informed of what the next storage host ID will be.
  • The storage controller finds orphaned storage hosts as follows:
    • It lists all services in the namespace.
    • If it discovers any services whose ID is greater than or equal to the next storage host ID, it halt!s, as this is a clear sign that another envd has taken over.
    • It deletes any services that are not reflected in the catalog state.

This seems simpler to me than trying to reason about epochs. It also seems correct to me, but please double check my math. Note that it relies on a property that we intentionally uphold: that IDs for cluster replicas and storage hosts are monotonically increasing.

I think the epochs are useful to solve another problem, which is described in https://github.com/MaterializeInc/materialize/issues/13739#issuecomment-1189812455. We should not consider #13739 fixed until we tackle that problem. Basically, a stale envd should never overwrite the changes of a newer envd, and for that I think we want to use the epochs. Every call to ensure_service in the k8s orchestrator actually needs to read the current object, verify that, if the object exists, its epoch is lower than the current process's epoch, and then perform a conditional write to k8s that only updates the object if its version is unchanged from the version whose epoch was just verified. This is complicated enough, though, that I'd suggest doing it in a future PR.


/// Remove all services in this namespace not contained in keep and are
/// of an older epoch.
async fn remove_orphans(&self, keep: HashSet<String>) -> Result<(), anyhow::Error>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this method belongs on this trait. I intentionally tried to keep this trait small but powerful. Unless I'm missing something, remove_orphans is something that should be built on top of the trait, rather than something that should be provided by the underlying implementations.

@benesch
Copy link
Member

benesch commented Nov 17, 2022

I've retitled MaterializeInc/database-issues#3930 to be about the more general problem, and removed it from the list of things this issue fixes, just so we don't accidentally close it until it is truly resolved!

@@ -1799,6 +1799,8 @@ impl<S: Append + 'static> Coordinator<S> {
self.catalog_transact(Some(session), ops, |_| Ok(()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have another failpoint before self.catalog_transact and have the test run for a 3-rd time with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the crash happens before, the operation is not run at all. There seems to be another related issue (see Joe's comment #16150 (comment)) that this might surface, but I'd favor adding the failpoint together with a test that fixes an issue.

@@ -1799,6 +1799,8 @@ impl<S: Append + 'static> Coordinator<S> {
self.catalog_transact(Some(session), ops, |_| Ok(()))
.await?;

fail::fail_point!("after_catalog_drop_replica");

for (compute_id, replica_id) in replicas_to_drop {
self.drop_replica(compute_id, replica_id).await.unwrap();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have a 3rd failpoint after the self.drop_replica loop and then a 4th invocation of the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a test run for this. I think it will not expose new scenarios, but better be safe (unless we are constrained on test duration time)

Copy link
Contributor

@philip-stoev philip-stoev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for the tests! They were already far beyond what I could have come up with.

Just please consider my suggestion to pepper the code with two more failpoints and run the tests two more times -- it seems like an easy win given that you have created all the required machinery already.

@lluki
Copy link
Contributor Author

lluki commented Nov 17, 2022

Thanks for all the comments. I think @benesch's proposal makes sense, I want to think about it a second more and if I dont find a problem I turn this PR into his proposal, and keep the epoch changes aside for another PR that tackles MaterializeInc/database-issues#3930

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants