Skip to content

sink_connector! macro silently discards buffered data on duplicate iggy_sink_open with same ID #3168

@kriti-sc

Description

@kriti-sc

Bug description

In core/connectors/sdk/src/sink.rs, the sink_connector! macro expands iggy_sink_open to unconditionally overwrite any existing entry in INSTANCES:

let mut container = SinkContainer::new(id);
let result = container.open(id, config_ptr, config_len, log_callback, <$type>::new);
INSTANCES.insert(id, container);  // silently overwrites if id already exists

If iggy_sink_open is called twice with the same id, the previous SinkContainer is dropped without close() being called. For sinks with buffered state (e.g. DeltaSink, which buffers records in a JsonWriter before flushing to the Delta log), any unflushed records are silently discarded.

The runtime doesn't currently double-open, but the macro is the public SDK contract and this is a latent data loss bug for any sink with real buffered state.

The same pattern exists in source_connector! (core/connectors/sdk/src/source.rs) and should be fixed there too, though sources currently have no equivalent buffered-state risk.

Deployment

None

Versions

No response

Hardware / environment

No response

Sample code

No response

Logs

No response

Iggy server config

No response

Reproduction

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions