From 400f2154e458b2033299b5be5bde165dfbb57cf8 Mon Sep 17 00:00:00 2001 From: zhuol Date: Tue, 10 Nov 2015 14:57:14 -0600 Subject: [PATCH 01/12] [STORM-1198] Web UI to show resource usages and Total Resources on all supervisors --- .../src/clj/backtype/storm/daemon/nimbus.clj | 7 +- storm-core/src/clj/backtype/storm/ui/core.clj | 5 + .../storm/generated/SupervisorSummary.java | 206 +++++++++++++++++- .../jvm/backtype/storm/scheduler/Cluster.java | 15 +- .../resource/ResourceAwareScheduler.java | 16 ++ storm-core/src/py/storm/ttypes.py | 28 ++- storm-core/src/storm.thrift | 2 + .../public/templates/index-page-template.html | 24 ++ 8 files changed, 296 insertions(+), 7 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index d23da03488f..87b5a8bd9ff 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -163,7 +163,8 @@ :leader-elector (zk-leader-elector conf) :code-distributor (mk-code-distributor conf) :id->sched-status (atom {}) - :id->resources (atom {}) + :node-id->resources (atom {}) ;;resources of supervisors + :id->resources (atom {}) ;;resources of topologies :cred-renewers (AuthUtils/GetCredentialRenewers conf) :topology-history-lock (Object.) :topo-history-state (nimbus-topo-history-state conf) @@ -745,6 +746,7 @@ ;; the new assignments for all the topologies are in the cluster object. _ (.schedule (:scheduler nimbus) topologies cluster) _ (reset! (:id->sched-status nimbus) (.getStatusMap cluster)) + _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster)) _ (reset! (:id->resources nimbus) (merge @(:id->resources nimbus) (.getResourcesMap cluster)))] (.getAssignments cluster))) @@ -1646,6 +1648,9 @@ (count (:used-ports info)) id) ] (.set_total_resources sup-sum (map-val double (:resources-map info))) + (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)] + (.set_used_mem sup-sum used-mem) + (.set_used_cpu sup-sum used-cpu)) (when-let [version (:version info)] (.set_version sup-sum version)) sup-sum)) nimbus-uptime ((:uptime nimbus)) diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 99f75e49c2c..64dc1b91a55 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -42,6 +42,7 @@ (:import [backtype.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo]) (:import [backtype.storm.security.auth AuthUtils]) (:import [backtype.storm.utils VersionInfo]) + (:import [backtype.storm Config]) (:import [java.io File]) (:require [compojure.route :as route] [compojure.handler :as handler] @@ -433,6 +434,10 @@ "uptimeSeconds" (.get_uptime_secs s) "slotsTotal" (.get_num_workers s) "slotsUsed" (.get_num_used_workers s) + "totalMem" (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB) + "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY) + "usedMem" (.get_used_mem s) + "usedCpu" (.get_used_cpu s) "version" (.get_version s)})})) (defn all-topologies-summary diff --git a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java index 2c3f3307057..88202d8bbed 100644 --- a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java @@ -62,6 +62,8 @@ public class SupervisorSummary implements org.apache.thrift.TBase, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -76,6 +78,8 @@ public class SupervisorSummary implements org.apache.thrift.TBase total_resources; // optional + private double used_mem; // optional + private double used_cpu; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -85,7 +89,9 @@ public enum _Fields implements org.apache.thrift.TFieldIdEnum { NUM_USED_WORKERS((short)4, "num_used_workers"), SUPERVISOR_ID((short)5, "supervisor_id"), VERSION((short)6, "version"), - TOTAL_RESOURCES((short)7, "total_resources"); + TOTAL_RESOURCES((short)7, "total_resources"), + USED_MEM((short)8, "used_mem"), + USED_CPU((short)9, "used_cpu"); private static final Map byName = new HashMap(); @@ -114,6 +120,10 @@ public static _Fields findByThriftId(int fieldId) { return VERSION; case 7: // TOTAL_RESOURCES return TOTAL_RESOURCES; + case 8: // USED_MEM + return USED_MEM; + case 9: // USED_CPU + return USED_CPU; default: return null; } @@ -157,8 +167,10 @@ public String getFieldName() { private static final int __UPTIME_SECS_ISSET_ID = 0; private static final int __NUM_WORKERS_ISSET_ID = 1; private static final int __NUM_USED_WORKERS_ISSET_ID = 2; + private static final int __USED_MEM_ISSET_ID = 3; + private static final int __USED_CPU_ISSET_ID = 4; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.VERSION,_Fields.TOTAL_RESOURCES}; + private static final _Fields optionals[] = {_Fields.VERSION,_Fields.TOTAL_RESOURCES,_Fields.USED_MEM,_Fields.USED_CPU}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -178,6 +190,10 @@ public String getFieldName() { new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)))); + tmpMap.put(_Fields.USED_MEM, new org.apache.thrift.meta_data.FieldMetaData("used_mem", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); + tmpMap.put(_Fields.USED_CPU, new org.apache.thrift.meta_data.FieldMetaData("used_cpu", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(SupervisorSummary.class, metaDataMap); } @@ -226,6 +242,8 @@ public SupervisorSummary(SupervisorSummary other) { Map __this__total_resources = new HashMap(other.total_resources); this.total_resources = __this__total_resources; } + this.used_mem = other.used_mem; + this.used_cpu = other.used_cpu; } public SupervisorSummary deepCopy() { @@ -245,6 +263,10 @@ public void clear() { this.version = "VERSION_NOT_PROVIDED"; this.total_resources = null; + set_used_mem_isSet(false); + this.used_mem = 0.0; + set_used_cpu_isSet(false); + this.used_cpu = 0.0; } public String get_host() { @@ -416,6 +438,50 @@ public void set_total_resources_isSet(boolean value) { } } + public double get_used_mem() { + return this.used_mem; + } + + public void set_used_mem(double used_mem) { + this.used_mem = used_mem; + set_used_mem_isSet(true); + } + + public void unset_used_mem() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __USED_MEM_ISSET_ID); + } + + /** Returns true if field used_mem is set (has been assigned a value) and false otherwise */ + public boolean is_set_used_mem() { + return EncodingUtils.testBit(__isset_bitfield, __USED_MEM_ISSET_ID); + } + + public void set_used_mem_isSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __USED_MEM_ISSET_ID, value); + } + + public double get_used_cpu() { + return this.used_cpu; + } + + public void set_used_cpu(double used_cpu) { + this.used_cpu = used_cpu; + set_used_cpu_isSet(true); + } + + public void unset_used_cpu() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __USED_CPU_ISSET_ID); + } + + /** Returns true if field used_cpu is set (has been assigned a value) and false otherwise */ + public boolean is_set_used_cpu() { + return EncodingUtils.testBit(__isset_bitfield, __USED_CPU_ISSET_ID); + } + + public void set_used_cpu_isSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __USED_CPU_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case HOST: @@ -474,6 +540,22 @@ public void setFieldValue(_Fields field, Object value) { } break; + case USED_MEM: + if (value == null) { + unset_used_mem(); + } else { + set_used_mem((Double)value); + } + break; + + case USED_CPU: + if (value == null) { + unset_used_cpu(); + } else { + set_used_cpu((Double)value); + } + break; + } } @@ -500,6 +582,12 @@ public Object getFieldValue(_Fields field) { case TOTAL_RESOURCES: return get_total_resources(); + case USED_MEM: + return Double.valueOf(get_used_mem()); + + case USED_CPU: + return Double.valueOf(get_used_cpu()); + } throw new IllegalStateException(); } @@ -525,6 +613,10 @@ public boolean isSet(_Fields field) { return is_set_version(); case TOTAL_RESOURCES: return is_set_total_resources(); + case USED_MEM: + return is_set_used_mem(); + case USED_CPU: + return is_set_used_cpu(); } throw new IllegalStateException(); } @@ -605,6 +697,24 @@ public boolean equals(SupervisorSummary that) { return false; } + boolean this_present_used_mem = true && this.is_set_used_mem(); + boolean that_present_used_mem = true && that.is_set_used_mem(); + if (this_present_used_mem || that_present_used_mem) { + if (!(this_present_used_mem && that_present_used_mem)) + return false; + if (this.used_mem != that.used_mem) + return false; + } + + boolean this_present_used_cpu = true && this.is_set_used_cpu(); + boolean that_present_used_cpu = true && that.is_set_used_cpu(); + if (this_present_used_cpu || that_present_used_cpu) { + if (!(this_present_used_cpu && that_present_used_cpu)) + return false; + if (this.used_cpu != that.used_cpu) + return false; + } + return true; } @@ -647,6 +757,16 @@ public int hashCode() { if (present_total_resources) list.add(total_resources); + boolean present_used_mem = true && (is_set_used_mem()); + list.add(present_used_mem); + if (present_used_mem) + list.add(used_mem); + + boolean present_used_cpu = true && (is_set_used_cpu()); + list.add(present_used_cpu); + if (present_used_cpu) + list.add(used_cpu); + return list.hashCode(); } @@ -728,6 +848,26 @@ public int compareTo(SupervisorSummary other) { return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_used_mem()).compareTo(other.is_set_used_mem()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_used_mem()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.used_mem, other.used_mem); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_used_cpu()).compareTo(other.is_set_used_cpu()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_used_cpu()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.used_cpu, other.used_cpu); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -795,6 +935,18 @@ public String toString() { } first = false; } + if (is_set_used_mem()) { + if (!first) sb.append(", "); + sb.append("used_mem:"); + sb.append(this.used_mem); + first = false; + } + if (is_set_used_cpu()) { + if (!first) sb.append(", "); + sb.append("used_cpu:"); + sb.append(this.used_cpu); + first = false; + } sb.append(")"); return sb.toString(); } @@ -928,6 +1080,22 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, SupervisorSummary s org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 8: // USED_MEM + if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) { + struct.used_mem = iprot.readDouble(); + struct.set_used_mem_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 9: // USED_CPU + if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) { + struct.used_cpu = iprot.readDouble(); + struct.set_used_cpu_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -982,6 +1150,16 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, SupervisorSummary oprot.writeFieldEnd(); } } + if (struct.is_set_used_mem()) { + oprot.writeFieldBegin(USED_MEM_FIELD_DESC); + oprot.writeDouble(struct.used_mem); + oprot.writeFieldEnd(); + } + if (struct.is_set_used_cpu()) { + oprot.writeFieldBegin(USED_CPU_FIELD_DESC); + oprot.writeDouble(struct.used_cpu); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1011,7 +1189,13 @@ public void write(org.apache.thrift.protocol.TProtocol prot, SupervisorSummary s if (struct.is_set_total_resources()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.is_set_used_mem()) { + optionals.set(2); + } + if (struct.is_set_used_cpu()) { + optionals.set(3); + } + oprot.writeBitSet(optionals, 4); if (struct.is_set_version()) { oprot.writeString(struct.version); } @@ -1025,6 +1209,12 @@ public void write(org.apache.thrift.protocol.TProtocol prot, SupervisorSummary s } } } + if (struct.is_set_used_mem()) { + oprot.writeDouble(struct.used_mem); + } + if (struct.is_set_used_cpu()) { + oprot.writeDouble(struct.used_cpu); + } } @Override @@ -1040,7 +1230,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, SupervisorSummary st struct.set_num_used_workers_isSet(true); struct.supervisor_id = iprot.readString(); struct.set_supervisor_id_isSet(true); - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(4); if (incoming.get(0)) { struct.version = iprot.readString(); struct.set_version_isSet(true); @@ -1060,6 +1250,14 @@ public void read(org.apache.thrift.protocol.TProtocol prot, SupervisorSummary st } struct.set_total_resources_isSet(true); } + if (incoming.get(2)) { + struct.used_mem = iprot.readDouble(); + struct.set_used_mem_isSet(true); + } + if (incoming.get(3)) { + struct.used_cpu = iprot.readDouble(); + struct.set_used_cpu_isSet(true); + } } } diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java index b3028e92e5c..c4a815fce18 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java @@ -34,7 +34,11 @@ public class Cluster { /** * key: supervisor id, value: supervisor details */ - private Map supervisors; + private Map supervisors; + /** + * key: supervisor id, value: supervisor's total and used resources + */ + private Map supervisorsResources; /** * key: rack, value: nodes in that rack @@ -73,6 +77,7 @@ public Cluster(INimbus nimbus, Map supervisors, Map(); this.resources = new HashMap(); + this.supervisorsResources = new HashMap(); this.hostToId = new HashMap>(); for (Map.Entry entry : supervisors.entrySet()) { String nodeId = entry.getKey(); @@ -476,4 +481,12 @@ public void setResources(String topologyId, Double[] resources) { public Map getResourcesMap() { return this.resources; } + + public void setSupervisorsResources(Map supervisors_resources) { + this.supervisorsResources.putAll(supervisors_resources); + } + + public Map getSupervisorsResourcesMap() { + return this.supervisorsResources; + } } diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java index 6e455f2735f..ad33b43b95e 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java @@ -110,6 +110,22 @@ public void schedule(Topologies topologies, Cluster cluster) { cluster.setStatus(td.getId(), "Fully Scheduled"); } } + updateSupervisorsResources(cluster, topologies); + } + + private void updateSupervisorsResources(Cluster cluster, Topologies topologies) { + Map supervisors_resources = new HashMap(); + Map nodes = RAS_Node.getAllNodesFrom(cluster, topologies); + for (Map.Entry entry : nodes.entrySet()) { + RAS_Node node = entry.getValue(); + Double totalMem = node.getTotalMemoryResources(); + Double totalCpu = node.getTotalCpuResources(); + Double usedMem = totalMem - node.getAvailableMemoryResources(); + Double usedCpu = totalCpu - node.getAvailableCpuResources(); + Double[] resources = {totalMem, totalCpu, usedMem, usedCpu}; + supervisors_resources.put(entry.getKey(), resources); + } + cluster.setSupervisorsResources(supervisors_resources); } private Map getUserConf() { diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py index 23a82e10fff..ea3c768f100 100644 --- a/storm-core/src/py/storm/ttypes.py +++ b/storm-core/src/py/storm/ttypes.py @@ -2510,6 +2510,8 @@ class SupervisorSummary: - supervisor_id - version - total_resources + - used_mem + - used_cpu """ thrift_spec = ( @@ -2521,9 +2523,11 @@ class SupervisorSummary: (5, TType.STRING, 'supervisor_id', None, None, ), # 5 (6, TType.STRING, 'version', None, "VERSION_NOT_PROVIDED", ), # 6 (7, TType.MAP, 'total_resources', (TType.STRING,None,TType.DOUBLE,None), None, ), # 7 + (8, TType.DOUBLE, 'used_mem', None, None, ), # 8 + (9, TType.DOUBLE, 'used_cpu', None, None, ), # 9 ) - def __init__(self, host=None, uptime_secs=None, num_workers=None, num_used_workers=None, supervisor_id=None, version=thrift_spec[6][4], total_resources=None,): + def __init__(self, host=None, uptime_secs=None, num_workers=None, num_used_workers=None, supervisor_id=None, version=thrift_spec[6][4], total_resources=None, used_mem=None, used_cpu=None,): self.host = host self.uptime_secs = uptime_secs self.num_workers = num_workers @@ -2531,6 +2535,8 @@ def __init__(self, host=None, uptime_secs=None, num_workers=None, num_used_worke self.supervisor_id = supervisor_id self.version = version self.total_resources = total_resources + self.used_mem = used_mem + self.used_cpu = used_cpu def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -2582,6 +2588,16 @@ def read(self, iprot): iprot.readMapEnd() else: iprot.skip(ftype) + elif fid == 8: + if ftype == TType.DOUBLE: + self.used_mem = iprot.readDouble(); + else: + iprot.skip(ftype) + elif fid == 9: + if ftype == TType.DOUBLE: + self.used_cpu = iprot.readDouble(); + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -2624,6 +2640,14 @@ def write(self, oprot): oprot.writeDouble(viter74) oprot.writeMapEnd() oprot.writeFieldEnd() + if self.used_mem is not None: + oprot.writeFieldBegin('used_mem', TType.DOUBLE, 8) + oprot.writeDouble(self.used_mem) + oprot.writeFieldEnd() + if self.used_cpu is not None: + oprot.writeFieldBegin('used_cpu', TType.DOUBLE, 9) + oprot.writeDouble(self.used_cpu) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -2650,6 +2674,8 @@ def __hash__(self): value = (value * 31) ^ hash(self.supervisor_id) value = (value * 31) ^ hash(self.version) value = (value * 31) ^ hash(self.total_resources) + value = (value * 31) ^ hash(self.used_mem) + value = (value * 31) ^ hash(self.used_cpu) return value def __repr__(self): diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index 6d28d88b623..1818ea7f4d0 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -162,6 +162,8 @@ struct SupervisorSummary { 5: required string supervisor_id; 6: optional string version = "VERSION_NOT_PROVIDED"; 7: optional map total_resources; + 8: optional double used_mem; + 9: optional double used_cpu; } struct NimbusSummary { diff --git a/storm-core/src/ui/public/templates/index-page-template.html b/storm-core/src/ui/public/templates/index-page-template.html index 15d72a7b8fb..bf1f1081fe4 100644 --- a/storm-core/src/ui/public/templates/index-page-template.html +++ b/storm-core/src/ui/public/templates/index-page-template.html @@ -214,6 +214,26 @@ Used slots + + + Total Mem (MB) + + + + + Used Mem (MB) + + + + + Total CPU (%) + + + + + Used CPU (%) + + Version @@ -229,6 +249,10 @@ {{uptime}} {{slotsTotal}} {{slotsUsed}} + {{totalMem}} + {{usedMem}} + {{totalCpu}} + {{usedCpu}} {{version}} {{/supervisors}} From dda1c32d6b6c39c7e3027fab81f2cfce5f9927e7 Mon Sep 17 00:00:00 2001 From: zhuol Date: Wed, 11 Nov 2015 19:50:16 -0600 Subject: [PATCH 02/12] Update rest api doc for supervisor summary --- docs/documentation/ui-rest-api.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/documentation/ui-rest-api.md b/docs/documentation/ui-rest-api.md index e1c68c1815f..f2d36a91598 100644 --- a/docs/documentation/ui-rest-api.md +++ b/docs/documentation/ui-rest-api.md @@ -120,6 +120,10 @@ Response fields: |uptimeSeconds| Integer| Shows how long the supervisor is running in seconds| |slotsTotal| Integer| Total number of available worker slots for this supervisor| |slotsUsed| Integer| Number of worker slots used on this supervisor| +|totalMem| Double| Total memory capacity on this supervisor| +|totalCpu| Double| Total CPU capacity on this supervisor| +|usedMem| Double| Used memory capacity on this supervisor| +|usedCpu| Double| Used CPU capacity on this supervisor| Sample response: @@ -132,7 +136,11 @@ Sample response: "uptime": "5m 58s", "uptimeSeconds": 358, "slotsTotal": 4, - "slotsUsed": 3 + "slotsUsed": 3, + "totalMem": 3000, + "totalCpu": 400, + "usedMem": 1280, + "usedCPU": 160 } ] } From 6b12a0b0290b6f3df9c28eb1185d8b294141e60d Mon Sep 17 00:00:00 2001 From: zhuol Date: Thu, 12 Nov 2015 16:10:34 -0600 Subject: [PATCH 03/12] Hide resources columns for non-RAS --- storm-core/src/ui/public/index.html | 16 ++++++++++++++++ storm-core/src/ui/public/topology.html | 9 ++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html index eae423cea9b..6e529236e37 100644 --- a/storm-core/src/ui/public/index.html +++ b/storm-core/src/ui/public/index.html @@ -143,6 +143,13 @@

Nimbus Configuration

] }); $('#topology-summary [data-toggle="tooltip"]').tooltip(); + $.getJSON("/api/v1/cluster/configuration", function(json){ + var scheduler = json["storm.scheduler"]; + if (scheduler != "backtype.storm.scheduler.resource.ResourceAwareScheduler"){ + $('#topology-summary td:nth-child(9),#topology-summary th:nth-child(9)').hide(); + $('#topology-summary td:nth-child(10),#topology-summary th:nth-child(10)').hide(); + } + }); }); }); $.getJSON("/api/v1/supervisor/summary",function(response,status,jqXHR) { @@ -156,6 +163,15 @@

Nimbus Configuration

] }); $('#supervisor-summary [data-toggle="tooltip"]').tooltip(); + $.getJSON("/api/v1/cluster/configuration", function(json){ + var scheduler = json["storm.scheduler"]; + if (scheduler != "backtype.storm.scheduler.resource.ResourceAwareScheduler"){ + $('#supervisor-summary td:nth-child(6),#supervisor-summary th:nth-child(6)').hide(); + $('#supervisor-summary td:nth-child(7),#supervisor-summary th:nth-child(7)').hide(); + $('#supervisor-summary td:nth-child(8),#supervisor-summary th:nth-child(8)').hide(); + $('#supervisor-summary td:nth-child(9),#supervisor-summary th:nth-child(9)').hide(); + } + }); }); }); $.getJSON("/api/v1/cluster/configuration",function(response,status,jqXHR) { diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html index e2abab14f0f..b8dd1f9988b 100644 --- a/storm-core/src/ui/public/topology.html +++ b/storm-core/src/ui/public/topology.html @@ -59,7 +59,7 @@

Topology summary

-

Topology resources

+

Topology resources

@@ -293,6 +293,13 @@

Topology resources

searchForm.append(Mustache.render($(template).filter("#search-form-template").html(),{id: topologyId})); topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response)); topologyResources.append(Mustache.render($(template).filter("#topology-resources-template").html(),response)); + $.getJSON("/api/v1/cluster/configuration", function(json){ + var scheduler = json["storm.scheduler"]; + if (scheduler != "backtype.storm.scheduler.resource.ResourceAwareScheduler"){ + $('#topology-resources-header').hide(); + $('#topology-resources').hide(); + } + }); topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData)); topologyStats.append(Mustache.render($(template).filter("#topology-stats-template").html(),response)); //window, emitted, transferred, complete latency, acked, failed From 2d993c515ecde059ad986bf2c3a77cf463d75b74 Mon Sep 17 00:00:00 2001 From: zhuol Date: Fri, 13 Nov 2015 13:29:50 -0600 Subject: [PATCH 04/12] Using config to decide whether to show resource rather than hard-coded scheduler --- conf/defaults.yaml | 2 ++ storm-core/src/jvm/backtype/storm/Config.java | 9 +++++++++ storm-core/src/ui/public/index.html | 8 ++++---- storm-core/src/ui/public/topology.html | 4 ++-- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index b49857188fa..af5daa1220d 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -74,6 +74,8 @@ topology.max.replication.wait.time.sec: 60 nimbus.credential.renewers.freq.secs: 600 nimbus.impersonation.authorizer: "backtype.storm.security.auth.authorizer.ImpersonationAuthorizer" +scheduler.resource.display: false + ### ui.* configs are for the master ui.host: 0.0.0.0 ui.port: 8080 diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 2830f2f6f14..852757d4e08 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -173,6 +173,15 @@ public class Config extends HashMap { @isString public static final String STORM_SCHEDULER = "storm.scheduler"; + /** + * Whether we want to display all the resource capacity and scheduled usage on the UI page. + * We suggest to have this variable set if you are using any kind of resource-related scheduler. + * + * If this is not set, we will not display resource capacity and usage on the UI. + */ + @isBoolean + public static final String SCHEDULER_RESOURCE_DISPLAY = "scheduler.resource.display"; + /** * The mode this Storm cluster is running in. Either "distributed" or "local". */ diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html index 6e529236e37..d113e3091d5 100644 --- a/storm-core/src/ui/public/index.html +++ b/storm-core/src/ui/public/index.html @@ -144,8 +144,8 @@

