Skip to content

[Connector][Pulsar] Support multi-table sink#10558

Open
Muktha9491 wants to merge 1 commit intoapache:devfrom
Muktha9491:feature/pulsar-multitable-10426
Open

[Connector][Pulsar] Support multi-table sink#10558
Muktha9491 wants to merge 1 commit intoapache:devfrom
Muktha9491:feature/pulsar-multitable-10426

Conversation

@Muktha9491
Copy link

Purpose of this pull request

This PR adds multi-table sink support for the Pulsar connector.

The Pulsar sink can now route records dynamically to different Pulsar topics
based on SeaTunnelRow.getTableId(). Each topic maintains a dedicated
Pulsar producer stored in a producer map, allowing the connector to support
multiple tables in a single pipeline.

This change implements SupportMultiTableSink for the Pulsar connector.

Closes #10426

Does this PR introduce any user-facing change?

Yes.

Previously, the Pulsar sink only supported writing records to a single topic.
With this change, the connector supports multi-table pipelines by routing
records to different Pulsar topics using SeaTunnelRow.getTableId().

How was this patch tested?

The change was tested locally by building the connector module and verifying
the multi-table routing logic.

Steps performed:

  1. mvn clean install
  2. mvn spotless:check
  3. Verified successful compilation and connector-pulsar module tests.

The routing behavior was reviewed to ensure records are written to different
Pulsar topics based on SeaTunnelRow.getTableId().

No additional unit tests were added because the change only affects routing
logic and existing Pulsar sink functionality remains unchanged.

Check list

  • If any new Jar binary package adding in your PR, please add License Notice
  • If necessary, please update the documentation
  • If necessary, please update incompatible-changes.md
  • If you are contributing the connector code, please check that required
    connector configuration files are updated (not applicable for this change)

@DanielCarter-stack
Copy link

Issue 1: Exceptions thrown in whenComplete callback are ineffective

Location: PulsarSinkWriter.java:91-99

future.whenComplete(
    (id, ex) -> {
        pendingMessages.decrementAndGet();
        if (ex != null) {
            throw new PulsarConnectorException(
                    PulsarConnectorErrorCode.SEND_MESSAGE_FAILED,
                    "Send message failed");
        }
    });

Related Context:

  • Async callbacks execute in Pulsar client thread pool
  • Exceptions thrown in callbacks are not caught by the main thread

Problem Description:
CompletableFuture.whenComplete() executes in an async thread, where thrown exceptions are swallowed by the CompletableFuture framework (typically only logged to stderr) and do not propagate to the calling thread. This causes message send failures to:

  1. Go unnoticed by users (exceptions are swallowed)
  2. pendingMessages still gets decremented
  3. Data consistency cannot be guaranteed

This is an issue that existed in the original code, but was not fixed in this refactoring.

Potential Risks:

  • Silent loss of failed message sends
  • Violation of at-least-once semantics (AT_LEAST_ONCE)
  • Difficult to troubleshoot in production environments

Impact Scope:

  • Direct impact: Exception handling in PulsarSinkWriter.write()
  • Indirect impact: All scenarios using EXACTLY_ONCE or AT_LEAST_ONCE semantics
  • Affected scope: All Pulsar Connector users

Severity: MAJOR

Improvement Suggestions:

future.whenComplete(
    (id, ex) -> {
        pendingMessages.decrementAndGet();
        if (ex != null) {
            // Log errors instead of throwing exceptions
            log.error("Failed to send message to topic: {}", topic, ex);
            // Or use a custom error handling mechanism
            errorHandler.handleSendFailure(topic, element, ex);
        }
    });

Issue 2: Removal of Apache License header

Locations:

  • PulsarSink.java:1-17 (original) → removed in new code
  • PulsarSinkWriter.java:1-33 (original) → removed in new code

Problem Description:
All modified Java files have had their Apache License headers removed, including:

  • /* Licensed to the Apache Software Foundation... */
  • Copyright notices
  • LICENSE file references

This violates Apache project guidelines.

Potential Risks:

  • Legal compliance risk
  • Cannot pass Apache PMC IP review
  • Affects project compliance

Impact Scope:

  • Direct impact: All modified Java files
  • Affected scope: Project compliance

Severity: BLOCKER (Must fix)

Improvement Suggestions:
Restore Apache License headers for all files.


Issue 3: Removal of PARTITION_KEY_FIELDS configuration causes breaking change

Locations:

  • PulsarSinkFactory.java:39-41 (original optionRule included PARTITION_KEY_FIELDS)
  • PulsarSinkWriter.java (removed getPartitionKeyFields(), createKeySerializationSchema() methods)
  • PulsarSinkOptions.java:64-69 (definition still exists but unused)

Problem Description:

  1. Original PulsarSinkFactory.optionRule() had PARTITION_KEY_FIELDS as an optional configuration
  2. New code removed all related logic:
    • keySerializationSchema field
    • getPartitionKeyFields() method
    • createKeySerializationSchema() method
  3. But not documented in incompatible-changes.md
  4. PR description does not explicitly inform about this breaking change

Potential Risks:

  • Existing users using partition_key_fields configuration cannot run after upgrade
  • Violates semantic versioning principles
  • Poor user experience

Impact Scope:

  • Direct impact: Users using Pulsar partition key functionality
  • Indirect impact: Scenarios relying on message key for routing/partitioning
  • Affected scope: Some Pulsar Connector users

Severity: MAJOR

Improvement Suggestions:

  1. If this feature must be removed:

    • Document in incompatible-changes.md
    • Prominently note in PR description
    • Explain alternative approaches in migration documentation
  2. Or retain this functionality:

    • In multi-table scenarios, partition key configuration remains valid
    • Only ignore partition key configuration when using getTableId() routing

Issue 4: Race condition in transaction management logic after prepareCommit

Location: PulsarSinkWriter.java:145-163

@Override
public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
    if (PulsarSemantics.EXACTLY_ONCE != pulsarSemantics) {
        return Optional.empty();
    }

    while (pendingMessages.get() > 0) {
        Thread.yield();
    }

    if (currentTransaction == null) {
        return Optional.empty();
    }

    TxnID txnID = currentTransaction.getTxnID();
    currentTransaction = null;  // ← Set to null

    return Optional.of(new PulsarCommitInfo(txnID));
}

private Transaction getOrCreateTransaction() {
    if (PulsarSemantics.EXACTLY_ONCE != pulsarSemantics) {
        return null;
    }

    if (currentTransaction == null) {  // ← Recreate
        try {
            currentTransaction =
                    PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
        } catch (Exception e) {
            throw new PulsarConnectorException(
                    PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
                    "Transaction create failed");
        }
    }

    return currentTransaction;
}

Problem Description:

  1. prepareCommit() sets currentTransaction to null
  2. Next write() call creates new transaction only when getOrCreateTransaction() is invoked
  3. Race condition: If write() is called before prepareCommit(), but transaction is not ready, NPE will occur

Compared to original implementation:

// The original snapshotState creates a new transaction before returning the state
List<PulsarSinkState> pulsarSinkStates = Lists.newArrayList(new PulsarSinkState(this.transaction.getTxnID()));
this.transaction = (TransactionImpl) PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
return pulsarSinkStates;

Potential Risks:

  • After currentTransaction is set to null, if there are concurrent write calls, null transaction may be used
  • Violates EXACTLY_ONCE semantics

Impact Scope:

  • Direct impact: Multi-table writes in EXACTLY_ONCE mode
  • Indirect impact: Scenarios with large checkpoint intervals
  • Affected scope: Pulsar Connector EXACTLY_ONCE users

Severity: CRITICAL

Improvement Suggestions:
Create next transaction before prepareCommit() returns:

TxnID txnID = currentTransaction.getTxnID();
currentTransaction = null;

