Skip to content

Commit

Permalink
orchestrator-process: rewrite
Browse files Browse the repository at this point in the history
Rewrite the process orchestrator to be more reliable:

  * Use Unix domain sockets rather than TCP, to avoid competing with
    other `environmentd` processes (e.g., in tests) for a very limited
    range of TCP ports.

  * Restructure process supervision so that existing processes that are
    adopted at startup are monitored for failures and restarted if
    necessary.

  * Enable process status updates.

  * Enable process metrics collection.

Fix #15725.
  • Loading branch information
benesch committed Dec 4, 2022
1 parent 33b1abc commit 1439b5f
Show file tree
Hide file tree
Showing 16 changed files with 379 additions and 623 deletions.
12 changes: 7 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,7 @@ tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
tungstenite = { git = "https://github.com/snapview/tungstenite-rs.git" }
serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" }
vte = { git = "https://github.com/alacritty/vte" }

[patch."https://github.com/TimelyDataflow/timely-dataflow.git"]
timely_communication = { git = "https://github.com/benesch/timely-dataflow.git", branch = "uds" }
timely = { git = "https://github.com/benesch/timely-dataflow.git", branch = "uds" }
7 changes: 4 additions & 3 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ use chrono::{DateTime, Utc};
use differential_dataflow::lattice::Lattice;
use futures::stream::BoxStream;
use futures::{future, FutureExt, StreamExt};
use mz_ore::{halt, soft_assert};
use serde::{Deserialize, Serialize};
use timely::progress::frontier::{AntichainRef, MutableAntichain};
use timely::progress::{Antichain, Timestamp};
use tracing::warn;
use uuid::Uuid;

use mz_build_info::BuildInfo;
use mz_expr::RowSetFinishing;
use mz_orchestrator::{CpuLimit, MemoryLimit, NamespacedOrchestrator, ServiceProcessMetrics};
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{halt, soft_assert};
use mz_repr::{GlobalId, Row};
use mz_storage_client::controller::{ReadPolicy, StorageController};

