Skip to content

Commit

Permalink
Merge pull request #16200 from lluki/cleanup-on-startup-id
Browse files Browse the repository at this point in the history
Remove orphaned orchestrator nodes on environmentd startup
  • Loading branch information
lluki committed Nov 30, 2022
2 parents 6177894 + 61e3f42 commit f709a94
Show file tree
Hide file tree
Showing 24 changed files with 324 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions misc/python/materialize/cloudtest/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@

import os
import subprocess
import time
from datetime import datetime, timedelta
from typing import List, Optional

from pg8000.exceptions import InterfaceError

from materialize import ROOT, mzbuild
from materialize.cloudtest.k8s import K8sResource
from materialize.cloudtest.k8s.debezium import DEBEZIUM_RESOURCES
Expand Down Expand Up @@ -146,3 +150,32 @@ def __init__(
def create(self) -> None:
super().create()
wait(condition="condition=Ready", resource="pod/compute-cluster-u1-replica-1-0")

def wait_for_sql(self) -> None:
"""Wait until environmentd pod is ready and can accept SQL connections"""
wait(condition="condition=Ready", resource="pod/environmentd-0")

start = datetime.now()
while datetime.now() - start < timedelta(seconds=300):
try:
self.environmentd.sql("SELECT 1")
break
except InterfaceError as e:
# Since we crash environmentd, we expect some errors that we swallow.
print(f"SQL interface not ready, {e} while SELECT 1. Waiting...")
time.sleep(2)

def set_environmentd_failpoints(self, failpoints: str) -> None:
"""Set the FAILPOINTS environmentd variable in the stateful set. This
will most likely restart environmentd"""
stateful_set = [
resource
for resource in self.resources
if type(resource) == EnvironmentdStatefulSet
]
assert len(stateful_set) == 1
stateful_set = stateful_set[0]

stateful_set.env["FAILPOINTS"] = failpoints
stateful_set.replace()
self.wait_for_sql()
4 changes: 4 additions & 0 deletions misc/python/materialize/cloudtest/k8s/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ def replace(self) -> None:
apps_v1_api.replace_namespaced_stateful_set(
name=name, body=self.stateful_set, namespace=self.namespace()
)
# Despite the name "status" this kubectl command will actually wait
# until the rollout is complete.
# See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928
self.kubectl("rollout", "status", f"statefulset/{name}")


class K8sConfigMap(K8sResource):
Expand Down
6 changes: 5 additions & 1 deletion misc/python/materialize/cloudtest/k8s/environmentd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# by the Apache License, Version 2.0.

import urllib.parse
from typing import Optional
from typing import Dict, Optional

from kubernetes.client import (
V1Container,
Expand Down Expand Up @@ -60,6 +60,7 @@ def __init__(
self.tag = tag
self.release_mode = release_mode
self.log_filter = log_filter
self.env: Dict[str, str] = {}
super().__init__()

def generate_stateful_set(self) -> V1StatefulSet:
Expand All @@ -78,6 +79,9 @@ def generate_stateful_set(self) -> V1StatefulSet:
V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55"),
]

for (k, v) in self.env.items():
env.append(V1EnvVar(name=k, value=v))

ports = [V1ContainerPort(container_port=5432, name="sql")]

volume_mounts = [
Expand Down
1 change: 1 addition & 0 deletions misc/python/stubs/pg8000/exceptions.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
# by the Apache License, Version 2.0.

class DatabaseError(Exception): ...
class InterfaceError(Exception): ...
10 changes: 10 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3183,6 +3183,16 @@ impl<S: Append> Catalog<S> {
self.storage().await.get_persisted_timestamp(timeline).await
}

/// Get the next user id without allocating it.
pub async fn get_next_user_global_id(&mut self) -> Result<GlobalId, Error> {
self.storage().await.get_next_user_global_id().await
}

/// Get the next replica id without allocating it.
pub async fn get_next_replica_id(&mut self) -> Result<ReplicaId, Error> {
self.storage().await.get_next_replica_id().await
}

/// Persist new global timestamp for a timeline to disk.
pub async fn persist_timestamp(
&mut self,
Expand Down
23 changes: 23 additions & 0 deletions src/adapter/src/catalog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,29 @@ impl<S: Append> Connection<S> {
Ok(GlobalId::User(id))
}

/// Get the next user id without allocating it.
pub async fn get_next_user_global_id(&mut self) -> Result<GlobalId, Error> {
self.get_next_id("user").await.map(GlobalId::User)
}

/// Get the next replica id without allocating it.
pub async fn get_next_replica_id(&mut self) -> Result<ReplicaId, Error> {
self.get_next_id(REPLICA_ID_ALLOC_KEY).await
}

async fn get_next_id(&mut self, id_type: &str) -> Result<u64, Error> {
COLLECTION_ID_ALLOC
.peek_key_one(
&mut self.stash,
&IdAllocKey {
name: id_type.to_string(),
},
)
.await
.map(|x| x.expect("must exist").next_id)
.map_err(Into::into)
}

#[tracing::instrument(level = "debug", skip(self))]
async fn allocate_id(&mut self, id_type: &str, amount: u64) -> Result<Vec<u64>, Error> {
if amount == 0 {
Expand Down
16 changes: 14 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,8 +1162,20 @@ pub async fn serve<S: Append + 'static>(
segment_client,
metrics: Metrics::register_with(&inner_metrics_registry),
};
let bootstrap =
handle.block_on(coord.bootstrap(builtin_migration_metadata, builtin_table_updates));
let bootstrap = handle.block_on(async {
coord
.bootstrap(builtin_migration_metadata, builtin_table_updates)
.await?;
coord
.controller
.remove_orphans(
coord.catalog.get_next_replica_id().await?,
coord.catalog.get_next_user_global_id().await?,
)
.await
.map_err(AdapterError::Orchestrator)?;
Ok(())
});
let ok = bootstrap.is_ok();
bootstrap_tx.send(bootstrap).unwrap();
if ok {
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1828,10 +1828,14 @@ impl<S: Append + 'static> Coordinator<S> {

self.catalog_transact(Some(session), ops).await?;

fail::fail_point!("after_catalog_drop_replica");

for (compute_id, replica_id) in replicas_to_drop {
self.drop_replica(compute_id, replica_id).await.unwrap();
}

fail::fail_point!("after_sequencer_drop_replica");

Ok(ExecuteResponse::DroppedComputeReplica)
}

Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ pub enum AdapterError {
Storage(mz_storage_client::controller::StorageError),
/// An error occurred in the compute layer
Compute(anyhow::Error),
/// An error in the orchestrator layer
Orchestrator(anyhow::Error),
}

impl AdapterError {
Expand Down Expand Up @@ -478,6 +480,7 @@ impl fmt::Display for AdapterError {
}
AdapterError::Storage(e) => e.fmt(f),
AdapterError::Compute(e) => e.fmt(f),
AdapterError::Orchestrator(e) => e.fmt(f),
}
}
}
Expand Down
40 changes: 37 additions & 3 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! from compacting beyond the allowed compaction of each of its outputs, ensuring that we can
//! recover each dataflow to its current state in case of failure or other reconfiguration.

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fmt;
use std::num::{NonZeroI64, NonZeroUsize};
use std::str::FromStr;
Expand All @@ -41,7 +41,7 @@ use chrono::{DateTime, Utc};
use differential_dataflow::lattice::Lattice;
use futures::stream::BoxStream;
use futures::{future, FutureExt, StreamExt};
use mz_ore::soft_assert;
use mz_ore::{halt, soft_assert};
use serde::{Deserialize, Serialize};
use timely::progress::frontier::{AntichainRef, MutableAntichain};
use timely::progress::{Antichain, Timestamp};
Expand All @@ -61,7 +61,8 @@ use crate::service::{ComputeClient, ComputeGrpcClient};

use self::error::{
CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
InstanceExists, InstanceMissing, PeekError, ReplicaCreationError, ReplicaDropError,
InstanceExists, InstanceMissing, PeekError, RemoveOrphansError, ReplicaCreationError,
ReplicaDropError,
};
use self::instance::{ActiveInstance, Instance};
use self::orchestrator::ComputeOrchestrator;
Expand Down Expand Up @@ -343,6 +344,39 @@ impl<T> ComputeController<T> {
storage,
}
}

/// Remove orphaned compute replicas from the orchestrator. These are replicas that the
/// orchestrator is aware of, but not the controller.
pub async fn remove_orphans(
&self,
next_replica_id: ReplicaId,
) -> Result<(), RemoveOrphansError> {
let keep: HashSet<_> = self
.instances
.iter()
.flat_map(|(_, inst)| inst.replica_ids())
.collect();

let current: HashSet<_> = self.orchestrator.list_replicas().await?.collect();

for (inst_id, replica_id) in current.into_iter() {
if replica_id >= next_replica_id {
// Found a replica in kubernetes with a higher replica id than what we are aware
// of. This must have been created by an environmentd with higher epoch number.
halt!(
"Found replica id ({}) in orchestrator >= next id ({})",
replica_id,
next_replica_id
);
}

if !keep.contains(&replica_id) {
self.orchestrator.drop_replica(inst_id, replica_id).await?;
}
}

Ok(())
}
}

impl<T> ComputeController<T>
Expand Down
13 changes: 13 additions & 0 deletions src/compute-client/src/controller/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,16 @@ impl From<CollectionMissing> for CollectionUpdateError {
Self::CollectionMissing(error.0)
}
}

