Skip to content

[GOBBLIN-2257] Fix thread-safety of specSyncObjects after parallel onAddSpec#4182

Closed
DaisyModi wants to merge 1 commit into
apache:masterfrom
DaisyModi:fix-concurrent-specSyncObjects
Closed

[GOBBLIN-2257] Fix thread-safety of specSyncObjects after parallel onAddSpec#4182
DaisyModi wants to merge 1 commit into
apache:masterfrom
DaisyModi:fix-concurrent-specSyncObjects

Conversation

@DaisyModi
Copy link
Copy Markdown
Contributor

Summary

GOBBLIN-2257 removed synchronized from onAddSpec and introduced a multi-threaded executor for parallel flow compilation, improving flowConfigsV2 GET API P99 latency. However, FlowCatalog.specSyncObjects is a plain HashMap that is now accessed concurrently from multiple updateOrAddSpecHelper calls without synchronization.

The bug

In updateOrAddSpecHelper, concurrent threads call:

  • specSyncObjects.put(...) (adding sync objects)
  • specSyncObjects.remove(...) (cleaning up after persist)

HashMap is not thread-safe for concurrent modifications. This causes:

  • Lost entries — a flow's syncObject disappears from the map
  • LaunchDagProc - errorgetSyncObject() returns null, breaking synchronization with NonScheduledJobRunner, causing DAG initialization failures
  • DagNode or its job status not found for Reevaluate — orphaned DAGs from failed initialization

The fix

  • HashMapConcurrentHashMap for specSyncObjects — drop-in replacement, no API change, no impact on P99 latency improvement
  • Downgrade "SpecStore is missing in FlowCatalog" log from ERROR to WARN — with concurrent writes to SpecStore, getSpecURIs() can list a URI before addSpec() fully commits the data. This is a transient condition already handled by exponential backoff retries; ERROR level creates false alarms in monitoring

Test plan

  • Existing FlowCatalogTest passes
  • Verify in production: LaunchDagProc - error and DagNode not found errors should stop occurring
  • Verify in production: "SpecStore is missing in FlowCatalog" now logs at WARN instead of ERROR

…AddSpec

GOBBLIN-2257 removed synchronized from onAddSpec and introduced a
multi-threaded executor for parallel flow compilation. However,
FlowCatalog.specSyncObjects is a plain HashMap that is now accessed
concurrently from multiple updateOrAddSpecHelper calls (put/remove
without synchronization). Concurrent HashMap modifications can cause
lost entries and structural corruption, leading to:
- LaunchDagProc errors (syncObject lost, DAG initialization fails)
- DagNode not found for Reevaluate actions (orphaned DAGs)

Fix: Change specSyncObjects from HashMap to ConcurrentHashMap.

Also downgrade the "discovered in SpecStore is missing in FlowCatalog"
log from ERROR to WARN. With concurrent writes, this transient condition
is expected — the existing exponential backoff retries handle it, and
the ERROR level creates false alarms in monitoring.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@DaisyModi DaisyModi closed this Apr 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant