Skip to content

Commit

Permalink
environmentd: Have environmentd panic on child crashes during testing
Browse files Browse the repository at this point in the history
By default, the process orchestrator will restart any computed and
storageds that it manages. During testing, this will unfortunately
mask panics that should have caused the test to fail instead.

- Add a new environmentd-command line option that will cause environmentd
  to panic if any of its children exit with an exit code that implies
  a crash or a panic happened on the child.
- Make sure the new behavior is in effect for sqllogictests, cargo
  unit tests and mzcompose-based tests
- do not panic environmentd in mzcompose workflows that
  explicitly use mz_panic() to panic a computed
- Remove the KillReplica Action from Zippy, that was using mz_panic()
  internally. The original idea of this Action was to panic a single replica,
  but in practice mz_panic() is evaludated by all replicas and they all panic
  together.

Co-authored-by: Nikhil Benesch <nikhil.benesch@gmail.com>
  • Loading branch information
philip-stoev and benesch committed Dec 5, 2022
1 parent 0275079 commit 3ed6700
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions misc/python/materialize/mzcompose/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
depends_on: Optional[List[str]] = None,
allow_host_ports: bool = False,
environment_id: Optional[str] = None,
propagate_crashes: bool = True,
) -> None:
if persist_blob_url is None:
persist_blob_url = f"file://{data_directory}/persist/blob"
Expand Down Expand Up @@ -95,6 +96,9 @@ def __init__(
if environment_id:
environment += [f"MZ_ENVIRONMENT_ID={environment_id}"]

if propagate_crashes:
environment += ["MZ_ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES=1"]

self.default_storage_size = default_size
self.default_replica_size = (
"1" if default_size == 1 else f"{default_size}-{default_size}"
Expand Down
32 changes: 0 additions & 32 deletions misc/python/materialize/zippy/replica_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
# by the Apache License, Version 2.0.

import random
from textwrap import dedent
from typing import List, Optional, Set, Type

from materialize.mzcompose import Composition
Expand Down Expand Up @@ -103,34 +102,3 @@ def __init__(self, capabilities: Capabilities) -> None:
def run(self, c: Composition) -> None:
if self.replica is not None:
c.testdrive(f"> DROP CLUSTER REPLICA IF EXISTS default.{self.replica.name}")


class KillReplica(Action):
"""Kills a single replica using mz_panic()"""

replica: Optional[ReplicaExists]

@classmethod
def requires(self) -> Set[Type[Capability]]:
return {MzIsRunning, ReplicaExists}

def __init__(self, capabilities: Capabilities) -> None:
existing_replicas = capabilities.get(ReplicaExists)
self.replica = random.choice(existing_replicas)

def run(self, c: Composition) -> None:
if self.replica is not None:
c.testdrive(
dedent(
f"""
> DROP TABLE IF EXISTS panic_table;
> CREATE TABLE panic_table (f1 TEXT);
> INSERT INTO panic_table VALUES ('Zippy killing replica {self.replica.name}');
> SET statement_timeout='1s'
> SET cluster_replica = {self.replica.name}
! INSERT INTO panic_table SELECT mz_internal.mz_panic(f1) FROM panic_table;
contains: statement timeout
"""
)
)
2 changes: 0 additions & 2 deletions misc/python/materialize/zippy/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
CreateReplica,
DropDefaultReplica,
DropReplica,
KillReplica,
)
from materialize.zippy.sink_actions import CreateSinkParameterized
from materialize.zippy.source_actions import CreateSourceParameterized
Expand Down Expand Up @@ -129,7 +128,6 @@ def config(self) -> Dict[ActionOrFactory, float]:
KillStoraged: 10,
KillComputed: 10,
CreateReplica: 30,
KillReplica: 10,
DropReplica: 10,
CreateTopicParameterized(): 10,
CreateSourceParameterized(): 10,
Expand Down
5 changes: 5 additions & 0 deletions src/environmentd/src/bin/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ pub struct Args {
required_if_eq("orchestrator", "process")
)]
orchestrator_process_secrets_directory: Option<PathBuf>,
/// Whether the process orchestrator should handle crashes in child
/// processes by crashing the parent process.
#[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
orchestrator_process_propagate_crashes: bool,

/// The init container to use for computed and storaged when using the
/// kubernetes orchestrator.
Expand Down Expand Up @@ -600,6 +604,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
command_wrapper: args
.orchestrator_process_wrapper
.map_or(Ok(vec![]), |s| shell_words::split(&s))?,
propagate_crashes: args.orchestrator_process_propagate_crashes,
}))
.context("creating process orchestrator")?,
);
Expand Down
1 change: 1 addition & 0 deletions src/environmentd/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub fn start_server(config: Config) -> Result<Server, anyhow::Error> {
environment_id: environment_id.clone(),
secrets_dir: data_directory.join("secrets"),
command_wrapper: vec![],
propagate_crashes: true,
}))?,
);
// Messing with the clock causes persist to expire leases, causing hangs and
Expand Down
1 change: 1 addition & 0 deletions src/orchestrator-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ chrono = { version = "0.4.23", default_features = false, features = ["clock"] }
futures = "0.3.25"
hex = "0.4.3"
itertools = "0.10.5"
libc = "0.2.137"
mz-orchestrator = { path = "../orchestrator" }
mz-ore = { path = "../ore", features = ["async"] }
mz-pid-file = { path = "../pid-file" }
Expand Down
54 changes: 42 additions & 12 deletions src/orchestrator-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use std::fmt::Debug;
use std::fs::Permissions;
use std::future::Future;
use std::os::unix::fs::PermissionsExt;
use std::os::unix::process::ExitStatusExt;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::process::{ExitStatus, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};

Expand All @@ -24,6 +25,7 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use itertools::Itertools;
use libc::{SIGABRT, SIGBUS, SIGILL, SIGSEGV, SIGTRAP};
use scopeguard::defer;
use sha1::{Digest, Sha1};
use sysinfo::{Pid, PidExt, ProcessExt, ProcessRefreshKind, System, SystemExt};
Expand Down Expand Up @@ -59,6 +61,8 @@ pub struct ProcessOrchestratorConfig {
pub secrets_dir: PathBuf,
/// A command to wrap the child command invocation
pub command_wrapper: Vec<String>,
/// Whether to crash this process if a child process crashes.
pub propagate_crashes: bool,
}

/// An orchestrator backed by processes on the local machine.
Expand All @@ -78,6 +82,7 @@ pub struct ProcessOrchestrator {
metadata_dir: PathBuf,
secrets_dir: PathBuf,
command_wrapper: Vec<String>,
propagate_crashes: bool,
}

impl ProcessOrchestrator {
Expand All @@ -89,6 +94,7 @@ impl ProcessOrchestrator {
environment_id,
secrets_dir,
command_wrapper,
propagate_crashes,
}: ProcessOrchestratorConfig,
) -> Result<ProcessOrchestrator, anyhow::Error> {
let metadata_dir = env::temp_dir().join(format!("environmentd-{environment_id}"));
Expand All @@ -109,6 +115,7 @@ impl ProcessOrchestrator {
metadata_dir: fs::canonicalize(metadata_dir).await?,
secrets_dir: fs::canonicalize(secrets_dir).await?,
command_wrapper,
propagate_crashes,
})
}
}
Expand All @@ -128,6 +135,7 @@ impl Orchestrator for ProcessOrchestrator {
services: Arc::new(Mutex::new(HashMap::new())),
service_event_tx,
system: Mutex::new(System::new()),
propagate_crashes: self.propagate_crashes,
})
}))
}
Expand All @@ -144,6 +152,7 @@ struct NamespacedProcessOrchestrator {
services: Arc<Mutex<HashMap<String, Vec<ProcessState>>>>,
service_event_tx: Sender<ServiceEvent>,
system: Mutex<System>,
propagate_crashes: bool,
}

#[async_trait]
Expand Down Expand Up @@ -291,6 +300,7 @@ impl NamespacedProcessOrchestrator {
}: ServiceProcessConfig,
) -> impl Future<Output = ()> {
let suppress_output = self.suppress_output;
let propagate_crashes = self.propagate_crashes;
let command_wrapper = self.command_wrapper.clone();
let image = self.image_dir.join(image);
let pid_file = run_dir.join(format!("{i}.pid"));
Expand Down Expand Up @@ -355,16 +365,15 @@ impl NamespacedProcessOrchestrator {
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::null());
}
match cmd.spawn() {
Ok(process) => {
state_updater.update_state(ProcessStatus::Ready {
pid: Pid::from_u32(process.id().unwrap()),
});
let status = KillOnDropChild(process).0.wait().await;
match spawn_process(&state_updater, cmd).await {
Ok(status) => {
if propagate_crashes && did_process_crash(status) {
panic!("{full_id}-{i} crashed; aborting because propagate_crashes is enabled");
}
error!("{full_id}-{i} exited: {:?}; relaunching in 5s", status);
}
Err(e) => {
error!("{full_id}-{i} failed to launch: {}; relaunching in 5s", e);
error!("{full_id}-{i} failed to spawn: {}; relaunching in 5s", e);
}
};
state_updater.update_state(ProcessStatus::NotReady);
Expand Down Expand Up @@ -438,12 +447,33 @@ fn interpolate_command(
command_part
}

struct KillOnDropChild(Child);
async fn spawn_process(
state_updater: &ProcessStateUpdater,
mut cmd: Command,
) -> Result<ExitStatus, anyhow::Error> {
struct KillOnDropChild(Child);

impl Drop for KillOnDropChild {
fn drop(&mut self) {
let _ = self.0.start_kill();
impl Drop for KillOnDropChild {
fn drop(&mut self) {
let _ = self.0.start_kill();
}
}

let mut child = KillOnDropChild(cmd.spawn()?);
state_updater.update_state(ProcessStatus::Ready {
pid: Pid::from_u32(child.0.id().unwrap()),
});
Ok(child.0.wait().await?)
}

fn did_process_crash(status: ExitStatus) -> bool {
// Likely not exhaustive. Feel free to add additional tests for other
// indications of a crashed child process, as those conditions are
// discovered.
matches!(
status.signal(),
Some(SIGABRT | SIGBUS | SIGSEGV | SIGTRAP | SIGILL)
)
}

struct ProcessStateUpdater {
Expand Down
1 change: 1 addition & 0 deletions src/sqllogictest/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ impl Runner {
environment_id: environment_id.clone(),
secrets_dir: temp_dir.path().join("secrets"),
command_wrapper: vec![],
propagate_crashes: true,
})
.await?,
);
Expand Down
3 changes: 2 additions & 1 deletion test/cluster-isolation/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
Zookeeper(),
Kafka(),
SchemaRegistry(),
Materialized(),
# We use mz_panic() in some test scenarios, so environmentd must stay up.
Materialized(propagate_crashes=False),
Testdrive(volumes=["mzdata:/mzdata"]),
]

Expand Down
3 changes: 2 additions & 1 deletion test/cluster/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
Computed(name="computed_2"),
Computed(name="computed_3"),
Computed(name="computed_4"),
Materialized(),
# We use mz_panic() in some test scenarios, so environmentd must stay up.
Materialized(propagate_crashes=False),
Redpanda(),
Testdrive(
volumes=[
Expand Down

0 comments on commit 3ed6700

Please sign in to comment.