Nimbus Configuration

}); $('#topology-summary [data-toggle="tooltip"]').tooltip(); $.getJSON("/api/v1/cluster/configuration", function(json){ - var scheduler = json["storm.scheduler"]; - if (scheduler != "backtype.storm.scheduler.resource.ResourceAwareScheduler"){ + var displayResource = json["scheduler.display.resource"]; + if (!displayResource){ $('#topology-summary td:nth-child(9),#topology-summary th:nth-child(9)').hide(); $('#topology-summary td:nth-child(10),#topology-summary th:nth-child(10)').hide(); } @@ -164,8 +164,8 @@

Nimbus Configuration

}); $('#supervisor-summary [data-toggle="tooltip"]').tooltip(); $.getJSON("/api/v1/cluster/configuration", function(json){ - var scheduler = json["storm.scheduler"]; - if (scheduler != "backtype.storm.scheduler.resource.ResourceAwareScheduler"){ + var displayResource = json["scheduler.display.resource"]; + if (!displayResource){ $('#supervisor-summary td:nth-child(6),#supervisor-summary th:nth-child(6)').hide(); $('#supervisor-summary td:nth-child(7),#supervisor-summary th:nth-child(7)').hide(); $('#supervisor-summary td:nth-child(8),#supervisor-summary th:nth-child(8)').hide(); diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html index b8dd1f9988b..b06219635d7 100644 --- a/storm-core/src/ui/public/topology.html +++ b/storm-core/src/ui/public/topology.html @@ -294,8 +294,8 @@

Topology resources

topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response)); topologyResources.append(Mustache.render($(template).filter("#topology-resources-template").html(),response)); $.getJSON("/api/v1/cluster/configuration", function(json){ - var scheduler = json["storm.scheduler"]; - if (scheduler != "backtype.storm.scheduler.resource.ResourceAwareScheduler"){ + var displayResource = json["scheduler.resource.display"]; + if (!displayResource){ $('#topology-resources-header').hide(); $('#topology-resources').hide(); } From ce7085bafe0f75a3183977414f2321fdd1b55c94 Mon Sep 17 00:00:00 2001 From: zhuol Date: Mon, 16 Nov 2015 14:11:55 -0600 Subject: [PATCH 05/12] Calculate memory usage of each topology and supervisor for non-RAS schedulers Minor change before thrift-9.3 --- conf/defaults.yaml | 2 +- .../src/clj/backtype/storm/daemon/nimbus.clj | 4 +- storm-core/src/clj/backtype/storm/ui/core.clj | 3 +- storm-core/src/jvm/backtype/storm/Config.java | 2 +- .../storm/generated/SupervisorSummary.java | 4 +- .../jvm/backtype/storm/scheduler/Cluster.java | 75 ++++++++++++++++++- .../resource/ResourceAwareScheduler.java | 4 +- .../src/jvm/backtype/storm/utils/Utils.java | 38 ++++++++++ storm-core/src/py/storm/ttypes.py | 4 +- storm-core/src/ui/public/index.html | 2 - .../templates/topology-page-template.html | 6 ++ storm-core/src/ui/public/topology.html | 2 +- 12 files changed, 132 insertions(+), 14 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index af5daa1220d..b520745c1d6 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -74,7 +74,7 @@ topology.max.replication.wait.time.sec: 60 nimbus.credential.renewers.freq.secs: 600 nimbus.impersonation.authorizer: "backtype.storm.security.auth.authorizer.ImpersonationAuthorizer" -scheduler.resource.display: false +scheduler.display.resource: false ### ui.* configs are for the master ui.host: 0.0.0.0 diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 87b5a8bd9ff..176ce6b1e35 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -745,9 +745,11 @@ ;; call scheduler.schedule to schedule all the topologies ;; the new assignments for all the topologies are in the cluster object. _ (.schedule (:scheduler nimbus) topologies cluster) + _ (.setResourcesMap cluster @(:id->resources nimbus)) + _ (if-not (conf SCHEDULER-RESOURCE-DISPLAY) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies)) _ (reset! (:id->sched-status nimbus) (.getStatusMap cluster)) _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster)) - _ (reset! (:id->resources nimbus) (merge @(:id->resources nimbus) (.getResourcesMap cluster)))] + _ (reset! (:id->resources nimbus) (.getResourcesMap cluster))] (.getAssignments cluster))) (defn changed-executors [executor->node+port new-executor->node+port] diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 64dc1b91a55..480dcd3bc88 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -464,7 +464,7 @@ "schedulerInfo" (.get_sched_status t) "requestedMemOnHeap" (.get_requested_memonheap t) "requestedMemOffHeap" (.get_requested_memoffheap t) - "requestedMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t)) + "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t)) "requestedCpu" (.get_requested_cpu t) "assignedMemOnHeap" (.get_assigned_memonheap t) "assignedMemOffHeap" (.get_assigned_memoffheap t) @@ -592,6 +592,7 @@ "requestedCpu" (.get_requested_cpu topo-info) "assignedMemOnHeap" (.get_assigned_memonheap topo-info) "assignedMemOffHeap" (.get_assigned_memoffheap topo-info) + "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info)) "assignedCpu" (.get_assigned_cpu topo-info) "topologyStats" topo-stats "spouts" (map (partial comp-agg-stats-json id secure?) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 852757d4e08..afa200b774b 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -180,7 +180,7 @@ public class Config extends HashMap { * If this is not set, we will not display resource capacity and usage on the UI. */ @isBoolean - public static final String SCHEDULER_RESOURCE_DISPLAY = "scheduler.resource.display"; + public static final String SCHEDULER_RESOURCE_DISPLAY = "scheduler.display.resource"; /** * The mode this Storm cluster is running in. Either "distributed" or "local". diff --git a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java index 88202d8bbed..edb201691c4 100644 --- a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java @@ -583,10 +583,10 @@ public Object getFieldValue(_Fields field) { return get_total_resources(); case USED_MEM: - return Double.valueOf(get_used_mem()); + return get_used_mem(); case USED_CPU: - return Double.valueOf(get_used_cpu()); + return get_used_cpu(); } throw new IllegalStateException(); diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java index c4a815fce18..ff08ad1690d 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java @@ -466,6 +466,75 @@ public Map> getNetworkTopography() { return networkTopography; } + /* + * Get heap memory usage for a worker's main process and logwriter process + * */ + private Double getAssignedMemoryForSlot(Map topConf) { + Double totalWorkerMemory = 0.0; + + String topology_worker_childopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null); + String worker_childopts = Utils.getString(topConf.get(Config.WORKER_CHILDOPTS), null); + Double mem_topology_worker_childopts = Utils.parseWorkerChildOpts(topology_worker_childopts, null); + Double mem_worker_childopts = Utils.parseWorkerChildOpts(worker_childopts, null); + + if (mem_topology_worker_childopts != null) { + totalWorkerMemory += mem_topology_worker_childopts; + } else if (mem_worker_childopts != null) { + totalWorkerMemory += mem_worker_childopts; + } else { + totalWorkerMemory += Utils.getInt(topConf.get(Config.WORKER_HEAP_MEMORY_MB)); + } + + String topology_worker_lw_childiopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null); + if (topology_worker_lw_childiopts != null) { + totalWorkerMemory += Utils.parseWorkerChildOpts(topology_worker_lw_childiopts, 0.0); + } + return totalWorkerMemory; + } + + /* + * Update memory usage for each topology and each supervisor node after every round of scheduling + * */ + public void updateAssignedMemoryForTopologyAndSupervisor(Topologies topologies) { + Map supervisorToAssignedMem = new HashMap(); + + for (Map.Entry entry : this.getAssignments().entrySet()) { + String topId = entry.getValue().getTopologyId(); + Map topConf = topologies.getById(topId).getConf(); + Double assignedMemForTopology = 0.0; + Double assignedMemPerSlot = getAssignedMemoryForSlot(topConf); + for (WorkerSlot ws: entry.getValue().getSlots()) { + assignedMemForTopology += assignedMemPerSlot; + String nodeId = ws.getNodeId(); + if (supervisorToAssignedMem.containsKey(nodeId)) { + supervisorToAssignedMem.put(nodeId, supervisorToAssignedMem.get(nodeId) + assignedMemPerSlot); + } else { + supervisorToAssignedMem.put(nodeId, assignedMemPerSlot); + } + } + if (this.getResourcesMap().containsKey(topId)) { + Double[] topo_resources = getResourcesMap().get(topId); + topo_resources[3] = assignedMemForTopology; + } else { + Double[] topo_resources = {0.0, 0.0, 0.0, 0.0, 0.0, 0.0}; + topo_resources[3] = assignedMemForTopology; + this.setResources(topId, topo_resources); + } + } + + for (Map.Entry entry : supervisorToAssignedMem.entrySet()) { + String nodeId = entry.getKey(); + if (this.supervisorsResources.containsKey(nodeId)) { + Double[] supervisor_resources = supervisorsResources.get(nodeId); + supervisor_resources[2] = entry.getValue(); + } else { + Double[] supervisor_resources = {0.0, 0.0, 0.0, 0.0}; + supervisor_resources[2] = entry.getValue(); + this.supervisorsResources.put(nodeId, supervisor_resources); + } + } + } + public void setStatus(String topologyId, String status) { this.status.put(topologyId, status); } @@ -478,11 +547,15 @@ public void setResources(String topologyId, Double[] resources) { this.resources.put(topologyId, resources); } + public void setResourcesMap(Map topologies_resources) { + this.resources.putAll(topologies_resources); + } + public Map getResourcesMap() { return this.resources; } - public void setSupervisorsResources(Map supervisors_resources) { + public void setSupervisorsResourcesMap(Map supervisors_resources) { this.supervisorsResources.putAll(supervisors_resources); } diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java index ad33b43b95e..ac34d446112 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java @@ -102,7 +102,7 @@ public void schedule(Topologies topologies, Cluster cluster) { Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}; LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " + - "assigned on-heap mem, off-heap mem, cpu: {} {} {}", + "assigned on-heap mem, off-heap mem, cpu: {} {} {}", td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu); cluster.setResources(td.getId(), resources); @@ -125,7 +125,7 @@ private void updateSupervisorsResources(Cluster cluster, Topologies topologies) Double[] resources = {totalMem, totalCpu, usedMem, usedCpu}; supervisors_resources.put(entry.getKey(), resources); } - cluster.setSupervisorsResources(supervisors_resources); + cluster.setSupervisorsResourcesMap(supervisors_resources); } private Map getUserConf() { diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 00af36736fa..91fdb091f1a 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -69,6 +69,8 @@ import java.util.HashMap; import java.util.TreeMap; import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; @@ -503,6 +505,18 @@ public static boolean getBoolean(Object o, boolean defaultValue) { } } + public static String getString(Object o, String defaultValue) { + if (null == o) { + return defaultValue; + } + + if (o instanceof String) { + return (String) o; + } else { + throw new IllegalArgumentException("Don't know how to convert " + o + " + to String"); + } + } + public static long secureRandomLong() { return UUID.randomUUID().getLeastSignificantBits(); } @@ -763,6 +777,30 @@ public static long zipFileSize(File myFile) throws IOException{ public static double zeroIfNaNOrInf(double x) { return (Double.isNaN(x) || Double.isInfinite(x)) ? 0.0 : x; + + /** + * parses the arguments to extract jvm heap memory size. + * @param input + * @param defaultValue + * @return the value of the JVM heap memory setting in a java command. + */ + public static Double parseWorkerChildOpts(String input, Double defaultValue) { + if(input != null) { + Pattern optsPattern = Pattern.compile("Xmx[0-9]+m"); + Matcher m = optsPattern.matcher(input); + String memoryOpts = null; + while (m.find()) { + memoryOpts = m.group(); + } + if(memoryOpts!=null) { + memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", ""); + return Double.parseDouble(memoryOpts); + } else { + return defaultValue; + } + } else { + return defaultValue; + } } } diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py index ea3c768f100..4694ba2f445 100644 --- a/storm-core/src/py/storm/ttypes.py +++ b/storm-core/src/py/storm/ttypes.py @@ -2590,12 +2590,12 @@ def read(self, iprot): iprot.skip(ftype) elif fid == 8: if ftype == TType.DOUBLE: - self.used_mem = iprot.readDouble(); + self.used_mem = iprot.readDouble() else: iprot.skip(ftype) elif fid == 9: if ftype == TType.DOUBLE: - self.used_cpu = iprot.readDouble(); + self.used_cpu = iprot.readDouble() else: iprot.skip(ftype) else: diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html index d113e3091d5..454fe643d52 100644 --- a/storm-core/src/ui/public/index.html +++ b/storm-core/src/ui/public/index.html @@ -146,7 +146,6 @@

Nimbus Configuration

$.getJSON("/api/v1/cluster/configuration", function(json){ var displayResource = json["scheduler.display.resource"]; if (!displayResource){ - $('#topology-summary td:nth-child(9),#topology-summary th:nth-child(9)').hide(); $('#topology-summary td:nth-child(10),#topology-summary th:nth-child(10)').hide(); } }); @@ -167,7 +166,6 @@

Nimbus Configuration

var displayResource = json["scheduler.display.resource"]; if (!displayResource){ $('#supervisor-summary td:nth-child(6),#supervisor-summary th:nth-child(6)').hide(); - $('#supervisor-summary td:nth-child(7),#supervisor-summary th:nth-child(7)').hide(); $('#supervisor-summary td:nth-child(8),#supervisor-summary th:nth-child(8)').hide(); $('#supervisor-summary td:nth-child(9),#supervisor-summary th:nth-child(9)').hide(); } diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html index 02978399ba0..1f81f1b8e3e 100644 --- a/storm-core/src/ui/public/templates/topology-page-template.html +++ b/storm-core/src/ui/public/templates/topology-page-template.html @@ -63,6 +63,11 @@ Replication count
+ + + Assigned Mem (MB) + + Scheduler Info @@ -81,6 +86,7 @@ {{executorsTotal}} {{tasksTotal}} {{replicationCount}} + {{assignedTotalMem}} {{schedulerInfo}} diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html index b06219635d7..c5b86844fb9 100644 --- a/storm-core/src/ui/public/topology.html +++ b/storm-core/src/ui/public/topology.html @@ -294,7 +294,7 @@

Topology resources

topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response)); topologyResources.append(Mustache.render($(template).filter("#topology-resources-template").html(),response)); $.getJSON("/api/v1/cluster/configuration", function(json){ - var displayResource = json["scheduler.resource.display"]; + var displayResource = json["scheduler.display.resource"]; if (!displayResource){ $('#topology-resources-header').hide(); $('#topology-resources').hide(); From 89fff440819836c78762b1c27fae8b005a752746 Mon Sep 17 00:00:00 2001 From: zhuol Date: Tue, 17 Nov 2015 16:03:28 -0600 Subject: [PATCH 06/12] Minor --- storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 2 +- storm-core/src/jvm/backtype/storm/Config.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 176ce6b1e35..71d46544f27 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -746,7 +746,7 @@ ;; the new assignments for all the topologies are in the cluster object. _ (.schedule (:scheduler nimbus) topologies cluster) _ (.setResourcesMap cluster @(:id->resources nimbus)) - _ (if-not (conf SCHEDULER-RESOURCE-DISPLAY) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies)) + _ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies)) _ (reset! (:id->sched-status nimbus) (.getStatusMap cluster)) _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster)) _ (reset! (:id->resources nimbus) (.getResourcesMap cluster))] diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index afa200b774b..64476a0c8e4 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -180,7 +180,7 @@ public class Config extends HashMap { * If this is not set, we will not display resource capacity and usage on the UI. */ @isBoolean - public static final String SCHEDULER_RESOURCE_DISPLAY = "scheduler.display.resource"; + public static final String SCHEDULER_DISPLAY_RESOURCE = "scheduler.display.resource"; /** * The mode this Storm cluster is running in. Either "distributed" or "local". From 515722dfdee7f5881b219f81263064b16d2f38a6 Mon Sep 17 00:00:00 2001 From: zhuol Date: Wed, 18 Nov 2015 13:50:57 -0600 Subject: [PATCH 07/12] Add schedulerDisplayResource REST API, which the resource hiding/display is based Add one rest for topo/id, delete the last REST getJSON call --- docs/documentation/ui-rest-api.md | 4 ++++ storm-core/src/clj/backtype/storm/ui/core.clj | 9 ++++++--- storm-core/src/ui/public/index.html | 14 -------------- .../ui/public/templates/index-page-template.html | 12 ++++++++++++ storm-core/src/ui/public/topology.html | 12 +++++------- 5 files changed, 27 insertions(+), 24 deletions(-) diff --git a/docs/documentation/ui-rest-api.md b/docs/documentation/ui-rest-api.md index f2d36a91598..1fabcf5e103 100644 --- a/docs/documentation/ui-rest-api.md +++ b/docs/documentation/ui-rest-api.md @@ -143,6 +143,7 @@ Sample response: "usedCPU": 160 } ] + "schedulerDisplayResource": true } ``` @@ -252,6 +253,7 @@ Sample response: "assignedCpu": 80 } ] + "schedulerDisplayResource": true } ``` @@ -315,6 +317,7 @@ Response fields: |executorsTotal| Integer |Number of executors used for this topology| |msgTimeout| Integer | Number of seconds a tuple has before the spout considers it failed | |windowHint| String | window param value in "hh mm ss" format. Default value is "All Time"| +|schedulerDisplayResource| Boolean | Whether to display scheduler resource information| |topologyStats| Array | Array of all the topology related stats per time window| |topologyStats.windowPretty| String |Duration passed in HH:MM:SS format| |topologyStats.window| String |User requested time window for metrics| @@ -373,6 +376,7 @@ Sample response: "uptimeSeconds": 1759, "msgTimeout": 30, "windowHint": "10m 0s", + "schedulerDisplayResource": true, "topologyStats": [ { "windowPretty": "10m 0s", diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 480dcd3bc88..cc6632a2a15 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -438,7 +438,8 @@ "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY) "usedMem" (.get_used_mem s) "usedCpu" (.get_used_cpu s) - "version" (.get_version s)})})) + "version" (.get_version s)}) + "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})) (defn all-topologies-summary ([] @@ -469,7 +470,8 @@ "assignedMemOnHeap" (.get_assigned_memonheap t) "assignedMemOffHeap" (.get_assigned_memoffheap t) "assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t)) - "assignedCpu" (.get_assigned_cpu t)})})) + "assignedCpu" (.get_assigned_cpu t)}) + "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})) (defn topology-stats [window stats] (let [times (stats-times (:emitted stats)) @@ -633,7 +635,8 @@ "windowHint" window-hint "msgTimeout" msg-timeout "configuration" topology-conf - "visualizationTable" []})))) + "visualizationTable" [] + "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})))) (defn component-errors [errors-list topology-id secure?] diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html index 454fe643d52..eae423cea9b 100644 --- a/storm-core/src/ui/public/index.html +++ b/storm-core/src/ui/public/index.html @@ -143,12 +143,6 @@

Nimbus Configuration

] }); $('#topology-summary [data-toggle="tooltip"]').tooltip(); - $.getJSON("/api/v1/cluster/configuration", function(json){ - var displayResource = json["scheduler.display.resource"]; - if (!displayResource){ - $('#topology-summary td:nth-child(10),#topology-summary th:nth-child(10)').hide(); - } - }); }); }); $.getJSON("/api/v1/supervisor/summary",function(response,status,jqXHR) { @@ -162,14 +156,6 @@

Nimbus Configuration

] }); $('#supervisor-summary [data-toggle="tooltip"]').tooltip(); - $.getJSON("/api/v1/cluster/configuration", function(json){ - var displayResource = json["scheduler.display.resource"]; - if (!displayResource){ - $('#supervisor-summary td:nth-child(6),#supervisor-summary th:nth-child(6)').hide(); - $('#supervisor-summary td:nth-child(8),#supervisor-summary th:nth-child(8)').hide(); - $('#supervisor-summary td:nth-child(9),#supervisor-summary th:nth-child(9)').hide(); - } - }); }); }); $.getJSON("/api/v1/cluster/configuration",function(response,status,jqXHR) { diff --git a/storm-core/src/ui/public/templates/index-page-template.html b/storm-core/src/ui/public/templates/index-page-template.html index bf1f1081fe4..51733562df4 100644 --- a/storm-core/src/ui/public/templates/index-page-template.html +++ b/storm-core/src/ui/public/templates/index-page-template.html @@ -154,11 +154,13 @@ Assigned Mem (MB)
+ {{#schedulerDisplayResource}} Assigned CPU (%) + {{/schedulerDisplayResource}} Scheduler Info @@ -178,7 +180,9 @@ {{tasksTotal}} {{replicationCount}} {{assignedTotalMem}} + {{#schedulerDisplayResource}} {{assignedCpu}} + {{/schedulerDisplayResource}} {{schedulerInfo}} {{/topologies}} @@ -214,16 +218,19 @@ Used slots + {{#schedulerDisplayResource}} Total Mem (MB) + {{/schedulerDisplayResource}} Used Mem (MB) + {{#schedulerDisplayResource}} Total CPU (%) @@ -234,6 +241,7 @@ Used CPU (%) + {{/schedulerDisplayResource}} Version @@ -249,10 +257,14 @@ {{uptime}} {{slotsTotal}} {{slotsUsed}} + {{#schedulerDisplayResource}} {{totalMem}} + {{/schedulerDisplayResource}} {{usedMem}} + {{#schedulerDisplayResource}} {{totalCpu}} {{usedCpu}} + {{/schedulerDisplayResource}} {{version}} {{/supervisors}} diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html index c5b86844fb9..5869d9a9a9b 100644 --- a/storm-core/src/ui/public/topology.html +++ b/storm-core/src/ui/public/topology.html @@ -293,13 +293,11 @@

Topology resources

searchForm.append(Mustache.render($(template).filter("#search-form-template").html(),{id: topologyId})); topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response)); topologyResources.append(Mustache.render($(template).filter("#topology-resources-template").html(),response)); - $.getJSON("/api/v1/cluster/configuration", function(json){ - var displayResource = json["scheduler.display.resource"]; - if (!displayResource){ - $('#topology-resources-header').hide(); - $('#topology-resources').hide(); - } - }); + var displayResource = response["schedulerDisplayResource"]; + if (!displayResource){ + $('#topology-resources-header').hide(); + $('#topology-resources').hide(); + } topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData)); topologyStats.append(Mustache.render($(template).filter("#topology-stats-template").html(),response)); //window, emitted, transferred, complete latency, acked, failed From 71a5b1a656b099748ddfb87b6c78a26d25146904 Mon Sep 17 00:00:00 2001 From: zhuol Date: Thu, 19 Nov 2015 15:38:34 -0600 Subject: [PATCH 08/12] Minor fix --- storm-core/src/jvm/backtype/storm/utils/Utils.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 91fdb091f1a..9400c444f50 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -777,6 +777,7 @@ public static long zipFileSize(File myFile) throws IOException{ public static double zeroIfNaNOrInf(double x) { return (Double.isNaN(x) || Double.isInfinite(x)) ? 0.0 : x; + } /** * parses the arguments to extract jvm heap memory size. @@ -785,14 +786,14 @@ public static double zeroIfNaNOrInf(double x) { * @return the value of the JVM heap memory setting in a java command. */ public static Double parseWorkerChildOpts(String input, Double defaultValue) { - if(input != null) { + if (input != null) { Pattern optsPattern = Pattern.compile("Xmx[0-9]+m"); Matcher m = optsPattern.matcher(input); String memoryOpts = null; while (m.find()) { memoryOpts = m.group(); } - if(memoryOpts!=null) { + if(memoryOpts != null) { memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", ""); return Double.parseDouble(memoryOpts); } else { From bac4a053687f1ba8c806f6fe20b9d47f3fba2407 Mon Sep 17 00:00:00 2001 From: zhuol Date: Fri, 20 Nov 2015 13:05:35 -0600 Subject: [PATCH 09/12] Use camel case to be consistent --- .../jvm/backtype/storm/scheduler/Cluster.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java index ff08ad1690d..571ee1bdf1e 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java @@ -425,7 +425,7 @@ public Map getAssignments() { for (String topologyId : this.assignments.keySet()) { ret.put(topologyId, this.assignments.get(topologyId)); } - + return ret; } @@ -472,22 +472,22 @@ public Map> getNetworkTopography() { private Double getAssignedMemoryForSlot(Map topConf) { Double totalWorkerMemory = 0.0; - String topology_worker_childopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null); - String worker_childopts = Utils.getString(topConf.get(Config.WORKER_CHILDOPTS), null); - Double mem_topology_worker_childopts = Utils.parseWorkerChildOpts(topology_worker_childopts, null); - Double mem_worker_childopts = Utils.parseWorkerChildOpts(worker_childopts, null); + String topologyWorkerChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null); + String workerChildopts = Utils.getString(topConf.get(Config.WORKER_CHILDOPTS), null); + Double memTopologyWorkerChildopts = Utils.parseWorkerChildOpts(topologyWorkerChildopts, null); + Double memWorkerChildopts = Utils.parseWorkerChildOpts(workerChildopts, null); - if (mem_topology_worker_childopts != null) { - totalWorkerMemory += mem_topology_worker_childopts; - } else if (mem_worker_childopts != null) { - totalWorkerMemory += mem_worker_childopts; + if (memTopologyWorkerChildopts != null) { + totalWorkerMemory += memTopologyWorkerChildopts; + } else if (memWorkerChildopts != null) { + totalWorkerMemory += memWorkerChildopts; } else { totalWorkerMemory += Utils.getInt(topConf.get(Config.WORKER_HEAP_MEMORY_MB)); } - String topology_worker_lw_childiopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null); - if (topology_worker_lw_childiopts != null) { - totalWorkerMemory += Utils.parseWorkerChildOpts(topology_worker_lw_childiopts, 0.0); + String topoWorkerLwChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null); + if (topoWorkerLwChildopts != null) { + totalWorkerMemory += Utils.parseWorkerChildOpts(topoWorkerLwChildopts, 0.0); } return totalWorkerMemory; } From 2e56ede5d00af774796344be6581af99076a9590 Mon Sep 17 00:00:00 2001 From: zhuol Date: Fri, 20 Nov 2015 14:45:40 -0600 Subject: [PATCH 10/12] Able to solve mkgMKG and list of strings --- .../jvm/backtype/storm/scheduler/Cluster.java | 6 ++-- .../src/jvm/backtype/storm/utils/Utils.java | 29 ++++++++++++++----- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java index 571ee1bdf1e..a8ff2ab81c0 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java @@ -474,8 +474,8 @@ private Double getAssignedMemoryForSlot(Map topConf) { String topologyWorkerChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null); String workerChildopts = Utils.getString(topConf.get(Config.WORKER_CHILDOPTS), null); - Double memTopologyWorkerChildopts = Utils.parseWorkerChildOpts(topologyWorkerChildopts, null); - Double memWorkerChildopts = Utils.parseWorkerChildOpts(workerChildopts, null); + Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(topologyWorkerChildopts, null); + Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(workerChildopts, null); if (memTopologyWorkerChildopts != null) { totalWorkerMemory += memTopologyWorkerChildopts; @@ -487,7 +487,7 @@ private Double getAssignedMemoryForSlot(Map topConf) { String topoWorkerLwChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null); if (topoWorkerLwChildopts != null) { - totalWorkerMemory += Utils.parseWorkerChildOpts(topoWorkerLwChildopts, 0.0); + totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(topoWorkerLwChildopts, 0.0); } return totalWorkerMemory; } diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 9400c444f50..766b10f099b 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -509,9 +509,15 @@ public static String getString(Object o, String defaultValue) { if (null == o) { return defaultValue; } - if (o instanceof String) { return (String) o; + } else if (o instanceof List) { + StringBuilder sb = new StringBuilder(); + for (String s : (List) o) { + sb.append(s); + sb.append(" "); + } + return sb.toString(); } else { throw new IllegalArgumentException("Don't know how to convert " + o + " + to String"); } @@ -780,22 +786,31 @@ public static double zeroIfNaNOrInf(double x) { } /** - * parses the arguments to extract jvm heap memory size. + * parses the arguments to extract jvm heap memory size in MB. * @param input * @param defaultValue - * @return the value of the JVM heap memory setting in a java command. + * @return the value of the JVM heap memory setting (in MB) in a java command. */ - public static Double parseWorkerChildOpts(String input, Double defaultValue) { + public static Double parseJvmHeapMemByChildOpts(String input, Double defaultValue) { if (input != null) { - Pattern optsPattern = Pattern.compile("Xmx[0-9]+m"); + Pattern optsPattern = Pattern.compile("Xmx[0-9]+[mkgMKG]"); Matcher m = optsPattern.matcher(input); String memoryOpts = null; while (m.find()) { memoryOpts = m.group(); } - if(memoryOpts != null) { + if (memoryOpts != null) { + int unit = 1; + if (memoryOpts.toLowerCase().endsWith("k")) { + unit = 1024; + } else if (memoryOpts.toLowerCase().endsWith("m")) { + unit = 1024 * 1024; + } else if (memoryOpts.toLowerCase().endsWith("g")) { + unit = 1024 * 1024 * 1024; + } memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", ""); - return Double.parseDouble(memoryOpts); + Double result = Double.parseDouble(memoryOpts) * unit / 1024.0 / 1024.0; + return (result < 1.0) ? 1.0 : result; } else { return defaultValue; } From ca21dc7d615e1dae1cd4c0f692c3b5f561688347 Mon Sep 17 00:00:00 2001 From: zhuol Date: Fri, 20 Nov 2015 15:20:12 -0600 Subject: [PATCH 11/12] Move pass List to String away from Utils --- .../jvm/backtype/storm/scheduler/Cluster.java | 31 +++++++++++++++++-- .../src/jvm/backtype/storm/utils/Utils.java | 7 ----- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java index a8ff2ab81c0..f4410ad374d 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java @@ -466,15 +466,35 @@ public Map> getNetworkTopography() { return networkTopography; } + private String getStringFromStringList(Object o) { + StringBuilder sb = new StringBuilder(); + for (String s : (List) o) { + sb.append(s); + sb.append(" "); + } + return sb.toString(); + } + /* * Get heap memory usage for a worker's main process and logwriter process * */ private Double getAssignedMemoryForSlot(Map topConf) { Double totalWorkerMemory = 0.0; - String topologyWorkerChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null); - String workerChildopts = Utils.getString(topConf.get(Config.WORKER_CHILDOPTS), null); + String topologyWorkerChildopts = null; + if (topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS) instanceof List) { + topologyWorkerChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS)); + } else { + topologyWorkerChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null); + } Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(topologyWorkerChildopts, null); + + String workerChildopts = null; + if (topConf.get(Config.WORKER_CHILDOPTS) instanceof List) { + workerChildopts = getStringFromStringList(topConf.get(Config.WORKER_CHILDOPTS)); + } else { + workerChildopts = Utils.getString(topConf.get(Config.WORKER_CHILDOPTS), null); + } Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(workerChildopts, null); if (memTopologyWorkerChildopts != null) { @@ -485,7 +505,12 @@ private Double getAssignedMemoryForSlot(Map topConf) { totalWorkerMemory += Utils.getInt(topConf.get(Config.WORKER_HEAP_MEMORY_MB)); } - String topoWorkerLwChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null); + String topoWorkerLwChildopts = null; + if (topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS) instanceof List) { + topoWorkerLwChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); + } else { + topoWorkerLwChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null); + } if (topoWorkerLwChildopts != null) { totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(topoWorkerLwChildopts, 0.0); } diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 766b10f099b..b9a89f784f6 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -511,13 +511,6 @@ public static String getString(Object o, String defaultValue) { } if (o instanceof String) { return (String) o; - } else if (o instanceof List) { - StringBuilder sb = new StringBuilder(); - for (String s : (List) o) { - sb.append(s); - sb.append(" "); - } - return sb.toString(); } else { throw new IllegalArgumentException("Don't know how to convert " + o + " + to String"); } From e58d254102daff659250ee8adc0df68dabac6931 Mon Sep 17 00:00:00 2001 From: zhuol Date: Fri, 20 Nov 2015 15:33:13 -0600 Subject: [PATCH 12/12] Minor --- storm-core/src/jvm/backtype/storm/scheduler/Cluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java index f4410ad374d..ff2b233418a 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java @@ -507,7 +507,7 @@ private Double getAssignedMemoryForSlot(Map topConf) { String topoWorkerLwChildopts = null; if (topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS) instanceof List) { - topoWorkerLwChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); + topoWorkerLwChildopts = getStringFromStringList(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)); } else { topoWorkerLwChildopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null); }