Skip to content

Commit

Permalink
chore: Rename err_threshold to error_threshold. Remove unused par…
Browse files Browse the repository at this point in the history
…ameter. (#1837)
  • Loading branch information
chubei committed Aug 10, 2023
1 parent 2264a0f commit e55a977
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 27 deletions.
2 changes: 1 addition & 1 deletion dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ pub fn run_sql(
let (tx, rx) = dozer_types::crossbeam::channel::unbounded::<bool>();
let pipeline_thread = std::thread::spawn(move || {
dozer.build(true).unwrap();
dozer.run_apps(shutdown_receiver, Some(tx), None)
dozer.run_apps(shutdown_receiver, Some(tx))
});
let endpoint_name = endpoints[0].clone();

Expand Down
4 changes: 2 additions & 2 deletions dozer-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ fn run() -> Result<(), OrchestrationError> {
RunCommands::App => {
render_logo();

dozer.run_apps(shutdown_receiver, None, None)
dozer.run_apps(shutdown_receiver, None)
}
},
Commands::Security(security) => match security.command {
Expand Down Expand Up @@ -220,7 +220,7 @@ fn run() -> Result<(), OrchestrationError> {
} else {
render_logo();

dozer.run_all(shutdown_receiver, None)
dozer.run_all(shutdown_receiver)
}
}

Expand Down
22 changes: 4 additions & 18 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,7 @@ impl SimpleOrchestrator {
&mut self,
shutdown: ShutdownReceiver,
api_notifier: Option<Sender<bool>>,
err_threshold: Option<u32>,
) -> Result<(), OrchestrationError> {
let mut global_err_threshold: Option<u32> =
self.config.app.as_ref().and_then(|app| app.err_threshold);
if err_threshold.is_some() {
global_err_threshold = err_threshold;
}

let home_dir = HomeDir::new(self.config.home_dir.as_ref(), self.config.cache_dir.clone());
let executor = self.runtime.block_on(Executor::new(
&home_dir,
Expand All @@ -184,10 +177,8 @@ impl SimpleOrchestrator {
get_log_options(&self.config),
self.multi_pb.clone(),
))?;
let dag_executor = executor.create_dag_executor(
self.runtime.clone(),
get_executor_options(&self.config, global_err_threshold),
)?;
let dag_executor = executor
.create_dag_executor(self.runtime.clone(), get_executor_options(&self.config))?;

let app_grpc_config = get_app_grpc_config(&self.config);
let internal_server_future = start_internal_pipeline_server(
Expand Down Expand Up @@ -354,11 +345,7 @@ impl SimpleOrchestrator {
Ok(())
}

pub fn run_all(
&mut self,
shutdown: ShutdownReceiver,
err_threshold: Option<u32>,
) -> Result<(), OrchestrationError> {
pub fn run_all(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
let shutdown_api = shutdown.clone();

let mut dozer_api = self.clone();
Expand All @@ -368,8 +355,7 @@ impl SimpleOrchestrator {
self.build(false)?;

let mut dozer_pipeline = self.clone();
let pipeline_thread =
thread::spawn(move || dozer_pipeline.run_apps(shutdown, Some(tx), err_threshold));
let pipeline_thread = thread::spawn(move || dozer_pipeline.run_apps(shutdown, Some(tx)));

// Wait for pipeline to initialize caches before starting api server
if rx.recv().is_err() {
Expand Down
14 changes: 11 additions & 3 deletions dozer-cli/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use dozer_types::models::{
api_security::ApiSecurity,
app_config::{
default_app_buffer_size, default_commit_size, default_commit_timeout,
default_log_entry_max_size, default_log_max_num_immutable_entries,
default_error_threshold, default_log_entry_max_size, default_log_max_num_immutable_entries,
},
config::{default_cache_max_map_size, Config},
};
Expand Down Expand Up @@ -46,6 +46,14 @@ fn get_commit_size(config: &Config) -> u32 {
.unwrap_or_else(default_commit_size)
}

fn get_error_threshold(config: &Config) -> u32 {
config
.app
.as_ref()
.and_then(|app| app.error_threshold)
.unwrap_or_else(default_error_threshold)
}

pub fn get_log_options(config: &Config) -> LogOptions {
let app = config.app.as_ref();
let storage_config = app
Expand Down Expand Up @@ -95,12 +103,12 @@ pub fn get_api_security_config(config: &Config) -> Option<&ApiSecurity> {
.and_then(|api| api.api_security.as_ref())
}

pub fn get_executor_options(config: &Config, err_threshold: Option<u32>) -> ExecutorOptions {
pub fn get_executor_options(config: &Config) -> ExecutorOptions {
ExecutorOptions {
commit_sz: get_commit_size(config),
channel_buffer_sz: get_buffer_size(config) as usize,
commit_time_threshold: get_commit_time_threshold(config),
error_threshold: Some(err_threshold.unwrap_or(0_u32)),
error_threshold: Some(get_error_threshold(config)),
}
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-tests/src/tests/e2e/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl DozerE2eTest {
let mut dozer = SimpleOrchestrator::new(config, Arc::new(runtime));
let (shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
let dozer_thread = std::thread::spawn(move || {
dozer.run_all(shutdown_receiver, None).unwrap();
dozer.run_all(shutdown_receiver).unwrap();
});

let num_retries = 10;
Expand Down
4 changes: 2 additions & 2 deletions dozer-types/src/models/app_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct AppConfig {
#[prost(uint32, optional)]
/// How many errors we can tolerate before bringing down the app.
#[serde(skip_serializing_if = "Option::is_none")]
pub err_threshold: Option<u32>,
pub error_threshold: Option<u32>,
}

#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, prost::Oneof)]
Expand Down Expand Up @@ -79,6 +79,6 @@ pub fn default_commit_timeout() -> u64 {
50
}

pub fn default_err_threshold() -> u32 {
pub fn default_error_threshold() -> u32 {
0
}

0 comments on commit e55a977

Please sign in to comment.