From a58808192c64059c58cd427e2d9463fa65f42014 Mon Sep 17 00:00:00 2001 From: Mukunda Rao Katta Date: Sat, 25 Apr 2026 16:52:50 -0700 Subject: [PATCH] connectors/sdk: reject duplicate iggy_sink_open and iggy_source_open The sink_connector! and source_connector! macros silently overwrote an existing INSTANCES entry on a duplicate open call, discarding any in-flight buffered data and orphaning the prior connector tasks. Return -1 on duplicate id so callers see the failure and can drain or close explicitly before re-opening. Closes #3168. Signed-off-by: Mukunda Rao Katta --- core/connectors/sdk/src/sink.rs | 7 +++++++ core/connectors/sdk/src/source.rs | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs index 9d21f05ee0..6e3d83bb14 100644 --- a/core/connectors/sdk/src/sink.rs +++ b/core/connectors/sdk/src/sink.rs @@ -236,6 +236,13 @@ macro_rules! sink_connector { config_len: usize, log_callback: LogCallback, ) -> i32 { + if INSTANCES.contains_key(&id) { + // Duplicate id: caller did not close before reopening. Without + // this guard the existing entry would be silently overwritten, + // discarding any in-flight buffered data and orphaning tasks. + return -1; + } + let mut container = SinkContainer::new(id); let result = container.open(id, config_ptr, config_len, log_callback, <$type>::new); INSTANCES.insert(id, container); diff --git a/core/connectors/sdk/src/source.rs b/core/connectors/sdk/src/source.rs index d5486f24f8..0209027e08 100644 --- a/core/connectors/sdk/src/source.rs +++ b/core/connectors/sdk/src/source.rs @@ -226,6 +226,13 @@ macro_rules! source_connector { state_len: usize, log_callback: LogCallback, ) -> i32 { + if INSTANCES.contains_key(&id) { + // Duplicate id: caller did not close before reopening. Without + // this guard the existing entry would be silently overwritten, + // discarding any in-flight buffered data and orphaning tasks. + return -1; + } + let mut container = SourceContainer::new(id); let result = container.open( id,