From 609678a2b1bc5779f6cbd4bdbf6f65b2c66e4219 Mon Sep 17 00:00:00 2001 From: Daisy Modi Date: Wed, 1 Apr 2026 11:18:28 +0530 Subject: [PATCH] [GOBBLIN-2257] Fix thread-safety of specSyncObjects after parallel onAddSpec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GOBBLIN-2257 removed synchronized from onAddSpec and introduced a multi-threaded executor for parallel flow compilation. However, FlowCatalog.specSyncObjects is a plain HashMap that is now accessed concurrently from multiple updateOrAddSpecHelper calls (put/remove without synchronization). Concurrent HashMap modifications can cause lost entries and structural corruption, leading to: - LaunchDagProc errors (syncObject lost, DAG initialization fails) - DagNode not found for Reevaluate actions (orphaned DAGs) Fix: Change specSyncObjects from HashMap to ConcurrentHashMap. Also downgrade the "discovered in SpecStore is missing in FlowCatalog" log from ERROR to WARN. With concurrent writes, this transient condition is expected — the existing exponential backoff retries handle it, and the ERROR level creates false alarms in monitoring. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../apache/gobblin/runtime/spec_catalog/FlowCatalog.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 8ac02145498..0a401f8b809 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -97,8 +98,10 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut @Getter protected final SpecStore specStore; // a map which keeps a handle of condition variables for each spec being added to the flow catalog - // to provide synchronization needed for flow specs - private final Map specSyncObjects = new HashMap<>(); + // to provide synchronization needed for flow specs. + // Must be ConcurrentHashMap because onAddSpec is no longer synchronized (GOBBLIN-2257), so + // multiple updateOrAddSpecHelper calls can concurrently put/remove entries. + private final Map specSyncObjects = new ConcurrentHashMap<>(); public FlowCatalog(Config config) { this(config, Optional.absent()); @@ -359,7 +362,7 @@ private Spec getSpecHelper(URI uri, ExponentialBackoff exponentialBackoff) if (exponentialBackoff.awaitNextRetryIfAvailable()) { return getSpecHelper(uri, exponentialBackoff); } else { - log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatalog" + ", suspecting current modification on SpecStore", uri), snfe); + log.warn(String.format("The URI %s discovered in SpecStore is missing in FlowCatalog" + ", suspecting current modification on SpecStore", uri), snfe); } } return spec;