// Create the next transaction immediately
try {
    currentTransaction = PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
} catch (Exception e) {
    throw new PulsarConnectorException(
            PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
            "Failed to create next transaction after prepareCommit", e);
}

return Optional.of(new PulsarCommitInfo(txnID));

Issue 5: TOPIC configuration becomes optional but default value scenarios not sufficiently validated

Locations: PulsarSinkFactory.java:38-39 (new optionRule), PulsarSinkWriter.java:103-108

private String resolveTopic(SeaTunnelRow row) {
    if (row.getTableId() != null) {
        return row.getTableId();
    }
    return pluginConfig.get(PulsarSinkOptions.TOPIC);  // ← May return null
}

Problem Description:

  1. PulsarSinkFactory.optionRule() removes TOPIC from required
  2. resolveTopic() falls back to configured TOPIC when row.getTableId() == null
  3. But pluginConfig.get(TOPIC) may return null (user did not configure)
  4. Subsequent createProducer(topic) receives null topic, causing Pulsar client exception

Potential Risks:

  • In single-table scenarios, if user forgets to configure topic, runtime exception occurs
  • Error message unclear

Impact Scope:

  • Direct impact: Single-table sink users
  • Affected scope: All Pulsar Sink users

Severity: MINOR

Improvement Suggestions:

  1. Validate in PulsarSinkFactory.createSink(): if single-table mode and topic not configured, throw exception
  2. Or add validation in resolveTopic():
private String resolveTopic(SeaTunnelRow row) {
    if (row.getTableId() != null) {
        return row.getTableId();
    }
    String topic = pluginConfig.get(PulsarSinkOptions.TOPIC);
    if (topic == null) {
        throw new PulsarConnectorException(
                PulsarConnectorErrorCode.ILLEGAL_ARGUMENT,
                "Topic must be configured when row.getTableId() is null");
    }
    return topic;
}

Issue 6: Using RuntimeException instead of SeaTunnelJsonFormatException

Location: PulsarSinkWriter.java:216

// Original code:
throw new SeaTunnelJsonFormatException(
        CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
        "Unsupported format: " + format);

// New code:
throw new RuntimeException("Unsupported format: " + format);

Problem Description:
Using generic RuntimeException instead of framework-defined exception types violates SeaTunnel specifications.

Potential Risks:

  • Error codes lost, difficult to troubleshoot issues
  • Exception handling framework cannot recognize

Impact Scope:

  • Direct impact: Scenarios using unsupported formats
  • Affected scope: Error handling and monitoring

Severity: MINOR

Improvement Suggestions:
Restore use of SeaTunnelJsonFormatException or other framework-defined exception types.


Issue 7: Lack of testing for multi-table scenarios

Problem Description:

  • PR description explicitly states: "No additional unit tests were added"
  • Core logic modified but no test coverage
  • No sink tests under connector-pulsar/src/test/

Potential Risks:

  • Code quality not guaranteed
  • High regression risk
  • Edge cases not covered

Impact Scope:

  • Direct impact: Code quality and stability
  • Indirect impact: Future maintenance costs
  • Affected scope: All Pulsar Connector users

Severity: MAJOR

Improvement Suggestions:
Add the following tests:

// PulsarSinkWriterTest.java
@Test
public void testResolveTopicWithTableId() {
    SeaTunnelRow row = new SeaTunnelRow(new Object[]{});
    row.setTableId("persistent://tenant/ns/topic1");
    String topic = writer.resolveTopic(row);
    assertEquals("persistent://tenant/ns/topic1", topic);
}

@Test
public void testResolveTopicWithoutTableId() {
    SeaTunnelRow row = new SeaTunnelRow(new Object[]{});
    String topic = writer.resolveTopic(row);
    assertEquals(configTopic, topic);
}

@Test
public void testMultipleProducerCreation() {
    // Verify that multiple producers are cached in producerMap
}

@Test
public void testExactlyOnceMultiTable() {
    // Verify multi-table writes in EXACTLY_ONCE mode
}

