From f7373f43511d1410f0f026034a4c8194e5f1a7f1 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 25 Jan 2017 13:16:44 +0900 Subject: [PATCH] STORM-2323 Precondition for Leader Nimbus should check all topology blobs and also corresponding dependencies * change the precondition for leader Nimbus ** it should have all active topology blobs and corresponding dependencies locally --- .../org/apache/storm/zookeeper/Zookeeper.java | 119 +++++++++++++++--- 1 file changed, 100 insertions(+), 19 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java index ef353072bc6..a2ad797c593 100644 --- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java @@ -20,6 +20,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Sets; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorEvent; @@ -31,15 +32,17 @@ import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.storm.Config; import org.apache.storm.blobstore.BlobStore; -import org.apache.storm.blobstore.KeyFilter; +import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.callback.DefaultWatcherCallBack; import org.apache.storm.callback.WatcherCallBack; import org.apache.storm.cluster.ClusterUtils; -import org.apache.storm.cluster.IStateStorage; import org.apache.storm.cluster.VersionedData; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.generated.StormTopology; import org.apache.storm.nimbus.ILeaderElector; import org.apache.storm.nimbus.NimbusInfo; -import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.security.auth.ReqContext; import org.apache.storm.utils.Utils; import org.apache.storm.utils.ZookeeperAuthInfo; import org.apache.zookeeper.KeeperException; @@ -51,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.Subject; import java.io.File; import java.io.IOException; import java.net.BindException; @@ -336,29 +340,43 @@ public static NimbusInfo toNimbusInfo(Participant participant) { public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException { final String hostName = InetAddress.getLocalHost().getCanonicalHostName(); return new LeaderLatchListener() { + final String STORM_JAR_SUFFIX = "-stormjar.jar"; + final String STORM_CODE_SUFFIX = "-stormcode.ser"; + final String STORM_CONF_SUFFIX = "-stormconf.ser"; + @Override public void isLeader() { - Set activeTopologyIds = new HashSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false)); - Set localTopologyIds = blobStore.filterAndListKeys(new KeyFilter() { - @Override - public String filter(String key) { - return ConfigUtils.getIdFromBlobKey(key); - } - }); - Sets.SetView diffTopology = Sets.difference(activeTopologyIds, localTopologyIds); - LOG.info("active-topology-ids [{}] local-topology-ids [{}] diff-topology [{}]", - generateJoinedString(activeTopologyIds), generateJoinedString(localTopologyIds), + Set activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false)); + + Set activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds); + Set activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys); + Set allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys()); + Set allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys); + + // this finds all active topologies blob keys from all local topology blob keys + Sets.SetView diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys); + LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]", + generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys), generateJoinedString(diffTopology)); if (diffTopology.isEmpty()) { - LOG.info("Accepting leadership, all active topology found locally."); + Set activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys); + + // this finds all dependency blob keys from active topologies from all local blob keys + Sets.SetView diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys); + LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]", + generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys), + generateJoinedString(diffDependencies)); + + if (diffDependencies.isEmpty()) { + LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally."); + } else { + LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership."); + closeLatch(); + } } else { LOG.info("code for all active topologies not available locally, giving up leadership."); - try { - leaderLatch.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } + closeLatch(); } } @@ -370,6 +388,69 @@ public void notLeader() { private String generateJoinedString(Set activeTopologyIds) { return Joiner.on(",").join(activeTopologyIds); } + + private Set populateTopologyBlobKeys(Set activeTopologyIds) { + Set activeTopologyBlobKeys = new TreeSet<>(); + for (String activeTopologyId : activeTopologyIds) { + activeTopologyBlobKeys.add(activeTopologyId + STORM_JAR_SUFFIX); + activeTopologyBlobKeys.add(activeTopologyId + STORM_CODE_SUFFIX); + activeTopologyBlobKeys.add(activeTopologyId + STORM_CONF_SUFFIX); + } + return activeTopologyBlobKeys; + } + + private Set filterTopologyBlobKeys(Set blobKeys) { + Set topologyBlobKeys = new HashSet<>(); + for (String blobKey : blobKeys) { + if (blobKey.endsWith(STORM_JAR_SUFFIX) || blobKey.endsWith(STORM_CODE_SUFFIX) || + blobKey.endsWith(STORM_CONF_SUFFIX)) { + topologyBlobKeys.add(blobKey); + } + } + return topologyBlobKeys; + } + + private Set filterTopologyCodeKeys(Set blobKeys) { + Set topologyCodeKeys = new HashSet<>(); + for (String blobKey : blobKeys) { + if (blobKey.endsWith(STORM_CODE_SUFFIX)) { + topologyCodeKeys.add(blobKey); + } + } + return topologyCodeKeys; + } + + private Set getTopologyDependencyKeys(Set activeTopologyCodeKeys) { + Set activeTopologyDependencies = new TreeSet<>(); + Subject subject = ReqContext.context().subject(); + + for (String activeTopologyCodeKey : activeTopologyCodeKeys) { + try { + InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject); + byte[] blobContent = IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue()); + StormTopology stormCode = Utils.deserialize(blobContent, StormTopology.class); + if (stormCode.is_set_dependency_jars()) { + activeTopologyDependencies.addAll(stormCode.get_dependency_jars()); + } + if (stormCode.is_set_dependency_artifacts()) { + activeTopologyDependencies.addAll(stormCode.get_dependency_artifacts()); + } + } catch (AuthorizationException | KeyNotFoundException | IOException e) { + LOG.error("Exception occurs while reading blob for key: " + activeTopologyCodeKey + ", exception: " + e, e); + throw new RuntimeException("Exception occurs while reading blob for key: " + activeTopologyCodeKey + + ", exception: " + e, e); + } + } + return activeTopologyDependencies; + } + + private void closeLatch() { + try { + leaderLatch.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } }; }