From 52cbe5f6459f0923c7eda573df1ad5b792c5ba98 Mon Sep 17 00:00:00 2001 From: caofangkun Date: Fri, 23 Jan 2015 16:55:42 +0800 Subject: [PATCH 01/10] STORM-534:Store Nimbus Server Information in zookeeper path {storm.zookeeper.root}/nimbus --- pom.xml | 4 ++-- storm-core/src/clj/backtype/storm/cluster.clj | 16 +++++++++++++++- .../src/clj/backtype/storm/daemon/nimbus.clj | 8 ++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 4fd0f7edf71..142fefa5018 100644 --- a/pom.xml +++ b/pom.xml @@ -158,10 +158,10 @@ storm-buildtools/maven-shade-clojure-transformer storm-buildtools/storm-maven-plugins storm-core - examples/storm-starter + diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 8ead7108879..957b0072e80 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -141,6 +141,8 @@ (assignment-info [this storm-id callback]) (assignment-info-with-version [this storm-id callback]) (assignment-version [this storm-id callback]) + (register-nimbus-info [this nimbus-info]) + (nimbus-info [this]) (active-storms [this]) (storm-base [this storm-id callback]) (get-worker-heartbeat [this storm-id node port]) @@ -173,6 +175,7 @@ (def WORKERBEATS-ROOT "workerbeats") (def ERRORS-ROOT "errors") (def CREDENTIALS-ROOT "credentials") +(def NIMBUS-ROOT "nimbus") (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) (def STORMS-SUBTREE (str "/" STORMS-ROOT)) @@ -180,6 +183,7 @@ (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT)) (def ERRORS-SUBTREE (str "/" ERRORS-ROOT)) (def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT)) +(def NIMBUS-SUBTREE (str "/" NIMBUS-ROOT)) (defn supervisor-path [id] @@ -277,7 +281,7 @@ CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args)) ;; this should never happen (exit-process! 30 "Unknown callback for subtree " subtree args)))))] - (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]] + (doseq [p [NIMBUS-SUBTREE ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]] (mkdirs cluster-state p acls)) (reify StormClusterState @@ -308,6 +312,16 @@ (when callback (swap! assignment-version-callback assoc storm-id callback)) (get-version cluster-state (assignment-path storm-id) (not-nil? callback))) + + (register-nimbus-info + [this nimbus-info] + (set-data cluster-state NIMBUS-SUBTREE (Utils/serialize nimbus-info) acls)) + + (nimbus-info + [this] + (-> cluster-state + (get-data NIMBUS-SUBTREE false) + maybe-deserialize)) (active-storms [this] diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index b2cb96ac7f9..38ab74446c5 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -877,6 +877,13 @@ (log-error "Cleaning inbox ... error deleting: " (.getName f)) )))) +(defn register-nimbus-host! [nimbus] + (let [storm-cluster-state (:storm-cluster-state nimbus) + nimbus-conf (:conf nimbus) + local-hostname (memoized-local-hostname) + port (nimbus-conf NIMBUS-THRIFT-PORT)] + (.register-nimbus-info storm-cluster-state (str local-hostname ":" port)))) + (defn cleanup-corrupt-topologies! [nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) code-ids (set (code-ids (:conf nimbus))) @@ -982,6 +989,7 @@ (log-message "Starting Nimbus with conf " conf) (let [nimbus (nimbus-data conf inimbus) principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)] + (register-nimbus-host! nimbus) (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf) (cleanup-corrupt-topologies! nimbus) (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))] From 55f95b52f5f49c5572d81d6b653babafa67750c4 Mon Sep 17 00:00:00 2001 From: caofangkun Date: Fri, 23 Jan 2015 17:02:21 +0800 Subject: [PATCH 02/10] STORM-534:Store Nimbus Server Information in zookeeper path {storm.zookeeper.root}/nimbus --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 142fefa5018..4fd0f7edf71 100644 --- a/pom.xml +++ b/pom.xml @@ -158,10 +158,10 @@ storm-buildtools/maven-shade-clojure-transformer storm-buildtools/storm-maven-plugins storm-core - + external/storm-hbase From 3a2ace166ef13b99357089bb5e9093f8494b3c1b Mon Sep 17 00:00:00 2001 From: caofangkun Date: Mon, 26 Jan 2015 11:54:41 +0800 Subject: [PATCH 03/10] STORM-534:Add ServerInfo.java to storm nimbus server info, NimbusClient fetch nimbus info from zk --- storm-core/src/clj/backtype/storm/cluster.clj | 6 +- .../src/clj/backtype/storm/daemon/nimbus.clj | 6 +- .../backtype/storm/utils/NimbusClient.java | 6 +- .../jvm/backtype/storm/utils/ServerInfo.java | 55 +++++++++++++++++++ .../src/jvm/backtype/storm/utils/Utils.java | 25 +++++++++ 5 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 storm-core/src/jvm/backtype/storm/utils/ServerInfo.java diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 957b0072e80..46e7107be47 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -17,7 +17,7 @@ (ns backtype.storm.cluster (:import [org.apache.zookeeper.data Stat ACL Id]) (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms]) - (:import [backtype.storm.utils Utils]) + (:import [backtype.storm.utils Utils ServerInfo]) (:import [java.security MessageDigest]) (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider]) (:use [backtype.storm util log config]) @@ -319,9 +319,9 @@ (nimbus-info [this] - (-> cluster-state + (.instance ServerInfo (-> cluster-state (get-data NIMBUS-SUBTREE false) - maybe-deserialize)) + maybe-deserialize))) (active-storms [this] diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 38ab74446c5..60fe81bd22a 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -23,6 +23,7 @@ (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) (:import [backtype.storm.generated AuthorizationException]) + (:import [backtype.storm.utils ServerInfo]) (:use [backtype.storm bootstrap util]) (:use [backtype.storm.config :only [validate-configs-with-schemas]]) (:use [backtype.storm.daemon common]) @@ -881,8 +882,9 @@ (let [storm-cluster-state (:storm-cluster-state nimbus) nimbus-conf (:conf nimbus) local-hostname (memoized-local-hostname) - port (nimbus-conf NIMBUS-THRIFT-PORT)] - (.register-nimbus-info storm-cluster-state (str local-hostname ":" port)))) + port (nimbus-conf NIMBUS-THRIFT-PORT) + server-info (ServerInfo. local-hostname port)] + (.register-nimbus-info storm-cluster-state (.toJsonString server-info)))) (defn cleanup-corrupt-topologies! [nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java index 273e232fb32..8e8075b9c93 100644 --- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java +++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java @@ -32,8 +32,10 @@ public class NimbusClient extends ThriftClient { public static NimbusClient getConfiguredClient(Map conf) { try { - String nimbusHost = (String) conf.get(Config.NIMBUS_HOST); - return new NimbusClient(conf, nimbusHost); + ServerInfo serverInfo = Utils.getServerInfo(conf, "nimbus"); + conf.put(Config.NIMBUS_HOST, serverInfo.getHost()); + conf.put(Config.NIMBUS_THRIFT_PORT, serverInfo.getPort()); + return new NimbusClient(conf, serverInfo.getHost()); } catch (TTransportException ex) { throw new RuntimeException(ex); } diff --git a/storm-core/src/jvm/backtype/storm/utils/ServerInfo.java b/storm-core/src/jvm/backtype/storm/utils/ServerInfo.java new file mode 100644 index 00000000000..ede7fe14419 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/ServerInfo.java @@ -0,0 +1,55 @@ +package backtype.storm.utils; + +import java.io.Serializable; + +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; + +public class ServerInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + private String host; + private int port; + + public static ServerInfo instance(String jsonText) throws ParseException { + JSONParser parser = new JSONParser(); + JSONObject jsonObject = (JSONObject) parser.parse(jsonText); + String host = (String) jsonObject.get("host"); + Long port = (Long) jsonObject.get("port"); + return new ServerInfo(host, port.intValue()); + } + + public ServerInfo(String host, int port) { + this.host = host; + this.port = port; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String toString() { + return host + ":" + port; + } + + public String toJsonString() { + JSONObject obj = new JSONObject(); + obj.put("host", host); + obj.put("port", new Integer(port)); + return obj.toJSONString(); + } +} diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 6e8458a3809..e4068efea6a 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -49,6 +49,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.commons.lang.StringUtils; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; @@ -533,4 +534,28 @@ private static SerializationDelegate getSerializationDelegate(Map stormConf) { delegate.prepare(stormConf); return delegate; } + + public static ServerInfo getServerInfo(Map conf, String name) throws TTransportException { + List servers = + (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + int port = Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_PORT), 2181); + String rootDir = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT); + ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf); + CuratorFramework _curator = + Utils.newCuratorStarted(conf, servers, port, rootDir, auth); + try { + if (_curator.checkExists().forPath("/" + name) == null) { + throw new RuntimeException(name + " server is not alive ! "); + } + byte[] zk_data = _curator.getData().forPath("/" + name); + String jsonText = (String) Utils.deserialize(zk_data); + return ServerInfo.instance(jsonText); + } catch (Exception e) { + LOG.error(e.getMessage()); + throw new TTransportException(e); + } finally { + _curator.close(); + LOG.info("Closed Client Zookeeper Connection"); + } + } } From fd4a7e825a9d980ef466edfb703fd4ee5885da80 Mon Sep 17 00:00:00 2001 From: caofangkun Date: Mon, 26 Jan 2015 12:21:51 +0800 Subject: [PATCH 04/10] STORM-534:Add ServerInfo.java to storm nimbus server info, NimbusClient fetch nimbus info from zk --- storm-core/src/clj/backtype/storm/cluster.clj | 5 +- .../src/clj/backtype/storm/daemon/nimbus.clj | 5 +- storm-core/src/genthrift.sh | 2 +- .../storm/generated/ClusterSummary.java | 115 ++++- .../backtype/storm/generated/ServerInfo.java | 425 ++++++++++++++++++ .../backtype/storm/utils/NimbusClient.java | 7 +- .../jvm/backtype/storm/utils/ServerInfo.java | 55 --- .../src/jvm/backtype/storm/utils/Utils.java | 21 +- storm-core/src/storm.thrift | 6 + 9 files changed, 558 insertions(+), 83 deletions(-) create mode 100644 storm-core/src/jvm/backtype/storm/generated/ServerInfo.java delete mode 100644 storm-core/src/jvm/backtype/storm/utils/ServerInfo.java diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 46e7107be47..2651d39dabe 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -17,7 +17,8 @@ (ns backtype.storm.cluster (:import [org.apache.zookeeper.data Stat ACL Id]) (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms]) - (:import [backtype.storm.utils Utils ServerInfo]) + (:import [backtype.storm.utils Utils]) + (:import [backtype.storm.generated ServerInfo]) (:import [java.security MessageDigest]) (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider]) (:use [backtype.storm util log config]) @@ -319,7 +320,7 @@ (nimbus-info [this] - (.instance ServerInfo (-> cluster-state + (Utils/instance (-> cluster-state (get-data NIMBUS-SUBTREE false) maybe-deserialize))) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 60fe81bd22a..ded18defb51 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -22,8 +22,7 @@ (:use [backtype.storm.scheduler.DefaultScheduler]) (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) - (:import [backtype.storm.generated AuthorizationException]) - (:import [backtype.storm.utils ServerInfo]) + (:import [backtype.storm.generated AuthorizationException ServerInfo]) (:use [backtype.storm bootstrap util]) (:use [backtype.storm.config :only [validate-configs-with-schemas]]) (:use [backtype.storm.daemon common]) @@ -884,7 +883,7 @@ local-hostname (memoized-local-hostname) port (nimbus-conf NIMBUS-THRIFT-PORT) server-info (ServerInfo. local-hostname port)] - (.register-nimbus-info storm-cluster-state (.toJsonString server-info)))) + (.register-nimbus-info storm-cluster-state (Utils/toJsonString server-info)))) (defn cleanup-corrupt-topologies! [nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) diff --git a/storm-core/src/genthrift.sh b/storm-core/src/genthrift.sh index 50d5cb0af8a..32ee8f2e9d1 100644 --- a/storm-core/src/genthrift.sh +++ b/storm-core/src/genthrift.sh @@ -16,7 +16,7 @@ rm -rf gen-javabean gen-py py rm -rf jvm/backtype/storm/generated -thrift7 --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift +thrift --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift mv gen-javabean/backtype/storm/generated jvm/backtype/storm/generated mv gen-py py rm -rf gen-javabean diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java index a2623aba49b..394a8140fc6 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java @@ -1,20 +1,3 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ /** * Autogenerated by Thrift Compiler (0.7.0) * @@ -44,16 +27,19 @@ public class ClusterSummary implements org.apache.thrift.TBase supervisors; // required private int nimbus_uptime_secs; // required private List topologies; // required + private ServerInfo server_info; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { SUPERVISORS((short)1, "supervisors"), NIMBUS_UPTIME_SECS((short)2, "nimbus_uptime_secs"), - TOPOLOGIES((short)3, "topologies"); + TOPOLOGIES((short)3, "topologies"), + SERVER_INFO((short)4, "server_info"); private static final Map byName = new HashMap(); @@ -74,6 +60,8 @@ public static _Fields findByThriftId(int fieldId) { return NIMBUS_UPTIME_SECS; case 3: // TOPOLOGIES return TOPOLOGIES; + case 4: // SERVER_INFO + return SERVER_INFO; default: return null; } @@ -128,6 +116,8 @@ public String getFieldName() { tmpMap.put(_Fields.TOPOLOGIES, new org.apache.thrift.meta_data.FieldMetaData("topologies", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TopologySummary.class)))); + tmpMap.put(_Fields.SERVER_INFO, new org.apache.thrift.meta_data.FieldMetaData("server_info", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ServerInfo.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterSummary.class, metaDataMap); } @@ -168,6 +158,9 @@ public ClusterSummary(ClusterSummary other) { } this.topologies = __this__topologies; } + if (other.is_set_server_info()) { + this.server_info = new ServerInfo(other.server_info); + } } public ClusterSummary deepCopy() { @@ -180,6 +173,7 @@ public void clear() { set_nimbus_uptime_secs_isSet(false); this.nimbus_uptime_secs = 0; this.topologies = null; + this.server_info = null; } public int get_supervisors_size() { @@ -280,6 +274,29 @@ public void set_topologies_isSet(boolean value) { } } + public ServerInfo get_server_info() { + return this.server_info; + } + + public void set_server_info(ServerInfo server_info) { + this.server_info = server_info; + } + + public void unset_server_info() { + this.server_info = null; + } + + /** Returns true if field server_info is set (has been assigned a value) and false otherwise */ + public boolean is_set_server_info() { + return this.server_info != null; + } + + public void set_server_info_isSet(boolean value) { + if (!value) { + this.server_info = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case SUPERVISORS: @@ -306,6 +323,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case SERVER_INFO: + if (value == null) { + unset_server_info(); + } else { + set_server_info((ServerInfo)value); + } + break; + } } @@ -320,6 +345,9 @@ public Object getFieldValue(_Fields field) { case TOPOLOGIES: return get_topologies(); + case SERVER_INFO: + return get_server_info(); + } throw new IllegalStateException(); } @@ -337,6 +365,8 @@ public boolean isSet(_Fields field) { return is_set_nimbus_uptime_secs(); case TOPOLOGIES: return is_set_topologies(); + case SERVER_INFO: + return is_set_server_info(); } throw new IllegalStateException(); } @@ -381,6 +411,15 @@ public boolean equals(ClusterSummary that) { return false; } + boolean this_present_server_info = true && this.is_set_server_info(); + boolean that_present_server_info = true && that.is_set_server_info(); + if (this_present_server_info || that_present_server_info) { + if (!(this_present_server_info && that_present_server_info)) + return false; + if (!this.server_info.equals(that.server_info)) + return false; + } + return true; } @@ -403,6 +442,11 @@ public int hashCode() { if (present_topologies) builder.append(topologies); + boolean present_server_info = true && (is_set_server_info()); + builder.append(present_server_info); + if (present_server_info) + builder.append(server_info); + return builder.toHashCode(); } @@ -444,6 +488,16 @@ public int compareTo(ClusterSummary other) { return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_server_info()).compareTo(typedOther.is_set_server_info()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_server_info()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.server_info, typedOther.server_info); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -505,6 +559,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.t org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } break; + case 4: // SERVER_INFO + if (field.type == org.apache.thrift.protocol.TType.STRUCT) { + this.server_info = new ServerInfo(); + this.server_info.read(iprot); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } @@ -545,6 +607,13 @@ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache. } oprot.writeFieldEnd(); } + if (this.server_info != null) { + if (is_set_server_info()) { + oprot.writeFieldBegin(SERVER_INFO_FIELD_DESC); + this.server_info.write(oprot); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -573,6 +642,16 @@ public String toString() { sb.append(this.topologies); } first = false; + if (is_set_server_info()) { + if (!first) sb.append(", "); + sb.append("server_info:"); + if (this.server_info == null) { + sb.append("null"); + } else { + sb.append(this.server_info); + } + first = false; + } sb.append(")"); return sb.toString(); } diff --git a/storm-core/src/jvm/backtype/storm/generated/ServerInfo.java b/storm-core/src/jvm/backtype/storm/generated/ServerInfo.java new file mode 100644 index 00000000000..345f1d1c6c3 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/generated/ServerInfo.java @@ -0,0 +1,425 @@ +/** + * Autogenerated by Thrift Compiler (0.7.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + */ +package backtype.storm.generated; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServerInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ServerInfo"); + + private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2); + + private String host; // required + private int port; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + HOST((short)1, "host"), + PORT((short)2, "port"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // HOST + return HOST; + case 2: // PORT + return PORT; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __PORT_ISSET_ID = 0; + private BitSet __isset_bit_vector = new BitSet(1); + + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ServerInfo.class, metaDataMap); + } + + public ServerInfo() { + } + + public ServerInfo( + String host, + int port) + { + this(); + this.host = host; + this.port = port; + set_port_isSet(true); + } + + /** + * Performs a deep copy on other. + */ + public ServerInfo(ServerInfo other) { + __isset_bit_vector.clear(); + __isset_bit_vector.or(other.__isset_bit_vector); + if (other.is_set_host()) { + this.host = other.host; + } + this.port = other.port; + } + + public ServerInfo deepCopy() { + return new ServerInfo(this); + } + + @Override + public void clear() { + this.host = null; + set_port_isSet(false); + this.port = 0; + } + + public String get_host() { + return this.host; + } + + public void set_host(String host) { + this.host = host; + } + + public void unset_host() { + this.host = null; + } + + /** Returns true if field host is set (has been assigned a value) and false otherwise */ + public boolean is_set_host() { + return this.host != null; + } + + public void set_host_isSet(boolean value) { + if (!value) { + this.host = null; + } + } + + public int get_port() { + return this.port; + } + + public void set_port(int port) { + this.port = port; + set_port_isSet(true); + } + + public void unset_port() { + __isset_bit_vector.clear(__PORT_ISSET_ID); + } + + /** Returns true if field port is set (has been assigned a value) and false otherwise */ + public boolean is_set_port() { + return __isset_bit_vector.get(__PORT_ISSET_ID); + } + + public void set_port_isSet(boolean value) { + __isset_bit_vector.set(__PORT_ISSET_ID, value); + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case HOST: + if (value == null) { + unset_host(); + } else { + set_host((String)value); + } + break; + + case PORT: + if (value == null) { + unset_port(); + } else { + set_port((Integer)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case HOST: + return get_host(); + + case PORT: + return Integer.valueOf(get_port()); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case HOST: + return is_set_host(); + case PORT: + return is_set_port(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof ServerInfo) + return this.equals((ServerInfo)that); + return false; + } + + public boolean equals(ServerInfo that) { + if (that == null) + return false; + + boolean this_present_host = true && this.is_set_host(); + boolean that_present_host = true && that.is_set_host(); + if (this_present_host || that_present_host) { + if (!(this_present_host && that_present_host)) + return false; + if (!this.host.equals(that.host)) + return false; + } + + boolean this_present_port = true; + boolean that_present_port = true; + if (this_present_port || that_present_port) { + if (!(this_present_port && that_present_port)) + return false; + if (this.port != that.port) + return false; + } + + return true; + } + + @Override + public int hashCode() { + HashCodeBuilder builder = new HashCodeBuilder(); + + boolean present_host = true && (is_set_host()); + builder.append(present_host); + if (present_host) + builder.append(host); + + boolean present_port = true; + builder.append(present_port); + if (present_port) + builder.append(port); + + return builder.toHashCode(); + } + + public int compareTo(ServerInfo other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + ServerInfo typedOther = (ServerInfo)other; + + lastComparison = Boolean.valueOf(is_set_host()).compareTo(typedOther.is_set_host()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_host()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, typedOther.host); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_port()).compareTo(typedOther.is_set_port()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_port()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, typedOther.port); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (field.id) { + case 1: // HOST + if (field.type == org.apache.thrift.protocol.TType.STRING) { + this.host = iprot.readString(); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: // PORT + if (field.type == org.apache.thrift.protocol.TType.I32) { + this.port = iprot.readI32(); + set_port_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (this.host != null) { + oprot.writeFieldBegin(HOST_FIELD_DESC); + oprot.writeString(this.host); + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(PORT_FIELD_DESC); + oprot.writeI32(this.port); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ServerInfo("); + boolean first = true; + + sb.append("host:"); + if (this.host == null) { + sb.append("null"); + } else { + sb.append(this.host); + } + first = false; + if (!first) sb.append(", "); + sb.append("port:"); + sb.append(this.port); + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + if (!is_set_host()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'host' is unset! Struct:" + toString()); + } + + if (!is_set_port()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'port' is unset! Struct:" + toString()); + } + + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + +} + diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java index 8e8075b9c93..445f433cca2 100644 --- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java +++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java @@ -20,6 +20,7 @@ import backtype.storm.Config; import backtype.storm.security.auth.ThriftClient; import backtype.storm.security.auth.ThriftConnectionType; +import backtype.storm.generated.ServerInfo; import backtype.storm.generated.Nimbus; import java.util.Map; import org.apache.thrift.transport.TTransportException; @@ -33,9 +34,9 @@ public class NimbusClient extends ThriftClient { public static NimbusClient getConfiguredClient(Map conf) { try { ServerInfo serverInfo = Utils.getServerInfo(conf, "nimbus"); - conf.put(Config.NIMBUS_HOST, serverInfo.getHost()); - conf.put(Config.NIMBUS_THRIFT_PORT, serverInfo.getPort()); - return new NimbusClient(conf, serverInfo.getHost()); + conf.put(Config.NIMBUS_HOST, serverInfo.get_host()); + conf.put(Config.NIMBUS_THRIFT_PORT, serverInfo.get_port()); + return new NimbusClient(conf, serverInfo.get_host()); } catch (TTransportException ex) { throw new RuntimeException(ex); } diff --git a/storm-core/src/jvm/backtype/storm/utils/ServerInfo.java b/storm-core/src/jvm/backtype/storm/utils/ServerInfo.java deleted file mode 100644 index ede7fe14419..00000000000 --- a/storm-core/src/jvm/backtype/storm/utils/ServerInfo.java +++ /dev/null @@ -1,55 +0,0 @@ -package backtype.storm.utils; - -import java.io.Serializable; - -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; - -public class ServerInfo implements Serializable { - - private static final long serialVersionUID = 1L; - - private String host; - private int port; - - public static ServerInfo instance(String jsonText) throws ParseException { - JSONParser parser = new JSONParser(); - JSONObject jsonObject = (JSONObject) parser.parse(jsonText); - String host = (String) jsonObject.get("host"); - Long port = (Long) jsonObject.get("port"); - return new ServerInfo(host, port.intValue()); - } - - public ServerInfo(String host, int port) { - this.host = host; - this.port = port; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String toString() { - return host + ":" + port; - } - - public String toJsonString() { - JSONObject obj = new JSONObject(); - obj.put("host", host); - obj.put("port", new Integer(port)); - return obj.toJSONString(); - } -} diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index e4068efea6a..c4081e07a17 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -54,6 +54,9 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.json.simple.JSONValue; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; @@ -62,6 +65,7 @@ import backtype.storm.Config; import backtype.storm.generated.ComponentCommon; import backtype.storm.generated.ComponentObject; +import backtype.storm.generated.ServerInfo; import backtype.storm.generated.StormTopology; import backtype.storm.generated.AuthorizationException; @@ -535,6 +539,21 @@ private static SerializationDelegate getSerializationDelegate(Map stormConf) { return delegate; } + public static ServerInfo instance(String jsonText) throws ParseException { + JSONParser parser = new JSONParser(); + JSONObject jsonObject = (JSONObject) parser.parse(jsonText); + String host = (String) jsonObject.get("host"); + Long port = (Long) jsonObject.get("port"); + return new ServerInfo(host, port.intValue()); + } + + public static String toJsonString(ServerInfo serverInfo) { + JSONObject obj = new JSONObject(); + obj.put("host", serverInfo.get_host()); + obj.put("port", serverInfo.get_port()); + return obj.toJSONString(); + } + public static ServerInfo getServerInfo(Map conf, String name) throws TTransportException { List servers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS); @@ -549,7 +568,7 @@ public static ServerInfo getServerInfo(Map conf, String name) throws TTransportE } byte[] zk_data = _curator.getData().forPath("/" + name); String jsonText = (String) Utils.deserialize(zk_data); - return ServerInfo.instance(jsonText); + return Utils.instance(jsonText); } catch (Exception e) { LOG.error(e.getMessage()); throw new TTransportException(e); diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index f807b743e86..0e034d651af 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -155,10 +155,16 @@ struct SupervisorSummary { 5: required string supervisor_id; } +struct ServerInfo { + 1: required string host; + 2: required i32 port; +} + struct ClusterSummary { 1: required list supervisors; 2: required i32 nimbus_uptime_secs; 3: required list topologies; + 4: optional ServerInfo server_info; } struct ErrorInfo { From a8d08c9c098208f693bdc5016d4e3f41eab09240 Mon Sep 17 00:00:00 2001 From: caofangkun Date: Mon, 26 Jan 2015 14:07:50 +0800 Subject: [PATCH 05/10] STORM-534:modify UI add nimbus.html --- .../src/clj/backtype/storm/daemon/nimbus.clj | 4 +- storm-core/src/clj/backtype/storm/ui/core.clj | 3 + .../storm/generated/ClusterSummary.java | 34 ++++---- storm-core/src/storm.thrift | 2 +- storm-core/src/ui/public/index.html | 10 --- storm-core/src/ui/public/nimbus.html | 77 +++++++++++++++++++ .../public/templates/index-page-template.html | 14 ++++ 7 files changed, 116 insertions(+), 28 deletions(-) create mode 100644 storm-core/src/ui/public/nimbus.html diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index ded18defb51..c30f7acf738 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -1224,6 +1224,7 @@ (^ClusterSummary getClusterInfo [this] (check-authorization! nimbus nil nil "getClusterInfo") (let [storm-cluster-state (:storm-cluster-state nimbus) + nimbus-info (.nimbus-info storm-cluster-state) supervisor-infos (all-supervisor-info storm-cluster-state) ;; TODO: need to get the port info about supervisors... ;; in standalone just look at metadata, otherwise just say N/A? @@ -1261,7 +1262,8 @@ ))] (ClusterSummary. supervisor-summaries nimbus-uptime - topology-summaries) + topology-summaries + nimbus-info) )) (^TopologyInfo getTopologyInfo [this ^String storm-id] diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 10a11233a8f..7db24f4efa2 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -491,6 +491,7 @@ (cluster-summary (.getClusterInfo ^Nimbus$Client nimbus) user))) ([^ClusterSummary summ user] (let [sups (.get_supervisors summ) + nimbus-info (.get_server_info summ) used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups)) total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups)) free-slots (- total-slots used-slots) @@ -501,6 +502,8 @@ (map #(.get_num_executors ^TopologySummary %)) (reduce +))] {"user" user + "nimbusHost" (.get_host nimbus-info) + "nimbusPort" (.get_port nimbus-info) "stormVersion" (str (VersionInfo/getVersion)) "nimbusUptime" (pretty-uptime-sec (.get_nimbus_uptime_secs summ)) "supervisors" (count sups) diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java index 394a8140fc6..21d95f19079 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java @@ -116,7 +116,7 @@ public String getFieldName() { tmpMap.put(_Fields.TOPOLOGIES, new org.apache.thrift.meta_data.FieldMetaData("topologies", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TopologySummary.class)))); - tmpMap.put(_Fields.SERVER_INFO, new org.apache.thrift.meta_data.FieldMetaData("server_info", org.apache.thrift.TFieldRequirementType.OPTIONAL, + tmpMap.put(_Fields.SERVER_INFO, new org.apache.thrift.meta_data.FieldMetaData("server_info", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ServerInfo.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterSummary.class, metaDataMap); @@ -128,13 +128,15 @@ public ClusterSummary() { public ClusterSummary( List supervisors, int nimbus_uptime_secs, - List topologies) + List topologies, + ServerInfo server_info) { this(); this.supervisors = supervisors; this.nimbus_uptime_secs = nimbus_uptime_secs; set_nimbus_uptime_secs_isSet(true); this.topologies = topologies; + this.server_info = server_info; } /** @@ -608,11 +610,9 @@ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache. oprot.writeFieldEnd(); } if (this.server_info != null) { - if (is_set_server_info()) { - oprot.writeFieldBegin(SERVER_INFO_FIELD_DESC); - this.server_info.write(oprot); - oprot.writeFieldEnd(); - } + oprot.writeFieldBegin(SERVER_INFO_FIELD_DESC); + this.server_info.write(oprot); + oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); @@ -642,16 +642,14 @@ public String toString() { sb.append(this.topologies); } first = false; - if (is_set_server_info()) { - if (!first) sb.append(", "); - sb.append("server_info:"); - if (this.server_info == null) { - sb.append("null"); - } else { - sb.append(this.server_info); - } - first = false; + if (!first) sb.append(", "); + sb.append("server_info:"); + if (this.server_info == null) { + sb.append("null"); + } else { + sb.append(this.server_info); } + first = false; sb.append(")"); return sb.toString(); } @@ -670,6 +668,10 @@ public void validate() throws org.apache.thrift.TException { throw new org.apache.thrift.protocol.TProtocolException("Required field 'topologies' is unset! Struct:" + toString()); } + if (!is_set_server_info()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'server_info' is unset! Struct:" + toString()); + } + } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index 0e034d651af..48fb150a212 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -164,7 +164,7 @@ struct ClusterSummary { 1: required list supervisors; 2: required i32 nimbus_uptime_secs; 3: required list topologies; - 4: optional ServerInfo server_info; + 4: required ServerInfo server_info; } struct ErrorInfo { diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html index 6fac19a0f69..41efe54a117 100644 --- a/storm-core/src/ui/public/index.html +++ b/storm-core/src/ui/public/index.html @@ -40,8 +40,6 @@

