Skip to content

Add synchronous join support to FORK_JOIN (#619)#680

Merged
nthmost-orkes merged 22 commits into
mainfrom
619-add-synchronous-join-support-to-fork_join
Mar 12, 2026
Merged

Add synchronous join support to FORK_JOIN (#619)#680
nthmost-orkes merged 22 commits into
mainfrom
619-add-synchronous-join-support-to-fork_join

Conversation

@nthmost-orkes
Copy link
Copy Markdown
Contributor

@nthmost-orkes nthmost-orkes commented Dec 5, 2025

Implements #619 - adds property-based control for synchronous vs asynchronous join behavior in FORK_JOIN tasks.

Implementation

Property-Based Control

  • Added joinMode property to WorkflowTask (values: "SYNC" or "ASYNC")
  • Property is optional; defaults to "ASYNC" for backward compatibility
  • Passed through JoinTaskMapper to join task input data

Synchronous Behavior via Evaluation Offset

Rather than overriding isAsync() (which doesn't support per-task control), synchronous behavior is achieved through the evaluation offset mechanism:

  • SYNC mode: Returns 0 evaluation offset → join is evaluated every workflow decision cycle
  • ASYNC mode (default): Uses exponential backoff → join is polled with increasing delays

This approach works within the existing framework constraints while providing responsive synchronous-like behavior.

Usage Example

{
  "name": "join_task",
  "taskReferenceName": "join_ref",
  "type": "JOIN",
  "joinOn": ["task1", "task2"],
  "joinMode": "SYNC"
}

Backward Compatibility

  • Existing workflows without joinMode continue using async behavior
  • No breaking changes to APIs or existing functionality

Tests

  • Unit tests for both sync and async modes
  • JoinTaskMapper tests verify property passing
  • All existing tests continue to pass

Convergence Notes: OSS vs Orkes Implementation

Current Orkes Implementation (OrkesJoin)

Orkes Conductor replaces the OSS Join component entirely with OrkesJoin:

  • Always synchronous: isAsync() = false - no property-based control
  • Wraps OSS Join: Internally delegates to OSS Join for basic functionality
  • Additional features:
    • Script-based join conditions via expression and evaluatorType (GraalJS)
    • Large fork optimization (conditional output capture for 500+ tasks)
    • Enhanced execution logic with custom evaluators

Convergence Path Options

Option 1: Orkes adopts OSS property-based approach

  • Replace OrkesJoin with enhanced OSS Join + property control
  • Set joinMode: "SYNC" by default in Orkes for backward compatibility
  • Keep Orkes-specific features (expressions, large fork optimization) as separate enhancements
  • Benefit: Shared core, easier maintenance

Option 2: Keep separate implementations

  • OSS: Property-based sync/async control
  • Orkes: Always-synchronous with advanced features
  • Both implementations continue to coexist
  • Benefit: No migration needed, each optimized for use case

Implementation Compatibility

This PR's approach is convergence-friendly:

  • ✅ Additive (new optional property)
  • ✅ Non-breaking (defaults to async)
  • ✅ Framework-aware (uses evaluation offset, not isAsync override)
  • ✅ Orkes can layer expression support on top of property-based control

The property-based approach could serve as foundation for future convergence if Orkes wants to support both sync and async modes with user control.

@nthmost-orkes nthmost-orkes linked an issue Dec 5, 2025 that may be closed by this pull request
25 tasks
@nthmost-orkes nthmost-orkes marked this pull request as draft December 5, 2025 20:30
Comment thread core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java Outdated
nthmost-orkes added a commit to nthmost-orkes/conductor that referenced this pull request Mar 6, 2026
Addresses reviewer feedback on PR conductor-oss#680 (v1r3n):

1. Change WorkflowTask.joinMode from String to the inner enum
   WorkflowTask.JoinMode { SYNC, ASYNC }, eliminating free-form string
   comparison and making the API self-documenting.

2. Read joinMode directly from taskModel.getWorkflowTask() in
   Join.getEvaluationOffset() instead of pulling it from the task's
   inputData map. The value is now never duplicated into the task payload.

3. Remove the joinMode copy from JoinTaskMapper.getMappedTasks() since
   Join no longer reads it from input data.

4. Update tests accordingly:
   - JoinTest: construct WorkflowTask with JoinMode enum; replace the
     now-redundant case-insensitivity test with a null-workflowTask test
     that verifies the async default.
   - JoinTaskMapperTest: use JoinMode enum; assert joinMode is absent from
     inputData and present on the workflowTask.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
nthmost-orkes and others added 9 commits March 6, 2026 21:26
Addresses reviewer feedback on PR #680 (v1r3n):

1. Change WorkflowTask.joinMode from String to the inner enum
   WorkflowTask.JoinMode { SYNC, ASYNC }, eliminating free-form string
   comparison and making the API self-documenting.

2. Read joinMode directly from taskModel.getWorkflowTask() in
   Join.getEvaluationOffset() instead of pulling it from the task's
   inputData map. The value is now never duplicated into the task payload.

3. Remove the joinMode copy from JoinTaskMapper.getMappedTasks() since
   Join no longer reads it from input data.

4. Update tests accordingly:
   - JoinTest: construct WorkflowTask with JoinMode enum; replace the
     now-redundant case-insensitivity test with a null-workflowTask test
     that verifies the async default.
   - JoinTaskMapperTest: use JoinMode enum; assert joinMode is absent from
     inputData and present on the workflowTask.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add ForkJoinSyncModeIntegrationTest with 9 test scenarios covering:
  all-succeed, backward compat (no joinMode), explicit ASYNC, optional
  branch fails, required branch fails, sequential tasks in branch,
  joinOn subset, nested fork/join, and 10-branch fan-out
- Fix WorkflowTask.JoinMode proto field ID from 33→34 (conflict with
  `items` field added for DO_WHILE list iteration in a prior PR)
- Add @ProtoEnum annotation to JoinMode enum so protogen can map it
- Regenerate workflowtask.proto and AbstractProtoMapper with JoinMode
- Add JSON workflow definition resources for manual/documentation use
- Use raw JSON + RestTemplate for workflow registration to avoid
  classpath conflict with bundled conductor-client WorkflowTask

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace testcontainers/Redis setup with InMemoryQueueDAO inner
  @TestConfiguration to satisfy QueueDAO wiring without needing Docker
- Evaluate JOIN tasks directly via @Autowired AsyncSystemTaskExecutor
  and Join bean (mirrors pattern used by Groovy Spock specs)
- Set retryCount=0 on task defs so failed tasks fail immediately
  rather than being retried (required for failure-path tests 4 & 5)
- Remove now-unnecessary testcontainers and DynamicPropertySource imports

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Reorder ObjectMapper import per spotless import ordering rules
- Join pushIfNotExists signature onto one line

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…onTest

- Join setUnackTimeout and forkTaskMap signatures to single lines
- Split long fail() string concatenations per google-java-format rules

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Ran ./gradlew spotlessApply to fix all remaining google-java-format violations.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@nthmost-orkes nthmost-orkes marked this pull request as ready for review March 8, 2026 08:53
@nthmost-orkes
Copy link
Copy Markdown
Contributor Author

nthmost-orkes commented Mar 9, 2026

Update on convergence: Replaced the offset=0 approach with isAsync(TaskModel task) on WorkflowSystemTask — this brings OSS Join much closer to OrkesJoin than we had before when we were avoiding changing the WorkflowSystemTask API

OSS Join (now) OrkesJoin
Sync execution isAsync(task) = false when joinMode=SYNC — runs in decide thread isAsync() = false always
Per-task control ✅ user-controlled via joinMode ❌ always sync
Script conditions ✅ (expression + GraalJS)
Large fork optimization ✅ (500+ tasks)

Convergence path for Orkes is now simple: subclass OSS Join, override isAsync(TaskModel task) to return false unconditionally — no workflow definition changes or migrations needed. Orkes-specific features (GraalJS, large fork optimization) layer on top as before. The joinMode enum also gives Orkes a path to expose user-controlled sync/async in the future.

nthmost-orkes and others added 2 commits March 12, 2026 00:01
The scheduled flush executor in PostgresPollDataDAO was never stopped,
causing a resource leak and test flakiness. After CacheTest completes,
its background thread continued writing cached poll data to the shared
database every 200ms. When NoCacheTest subsequently ran its @before
truncation, the rogue thread would re-insert a record immediately after,
causing the post-truncation verification check to fail.

Fixes:
- Store ScheduledExecutorService reference and add @PreDestroy shutdown()
- Add @DirtiesContext(AFTER_CLASS) to CacheTest so Spring closes its
  context (and calls @PreDestroy) before NoCacheTest begins

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@nthmost-orkes nthmost-orkes merged commit d1dc16a into main Mar 12, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Epic: Add synchronous join support to FORK_JOIN (Q4 Roadmap 1.3)

2 participants