Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove orphaned orchestrator nodes on environmentd startup #16200

Merged
merged 4 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions misc/python/materialize/cloudtest/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@

import os
import subprocess
import time
from datetime import datetime, timedelta
from typing import List, Optional

from pg8000.exceptions import InterfaceError

from materialize import ROOT, mzbuild
from materialize.cloudtest.k8s import K8sResource
from materialize.cloudtest.k8s.debezium import DEBEZIUM_RESOURCES
Expand Down Expand Up @@ -146,3 +150,32 @@ def __init__(
def create(self) -> None:
super().create()
wait(condition="condition=Ready", resource="pod/compute-cluster-u1-replica-1-0")

def wait_for_sql(self) -> None:
"""Wait until environmentd pod is ready and can accept SQL connections"""
wait(condition="condition=Ready", resource="pod/environmentd-0")

start = datetime.now()
while datetime.now() - start < timedelta(seconds=300):
try:
self.environmentd.sql("SELECT 1")
break
except InterfaceError as e:
# Since we crash environmentd, we expect some errors that we swallow.
print(f"SQL interface not ready, {e} while SELECT 1. Waiting...")
time.sleep(2)

def set_environmentd_failpoints(self, failpoints: str) -> None:
"""Set the FAILPOINTS environmentd variable in the stateful set. This
will most likely restart environmentd"""
stateful_set = [
resource
for resource in self.resources
if type(resource) == EnvironmentdStatefulSet
]
assert len(stateful_set) == 1
stateful_set = stateful_set[0]

stateful_set.env["FAILPOINTS"] = failpoints
stateful_set.replace()
self.wait_for_sql()
4 changes: 4 additions & 0 deletions misc/python/materialize/cloudtest/k8s/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ def replace(self) -> None:
apps_v1_api.replace_namespaced_stateful_set(
name=name, body=self.stateful_set, namespace=self.namespace()
)
# Despite the name "status" this kubectl command will actually wait
# until the rollout is complete.
# See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928
self.kubectl("rollout", "status", f"statefulset/{name}")


class K8sConfigMap(K8sResource):
Expand Down
6 changes: 5 additions & 1 deletion misc/python/materialize/cloudtest/k8s/environmentd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# by the Apache License, Version 2.0.

import urllib.parse
from typing import Optional
from typing import Dict, Optional

from kubernetes.client import (
V1Container,
Expand Down Expand Up @@ -60,6 +60,7 @@ def __init__(
self.tag = tag
self.release_mode = release_mode
self.log_filter = log_filter
self.env: Dict[str, str] = {}
super().__init__()

def generate_stateful_set(self) -> V1StatefulSet:
Expand All @@ -78,6 +79,9 @@ def generate_stateful_set(self) -> V1StatefulSet:
V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55"),
]

for (k, v) in self.env.items():
env.append(V1EnvVar(name=k, value=v))

ports = [V1ContainerPort(container_port=5432, name="sql")]

volume_mounts = [
Expand Down
1 change: 1 addition & 0 deletions misc/python/stubs/pg8000/exceptions.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
# by the Apache License, Version 2.0.

class DatabaseError(Exception): ...
class InterfaceError(Exception): ...
10 changes: 10 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3105,6 +3105,16 @@ impl<S: Append> Catalog<S> {
self.storage().await.get_persisted_timestamp(timeline).await
}

/// Get the next user id without allocating it.
pub async fn get_next_user_global_id(&mut self) -> Result<GlobalId, Error> {
self.storage().await.get_next_user_global_id().await
}

/// Get the next replica id without allocating it.
pub async fn get_next_replica_id(&mut self) -> Result<ReplicaId, Error> {
self.storage().await.get_next_replica_id().await
}

/// Persist new global timestamp for a timeline to disk.
pub async fn persist_timestamp(
&mut self,
Expand Down
23 changes: 23 additions & 0 deletions src/adapter/src/catalog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,29 @@ impl<S: Append> Connection<S> {
Ok(GlobalId::User(id))
}

/// Get the next user id without allocating it.
pub async fn get_next_user_global_id(&mut self) -> Result<GlobalId, Error> {
self.get_next_id("user").await.map(GlobalId::User)
}

/// Get the next replica id without allocating it.
pub async fn get_next_replica_id(&mut self) -> Result<ReplicaId, Error> {
self.get_next_id(REPLICA_ID_ALLOC_KEY).await
}

async fn get_next_id(&mut self, id_type: &str) -> Result<u64, Error> {
COLLECTION_ID_ALLOC
.peek_key_one(
&mut self.stash,
&IdAllocKey {
name: id_type.to_string(),
},
)
.await
.map(|x| x.expect("must exist").next_id)
.map_err(Into::into)
}

#[tracing::instrument(level = "debug", skip(self))]
async fn allocate_id(&mut self, id_type: &str, amount: u64) -> Result<Vec<u64>, Error> {
if amount == 0 {
Expand Down
16 changes: 14 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1166,8 +1166,20 @@ pub async fn serve<S: Append + 'static>(
segment_client,
metrics: Metrics::register_with(&inner_metrics_registry),
};
let bootstrap =
handle.block_on(coord.bootstrap(builtin_migration_metadata, builtin_table_updates));
let bootstrap = handle.block_on(async {
coord
.bootstrap(builtin_migration_metadata, builtin_table_updates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that bootstrapping creates new objects with a user ID higher than next_ids.0 due to the builtin schema migration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably fix this by getting the next IDs after bootrapping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to get those before the migration, I'm interested in objects that would be left over from a previous environmentd run, so no need to look at the ID's being allocated during bootstrap and normal operation.

The only thing I'm worried about is that someone else does a create replica and we see the increased replica ID in the next_replica_id, but not the replica object itself. Then remove_orphans would gladly remove this from the orchestrator. Afaik this can not happen, because both the initial catalog load and replica create happen inside a transaction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we see a new object created during migration, when removing orphans then will this code panic in the storage controller?

            if id >= next_id {
                // Found a storaged in kubernetes with a higher id than what we are aware of. This
                // must have been created by an environmentd with a higher epoch number.
                panic!(
                    "Found storaged id ({}) in orchestrator >= than next_id ({})",
                    id, next_id
                );
            }

Copy link
Contributor

@jkosh44 jkosh44 Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing I'm worried about is that someone else does a create replica and we see the increased replica ID in the next_replica_id, but not the replica object itself. Then remove_orphans would gladly remove this from the orchestrator. Afaik this can not happen, because both the initial catalog load and replica create happen inside a transaction.

I think we're OK here. A user can't create a replica until we're done with bootstrapping and finished removing orphaned nodes. If a new Coordinator creates a new replica and increases the next_replica_id then our read of next_replica_id will fail because we're no longer the leader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we see a new object created during migration, when removing orphans then will this code panic in the storage controller?

You're right, it won't be deleted, but we will panic: We need to fetch the next_id after migration. When I just fetch them after the bootstrap (using peek_key_one on the ID allocation collection), that call does an epoch check right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. All stash reads and writes will do an epoch check.

.await?;
coord
.controller
.remove_orphans(
coord.catalog.get_next_replica_id().await?,
coord.catalog.get_next_user_global_id().await?,
)
.await
.map_err(AdapterError::Orchestrator)?;
Ok(())
});
let ok = bootstrap.is_ok();
bootstrap_tx.send(bootstrap).unwrap();
if ok {
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1831,10 +1831,14 @@ impl<S: Append + 'static> Coordinator<S> {

self.catalog_transact(Some(session), ops).await?;

fail::fail_point!("after_catalog_drop_replica");

for (compute_id, replica_id) in replicas_to_drop {
self.drop_replica(compute_id, replica_id).await.unwrap();
}

fail::fail_point!("after_sequencer_drop_replica");

Ok(ExecuteResponse::DroppedComputeReplica)
}

Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ pub enum AdapterError {
Storage(mz_storage_client::controller::StorageError),
/// An error occurred in the compute layer
Compute(anyhow::Error),
/// An error in the orchestrator layer
Orchestrator(anyhow::Error),
}

impl AdapterError {
Expand Down Expand Up @@ -478,6 +480,7 @@ impl fmt::Display for AdapterError {
}
AdapterError::Storage(e) => e.fmt(f),
AdapterError::Compute(e) => e.fmt(f),
AdapterError::Orchestrator(e) => e.fmt(f),
}
}
}
Expand Down
40 changes: 37 additions & 3 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
//! recover each dataflow to its current state in case of failure or other reconfiguration.

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fmt;
use std::num::{NonZeroI64, NonZeroUsize};
use std::str::FromStr;
Expand All @@ -41,7 +41,7 @@ use chrono::{DateTime, Utc};
use differential_dataflow::lattice::Lattice;
use futures::stream::BoxStream;
use futures::{future, FutureExt, StreamExt};
use mz_ore::soft_assert;
use mz_ore::{halt, soft_assert};
use serde::{Deserialize, Serialize};
use timely::progress::frontier::{AntichainRef, MutableAntichain};
use timely::progress::{Antichain, Timestamp};
Expand All @@ -61,7 +61,8 @@ use crate::service::{ComputeClient, ComputeGrpcClient};

use self::error::{
CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
InstanceExists, InstanceMissing, PeekError, ReplicaCreationError, ReplicaDropError,
InstanceExists, InstanceMissing, PeekError, RemoveOrphansError, ReplicaCreationError,
ReplicaDropError,
};
use self::instance::{ActiveInstance, Instance};
use self::orchestrator::ComputeOrchestrator;
Expand Down Expand Up @@ -336,6 +337,39 @@ impl<T> ComputeController<T> {
storage,
}
}

/// Remove orphaned compute replicas from the orchestrator. These are replicas that the
/// orchestrator is aware of, but not the controller.
pub async fn remove_orphans(
&self,
next_replica_id: ReplicaId,
) -> Result<(), RemoveOrphansError> {
let keep: HashSet<_> = self
.instances
.iter()
.flat_map(|(_, inst)| inst.replica_ids())
.collect();

let current: HashSet<_> = self.orchestrator.list_replicas().await?.collect();

for (inst_id, replica_id) in current.into_iter() {
if replica_id >= next_replica_id {
// Found a replica in kubernetes with a higher replica id than what we are aware
// of. This must have been created by an environmentd with higher epoch number.
halt!(
"Found replica id ({}) in orchestrator >= next id ({})",
replica_id,
next_replica_id
);
}

if !keep.contains(&replica_id) {
self.orchestrator.drop_replica(inst_id, replica_id).await?;
}
}

Ok(())
}
}

impl<T> ComputeController<T>
Expand Down
13 changes: 13 additions & 0 deletions src/compute-client/src/controller/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,16 @@ impl From<CollectionMissing> for CollectionUpdateError {
Self::CollectionMissing(error.0)
}
}

/// Errors arising during orphan removal.
#[derive(Error, Debug)]
pub enum RemoveOrphansError {
#[error("orchestrator error: {0}")]
OrchestratorError(anyhow::Error),
}

impl From<anyhow::Error> for RemoveOrphansError {
fn from(error: anyhow::Error) -> Self {
Self::OrchestratorError(error)
}
}
5 changes: 5 additions & 0 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ impl<T> Instance<T> {
// Do we have responses ready to deliver?
|| !self.ready_responses.is_empty()
}

/// Returns the ids of all replicas of this instance
pub fn replica_ids(&self) -> impl Iterator<Item = &ReplicaId> {
self.replicas.keys()
}
}