Issue 8: SupportMultiTableSinkWriter interface not implemented

Location: PulsarSinkWriter.java:36-37

public class PulsarSinkWriter
        implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> {
    // SupportMultiTableSinkWriter not implemented

Related Context:

  • Redis Connector: RedisSinkWriter implements SupportMultiTableSinkWriter<Void>
  • InfluxDB Connector: InfluxDBSinkWriter implements SupportMultiTableSinkWriter
  • Console Connector: ConsoleSinkWriter implements SupportMultiTableSinkWriter<Void>

Problem Description:
Although PulsarSink implements SupportMultiTableSink, PulsarSinkWriter does not implement SupportMultiTableSinkWriter. Inconsistent with other Connector implementations.

Potential Risks:

  • API inconsistency
  • May require refactoring in the future to support more complex multi-table scenarios (e.g., schema evolution)

Impact Scope:

  • Direct impact: Code architecture consistency
  • Indirect impact: Future extensibility
  • Affected scope: Pulsar Connector architecture

Severity: MINOR

Improvement Suggestions:
Consider implementing SupportMultiTableSinkWriter<Void> interface to maintain consistency with other Connectors.


Issue 9: snapshotState returning empty list may cause state loss

Location: PulsarSinkWriter.java:165-178

@Override
public List<PulsarSinkState> snapshotState(long checkpointId) throws IOException {
    for (Producer<byte[]> producer : producerMap.values()) {
        producer.flush();
    }

    while (pendingMessages.get() > 0) {
        for (Producer<byte[]> producer : producerMap.values()) {
            producer.flush();
        }
    }

    return Collections.emptyList();  // ← Return empty list
}

Problem Description:
Compared to original implementation:

  • Original EXACTLY_ONCE mode returned new PulsarSinkState(transaction.getTxnID())
  • New implementation returns empty list, relying on prepareCommit to return commitInfo

Although SeaTunnel's checkpoint mechanism allows snapshotState to return empty list (state in prepareCommit), this differs from the original implementation.

Potential Risks:

  • Recovery from old version savepoint may fail
  • State management logic unclear

Impact Scope:

  • Direct impact: Users upgrading from old versions
  • Indirect impact: Savepoint recovery
  • Affected scope: EXACTLY_ONCE users

Severity: MAJOR

Improvement Suggestions:

  1. Confirm whether SeaTunnel framework allows snapshotState to return empty list
  2. If recovery from old version savepoint is needed, maintain compatibility:
@Override
public List<PulsarSinkState> snapshotState(long checkpointId) throws IOException {
    // flush logic...
    
    if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
        // Maintain compatibility with the old version
        if (currentTransaction != null) {
            return Collections.singletonList(new PulsarSinkState(currentTransaction.getTxnID()));
        }
    }
    return Collections.emptyList();
}

Issue 10: Visibility issue with currentTransaction in concurrent scenarios

Location: PulsarSinkWriter.java:49

private Transaction currentTransaction;  // ← No volatile

Problem Description:
Although SeaTunnel's SinkWriter is typically single-threaded, if the framework upgrades to concurrent writes:

  • currentTransaction has no volatile modifier
  • After setting to null in prepareCommit(), other threads may see stale values

Potential Risks:

  • In concurrent scenarios, threads may use committed or aborted transactions
  • Violates EXACTLY_ONCE semantics

Impact Scope:

  • Direct impact: Concurrent write scenarios (if supported in the future)
  • Indirect impact: Code robustness
  • Affected scope: Architecture design

Severity: MINOR (current framework assumes single-threading)

Improvement Suggestions:
Use AtomicReference<Transaction> or add volatile:

private final AtomicReference<Transaction> currentTransaction = new AtomicReference<>();

@dybyte
Copy link
Collaborator

dybyte commented Mar 4, 2026

Please enable CI following the instructions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The license statement cannot be deleted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature][Connector] Implement multi-table sink support for connectors

4 participants