diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 8ac02145498..0a401f8b809 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -97,8 +98,10 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut @Getter protected final SpecStore specStore; // a map which keeps a handle of condition variables for each spec being added to the flow catalog - // to provide synchronization needed for flow specs - private final Map specSyncObjects = new HashMap<>(); + // to provide synchronization needed for flow specs. + // Must be ConcurrentHashMap because onAddSpec is no longer synchronized (GOBBLIN-2257), so + // multiple updateOrAddSpecHelper calls can concurrently put/remove entries. + private final Map specSyncObjects = new ConcurrentHashMap<>(); public FlowCatalog(Config config) { this(config, Optional.absent()); @@ -359,7 +362,7 @@ private Spec getSpecHelper(URI uri, ExponentialBackoff exponentialBackoff) if (exponentialBackoff.awaitNextRetryIfAvailable()) { return getSpecHelper(uri, exponentialBackoff); } else { - log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatalog" + ", suspecting current modification on SpecStore", uri), snfe); + log.warn(String.format("The URI %s discovered in SpecStore is missing in FlowCatalog" + ", suspecting current modification on SpecStore", uri), snfe); } } return spec;