impl<T> Instance<T>
Expand Down
9 changes: 9 additions & 0 deletions src/compute-client/src/controller/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ impl ComputeOrchestrator {
self.inner.drop_service(&service_name).await
}

pub(super) async fn list_replicas(
&self,
) -> Result<impl Iterator<Item = (ComputeInstanceId, ReplicaId)>, anyhow::Error> {
self.inner.list_services().await.map(|s| {
s.into_iter()
.filter_map(|x| parse_replica_service_name(&x).ok())
})
}

pub(super) fn watch_services(&self) -> BoxStream<'static, ComputeInstanceEvent> {
fn translate_event(event: ServiceEvent) -> Result<ComputeInstanceEvent, anyhow::Error> {
let (instance_id, replica_id) = parse_replica_service_name(&event.service_id)?;
Expand Down
11 changes: 11 additions & 0 deletions src/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ impl<T> Controller<T> {
pub fn active_compute(&mut self) -> ActiveComputeController<T> {
self.compute.activate(&mut *self.storage)
}

/// Remove orphaned services from the orchestrator.
pub async fn remove_orphans(
&mut self,
next_replica_id: ReplicaId,
next_storage_host_id: GlobalId,
) -> Result<(), anyhow::Error> {
self.compute.remove_orphans(next_replica_id).await?;
self.storage.remove_orphans(next_storage_host_id).await?;
Ok(())
}
}

impl<T> Controller<T>
Expand Down
2 changes: 1 addition & 1 deletion src/environmentd/src/bin/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ max log level: {max_log_level}",
value.to_string_lossy().into_owned(),
)
})
.filter(|(name, _value)| name.starts_with("MZ_"))
.filter(|(name, _value)| name.starts_with("MZ_") || name == "FAILPOINTS")
.map(|(name, value)| format!("{}={}", escape(&name), escape(&value)))
.chain(env::args().into_iter().map(|arg| escape(&arg).into_owned()))
.join(" ")
Expand Down
1 change: 1 addition & 0 deletions src/orchestrator-kubernetes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow = "1.0.66"
async-trait = "0.1.58"
chrono = { version = "0.4.23", default-features = false }
clap = { version = "3.2.20", features = ["derive"] }
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
maplit = "1.0.2"
mz-cloud-resources = { path = "../cloud-resources" }
Expand Down
7 changes: 4 additions & 3 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl k8s_openapi::Metadata for PodMetrics {
impl NamespacedKubernetesOrchestrator {
/// Return a `ListParams` instance that limits results to the namespace
/// assigned to this orchestrator.
fn list_params(&self) -> ListParams {
fn list_pod_params(&self) -> ListParams {
let ns_selector = format!(
"environmentd.materialize.cloud/namespace={}",
self.namespace
Expand Down Expand Up @@ -740,6 +740,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {

/// Drops the identified service, if it exists.
async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
self.service_scales
.lock()
.expect("poisoned lock")
Expand Down Expand Up @@ -768,7 +769,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {

/// Lists the identifiers of all known services.
async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
let stateful_sets = self.stateful_set_api.list(&self.list_params()).await?;
let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing some context, but do we not want to only list the ones with the correct environmentd.materialize.cloud/namespace?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion in chat, this label is not set on the statefulsets, so was previously returning an empty list.

let name_prefix = format!("{}-", self.namespace);
Ok(stateful_sets
.into_iter()
Expand Down Expand Up @@ -818,7 +819,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
})
}

let stream = watcher(self.pod_api.clone(), self.list_params())
let stream = watcher(self.pod_api.clone(), self.list_pod_params())
.touched_objects()
.filter_map(|object| async move {
match object {
Expand Down
4 changes: 3 additions & 1 deletion src/pgwire/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ impl ErrorResponse {
// code, so it's probably the best choice.
AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::MultiTableWriteTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::Storage(_) | AdapterError::Compute(_) => SqlState::INTERNAL_ERROR,
AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
SqlState::INTERNAL_ERROR
}
};
ErrorResponse {
severity,
Expand Down
Loading