/// Errors arising during orphan removal.
#[derive(Error, Debug)]
pub enum RemoveOrphansError {
#[error("orchestrator error: {0}")]
OrchestratorError(anyhow::Error),
}

impl From<anyhow::Error> for RemoveOrphansError {
fn from(error: anyhow::Error) -> Self {
Self::OrchestratorError(error)
}
}
5 changes: 5 additions & 0 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ impl<T> Instance<T> {
// Do we have responses ready to deliver?
|| !self.ready_responses.is_empty()
}

/// Returns the ids of all replicas of this instance
pub fn replica_ids(&self) -> impl Iterator<Item = &ReplicaId> {
self.replicas.keys()
}
}

impl<T> Instance<T>
Expand Down
9 changes: 9 additions & 0 deletions src/compute-client/src/controller/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ impl ComputeOrchestrator {
self.inner.drop_service(&service_name).await
}

pub(super) async fn list_replicas(
&self,
) -> Result<impl Iterator<Item = (ComputeInstanceId, ReplicaId)>, anyhow::Error> {
self.inner.list_services().await.map(|s| {
s.into_iter()
.filter_map(|x| parse_replica_service_name(&x).ok())
})
}

pub(super) fn watch_services(&self) -> BoxStream<'static, ComputeInstanceEvent> {
fn translate_event(event: ServiceEvent) -> Result<ComputeInstanceEvent, anyhow::Error> {
let (instance_id, replica_id) = parse_replica_service_name(&event.service_id)?;
Expand Down
11 changes: 11 additions & 0 deletions src/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ impl<T> Controller<T> {
pub fn active_compute(&mut self) -> ActiveComputeController<T> {
self.compute.activate(&mut *self.storage)
}

/// Remove orphaned services from the orchestrator.
pub async fn remove_orphans(
&mut self,
next_replica_id: ReplicaId,
next_storage_host_id: GlobalId,
) -> Result<(), anyhow::Error> {
self.compute.remove_orphans(next_replica_id).await?;
self.storage.remove_orphans(next_storage_host_id).await?;
Ok(())
}
}

impl<T> Controller<T>
Expand Down
2 changes: 1 addition & 1 deletion src/environmentd/src/bin/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ max log level: {max_log_level}",
value.to_string_lossy().into_owned(),
)
})
.filter(|(name, _value)| name.starts_with("MZ_"))
.filter(|(name, _value)| name.starts_with("MZ_") || name == "FAILPOINTS")
.map(|(name, value)| format!("{}={}", escape(&name), escape(&value)))
.chain(env::args().into_iter().map(|arg| escape(&arg).into_owned()))
.join(" ")
Expand Down
1 change: 1 addition & 0 deletions src/orchestrator-kubernetes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow = "1.0.66"
async-trait = "0.1.58"
chrono = { version = "0.4.23", default-features = false }
clap = { version = "3.2.20", features = ["derive"] }
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
maplit = "1.0.2"
mz-cloud-resources = { path = "../cloud-resources" }
Expand Down
7 changes: 4 additions & 3 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl k8s_openapi::Metadata for PodMetrics {
impl NamespacedKubernetesOrchestrator {
/// Return a `ListParams` instance that limits results to the namespace
/// assigned to this orchestrator.
fn list_params(&self) -> ListParams {
fn list_pod_params(&self) -> ListParams {
let ns_selector = format!(
"environmentd.materialize.cloud/namespace={}",
self.namespace
Expand Down Expand Up @@ -740,6 +740,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {

/// Drops the identified service, if it exists.
async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
self.service_scales
.lock()
.expect("poisoned lock")
Expand Down Expand Up @@ -768,7 +769,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {

/// Lists the identifiers of all known services.
async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
let stateful_sets = self.stateful_set_api.list(&self.list_params()).await?;
let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
let name_prefix = format!("{}-", self.namespace);
Ok(stateful_sets
.into_iter()
Expand Down Expand Up @@ -818,7 +819,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
})
}

let stream = watcher(self.pod_api.clone(), self.list_params())
let stream = watcher(self.pod_api.clone(), self.list_pod_params())
.touched_objects()
.filter_map(|object| async move {
match object {
Expand Down
4 changes: 3 additions & 1 deletion src/pgwire/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ impl ErrorResponse {
// code, so it's probably the best choice.
AdapterError::WriteOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::MultiTableWriteTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::Storage(_) | AdapterError::Compute(_) => SqlState::INTERNAL_ERROR,
AdapterError::Storage(_) | AdapterError::Compute(_) | AdapterError::Orchestrator(_) => {
SqlState::INTERNAL_ERROR
}
};
ErrorResponse {
severity,
Expand Down
Loading

0 comments on commit f709a94

Please sign in to comment.