From 6eedd8dccd2a3405d68de611c5da0645678645ac Mon Sep 17 00:00:00 2001 From: caofangkun Date: Tue, 27 Jan 2015 16:13:26 +0800 Subject: [PATCH] STORM-638:UI should show up process-id of the Worker to which an Executor is assigned --- storm-core/src/clj/backtype/storm/cluster.clj | 1 + .../src/clj/backtype/storm/daemon/nimbus.clj | 1 + .../src/clj/backtype/storm/daemon/worker.clj | 7 +- storm-core/src/clj/backtype/storm/ui/core.clj | 2 + storm-core/src/genthrift.sh | 2 +- .../storm/generated/ExecutorSummary.java | 104 +++++++++++++++++- storm-core/src/storm.thrift | 3 +- .../templates/component-page-template.html | 6 + 8 files changed, 116 insertions(+), 10 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 8ead7108879..b33c42df3b6 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -248,6 +248,7 @@ (if (contains? executor-stats t) {t {:time-secs (:time-secs worker-hb) :uptime (:uptime worker-hb) + :process-id (:process-id worker-hb) :stats (get executor-stats t)}}))) (into {})))) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index b2cb96ac7f9..2467178a65f 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -1280,6 +1280,7 @@ (-> executor first task->component) host port + (Integer. (:process-id heartbeat)) (nil-to-zero (:uptime heartbeat))) (.set_stats stats)) )) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index e717ce4112b..833bf5fa610 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -54,6 +54,7 @@ :executor-stats stats :uptime ((:uptime worker)) :time-secs (current-time-secs) + :process-id (:process-id worker) }] ;; do the zookeeper heartbeat (.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb) @@ -191,7 +192,7 @@ ) :timer-name timer-name)) -(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state] +(defn worker-data [conf mq-context storm-id assignment-id port worker-id process-id storm-conf cluster-state storm-cluster-state] (let [assignment-versions (atom {}) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) @@ -215,6 +216,7 @@ :assignment-id assignment-id :port port :worker-id worker-id + :process-id process-id :cluster-state cluster-state :storm-cluster-state storm-cluster-state :storm-active-atom (atom false) @@ -393,7 +395,8 @@ subject (AuthUtils/populateSubject nil auto-creds initial-credentials)] (Subject/doAs subject (reify PrivilegedExceptionAction (run [this] - (let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state) + (let [process-id (process-pid) + worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id process-id storm-conf cluster-state storm-cluster-state) heartbeat-fn #(do-heartbeat worker) ;; do this here so that the worker process dies if this fails diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 10a11233a8f..73213a6e5ef 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -701,6 +701,7 @@ "uptime" (pretty-uptime-sec (.get_uptime_secs e)) "host" (.get_host e) "port" (.get_port e) + "processId" (.get_process_id e) "emitted" (nil-to-zero (:emitted stats)) "transferred" (nil-to-zero (:transferred stats)) "completeLatency" (float-str (:complete-latencies stats)) @@ -797,6 +798,7 @@ "uptime" (pretty-uptime-sec (.get_uptime_secs e)) "host" (.get_host e) "port" (.get_port e) + "processId" (.get_process_id e) "emitted" (nil-to-zero (:emitted stats)) "transferred" (nil-to-zero (:transferred stats)) "capacity" (float-str (nil-to-zero (compute-executor-capacity e))) diff --git a/storm-core/src/genthrift.sh b/storm-core/src/genthrift.sh index 50d5cb0af8a..32ee8f2e9d1 100644 --- a/storm-core/src/genthrift.sh +++ b/storm-core/src/genthrift.sh @@ -16,7 +16,7 @@ rm -rf gen-javabean gen-py py rm -rf jvm/backtype/storm/generated -thrift7 --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift +thrift --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift mv gen-javabean/backtype/storm/generated jvm/backtype/storm/generated mv gen-py py rm -rf gen-javabean diff --git a/storm-core/src/jvm/backtype/storm/generated/ExecutorSummary.java b/storm-core/src/jvm/backtype/storm/generated/ExecutorSummary.java index daa9a6e2ac3..feca470a545 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ExecutorSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/ExecutorSummary.java @@ -45,13 +45,15 @@ public class ExecutorSummary implements org.apache.thrift.TBase byName = new HashMap(); @@ -85,7 +88,9 @@ public static _Fields findByThriftId(int fieldId) { return HOST; case 4: // PORT return PORT; - case 5: // UPTIME_SECS + case 5: // PROCESS_ID + return PROCESS_ID; + case 6: // UPTIME_SECS return UPTIME_SECS; case 7: // STATS return STATS; @@ -130,8 +135,9 @@ public String getFieldName() { // isset id assignments private static final int __PORT_ISSET_ID = 0; - private static final int __UPTIME_SECS_ISSET_ID = 1; - private BitSet __isset_bit_vector = new BitSet(2); + private static final int __PROCESS_ID_ISSET_ID = 1; + private static final int __UPTIME_SECS_ISSET_ID = 2; + private BitSet __isset_bit_vector = new BitSet(3); public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -144,6 +150,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + tmpMap.put(_Fields.PROCESS_ID, new org.apache.thrift.meta_data.FieldMetaData("process_id", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.UPTIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("uptime_secs", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.STATS, new org.apache.thrift.meta_data.FieldMetaData("stats", org.apache.thrift.TFieldRequirementType.OPTIONAL, @@ -160,6 +168,7 @@ public ExecutorSummary( String component_id, String host, int port, + int process_id, int uptime_secs) { this(); @@ -168,6 +177,8 @@ public ExecutorSummary( this.host = host; this.port = port; set_port_isSet(true); + this.process_id = process_id; + set_process_id_isSet(true); this.uptime_secs = uptime_secs; set_uptime_secs_isSet(true); } @@ -188,6 +199,7 @@ public ExecutorSummary(ExecutorSummary other) { this.host = other.host; } this.port = other.port; + this.process_id = other.process_id; this.uptime_secs = other.uptime_secs; if (other.is_set_stats()) { this.stats = new ExecutorStats(other.stats); @@ -205,6 +217,8 @@ public void clear() { this.host = null; set_port_isSet(false); this.port = 0; + set_process_id_isSet(false); + this.process_id = 0; set_uptime_secs_isSet(false); this.uptime_secs = 0; this.stats = null; @@ -301,6 +315,28 @@ public void set_port_isSet(boolean value) { __isset_bit_vector.set(__PORT_ISSET_ID, value); } + public int get_process_id() { + return this.process_id; + } + + public void set_process_id(int process_id) { + this.process_id = process_id; + set_process_id_isSet(true); + } + + public void unset_process_id() { + __isset_bit_vector.clear(__PROCESS_ID_ISSET_ID); + } + + /** Returns true if field process_id is set (has been assigned a value) and false otherwise */ + public boolean is_set_process_id() { + return __isset_bit_vector.get(__PROCESS_ID_ISSET_ID); + } + + public void set_process_id_isSet(boolean value) { + __isset_bit_vector.set(__PROCESS_ID_ISSET_ID, value); + } + public int get_uptime_secs() { return this.uptime_secs; } @@ -380,6 +416,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case PROCESS_ID: + if (value == null) { + unset_process_id(); + } else { + set_process_id((Integer)value); + } + break; + case UPTIME_SECS: if (value == null) { unset_uptime_secs(); @@ -413,6 +457,9 @@ public Object getFieldValue(_Fields field) { case PORT: return Integer.valueOf(get_port()); + case PROCESS_ID: + return Integer.valueOf(get_process_id()); + case UPTIME_SECS: return Integer.valueOf(get_uptime_secs()); @@ -438,6 +485,8 @@ public boolean isSet(_Fields field) { return is_set_host(); case PORT: return is_set_port(); + case PROCESS_ID: + return is_set_process_id(); case UPTIME_SECS: return is_set_uptime_secs(); case STATS: @@ -495,6 +544,15 @@ public boolean equals(ExecutorSummary that) { return false; } + boolean this_present_process_id = true; + boolean that_present_process_id = true; + if (this_present_process_id || that_present_process_id) { + if (!(this_present_process_id && that_present_process_id)) + return false; + if (this.process_id != that.process_id) + return false; + } + boolean this_present_uptime_secs = true; boolean that_present_uptime_secs = true; if (this_present_uptime_secs || that_present_uptime_secs) { @@ -540,6 +598,11 @@ public int hashCode() { if (present_port) builder.append(port); + boolean present_process_id = true; + builder.append(present_process_id); + if (present_process_id) + builder.append(process_id); + boolean present_uptime_secs = true; builder.append(present_uptime_secs); if (present_uptime_secs) @@ -601,6 +664,16 @@ public int compareTo(ExecutorSummary other) { return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_process_id()).compareTo(typedOther.is_set_process_id()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_process_id()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.process_id, typedOther.process_id); + if (lastComparison != 0) { + return lastComparison; + } + } lastComparison = Boolean.valueOf(is_set_uptime_secs()).compareTo(typedOther.is_set_uptime_secs()); if (lastComparison != 0) { return lastComparison; @@ -668,7 +741,15 @@ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.t org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } break; - case 5: // UPTIME_SECS + case 5: // PROCESS_ID + if (field.type == org.apache.thrift.protocol.TType.I32) { + this.process_id = iprot.readI32(); + set_process_id_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 6: // UPTIME_SECS if (field.type == org.apache.thrift.protocol.TType.I32) { this.uptime_secs = iprot.readI32(); set_uptime_secs_isSet(true); @@ -715,6 +796,9 @@ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache. oprot.writeFieldBegin(PORT_FIELD_DESC); oprot.writeI32(this.port); oprot.writeFieldEnd(); + oprot.writeFieldBegin(PROCESS_ID_FIELD_DESC); + oprot.writeI32(this.process_id); + oprot.writeFieldEnd(); oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC); oprot.writeI32(this.uptime_secs); oprot.writeFieldEnd(); @@ -762,6 +846,10 @@ public String toString() { sb.append(this.port); first = false; if (!first) sb.append(", "); + sb.append("process_id:"); + sb.append(this.process_id); + first = false; + if (!first) sb.append(", "); sb.append("uptime_secs:"); sb.append(this.uptime_secs); first = false; @@ -797,6 +885,10 @@ public void validate() throws org.apache.thrift.TException { throw new org.apache.thrift.protocol.TProtocolException("Required field 'port' is unset! Struct:" + toString()); } + if (!is_set_process_id()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'process_id' is unset! Struct:" + toString()); + } + if (!is_set_uptime_secs()) { throw new org.apache.thrift.protocol.TProtocolException("Required field 'uptime_secs' is unset! Struct:" + toString()); } diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index f807b743e86..59d4bfd3409 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -205,7 +205,8 @@ struct ExecutorSummary { 2: required string component_id; 3: required string host; 4: required i32 port; - 5: required i32 uptime_secs; + 5: required i32 process_id; + 6: required i32 uptime_secs; 7: optional ExecutorStats stats; } diff --git a/storm-core/src/ui/public/templates/component-page-template.html b/storm-core/src/ui/public/templates/component-page-template.html index 1e916e37458..42b7a67ee00 100644 --- a/storm-core/src/ui/public/templates/component-page-template.html +++ b/storm-core/src/ui/public/templates/component-page-template.html @@ -177,6 +177,11 @@

Executors ({{windowHint}})

Port + + + ProcessId + + Emitted @@ -211,6 +216,7 @@

Executors ({{windowHint}})

{{uptime}} {{host}} {{port}} + {{processId}} {{emitted}} {{transferred}} {{completeLatency}}