Skip to content

Commit

Permalink
Abort starting log reader when shutting down
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Aug 24, 2023
1 parent 392f511 commit a7b27b2
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use dozer_cache::cache::LmdbRwCacheManager;
use dozer_cache::dozer_log::home_dir::HomeDir;
use dozer_core::app::AppPipeline;
use dozer_core::dag_schemas::DagSchemas;
use tokio::select;

use crate::console_helper::get_colored_text;
use crate::console_helper::GREEN;
Expand Down Expand Up @@ -98,15 +99,18 @@ impl SimpleOrchestrator {
);
let mut cache_endpoints = vec![];
for endpoint in &self.config.endpoints {
let (cache_endpoint, handle) = CacheEndpoint::new(
app_server_addr.clone(),
&*cache_manager,
endpoint.clone(),
Box::pin(shutdown.create_shutdown_future()),
operations_sender.clone(),
Some(self.multi_pb.clone()),
)
.await?;
let (cache_endpoint, handle) = select! {
// If we're shutting down, the cache endpoint will fail to connect
_shutdown_future = shutdown.create_shutdown_future() => return Ok(()),
result = CacheEndpoint::new(
app_server_addr.clone(),
&*cache_manager,
endpoint.clone(),
Box::pin(shutdown.create_shutdown_future()),
operations_sender.clone(),
Some(self.multi_pb.clone()),
) => result?
};
let cache_name = endpoint.name.clone();
futures.push(flatten_join_handle(join_handle_map_err(handle, move |e| {
if e.is_map_full() {
Expand Down

0 comments on commit a7b27b2

Please sign in to comment.