Expand Down Expand Up @@ -496,7 +497,7 @@ where
let metrics = match result {
Ok(metrics) => metrics,
Err(e) => {
tracing::log::warn!("Failed to get metrics for replica {replica_id}: {e}");
warn!("failed to get metrics for replica {replica_id}: {e}");
return;
}
};
Expand Down Expand Up @@ -761,7 +762,7 @@ where
if let Ok(mut instance) = self.instance(instance_id) {
instance.handle_response(response, replica_id)
} else {
tracing::warn!(
warn!(
?instance_id,
?response,
"processed response from unknown instance"
Expand Down
19 changes: 4 additions & 15 deletions src/compute-client/src/controller/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,12 @@ impl ComputeOrchestrator {
image: self.computed_image.clone(),
init_container_image: self.init_container_image.clone(),
args: &|assigned| {
let mut compute_opts = vec![
format!(
"--controller-listen-addr={}:{}",
assigned.listen_host, assigned.ports["controller"]
),
format!(
"--internal-http-listen-addr={}:{}",
assigned.listen_host, assigned.ports["internal-http"]
),
vec![
format!("--controller-listen-addr={}", assigned["controller"]),
format!("--internal-http-listen-addr={}", assigned["internal-http"]),
format!("--opentelemetry-resource=instance_id={}", instance_id),
format!("--opentelemetry-resource=replica_id={}", replica_id),
];
if let Some(index) = assigned.index {
compute_opts
.push(format!("--opentelemetry-resource=replica_index={}", index));
}
compute_opts
]
},
ports: vec![
ServicePort {
Expand Down
14 changes: 0 additions & 14 deletions src/environmentd/src/bin/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
use mz_ore::cgroup::{detect_memory_limit, MemoryLimit};
use mz_ore::cli::{self, CliConfig, KeyValueArg};
use mz_ore::id_gen::PortAllocator;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_persist_client::cache::PersistClientCache;
Expand Down Expand Up @@ -292,13 +291,6 @@ pub struct Args {
/// value.
#[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
orchestrator_process_wrapper: Option<String>,
/// Base port for services spawned by the process orchestrator.
#[structopt(
long,
env = "ORCHESTRATOR_PROCESS_BASE_SERVICE_PORT",
default_value = "2100"
)]
orchestrator_process_base_service_port: u16,
/// Where the process orchestrator should store secrets.
#[clap(
long,
Expand Down Expand Up @@ -600,12 +592,6 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
// binaries and release binaries look for other release
// binaries.
image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
port_allocator: Arc::new(PortAllocator::new(
args.orchestrator_process_base_service_port,
args.orchestrator_process_base_service_port
.checked_add(1000)
.expect("Port number overflow, base-service-port too large."),
)),
suppress_output: false,
environment_id: args.environment_id.clone(),
secrets_dir: args.orchestrator_process_secrets_directory,
Expand Down
6 changes: 0 additions & 6 deletions src/environmentd/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use mz_environmentd::TlsMode;
use mz_frontegg_auth::FronteggAuthentication;
use mz_orchestrator::Orchestrator;
use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
use mz_ore::id_gen::PortAllocator;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
use mz_ore::retry::Retry;
Expand All @@ -48,10 +47,6 @@ use mz_storage_client::types::connections::ConnectionContext;
pub static KAFKA_ADDRS: Lazy<String> =
Lazy::new(|| env::var("KAFKA_ADDRS").unwrap_or_else(|_| "localhost:9092".into()));

// Port 2181 is used by ZooKeeper.
static PORT_ALLOCATOR: Lazy<Arc<PortAllocator>> =
Lazy::new(|| Arc::new(PortAllocator::new_with_filter(2100, 2600, &[2181])));

#[derive(Clone)]
pub struct Config {
data_directory: Option<PathBuf>,
Expand Down Expand Up @@ -187,7 +182,6 @@ pub fn start_server(config: Config) -> Result<Server, anyhow::Error> {
.parent()
.unwrap()
.to_path_buf(),
port_allocator: PORT_ALLOCATOR.clone(),
// NOTE(benesch): would be nice to not have to do this, but
// the subprocess output wreaks havoc on cargo2junit.
suppress_output: true,
Expand Down
44 changes: 19 additions & 25 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::net::{IpAddr, Ipv4Addr};
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -39,8 +38,8 @@ use tracing::warn;
use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
use mz_cloud_resources::AwsExternalIdPrefix;
use mz_orchestrator::{
LabelSelectionLogic, NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments,
ServiceConfig, ServiceEvent, ServiceStatus,
LabelSelectionLogic, NamespacedOrchestrator, Orchestrator, Service, ServiceConfig,
ServiceEvent, ServiceStatus,
};
use mz_orchestrator::{LabelSelector as MzLabelSelector, ServiceProcessMetrics};

Expand Down Expand Up @@ -508,29 +507,11 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
.iter()
.map(|p| (p.name.clone(), p.port_hint))
.collect::<HashMap<_, _>>();
let peers = hosts
let listen_addrs = ports_in
.iter()
.map(|host| (host.clone(), ports.clone()))
.collect::<Vec<_>>();

let mut node_selector: BTreeMap<String, String> = self
.config
.service_node_selector
.clone()
.into_iter()
.collect();
if let Some(availability_zone) = availability_zone {
node_selector.insert(
"materialize.cloud/availability-zone".to_string(),
availability_zone,
);
}
let mut args = args(&ServiceAssignments {
listen_host: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
ports: &ports,
index: None,
peers: &peers,
});
.map(|p| (p.name.clone(), format!("0.0.0.0:{}", p.port_hint)))
.collect::<HashMap<_, _>>();
let mut args = args(&listen_addrs);
args.push("--secrets-reader=kubernetes".into());
args.push(format!(
"--secrets-reader-kubernetes-context={}",
Expand Down Expand Up @@ -567,6 +548,19 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
"cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
};

let mut node_selector: BTreeMap<String, String> = self
.config
.service_node_selector
.clone()
.into_iter()
.collect();
if let Some(availability_zone) = availability_zone {
node_selector.insert(
"materialize.cloud/availability-zone".to_string(),
availability_zone,
);
}

let container_name = image
.splitn(2, '/')
.skip(1)
Expand Down
2 changes: 2 additions & 0 deletions src/orchestrator-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async-stream = "0.3.3"
async-trait = "0.1.58"
chrono = { version = "0.4.23", default_features = false, features = ["clock"] }
futures = "0.3.25"
hex = "0.4.3"
itertools = "0.10.5"
mz-orchestrator = { path = "../orchestrator" }
mz-ore = { path = "../ore", features = ["async"] }
Expand All @@ -20,6 +21,7 @@ mz-repr = { path = "../repr" }
mz-secrets = { path = "../secrets" }
serde_json = "1.0.88"
scopeguard = "1.1.0"
sha1 = "0.10.5"
sysinfo = "0.26.7"
tokio = { version = "1.22.0", features = [ "fs", "process", "time" ] }
tracing = "0.1.37"
Expand Down

0 comments on commit 1439b5f

Please sign in to comment.