Skip to content

2311 Preserve FIFO delivery for ordered in-memory broker routes#4779

Merged
ivicac merged 3 commits intomasterfrom
2311
Apr 16, 2026
Merged

2311 Preserve FIFO delivery for ordered in-memory broker routes#4779
ivicac merged 3 commits intomasterfrom
2311

Conversation

@ivicac
Copy link
Copy Markdown
Contributor

@ivicac ivicac commented Apr 16, 2026

Adds an opt-in MessageRoute.isOrdered() flag and makes AsyncMessageBroker
dispatch ordered routes through a per-route single-thread executor so SSE
stream tokens reach the bridge in emission order instead of racing across
the shared cached thread pool.

Co-Authored-By: Claude Opus 4.6 (1M context) noreply@anthropic.com

Adds an opt-in MessageRoute.isOrdered() flag and makes AsyncMessageBroker
dispatch ordered routes through a per-route single-thread executor so SSE
stream tokens reach the bridge in emission order instead of racing across
the shared cached thread pool.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in ordered-delivery contract to MessageRoute and updates the in-memory async broker to preserve FIFO delivery for ordered routes (notably SSE stream events), backed by new regression tests.

Changes:

  • Introduces MessageRoute.isOrdered() (default false) to declare FIFO delivery requirements per route.
  • Updates AsyncMessageBroker to dispatch ordered routes via a per-route single-thread executor.
  • Adds tests validating ordering behavior for the SSE route and for the in-memory broker.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
server/libs/platform/platform-webhook/platform-webhook-api/src/test/java/com/bytechef/platform/webhook/message/route/SseStreamMessageRouteTest.java Verifies SSE stream route is explicitly marked ordered.
server/libs/platform/platform-webhook/platform-webhook-api/src/main/java/com/bytechef/platform/webhook/message/route/SseStreamMessageRoute.java Marks the SSE stream route as ordered via isOrdered().
server/libs/core/message/message-broker/message-broker-memory/src/test/java/com/bytechef/message/broker/memory/AsyncMessageBrokerTest.java Adds tests for FIFO ordering and thread behavior in the in-memory async broker.
server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/AsyncMessageBroker.java Implements per-route serial dispatch for ordered routes and preserves shared executor for unordered routes.
server/libs/core/message/message-api/src/main/java/com/bytechef/message/route/MessageRoute.java Adds the isOrdered() API contract and documents intended semantics.
Comments suppressed due to low confidence (1)

server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/AsyncMessageBroker.java:62

  • AsyncMessageBroker creates one or more ExecutorService instances (cached thread pool + per-route single-thread pools) but never shuts them down. This can leak threads/resources in long-running apps and can also keep test JVMs alive (cached thread pool uses non-daemon threads by default). Consider storing these as ExecutorService and adding a lifecycle hook (e.g., AutoCloseable/@PreDestroy/DisposableBean) that shuts down the shared executor and all ordered-route executors, and update tests to call it.
    private final Executor executor;
    private final boolean virtualThreads;
    private final Map<MessageRoute, Executor> orderedRouteExecutors = new ConcurrentHashMap<>();

    public AsyncMessageBroker(Environment environment) {
        this.virtualThreads = Threading.VIRTUAL.isActive(environment);

        if (virtualThreads) {
            executor = Executors.newVirtualThreadPerTaskExecutor();
        } else {
            executor = Executors.newCachedThreadPool();
        }
    }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

ivicac and others added 2 commits April 16, 2026 09:33
Empty JSON arrays (e.g. `attachments: []`) tripped List.getFirst() in
the chat trigger's body-to-map conversion and aborted the webhook with
NoSuchElementException, starving the workflow of the user's message.
Short-circuit empty lists so JSON clients can send no attachments
without crashing the trigger.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replaces the unbounded single-thread executor used for ordered routes
with a ThreadPoolExecutor backed by a bounded LinkedBlockingQueue and a
blocking rejection handler, so a slow receiver applies backpressure to
the producer instead of accumulating unbounded backlog. Implements
DisposableBean to shut down all per-route executors on context close,
and adjusts tests to tear down the broker between cases plus cover the
backpressure-without-drop contract.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@ivicac ivicac merged commit 621a3c6 into master Apr 16, 2026
2 checks passed
@ivicac ivicac deleted the 2311 branch April 16, 2026 07:33
@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed for 'server'

Failed conditions
B Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants