From 1bdfd7e1f04194f7aa88ba2537f41a08409ce344 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 28 Oct 2015 16:18:08 -0500 Subject: [PATCH] STORM-1145: Have IConnection push tuples instead of pull them. Move deserialization to the IConnectionCallback so only Tuple batches are flowing through the system, not unserialized byte arrays. --- .../clj/backtype/storm/daemon/executor.clj | 60 +++---- .../src/clj/backtype/storm/daemon/worker.clj | 59 ++++--- .../clj/backtype/storm/messaging/loader.clj | 76 ++------ .../clj/backtype/storm/messaging/local.clj | 72 +------- storm-core/src/jvm/backtype/storm/Config.java | 7 - .../storm/messaging/AddressedTuple.java | 46 +++++ .../DeserializingConnectionCallback.java | 60 +++++++ .../backtype/storm/messaging/IConnection.java | 10 +- .../storm/messaging/IConnectionCallback.java | 31 ++++ .../storm/messaging/local/Context.java | 164 ++++++++++++++++++ .../storm/messaging/netty/Client.java | 3 +- .../storm/messaging/netty/Server.java | 121 ++----------- .../backtype/storm/tuple/AddressedTuple.java | 48 +++++ .../storm/messaging/netty_unit_test.clj | 122 +++++++------ .../clj/backtype/storm/messaging_test.clj | 25 --- 15 files changed, 506 insertions(+), 398 deletions(-) create mode 100644 storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java create mode 100644 storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java create mode 100644 storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java create mode 100644 storm-core/src/jvm/backtype/storm/messaging/local/Context.java create mode 100644 storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj index 0390987c888..9687cddc0cf 100644 --- a/storm-core/src/clj/backtype/storm/daemon/executor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj @@ -21,7 +21,7 @@ (:import [java.util List Random HashMap ArrayList LinkedList Map]) (:import [backtype.storm ICredentialsListener]) (:import [backtype.storm.hooks ITaskHook]) - (:import [backtype.storm.tuple Tuple Fields TupleImpl MessageId]) + (:import [backtype.storm.tuple AddressedTuple Tuple Fields TupleImpl MessageId]) (:import [backtype.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) @@ -30,7 +30,7 @@ (:import [backtype.storm.generated GlobalStreamId]) (:import [backtype.storm.utils Utils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread]) (:import [com.lmax.disruptor InsufficientCapacityException]) - (:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]) + (:import [backtype.storm.serialization KryoTupleSerializer]) (:import [backtype.storm.daemon Shutdownable]) (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric]) (:import [backtype.storm Config Constants]) @@ -206,9 +206,10 @@ (defn mk-executor-transfer-fn [batch-transfer->worker storm-conf] (fn this [task tuple] - (when (= true (storm-conf TOPOLOGY-DEBUG)) - (log-message "TRANSFERING tuple TASK: " task " TUPLE: " tuple)) - (disruptor/publish batch-transfer->worker [task tuple]))) + (let [val (AddressedTuple. task tuple)] + (when (= true (storm-conf TOPOLOGY-DEBUG)) + (log-message "TRANSFERING tuple " val)) + (disruptor/publish batch-transfer->worker val)))) (defn mk-executor-data [worker executor-id] (let [worker-context (worker-context worker) @@ -257,7 +258,6 @@ (exception-cause? java.io.InterruptedIOException error)) (log-message "Got interrupted excpetion shutting thread down...") ((:suicide-fn <>)))) - :deserializer (KryoTupleDeserializer. storm-conf worker-context) :sampler (mk-stats-sampler storm-conf) :backpressure (atom false) :spout-throttling-metrics (if (= executor-type :spout) @@ -296,8 +296,7 @@ (.add alist o) (when batch-end? (worker-transfer-fn serializer alist) - (.setObject cached-emit (ArrayList.)) - ))) + (.setObject cached-emit (ArrayList.))))) :kill-fn (:report-error-and-die executor-data)))) (defn setup-metrics! [executor-data] @@ -309,9 +308,8 @@ interval interval (fn [] - (disruptor/publish - receive-queue - [[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]])))))) + (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]] + (disruptor/publish receive-queue val))))))) (defn metrics-tick [executor-data task-data ^TupleImpl tuple] @@ -333,7 +331,7 @@ (IMetricsConsumer$DataPoint. name value))))) (filter identity) (into []))] - (if (seq data-points) + (when (seq data-points) (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))) (defn setup-ticks! [worker executor-data] @@ -351,10 +349,8 @@ tick-time-secs tick-time-secs (fn [] - (disruptor/publish - receive-queue - [[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]] - ))))))) + (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]] + (disruptor/publish receive-queue val)))))))) (defn mk-executor [worker executor-id initial-credentials] (let [executor-data (mk-executor-data worker executor-id) @@ -394,10 +390,9 @@ executor-id) (credentials-changed [this creds] (let [receive-queue (:receive-queue executor-data) - context (:worker-context executor-data)] - (disruptor/publish - receive-queue - [[nil (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID)]]))) + context (:worker-context executor-data) + val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]] + (disruptor/publish receive-queue val))) (get-backpressure-flag [this] @(:backpressure executor-data)) Shutdownable @@ -444,16 +439,16 @@ (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] - (let [^KryoTupleDeserializer deserializer (:deserializer executor-data) - task-ids (:task-ids executor-data) + (let [task-ids (:task-ids executor-data) debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG))) ] (disruptor/clojure-handler (fn [tuple-batch sequence-id end-of-batch?] - (fast-list-iter [[task-id msg] tuple-batch] - (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))] + (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch] + (let [^TupleImpl tuple (.getTuple addressed-tuple) + task-id (.getDest addressed-tuple)] (when debug? (log-message "Processing received message FOR " task-id " TUPLE: " tuple)) - (if task-id + (if (not= task-id AddressedTuple/BROADCAST_DEST) (tuple-action-fn task-id tuple) ;; null task ids are broadcast tuples (fast-list-iter [task-id task-ids] @@ -479,7 +474,7 @@ spct (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)] ;; the thread's initialized random number generator is used to generate ;; uniformily distributed random numbers. - (if (and (> spct 0) (< (* 100 (.nextDouble random)) spct)) + (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct)) (task/send-unanchored task-data EVENTLOGGER-STREAM-ID @@ -561,9 +556,7 @@ task-id out-stream-id tuple-id)] - (transfer-fn out-task - out-tuple) - )) + (transfer-fn out-task out-tuple))) (if has-eventloggers? (send-to-eventlogger executor-data task-data values component-id message-id rand)) (if (and rooted? @@ -757,12 +750,12 @@ (fast-list-iter [root-id root-ids] (put-xor! anchors-to-ids root-id edge-id)) )))) - (transfer-fn t - (TupleImpl. worker-context + (let [tuple (TupleImpl. worker-context values task-id stream - (MessageId/makeId anchors-to-ids))))) + (MessageId/makeId anchors-to-ids))] + (transfer-fn t tuple)))) (if has-eventloggers? (send-to-eventlogger executor-data task-data values component-id nil rand)) (or out-tasks [])))]] @@ -796,8 +789,7 @@ (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)] (task/send-unanchored task-data ACKER-ACK-STREAM-ID - [root (bit-xor id ack-val)]) - )) + [root (bit-xor id ack-val)]))) (let [delta (tuple-time-delta! tuple) debug? (= true (storm-conf TOPOLOGY-DEBUG))] (when debug? diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 7c0b18dadf9..df9f725c6ed 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -31,7 +31,7 @@ (:import [backtype.storm.daemon Shutdownable]) (:import [backtype.storm.serialization KryoTupleSerializer]) (:import [backtype.storm.generated StormTopology]) - (:import [backtype.storm.tuple Fields]) + (:import [backtype.storm.tuple AddressedTuple Fields]) (:import [backtype.storm.task WorkerTopologyContext]) (:import [backtype.storm Constants]) (:import [backtype.storm.security.auth AuthUtils]) @@ -103,10 +103,15 @@ flatten set ))) +(defn get-dest + [^AddressedTuple addressed-tuple] + "get the destination for an AddressedTuple" + (.getDest addressed-tuple)) + (defn mk-transfer-local-fn [worker] (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker) task->short-executor (:task->short-executor worker) - task-getter (comp #(get task->short-executor %) fast-first)] + task-getter (comp #(get task->short-executor %) get-dest)] (fn [tuple-batch] (let [grouped (fast-group-by task-getter tuple-batch)] (fast-map-iter [[short-executor pairs] grouped] @@ -162,19 +167,21 @@ (fn [^KryoTupleSerializer serializer tuple-batch] (let [^ArrayList local (ArrayList.) ^HashMap remoteMap (HashMap.)] - (fast-list-iter [[task tuple :as pair] tuple-batch] - (if (local-tasks task) - (.add local pair) - - ;;Using java objects directly to avoid performance issues in java code - (do - (when (not (.get remoteMap task)) - (.put remoteMap task (ArrayList.))) - (let [^ArrayList remote (.get remoteMap task)] - (if (not-nil? task) - (.add remote (TaskMessage. ^int task ^bytes (.serialize serializer tuple))) - (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple))) - )))) + (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch] + (let [task (.getDest addressed-tuple) + tuple (.getTuple addressed-tuple)] + (if (local-tasks task) + (.add local addressed-tuple) + + ;;Using java objects directly to avoid performance issues in java code + (do + (when (not (.get remoteMap task)) + (.put remoteMap task (ArrayList.))) + (let [^ArrayList remote (.get remoteMap task)] + (if (not-nil? task) + (.add remote (TaskMessage. task ^bytes (.serialize serializer tuple))) + (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple))) + ))))) (when (not (.isEmpty local)) (local-transfer local)) (when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))] @@ -295,7 +302,6 @@ :default-shared-resources (mk-default-resources <>) :user-shared-resources (mk-user-resources <>) :transfer-local-fn (mk-transfer-local-fn <>) - :receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT) :transfer-fn (mk-transfer-fn <>) :load-mapping (LoadMapping.) :assignment-versions assignment-versions @@ -450,16 +456,12 @@ (schedule timer recur-secs this :check-active false) ))))) -(defn launch-receive-thread [worker] - (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker)) - (msg-loader/launch-receive-thread! - (:mq-context worker) - (:receiver worker) - (:storm-id worker) - (:receiver-thread-count worker) - (:port worker) - (:transfer-local-fn worker) - :kill-fn (fn [t] (exit-process! 11)))) +(defn register-callbacks [worker] + (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker)) + (msg-loader/register-callback (:transfer-local-fn worker) + (:receiver worker) + (:storm-conf worker) + (worker-context worker))) (defn- close-resources [worker] (let [dr (:default-shared-resources worker)] @@ -596,7 +598,7 @@ _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn) _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors)) - receive-thread-shutdown (launch-receive-thread worker) + _ (register-callbacks worker) refresh-connections (mk-refresh-connections worker) refresh-load (mk-refresh-load worker) @@ -635,9 +637,6 @@ ;; this will do best effort flushing since the linger period ;; was set on creation (.close socket)) - (log-message "Shutting down receive thread") - (receive-thread-shutdown) - (log-message "Shut down receive thread") (log-message "Terminating messaging context") (log-message "Shutting down executors") (doseq [executor @executors] (.shutdown executor)) diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj b/storm-core/src/clj/backtype/storm/messaging/loader.clj index c154ed80ace..72dd382fa34 100644 --- a/storm-core/src/clj/backtype/storm/messaging/loader.clj +++ b/storm-core/src/clj/backtype/storm/messaging/loader.clj @@ -14,71 +14,21 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns backtype.storm.messaging.loader - (:use [backtype.storm util log]) - (:import [java.util ArrayList Iterator]) - (:import [backtype.storm.messaging IContext IConnection TaskMessage]) - (:import [backtype.storm.utils DisruptorQueue MutableObject]) - (:require [backtype.storm.messaging [local :as local]]) - (:require [backtype.storm [disruptor :as disruptor]])) + (:import [backtype.storm.messaging IConnection DeserializingConnectionCallback]) + (:require [backtype.storm.messaging [local :as local]])) (defn mk-local-context [] (local/mk-context)) -(defn- mk-receive-thread [storm-id port transfer-local-fn daemon kill-fn priority socket thread-id] - (async-loop - (fn [] - (log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id " ]") - (fn [] - (let [batched (ArrayList.) - ^Iterator iter (.recv ^IConnection socket 0 thread-id) - closed (atom false)] - (when iter - (while (and (not @closed) (.hasNext iter)) - (let [packet (.next iter) - task (if packet (.task ^TaskMessage packet)) - message (if packet (.message ^TaskMessage packet))] - (if (= task -1) - (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice") - (.close socket) - (reset! closed true)) - (when packet (.add batched [task message])))))) - - (when (not @closed) - (do - (if (> (.size batched) 0) - (transfer-local-fn batched)) - 0))))) - :factory? true - :daemon daemon - :kill-fn kill-fn - :priority priority - :thread-name (str "worker-receiver-thread-" thread-id))) +(defn- mk-connection-callback + "make an IConnectionCallback" + [transfer-local-fn storm-conf worker-context] + (DeserializingConnectionCallback. storm-conf + worker-context + (fn [batch] + (transfer-local-fn batch)))) -(defn- mk-receive-threads [storm-id port transfer-local-fn daemon kill-fn priority socket thread-count] - (into [] (for [thread-id (range thread-count)] - (mk-receive-thread storm-id port transfer-local-fn daemon kill-fn priority socket thread-id)))) - - -(defnk launch-receive-thread! - [context socket storm-id receiver-thread-count port transfer-local-fn - :daemon true - :kill-fn (fn [t] (System/exit 1)) - :priority Thread/NORM_PRIORITY] - (let [local-hostname (memoized-local-hostname) - thread-count (if receiver-thread-count receiver-thread-count 1) - vthreads (mk-receive-threads storm-id port transfer-local-fn daemon kill-fn priority socket thread-count)] - (fn [] - (let [kill-socket (.connect ^IContext context storm-id local-hostname port)] - (log-message "Shutting down receiving-thread: [" storm-id ", " port "]") - (.send ^IConnection kill-socket - -1 (byte-array [])) - - (.close ^IConnection kill-socket) - - (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die") - - (for [thread-id (range thread-count)] - (.join (vthreads thread-id))) - - (log-message "Shutdown receiving-thread: [" storm-id ", " port "]") - )))) +(defn register-callback + "register the local-transfer-fn with the server" + [transfer-local-fn ^IConnection socket storm-conf worker-context] + (.registerRecv socket (mk-connection-callback transfer-local-fn storm-conf worker-context))) diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj index 60a6bd2fba6..b99a77a6c1b 100644 --- a/storm-core/src/clj/backtype/storm/messaging/local.clj +++ b/storm-core/src/clj/backtype/storm/messaging/local.clj @@ -14,76 +14,10 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns backtype.storm.messaging.local - (:refer-clojure :exclude [send]) - (:use [backtype.storm log util]) - (:import [backtype.storm.messaging IContext IConnection TaskMessage]) - (:import [backtype.storm.grouping Load]) - (:import [java.util.concurrent LinkedBlockingQueue]) - (:import [java.util Map Iterator Collection]) - (:import [java.util Iterator ArrayList]) - (:gen-class)) - -(defn update-load! [cached-task->load lock task->load] - (locking lock - (swap! cached-task->load merge task->load))) - -(defn add-queue! [queues-map lock storm-id port] - (let [id (str storm-id "-" port)] - (locking lock - (when-not (contains? @queues-map id) - (swap! queues-map assoc id (LinkedBlockingQueue.)))) - (@queues-map id))) - -(deftype LocalConnection [storm-id port queues-map lock queue task->load] - IConnection - (^Iterator recv [this ^int flags ^int clientId] - (when-not queue - (throw (IllegalArgumentException. "Cannot receive on this socket"))) - (let [ret (ArrayList.) - msg (if (= flags 1) (.poll queue) (.take queue))] - (if msg - (do - (.add ret msg) - (.iterator ret)) - nil))) - (^void send [this ^int taskId ^bytes payload] - (let [send-queue (add-queue! queues-map lock storm-id port)] - (.put send-queue (TaskMessage. taskId payload)) - )) - (^void send [this ^Iterator iter] - (let [send-queue (add-queue! queues-map lock storm-id port)] - (while (.hasNext iter) - (.put send-queue (.next iter))) - )) - (^void sendLoadMetrics [this ^Map taskToLoad] - (update-load! task->load lock taskToLoad)) - (^Map getLoad [this ^Collection tasks] - (locking lock - (into {} - (for [task tasks - :let [load (.get @task->load task)] - :when (not-nil? load)] - ;; for now we are ignoring the connection load locally - [task (Load. true load 0.0)])))) - (^void close [this])) - - -(deftype LocalContext [^{:unsynchronized-mutable true} queues-map - ^{:unsynchronized-mutable true} lock - ^{:unsynchronized-mutable true} task->load] - IContext - (^void prepare [this ^Map storm-conf] - (set! queues-map (atom {})) - (set! task->load (atom {})) - (set! lock (Object.))) - (^IConnection bind [this ^String storm-id ^int port] - (LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port) task->load)) - (^IConnection connect [this ^String storm-id ^String host ^int port] - (LocalConnection. storm-id port queues-map lock nil task->load)) - (^void term [this] - )) + (:import [backtype.storm.messaging IContext]) + (:import [backtype.storm.messaging.local Context])) (defn mk-context [] - (let [context (LocalContext. nil nil nil)] + (let [context (Context.)] (.prepare ^IContext context nil) context)) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 4cde8ad09bd..95e5ccc54b4 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -1135,13 +1135,6 @@ public class Config extends HashMap { @isStringOrStringList public static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts"; - /** - * control how many worker receiver threads we need per worker - */ - @isInteger - @isPositiveNumber - public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count"; - /** * How often this worker should heartbeat to the supervisor. */ diff --git a/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java b/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java new file mode 100644 index 00000000000..de9a3e676fc --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/messaging/AddressedTuple.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.messaging; + +import backtype.storm.tuple.Tuple; + +/** + * A Tuple that is addressed to a destination. + */ +public class AddressedTuple { + public final Tuple tuple; + public final int dest; + + public AddressedTuple(int dest, Tuple tuple) { + this.dest = dest; + this.tuple = tuple; + } + + public Tuple getTuple() { + return tuple; + } + + public int getDest() { + return dest; + } + + @Override + public String toString() { + return "[dest: "+dest+" tuple: "+tuple+"]"; + } +} diff --git a/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java b/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java new file mode 100644 index 00000000000..1e2d3aa497a --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/messaging/DeserializingConnectionCallback.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.messaging; + +import backtype.storm.task.GeneralTopologyContext; +import backtype.storm.tuple.AddressedTuple; +import backtype.storm.serialization.KryoTupleDeserializer; + +import clojure.lang.IFn; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * A class that is called when a TaskMessage arrives. + */ +public class DeserializingConnectionCallback implements IConnectionCallback { + private final IFn _cb; + private final Map _conf; + private final GeneralTopologyContext _context; + private final ThreadLocal _des = + new ThreadLocal() { + @Override + protected KryoTupleDeserializer initialValue() { + return new KryoTupleDeserializer(_conf, _context); + } + }; + + public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, IFn callback) { + _conf = conf; + _context = context; + _cb = callback; + } + + @Override + public void recv(List batch) { + KryoTupleDeserializer des = _des.get(); + ArrayList ret = new ArrayList<>(batch.size()); + for (TaskMessage message: batch) { + ret.add(new AddressedTuple(message.task(), des.deserialize(message.message()))); + } + _cb.invoke(ret); + } +} diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java index 88f0c0a742a..a03b3a22f05 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java +++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java @@ -22,14 +22,12 @@ import java.util.Iterator; import java.util.Map; -public interface IConnection { - +public interface IConnection { /** - * receive a batch message iterator (consists taskId and payload) - * @param flags 0: block, 1: non-block - * @return + * Register a callback to be notified when data is ready to be processed. + * @param cb the callback to process the messages. */ - public Iterator recv(int flags, int clientId); + public void registerRecv(IConnectionCallback cb); /** * Send load metrics to all downstream connections. diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java b/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java new file mode 100644 index 00000000000..ecf0828c241 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/messaging/IConnectionCallback.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.messaging; + +import java.util.List; + +/** + * A class that is called when a TaskMessage arrives. + */ +public interface IConnectionCallback { + /** + * A batch of new messages have arrived to be processed + * @param batch the messages to be processed + */ + public void recv(List batch); +} diff --git a/storm-core/src/jvm/backtype/storm/messaging/local/Context.java b/storm-core/src/jvm/backtype/storm/messaging/local/Context.java new file mode 100644 index 00000000000..968fe64c0ce --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/messaging/local/Context.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.messaging.local; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import backtype.storm.Config; +import backtype.storm.grouping.Load; +import backtype.storm.messaging.IConnection; +import backtype.storm.messaging.TaskMessage; +import backtype.storm.messaging.IConnectionCallback; +import backtype.storm.messaging.IContext; + +public class Context implements IContext { + private static final Logger LOG = LoggerFactory.getLogger(Context.class); + + private static class LocalServer implements IConnection { + IConnectionCallback _cb; + final ConcurrentHashMap _load = new ConcurrentHashMap<>(); + + @Override + public void registerRecv(IConnectionCallback cb) { + _cb = cb; + } + + @Override + public void send(int taskId, byte[] payload) { + throw new IllegalArgumentException("SHOULD NOT HAPPEN"); + } + + @Override + public void send(Iterator msgs) { + throw new IllegalArgumentException("SHOULD NOT HAPPEN"); + } + + @Override + public Map getLoad(Collection tasks) { + Map ret = new HashMap<>(); + for (Integer task : tasks) { + Double found = _load.get(task); + if (found != null) { + ret.put(task, new Load(true, found, 0)); + } + } + return ret; + } + + @Override + public void sendLoadMetrics(Map taskToLoad) { + _load.putAll(taskToLoad); + } + + @Override + public void close() { + //NOOP + } + }; + + private static class LocalClient implements IConnection { + private final LocalServer _server; + + public LocalClient(LocalServer server) { + _server = server; + } + + @Override + public void registerRecv(IConnectionCallback cb) { + throw new IllegalArgumentException("SHOULD NOT HAPPEN"); + } + + @Override + public void send(int taskId, byte[] payload) { + if (_server._cb != null) { + _server._cb.recv(Arrays.asList(new TaskMessage(taskId, payload))); + } + } + + @Override + public void send(Iterator msgs) { + if (_server._cb != null) { + ArrayList ret = new ArrayList<>(); + while (msgs.hasNext()) { + ret.add(msgs.next()); + } + _server._cb.recv(ret); + } + } + + @Override + public Map getLoad(Collection tasks) { + return _server.getLoad(tasks); + } + + @Override + public void sendLoadMetrics(Map taskToLoad) { + _server.sendLoadMetrics(taskToLoad); + } + + @Override + public void close() { + //NOOP + } + }; + + private static ConcurrentHashMap _registry = new ConcurrentHashMap<>(); + private static LocalServer getLocalServer(String nodeId, int port) { + String key = nodeId + "-" + port; + LocalServer ret = _registry.get(key); + if (ret == null) { + ret = new LocalServer(); + LocalServer tmp = _registry.putIfAbsent(key, ret); + if (tmp != null) { + ret = tmp; + } + } + return ret; + } + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map storm_conf) { + //NOOP + } + + @Override + public IConnection bind(String storm_id, int port) { + return getLocalServer(storm_id, port); + } + + @Override + public IConnection connect(String storm_id, String host, int port) { + return new LocalClient(getLocalServer(storm_id, port)); + } + + @SuppressWarnings("rawtypes") + @Override + public void term() { + //NOOP + } +} diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java index ccdef41a1c3..4f813ba4522 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java +++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java @@ -32,6 +32,7 @@ import backtype.storm.grouping.Load; import backtype.storm.messaging.ConnectionWithStatus; import backtype.storm.messaging.TaskMessage; +import backtype.storm.messaging.IConnectionCallback; import backtype.storm.metric.api.IStatefulObject; import backtype.storm.utils.StormBoundedExponentialBackoffRetry; import backtype.storm.utils.Utils; @@ -217,7 +218,7 @@ public Status status() { * @throws java.lang.UnsupportedOperationException whenever this method is being called. */ @Override - public Iterator recv(int flags, int clientId) { + public void registerRecv(IConnectionCallback cb) { throw new UnsupportedOperationException("Client connection should not receive any messages"); } diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java index 32c2bd7b7ba..bca3936544f 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java +++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java @@ -45,6 +45,8 @@ import backtype.storm.Config; import backtype.storm.grouping.Load; import backtype.storm.messaging.ConnectionWithStatus; +import backtype.storm.messaging.IConnection; +import backtype.storm.messaging.IConnectionCallback; import backtype.storm.messaging.TaskMessage; import backtype.storm.metric.api.IStatefulObject; import backtype.storm.serialization.KryoValuesSerializer; @@ -58,41 +60,22 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe int port; private final ConcurrentHashMap messagesEnqueued = new ConcurrentHashMap<>(); private final AtomicInteger messagesDequeued = new AtomicInteger(0); - private final AtomicInteger[] pendingMessages; - - // Create multiple queues for incoming messages. The size equals the number of receiver threads. - // For message which is sent to same task, it will be stored in the same queue to preserve the message order. - private LinkedBlockingQueue>[] message_queue; volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server"); final ChannelFactory factory; final ServerBootstrap bootstrap; - - private int queueCount; - private volatile HashMap taskToQueueId = null; - int roundRobinQueueId; - + private volatile boolean closing = false; List closeMessage = Arrays.asList(new TaskMessage(-1, null)); private KryoValuesSerializer _ser; + private IConnectionCallback _cb = null; @SuppressWarnings("rawtypes") Server(Map storm_conf, int port) { this.storm_conf = storm_conf; this.port = port; _ser = new KryoValuesSerializer(storm_conf); - - queueCount = Utils.getInt(storm_conf.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1); - roundRobinQueueId = 0; - taskToQueueId = new HashMap<>(); - - message_queue = new LinkedBlockingQueue[queueCount]; - pendingMessages = new AtomicInteger[queueCount]; - for (int i = 0; i < queueCount; i++) { - message_queue[i] = new LinkedBlockingQueue<>(); - pendingMessages[i] = new AtomicInteger(0); - } - + // Configure the server. int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); int backlog = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG), 500); @@ -124,48 +107,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe Channel channel = bootstrap.bind(new InetSocketAddress(port)); allChannels.add(channel); } - - private ArrayList[] groupMessages(List msgs) { - ArrayList messageGroups[] = new ArrayList[queueCount]; - - for (TaskMessage message : msgs) { - int task = message.task(); - - if (task == -1) { - closing = true; - return null; - } - - Integer queueId = getMessageQueueId(task); - - if (null == messageGroups[queueId]) { - messageGroups[queueId] = new ArrayList<>(); - } - messageGroups[queueId].add(message); - } - return messageGroups; - } - - private Integer getMessageQueueId(int task) { - // try to construct the map from taskId -> queueId in round robin manner. - Integer queueId = taskToQueueId.get(task); - if (null == queueId) { - synchronized (this) { - queueId = taskToQueueId.get(task); - if (queueId == null) { - queueId = roundRobinQueueId++; - if (roundRobinQueueId == queueCount) { - roundRobinQueueId = 0; - } - HashMap newRef = new HashMap<>(taskToQueueId); - newRef.put(task, queueId); - taskToQueueId = newRef; - } - } - } - return queueId; - } - + private void addReceiveCount(String from, int amount) { //This is possibly lossy in the case where a value is deleted // because it has received no messages over the metrics collection @@ -193,48 +135,14 @@ protected void enqueue(List msgs, String from) throws InterruptedEx return; } addReceiveCount(from, msgs.size()); - ArrayList messageGroups[] = groupMessages(msgs); - - if (null == messageGroups || closing) { - return; - } - - for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) { - ArrayList msgGroup = messageGroups[receiverId]; - if (null != msgGroup) { - message_queue[receiverId].put(msgGroup); - pendingMessages[receiverId].addAndGet(msgGroup.size()); - } + if (_cb != null) { + _cb.recv(msgs); } } - public Iterator recv(int flags, int receiverId) { - if (closing) { - return closeMessage.iterator(); - } - - ArrayList ret; - int queueId = receiverId % queueCount; - if ((flags & 0x01) == 0x01) { - //non-blocking - ret = message_queue[queueId].poll(); - } else { - try { - ArrayList request = message_queue[queueId].take(); - LOG.debug("request to be processed: {}", request); - ret = request; - } catch (InterruptedException e) { - LOG.info("exception within msg receiving", e); - ret = null; - } - } - - if (null != ret) { - messagesDequeued.addAndGet(ret.size()); - pendingMessages[queueId].addAndGet(0 - ret.size()); - return ret.iterator(); - } - return null; + @Override + public void registerRecv(IConnectionCallback cb) { + _cb = cb; } /** @@ -326,12 +234,7 @@ public Object getState() { LOG.debug("Getting metrics for server on port {}", port); HashMap ret = new HashMap<>(); ret.put("dequeuedMessages", messagesDequeued.getAndSet(0)); - ArrayList pending = new ArrayList<>(pendingMessages.length); - for (AtomicInteger p: pendingMessages) { - pending.add(p.get()); - } - ret.put("pending", pending); - HashMap enqueued = new HashMap<>(); + HashMap enqueued = new HashMap(); Iterator> it = messagesEnqueued.entrySet().iterator(); while (it.hasNext()) { Map.Entry ent = it.next(); diff --git a/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java b/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java new file mode 100644 index 00000000000..c3aec72fbac --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/tuple/AddressedTuple.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.tuple; + +/** + * A Tuple that is addressed to a destination. + */ +public class AddressedTuple { + /** + * Destination used when broadcasting a tuple. + */ + public static final int BROADCAST_DEST = -2; + public final Tuple tuple; + public final int dest; + + public AddressedTuple(int dest, Tuple tuple) { + this.dest = dest; + this.tuple = tuple; + } + + public Tuple getTuple() { + return tuple; + } + + public int getDest() { + return dest; + } + + @Override + public String toString() { + return "[dest: "+dest+" tuple: "+tuple+"]"; + } +} diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj index aa806a75ced..1d6f1046d84 100644 --- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj +++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj @@ -15,7 +15,7 @@ ;; limitations under the License. (ns backtype.storm.messaging.netty-unit-test (:use [clojure test]) - (:import [backtype.storm.messaging TransportFactory]) + (:import [backtype.storm.messaging TransportFactory IConnection TaskMessage IConnectionCallback]) (:import [backtype.storm.utils Utils]) (:use [backtype.storm testing util config log]) (:use [backtype.storm.daemon.worker :only [is-connection-ready]]) @@ -45,20 +45,37 @@ (throw (RuntimeException. (str "Netty connections were not ready within " max-wait-ms " ms")))) (log-message "All Netty connections are ready")))))) +(defn mk-connection-callback + "make an IConnectionCallback" + [my-fn] + (reify IConnectionCallback + (recv [this batch] + (doseq [msg batch] + (my-fn msg))))) + +(defn register-callback + "register the local-transfer-fn with the server" + [my-fn ^IConnection socket] + (.registerRecv socket (mk-connection-callback my-fn))) + +(defn- wait-for-not-nil + [atm] + (while-timeout TEST-TIMEOUT-MS (nil? @atm) (Thread/sleep 10))) (defn- test-basic-fn [storm-conf] (log-message "1. Should send and receive a basic message") (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") context (TransportFactory/makeContext storm-conf) port (available-port 6700) + resp (atom nil) server (.bind context nil port) + _ (register-callback (fn [message] (reset! resp message)) server) client (.connect context nil "localhost" port) _ (wait-until-ready [server client]) - _ (.send client task (.getBytes req_msg)) - iter (.recv server 0 0) - resp (.next iter)] - (is (= task (.task resp))) - (is (= req_msg (String. (.message resp)))) + _ (.send client task (.getBytes req_msg))] + (wait-for-not-nil resp) + (is (= task (.task @resp))) + (is (= req_msg (String. (.message @resp)))) (.close client) (.close server) (.term context))) @@ -88,19 +105,20 @@ (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") context (TransportFactory/makeContext storm-conf) port (available-port 6700) + resp (atom nil) server (.bind context nil port) + _ (register-callback (fn [message] (reset! resp message)) server) client (.connect context nil "localhost" port) _ (wait-until-ready [server client]) _ (.send client task (.getBytes req_msg)) - iter (.recv server 0 0) - resp (.next iter) _ (.sendLoadMetrics server {(int 1) 0.0 (int 2) 1.0}) _ (while-timeout 5000 (empty? (.getLoad client [(int 1) (int 2)])) (Thread/sleep 10)) load (.getLoad client [(int 1) (int 2)])] (is (= 0.0 (.getBoltLoad (.get load (int 1))))) (is (= 1.0 (.getBoltLoad (.get load (int 2))))) - (is (= task (.task resp))) - (is (= req_msg (String. (.message resp)))) + (wait-for-not-nil resp) + (is (= task (.task @resp))) + (is (= req_msg (String. (.message @resp)))) (.close client) (.close server) (.term context))) @@ -130,14 +148,15 @@ (let [req_msg (apply str (repeat 2048000 'c')) context (TransportFactory/makeContext storm-conf) port (available-port 6700) + resp (atom nil) server (.bind context nil port) + _ (register-callback (fn [message] (reset! resp message)) server) client (.connect context nil "localhost" port) _ (wait-until-ready [server client]) - _ (.send client task (.getBytes req_msg)) - iter (.recv server 0 0) - resp (.next iter)] - (is (= task (.task resp))) - (is (= req_msg (String. (.message resp)))) + _ (.send client task (.getBytes req_msg))] + (wait-for-not-nil resp) + (is (= task (.task @resp))) + (is (= req_msg (String. (.message @resp)))) (.close client) (.close server) (.term context))) @@ -164,27 +183,28 @@ (defn- test-server-delayed-fn [storm-conf] (log-message "4. test server delayed") - (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") - context (TransportFactory/makeContext storm-conf) - port (available-port 6700) - client (.connect context nil "localhost" port) + (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") + context (TransportFactory/makeContext storm-conf) + resp (atom nil) + port (available-port 6700) + client (.connect context nil "localhost" port) - server (Thread. - (fn [] - (Thread/sleep 1000) - (let [server (.bind context nil port) - iter (.recv server 0 0) - resp (.next iter)] - (is (= task (.task resp))) - (is (= req_msg (String. (.message resp)))) - (.close server))))] - (.start server) - (wait-until-ready [server client]) - (.send client task (.getBytes req_msg)) + server (Thread. + (fn [] + (Thread/sleep 100) + (let [server (.bind context nil port)] + (register-callback (fn [message] (reset! resp message)) server))))] + (.start server) + (wait-until-ready [server client]) + (.send client task (.getBytes req_msg)) - (.join server) - (.close client) - (.term context))) + (wait-for-not-nil resp) + (is (= task (.task @resp))) + (is (= req_msg (String. (.message @resp)))) + + (.join server) + (.close client) + (.term context))) (deftest test-server-delayed (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context" @@ -211,28 +231,24 @@ (log-message "5. test batch") (let [num-messages 100000 _ (log-message "Should send and receive many messages (testing with " num-messages " messages)") + resp (ArrayList.) + received (atom 0) context (TransportFactory/makeContext storm-conf) port (available-port 6700) server (.bind context nil port) + _ (register-callback (fn [message] (.add resp message) (swap! received inc)) server) client (.connect context nil "localhost" port) _ (wait-until-ready [server client])] - (doseq [num (range 1 num-messages)] + (doseq [num (range 1 num-messages)] (let [req_msg (str num)] (.send client task (.getBytes req_msg)))) - (let [resp (ArrayList.) - received (atom 0)] - (while (< @received (- num-messages 1)) - (let [iter (.recv server 0 0)] - (while (.hasNext iter) - (let [msg (.next iter)] - (.add resp msg) - (swap! received inc) - )))) - (doseq [num (range 1 num-messages)] + (while-timeout TEST-TIMEOUT-MS (< (.size resp) (- num-messages 1)) (log-message (.size resp) " " num-messages) (Thread/sleep 10)) + + (doseq [num (range 1 num-messages)] (let [req_msg (str num) resp_msg (String. (.message (.get resp (- num 1))))] - (is (= req_msg resp_msg))))) + (is (= req_msg resp_msg)))) (.close client) (.close server) @@ -274,20 +290,18 @@ TOPOLOGY-TUPLE-SERIALIZER "backtype.storm.serialization.types.ListDelegateSerializer" TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS false} + resp (atom nil) context (TransportFactory/makeContext storm-conf) port (available-port 6700) client (.connect context nil "localhost" port) _ (.send client task (.getBytes req_msg)) server (.bind context nil port) + _ (register-callback (fn [message] (reset! resp message)) server) _ (wait-until-ready [server client]) - _ (.send client task (.getBytes req_msg)) - iter (future (.recv server 0 0)) - resp (deref iter 5000 nil) - resp-val (if resp (.next resp) nil)] - (is resp-val) - (when resp-val - (is (= task (.task resp-val))) - (is (= req_msg (String. (.message resp-val))))) + _ (.send client task (.getBytes req_msg))] + (wait-for-not-nil resp) + (is (= task (.task @resp))) + (is (= req_msg (String. (.message @resp)))) (.close client) (.close server) (.term context))) diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj index 89d858649a5..86162dd24f7 100644 --- a/storm-core/test/clj/backtype/storm/messaging_test.clj +++ b/storm-core/test/clj/backtype/storm/messaging_test.clj @@ -61,28 +61,3 @@ (.cleanup this)) (startup [this] )) - -;; Test Adding more receiver threads won't violate the message delivery order gurantee -(deftest test-receiver-message-order - (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 2 - :daemon-conf {TOPOLOGY-WORKERS 2 - ;; Configure multiple receiver threads per worker - WORKER-RECEIVER-THREAD-COUNT 2 - STORM-LOCAL-MODE-ZMQ true - STORM-MESSAGING-TRANSPORT - "backtype.storm.messaging.netty.Context"}] - (let [topology (thrift/mk-topology - - ;; TestEventLogSpout output(sourceId, eventId), eventId is Monotonically increasing - {"1" (thrift/mk-spout-spec (TestEventLogSpout. 4000) :parallelism-hint 8)} - - ;; field grouping, message from same "source" task will be delivered to same bolt task - ;; When received message order is not kept, Emit an error Tuple - {"2" (thrift/mk-bolt-spec {"1" ["source"]} (TestEventOrderCheckBolt.) - :parallelism-hint 4) - }) - results (complete-topology cluster - topology)] - - ;; No error Tuple from Bolt TestEventOrderCheckBolt - (is (empty? (read-tuples results "2"))))))