Skip to content

Commit

Permalink
fix: dozer-core now propagates recoverable errors instead of panicking (
Browse files Browse the repository at this point in the history
  • Loading branch information
prabhuvaibhav committed Jan 26, 2024
1 parent c328376 commit 2f526c8
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions dozer-core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use dozer_log::tokio::runtime::Runtime;
use dozer_tracing::LabelsAndProgress;
use dozer_types::serde::{self, Deserialize, Serialize};
use std::fmt::Debug;
use std::panic::panic_any;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread::JoinHandle;
Expand Down Expand Up @@ -70,7 +69,7 @@ pub struct DagExecutor {
}

pub struct DagExecutorJoinHandle {
join_handles: Vec<JoinHandle<()>>,
join_handles: Vec<JoinHandle<Result<(), ExecutionError>>>,
}

impl DagExecutor {
Expand Down Expand Up @@ -161,9 +160,7 @@ impl DagExecutorJoinHandle {
continue;
};
let handle = self.join_handles.swap_remove(finished);
if let Err(e) = handle.join() {
panic_any(e)
}
handle.join().unwrap()?;

if self.join_handles.is_empty() {
return Ok(());
Expand All @@ -172,47 +169,49 @@ impl DagExecutorJoinHandle {
}
}

fn start_source(source_sender: SourceNode) -> Result<JoinHandle<()>, ExecutionError> {
fn start_source(
source_sender: SourceNode,
) -> Result<JoinHandle<Result<(), ExecutionError>>, ExecutionError> {
let handle = source_sender.handle().clone();

let handle = Builder::new()
.name(format!("{handle}"))
.spawn(move || match source_sender.run() {
Ok(_) => {}
Ok(_) => Ok(()),
// Channel disconnection means the source listener has quit.
// Maybe it quit gracefully so we don't need to panic.
// Maybe it quit gracefully so we don't need to propagate the error.
Err(e) => {
if let ExecutionError::Source(e) = &e {
if let Some(ExecutionError::CannotSendToChannel) = e.downcast_ref() {
return;
return Ok(());
}
}
std::panic::panic_any(e);
Err(e)
}
})
.map_err(ExecutionError::CannotSpawnWorkerThread)?;

Ok(handle)
}

fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
fn start_processor(
processor: ProcessorNode,
) -> Result<JoinHandle<Result<(), ExecutionError>>, ExecutionError> {
Builder::new()
.name(processor.handle().to_string())
.spawn(move || {
if let Err(e) = processor.run() {
std::panic::panic_any(e);
}
processor.run()?;
Ok(())
})
.map_err(ExecutionError::CannotSpawnWorkerThread)
}

fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
fn start_sink(sink: SinkNode) -> Result<JoinHandle<Result<(), ExecutionError>>, ExecutionError> {
Builder::new()
.name(sink.handle().to_string())
.spawn(|| {
if let Err(e) = sink.run() {
std::panic::panic_any(e);
}
sink.run()?;
Ok(())
})
.map_err(ExecutionError::CannotSpawnWorkerThread)
}

0 comments on commit 2f526c8

Please sign in to comment.