Topology summary

Supervisor summary

-

Nimbus Configuration

-
diff --git a/storm-core/src/ui/public/nimbus.html b/storm-core/src/ui/public/nimbus.html new file mode 100644 index 00000000000..bb7930a226e --- /dev/null +++ b/storm-core/src/ui/public/nimbus.html @@ -0,0 +1,77 @@ + + + +Nimbus Information + + + + + + + + + + + +

Storm UI

+

Overview

+
+
+

Nimbus Configuration

+
+
+ + + + + diff --git a/storm-core/src/ui/public/templates/index-page-template.html b/storm-core/src/ui/public/templates/index-page-template.html index 9c48e83a5aa..adc5cb3d468 100644 --- a/storm-core/src/ui/public/templates/index-page-template.html +++ b/storm-core/src/ui/public/templates/index-page-template.html @@ -14,10 +14,23 @@ See the License for the specific language governing permissions and limitations under the License. --> + + @@ -31,11 +31,6 @@ NimbusHost - - - Version - - Nimbus uptime @@ -76,7 +71,6 @@ {{nimbusHost}} - {{stormVersion}} {{nimbusUptime}} {{supervisors}} {{slotsUsed}} From d605f186c2261fe4a1c7db449d5bafd90875d4a8 Mon Sep 17 00:00:00 2001 From: caofangkun Date: Mon, 26 Jan 2015 15:39:46 +0800 Subject: [PATCH 07/10] STORM-534:modify ClusterSummary.java add License --- .../storm/generated/ClusterSummary.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java index 21d95f19079..7bb49629a6e 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /** * Autogenerated by Thrift Compiler (0.7.0) * From 16ea513eff82d4468f8c1bef22c17d66bb85eae9 Mon Sep 17 00:00:00 2001 From: caofangkun Date: Wed, 4 Feb 2015 11:12:57 +0800 Subject: [PATCH 08/10] STORM-534: fix merge conflicts --- .../src/clj/backtype/storm/daemon/nimbus.clj | 6 +----- .../backtype/storm/generated/ServerInfo.java | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index a1e36d2d3ca..50b564e2f63 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -23,12 +23,8 @@ (:use [backtype.storm.scheduler.DefaultScheduler]) (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) -<<<<<<< HEAD - (:import [backtype.storm.generated AuthorizationException ServerInfo]) -======= - (:import [backtype.storm.generated AuthorizationException GetInfoOptions + (:import [backtype.storm.generated AuthorizationException ServerInfo GetInfoOptions NumErrorsChoice]) ->>>>>>> origin/master (:use [backtype.storm bootstrap util]) (:use [backtype.storm.config :only [validate-configs-with-schemas]]) (:use [backtype.storm.daemon common]) diff --git a/storm-core/src/jvm/backtype/storm/generated/ServerInfo.java b/storm-core/src/jvm/backtype/storm/generated/ServerInfo.java index f6bdd31d4ac..504cf65607c 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ServerInfo.java +++ b/storm-core/src/jvm/backtype/storm/generated/ServerInfo.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /** * Autogenerated by Thrift Compiler (0.7.0) * From bf72c5b03241574090d17df4efff1f27e988f993 Mon Sep 17 00:00:00 2001 From: caofangkun Date: Wed, 4 Feb 2015 11:14:50 +0800 Subject: [PATCH 09/10] STORM-534: fix merge conflicts --- storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 1 - 1 file changed, 1 deletion(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 50b564e2f63..b322cd1c169 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -18,7 +18,6 @@ [java.util Collections]) (:import [java.io FileNotFoundException]) (:import [java.nio.channels Channels WritableByteChannel]) - (:import [backtype.storm.utils VersionInfo]) (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils]) (:use [backtype.storm.scheduler.DefaultScheduler]) (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails From a035bf2636504a803af2d956c1563df0af85affb Mon Sep 17 00:00:00 2001 From: caofangkun Date: Wed, 4 Feb 2015 11:19:05 +0800 Subject: [PATCH 10/10] STORM-534: fix merge conflicts --- storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 1 + 1 file changed, 1 insertion(+) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index b322cd1c169..50b564e2f63 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -18,6 +18,7 @@ [java.util Collections]) (:import [java.io FileNotFoundException]) (:import [java.nio.channels Channels WritableByteChannel]) + (:import [backtype.storm.utils VersionInfo]) (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils]) (:use [backtype.storm.scheduler.DefaultScheduler]) (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails