Skip to content

Commit

Permalink
orchestrator-process: store PID metadata in temp directory
Browse files Browse the repository at this point in the history
PID files are not valid after a reboot of a machine. In the best case,
the referenced PIDs do not exist, and the process orchestrator correctly
recreates the services; in the worst case, the PIDs have been reused by
different processes entirely, and the process orchestrator incorrectly
thinks the services are already running. The worst case scenario is
almost a guarantee with containers, where there are only a few processes
using the low-numbered containers.

This commit fixes the problem by moving  the PID metadata files into
$TMPDIR/environment-$ID. $TMPDIR is cleared on restart, so the stale PID
files will correctly vanish after a restart. Naming the directory after
the environment ID ensures that environmentd can find its metadata after
a process restart without a machine restart, but allows multiple
`environmentd` processes to co-exist, as long as they use different
environment IDs. Things work correctly with the `--reset` option to
bin/environment, too, as this option generates a new environmentd ID.

Touches #15725.
Would close #15800.
  • Loading branch information
benesch committed Nov 1, 2022
1 parent 4e80964 commit 89de11d
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 19 deletions.
11 changes: 8 additions & 3 deletions src/environmentd/src/bin/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,14 @@ pub struct Args {
default_value = "2100"
)]
orchestrator_process_base_service_port: u16,
/// Where the process orchestrator should store its metadata.
/// Where the process orchestrator should store secrets.
// TODO(benesch): the name "data directory" is historical. We should rename
// to "--orchestrator-process-secrets-directory", but there are some
// irritating backwards compatibility concerns to sort out with the upgrade
// test.
#[clap(
long,
env = "ORCHESTRATOR_PROCESSDATA_DIRECTORY",
env = "ORCHESTRATOR_PROCESS_DATA_DIRECTORY",
value_name = "PATH",
default_value = "mzdata"
)]
Expand Down Expand Up @@ -590,7 +594,8 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
.expect("Port number overflow, base-service-port too large."),
)),
suppress_output: false,
data_dir: args.orchestrator_process_data_directory.clone(),
environment_id: args.environment_id.clone(),
secrets_dir: args.orchestrator_process_data_directory.join("secrets"),
command_wrapper: args
.orchestrator_process_wrapper
.map_or(Ok(vec![]), |s| shell_words::split(&s))?,
Expand Down
6 changes: 4 additions & 2 deletions src/environmentd/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl Config {

pub fn start_server(config: Config) -> Result<Server, anyhow::Error> {
let runtime = Arc::new(Runtime::new()?);
let environment_id = format!("environment-{}-0", Uuid::from_u128(0));
let (data_directory, temp_dir) = match config.data_directory {
None => {
// If no data directory is provided, we create a temporary
Expand Down Expand Up @@ -187,7 +188,8 @@ pub fn start_server(config: Config) -> Result<Server, anyhow::Error> {
// NOTE(benesch): would be nice to not have to do this, but
// the subprocess output wreaks havoc on cargo2junit.
suppress_output: true,
data_dir: data_directory.clone(),
environment_id: environment_id.clone(),
secrets_dir: data_directory.join("secrets"),
command_wrapper: vec![],
}))?,
);
Expand Down Expand Up @@ -227,7 +229,7 @@ pub fn start_server(config: Config) -> Result<Server, anyhow::Error> {
persisted_introspection: true,
metrics_registry: metrics_registry.clone(),
now: config.now,
environment_id: format!("environment-{}-0", Uuid::from_u128(0)),
environment_id,
cors_allowed_origin: AllowOrigin::list([]),
cluster_replica_sizes: Default::default(),
bootstrap_default_cluster_replica_size: config.default_cluster_replica_size,
Expand Down
31 changes: 19 additions & 12 deletions src/orchestrator-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0.

use std::collections::{HashMap, HashSet};
use std::env;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::fs::Permissions;
Expand Down Expand Up @@ -52,9 +53,10 @@ pub struct ProcessOrchestratorConfig {
pub port_allocator: Arc<PortAllocator>,
/// Whether to supress output from spawned subprocesses.
pub suppress_output: bool,
/// The directory in which the orchestrator should look for process
/// lock files and store secrets.
pub data_dir: PathBuf,
/// The ID of the environment under orchestration.
pub environment_id: String,
/// The directory in which to store secrets.
pub secrets_dir: PathBuf,
/// A command to wrap the child command invocation
pub command_wrapper: Vec<String>,
}
Expand All @@ -74,7 +76,7 @@ pub struct ProcessOrchestrator {
port_allocator: Arc<PortAllocator>,
suppress_output: bool,
namespaces: Mutex<HashMap<String, Arc<dyn NamespacedOrchestrator>>>,
data_dir: PathBuf,
metadata_dir: PathBuf,
secrets_dir: PathBuf,
command_wrapper: Vec<String>,
}
Expand All @@ -86,11 +88,15 @@ impl ProcessOrchestrator {
image_dir,
port_allocator,
suppress_output,
data_dir,
environment_id,
secrets_dir,
command_wrapper,
}: ProcessOrchestratorConfig,
) -> Result<ProcessOrchestrator, anyhow::Error> {
let secrets_dir = data_dir.join("secrets");
let metadata_dir = env::temp_dir().join(format!("environmentd-{environment_id}"));
fs::create_dir_all(&metadata_dir)
.await
.context("creating metadata directory")?;
fs::create_dir_all(&secrets_dir)
.await
.context("creating secrets directory")?;
Expand All @@ -102,7 +108,7 @@ impl ProcessOrchestrator {
port_allocator,
suppress_output,
namespaces: Mutex::new(HashMap::new()),
data_dir: fs::canonicalize(data_dir).await?,
metadata_dir: fs::canonicalize(metadata_dir).await?,
secrets_dir: fs::canonicalize(secrets_dir).await?,
command_wrapper,
})
Expand All @@ -119,7 +125,7 @@ impl Orchestrator for ProcessOrchestrator {
port_allocator: Arc::clone(&self.port_allocator),
suppress_output: self.suppress_output,
supervisors: Arc::new(Mutex::new(HashMap::new())),
data_dir: self.data_dir.clone(),
metadata_dir: self.metadata_dir.clone(),
secrets_dir: self.secrets_dir.clone(),
command_wrapper: self.command_wrapper.clone(),
})
Expand All @@ -134,7 +140,7 @@ struct NamespacedProcessOrchestrator {
port_allocator: Arc<PortAllocator>,
suppress_output: bool,
supervisors: Arc<Mutex<HashMap<String, Vec<AbortOnDrop>>>>,
data_dir: PathBuf,
metadata_dir: PathBuf,
secrets_dir: PathBuf,
command_wrapper: Vec<String>,
}
Expand Down Expand Up @@ -175,10 +181,11 @@ impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
let mut port_metadata_file_locations = vec![None; scale_in.get()];
for i in 0..(scale_in.get()) {
let process_file_name = format!("{}-{}-{}", self.namespace, id, i);
let pid_file_location = self.data_dir.join(format!("{}.pid", process_file_name));
let pid_file_location = self.metadata_dir.join(format!("{}.pid", process_file_name));
pid_file_locations[i] = Some(pid_file_location.clone());
let port_metadata_file_location =
self.data_dir.join(format!("{}.ports", process_file_name));
let port_metadata_file_location = self
.metadata_dir
.join(format!("{}.ports", process_file_name));
port_metadata_file_locations[i] = Some(port_metadata_file_location.clone());

if let (Ok(pid), Ok(port_metadata)) = (
Expand Down
6 changes: 4 additions & 2 deletions src/sqllogictest/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ fn format_row(row: &Row, types: &[Type], mode: Mode, sort: &Sort) -> Vec<String>
impl Runner {
pub async fn start(config: &RunConfig<'_>) -> Result<Self, anyhow::Error> {
let temp_dir = tempfile::tempdir()?;
let environment_id = format!("environment-{}-0", Uuid::from_u128(0));
let (consensus_uri, adapter_stash_url, storage_stash_url) = {
let postgres_url = &config.postgres_url;
let (client, conn) = tokio_postgres::connect(postgres_url, NoTls).await?;
Expand Down Expand Up @@ -645,7 +646,8 @@ impl Runner {
image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
port_allocator: Arc::new(PortAllocator::new(2100, 2200)),
suppress_output: false,
data_dir: temp_dir.path().to_path_buf(),
environment_id: environment_id.clone(),
secrets_dir: temp_dir.path().join("secrets"),
command_wrapper: vec![],
})
.await?,
Expand Down Expand Up @@ -687,7 +689,7 @@ impl Runner {
persisted_introspection: true,
metrics_registry,
now,
environment_id: format!("environment-{}-0", Uuid::from_u128(0)),
environment_id,
cluster_replica_sizes: Default::default(),
bootstrap_default_cluster_replica_size: "1".into(),
bootstrap_builtin_cluster_replica_size: "1".into(),
Expand Down

0 comments on commit 89de11d

Please sign in to comment.