diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs index 9d21f05ee0..6e3d83bb14 100644 --- a/core/connectors/sdk/src/sink.rs +++ b/core/connectors/sdk/src/sink.rs @@ -236,6 +236,13 @@ macro_rules! sink_connector { config_len: usize, log_callback: LogCallback, ) -> i32 { + if INSTANCES.contains_key(&id) { + // Duplicate id: caller did not close before reopening. Without + // this guard the existing entry would be silently overwritten, + // discarding any in-flight buffered data and orphaning tasks. + return -1; + } + let mut container = SinkContainer::new(id); let result = container.open(id, config_ptr, config_len, log_callback, <$type>::new); INSTANCES.insert(id, container); diff --git a/core/connectors/sdk/src/source.rs b/core/connectors/sdk/src/source.rs index d5486f24f8..0209027e08 100644 --- a/core/connectors/sdk/src/source.rs +++ b/core/connectors/sdk/src/source.rs @@ -226,6 +226,13 @@ macro_rules! source_connector { state_len: usize, log_callback: LogCallback, ) -> i32 { + if INSTANCES.contains_key(&id) { + // Duplicate id: caller did not close before reopening. Without + // this guard the existing entry would be silently overwritten, + // discarding any in-flight buffered data and orphaning tasks. + return -1; + } + let mut container = SourceContainer::new(id); let result = container.open( id,