From f7a0d38eac0024e745c035eb0f606acbf573418f Mon Sep 17 00:00:00 2001 From: Hanifi Gunes Date: Tue, 3 Nov 2015 16:24:48 -0800 Subject: [PATCH] DRILL-3171: handle concurrent Zookeeper node creation gracefully --- .../exec/store/StoragePluginRegistry.java | 52 ++++--- .../exec/store/sys/zk/ZkAbstractStore.java | 133 ++++++++++++++---- .../drill/exec/store/sys/zk/ZkEStore.java | 27 +--- .../exec/store/sys/zk/ZkEStoreProvider.java | 7 +- .../drill/exec/store/sys/zk/ZkPStore.java | 17 +-- .../exec/store/sys/zk/ZkPStoreProvider.java | 3 +- 6 files changed, 141 insertions(+), 98 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java index 70e82dab274..6e11084a651 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java @@ -236,41 +236,39 @@ private void closePlugin(StoragePlugin plugin) { } public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException { - StoragePlugin oldPlugin = plugins.get(name); - - boolean ok = true; - final StoragePlugin newPlugin = create(name, config); - try { - if (oldPlugin != null) { - if (config.isEnabled()) { - ok = plugins.replace(name, oldPlugin, newPlugin); - if (ok) { - closePlugin(oldPlugin); + for (;;) { + final StoragePlugin oldPlugin = plugins.get(name); + final StoragePlugin newPlugin = create(name, config); + boolean done = false; + try { + if (oldPlugin != null) { + if (config.isEnabled()) { + done = plugins.replace(name, oldPlugin, newPlugin); + } else { + done = plugins.remove(name, oldPlugin); } - } else { - ok = plugins.remove(name, oldPlugin); - if (ok) { + if (done) { closePlugin(oldPlugin); } + } else if (config.isEnabled()) { + done = (null == plugins.putIfAbsent(name, newPlugin)); + } else { + done = true; + } + } finally { + if (!done) { + closePlugin(newPlugin); } - } else if (config.isEnabled()) { - ok = (null == plugins.putIfAbsent(name, newPlugin)); } - if (!ok) { - throw new ExecutionSetupException("Two processes tried to change a plugin at the same time."); - } - } finally { - if (!ok) { - closePlugin(newPlugin); - } - } + if (done) { + if (persist) { + pluginSystemTable.put(name, config); + } - if (persist) { - pluginSystemTable.put(name, config); + return newPlugin; + } } - - return newPlugin; } public StoragePlugin getPlugin(String name) throws ExecutionSetupException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java index 01059a4bbeb..0d2fb38a9f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java @@ -24,29 +24,29 @@ import java.util.List; import java.util.Map.Entry; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; -import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.zookeeper.CreateMode; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import org.apache.zookeeper.KeeperException; /** * This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store) * @param */ public abstract class ZkAbstractStore implements AutoCloseable { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class); - protected CuratorFramework framework; - protected PStoreConfig config; + protected final CuratorFramework framework; + protected final PStoreConfig config; private final PathChildrenCache childrenCache; - private String prefix; - private String parent; + private final String prefix; + private final String parent; public ZkAbstractStore(CuratorFramework framework, PStoreConfig config) throws IOException { @@ -54,31 +54,26 @@ public ZkAbstractStore(CuratorFramework framework, PStoreConfig config) this.prefix = parent + "/"; this.framework = framework; this.config = config; + this.childrenCache = new PathChildrenCache(framework, parent, true); // make sure the parent node exists. + createOrUpdate(parent, null, CreateMode.PERSISTENT); try { - if (framework.checkExists().forPath(parent) == null) { - framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); - } - - this.childrenCache = new PathChildrenCache(framework, parent, true); - this.childrenCache.start(StartMode.BUILD_INITIAL_CACHE); - + childrenCache.start(StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper for PStore: " + e.getMessage(), e); + throw new RuntimeException("Failure while initializing Zookeeper for PStore", e); } - } public Iterator> iterator() { try { return new Iter(childrenCache.getCurrentData()); } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); + throw new RuntimeException("Failure while accessing Zookeeper.", e); } } - protected String p(String key) { + protected String withPrefix(String key) { Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface."); return prefix + key; @@ -86,7 +81,7 @@ protected String p(String key) { public V get(String key) { try { - ChildData d = childrenCache.getCurrentData(p(key)); + ChildData d = childrenCache.getCurrentData(withPrefix(key)); if(d == null || d.getData() == null){ return null; } @@ -100,12 +95,12 @@ public V get(String key) { public void put(String key, V value) { try { - if (childrenCache.getCurrentData(p(key)) != null) { - framework.setData().forPath(p(key), config.getSerializer().serialize(value)); + if (childrenCache.getCurrentData(withPrefix(key)) != null) { + framework.setData().forPath(withPrefix(key), config.getSerializer().serialize(value)); } else { - createNodeInZK(key, value); + createWithPrefix(key, value); } - childrenCache.rebuildNode(p(key)); + childrenCache.rebuildNode(withPrefix(key)); } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); @@ -114,8 +109,10 @@ public void put(String key, V value) { public void delete(String key) { try { - framework.delete().forPath(p(key)); - childrenCache.rebuildNode(p(key)); + if (framework.checkExists().forPath(withPrefix(key)) != null) { + framework.delete().forPath(withPrefix(key)); + childrenCache.rebuildNode(withPrefix(key)); + } } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); } @@ -123,11 +120,11 @@ public void delete(String key) { public boolean putIfAbsent(String key, V value) { try { - if (childrenCache.getCurrentData(p(key)) != null) { + if (childrenCache.getCurrentData(withPrefix(key)) != null) { return false; } else { - createNodeInZK(key, value); - childrenCache.rebuildNode(p(key)); + createWithPrefix(key, value); + childrenCache.rebuildNode(withPrefix(key)); return true; } @@ -136,7 +133,81 @@ public boolean putIfAbsent(String key, V value) { } } - public abstract void createNodeInZK (String key, V value); + /** + * Default {@link CreateMode create mode} that will be used in create operations referred in the see also section. + * + * @see #createOrUpdate(String, Object) + * @see #createWithPrefix(String, Object) + */ + protected abstract CreateMode getCreateMode(); + + + /** + * Creates a node in zookeeper with the {@link #getCreateMode() default create mode} and sets its value if supplied. + * + * @param path target path + * @param value value to set, null if none available + * + * @see #getCreateMode() + * @see #createOrUpdate(String, Object) + * @see #withPrefix(String) + */ + protected void createWithPrefix(String path, V value) { + createOrUpdate(withPrefix(path), value); + } + + /** + * Creates a node in zookeeper with the {@link #getCreateMode() default create mode} and sets its value if supplied + * or updates its value if the node already exists. + * + * Note that if node exists, its mode will not be changed. + * + * @param path target path + * @param value value to set, null if none available + * + * @see #getCreateMode() + * @see #createOrUpdate(String, Object, CreateMode) + */ + protected void createOrUpdate(String path, V value) { + createOrUpdate(path, value, getCreateMode()); + } + + /** + * Creates a node in zookeeper with the given mode and sets its value if supplied or updates its value if the node + * already exists. + * + * Note that if the node exists, its mode will not be changed. + * + * Internally, the method suppresses {@link org.apache.zookeeper.KeeperException.NodeExistsException}. It is + * safe to do so since the implementation is idempotent. + * + * @param path target path + * @param value value to set, null if none available + * @param mode creation mode + * @throws RuntimeException throws a {@link RuntimeException} wrapping the root cause. + */ + protected void createOrUpdate(String path, V value, CreateMode mode) { + try { + final boolean isUpdate = value != null; + final byte[] valueInBytes = isUpdate ? config.getSerializer().serialize(value) : null; + final boolean nodeExists = framework.checkExists().forPath(path) != null; + if (!nodeExists) { + final ACLBackgroundPathAndBytesable creator = framework.create().withMode(mode); + if (isUpdate) { + creator.forPath(path, valueInBytes); + } else { + creator.forPath(path); + } + } else if (isUpdate) { + framework.setData().forPath(path, valueInBytes); + } + } catch (KeeperException.NodeExistsException ex) { + logger.warn("Node already exists in Zookeeper. Skipping... -- [path: {}, mode: {}]", path, mode); + } catch (Exception e) { + final String msg = String.format("Failed to create/update Zookeeper node. [path: %s, mode: %s]", path, mode); + throw new RuntimeException(msg, e); + } + } private class Iter implements Iterator>{ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java index 1abf3a6c9d7..4706287fc34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java @@ -17,40 +17,25 @@ */ package org.apache.drill.exec.store.sys.zk; +import java.io.IOException; + import org.apache.curator.framework.CuratorFramework; import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.zookeeper.CreateMode; -import java.io.IOException; - /** * Implementation of EStore using Zookeeper's EPHEMERAL node. * @param */ -public class ZkEStore extends ZkAbstractStore implements EStore{ +public class ZkEStore extends ZkAbstractStore implements EStore { - public ZkEStore(CuratorFramework framework, PStoreConfig config) throws IOException{ + public ZkEStore(CuratorFramework framework, PStoreConfig config) throws IOException { super(framework,config); } @Override - public void delete(String key) { - try { - if (framework.checkExists().forPath(p(key)) != null) { - framework.delete().forPath(p(key)); - } - } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); - } - } - - @Override - public void createNodeInZK(String key, V value) { - try { - framework.create().withMode(CreateMode.EPHEMERAL).forPath(p(key), config.getSerializer().serialize(value)); - } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper", e); - } + protected CreateMode getCreateMode() { + return CreateMode.EPHEMERAL; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java index 7d7d4756511..60277aa0653 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java @@ -18,16 +18,15 @@ package org.apache.drill.exec.store.sys.zk; +import java.io.IOException; + +import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.EStoreProvider; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreConfig.Mode; -import com.google.common.base.Preconditions; - -import java.io.IOException; - public class ZkEStoreProvider implements EStoreProvider{ private final CuratorFramework curator; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java index 723dbb0a041..da229966def 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java @@ -28,23 +28,14 @@ * Implementation of PStore using Zookeeper's PERSISTENT node. * @param */ -public class ZkPStore extends ZkAbstractStore implements PStore{ +public class ZkPStore extends ZkAbstractStore implements PStore { - ZkPStore(CuratorFramework framework, PStoreConfig config) - throws IOException { + public ZkPStore(CuratorFramework framework, PStoreConfig config) throws IOException { super(framework, config); } @Override - public void createNodeInZK(String key, V value) { - try { - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value)); - } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper", e); - } + protected CreateMode getCreateMode() { + return CreateMode.PERSISTENT; } - - - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java index f0fa120da72..eb5df43546c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java @@ -19,6 +19,7 @@ import java.io.IOException; +import com.google.common.annotations.VisibleForTesting; import org.apache.curator.framework.CuratorFramework; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.coord.ClusterCoordinator; @@ -33,8 +34,6 @@ import org.apache.drill.exec.store.sys.local.FilePStore; import org.apache.hadoop.fs.Path; -import com.google.common.annotations.VisibleForTesting; - public class ZkPStoreProvider implements PStoreProvider { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);