Skip to content

[GOBBLIN-2257] Fix thread-safety of shared maps after parallel onAddSpec#4183

Closed
DaisyModi wants to merge 1 commit into
apache:masterfrom
DaisyModi:fix-concurrent-onAddSpec-thread-safety
Closed

[GOBBLIN-2257] Fix thread-safety of shared maps after parallel onAddSpec#4183
DaisyModi wants to merge 1 commit into
apache:masterfrom
DaisyModi:fix-concurrent-onAddSpec-thread-safety

Conversation

@DaisyModi
Copy link
Copy Markdown
Contributor

Summary

GOBBLIN-2257 removed synchronized from onAddSpec and introduced a multi-threaded executor for parallel flow compilation, improving flowConfigsV2 GET API P99 latency. However, the listener callbacks and FlowCatalog internals that were implicitly protected by that synchronized block now run concurrently without thread-safe data structures.

The bug

GobblinServiceJobScheduler.onAddSpec() reads and writes scheduledFlowSpecs and lastUpdatedTimeForFlowSpec — both plain HashMaps — from multiple concurrent callback threads. NonScheduledJobRunner also removes entries from a separate thread pool. Concurrent HashMap modifications cause structural corruption:

  • Lost map entries → flows not tracked, not cleaned up
  • Orphaned DAGs → LaunchDagProc - error, DagNode or its job status not found
  • Corrupted internal HashMap state → potential infinite loops, NPEs

FlowCatalog.specSyncObjects (also a plain HashMap) is similarly accessed concurrently from updateOrAddSpecHelper (multiple API request threads) and NonScheduledJobRunner.

The fix

  • GobblinServiceJobScheduler: Maps.newHashMap()Maps.newConcurrentMap() for both scheduledFlowSpecs and lastUpdatedTimeForFlowSpec
  • FlowCatalog: HashMapConcurrentHashMap for specSyncObjects
  • FlowCatalog: Downgrade "SpecStore is missing in FlowCatalog" log from ERROR to WARN — with concurrent SpecStore writes, getSpecURIs() can list a URI before addSpec() fully commits. This transient condition is already handled by exponential backoff retries; ERROR level creates false alarms in monitoring

All changes are drop-in replacements with no API changes and no impact on the P99 latency improvement from GOBBLIN-2257.

Test plan

  • Existing FlowCatalogTest and GobblinServiceJobSchedulerTest pass
  • Verify in production: LaunchDagProc - error and DagNode not found errors should stop occurring
  • Verify in production: "SpecStore is missing in FlowCatalog" now logs at WARN instead of ERROR

GOBBLIN-2257 removed synchronized from onAddSpec and introduced a
multi-threaded executor for parallel flow compilation. However, the
listener callbacks and FlowCatalog internals that were implicitly
protected by that synchronized block now run concurrently without
thread-safe data structures.

GobblinServiceJobScheduler.onAddSpec() reads and writes
scheduledFlowSpecs and lastUpdatedTimeForFlowSpec (plain HashMaps)
from multiple concurrent callback threads, and NonScheduledJobRunner
also removes entries from a separate thread pool. Concurrent HashMap
modifications cause structural corruption: lost entries, orphaned
DAGs, and LaunchDagProc/DagNode errors in production.

Fix:
- GobblinServiceJobScheduler: HashMap -> ConcurrentHashMap for
  scheduledFlowSpecs and lastUpdatedTimeForFlowSpec
- FlowCatalog: HashMap -> ConcurrentHashMap for specSyncObjects
  (also accessed concurrently from updateOrAddSpecHelper and
  NonScheduledJobRunner)
- FlowCatalog: Downgrade "SpecStore is missing in FlowCatalog"
  log from ERROR to WARN — this is an expected transient condition
  with concurrent SpecStore writes, already handled by exponential
  backoff retries

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@DaisyModi DaisyModi closed this Apr 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant