Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 33 additions & 29 deletions storm-core/src/clj/org/apache/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
(:import [org.apache.storm.grouping CustomStreamGrouping])
(:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector])
(:import [org.apache.storm.generated GlobalStreamId])
(:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread])
(:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
(:import [com.lmax.disruptor InsufficientCapacityException])
(:import [org.apache.storm.serialization KryoTupleSerializer])
(:import [org.apache.storm.daemon Shutdownable])
Expand All @@ -38,9 +38,10 @@
(:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
(:import [java.lang Thread Thread$UncaughtExceptionHandler]
[java.util.concurrent ConcurrentLinkedQueue]
[org.json.simple JSONValue])
[org.json.simple JSONValue]
[com.lmax.disruptor.dsl ProducerType])
(:require [org.apache.storm [thrift :as thrift]
[cluster :as cluster] [disruptor :as disruptor] [stats :as stats]])
[cluster :as cluster] [stats :as stats]])
(:require [org.apache.storm.daemon [task :as task]])
(:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
(:require [clojure.set :as set]))
Expand Down Expand Up @@ -223,21 +224,21 @@
(let [val (AddressedTuple. task tuple)]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "TRANSFERING tuple " val))
(disruptor/publish batch-transfer->worker val))))
(.publish ^DisruptorQueue batch-transfer->worker val))))

(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker)
task-ids (executor-id->tasks executor-id)
component-id (.getComponentId worker-context (first task-ids))
storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
executor-type (executor-type worker-context component-id)
batch-transfer->worker (disruptor/disruptor-queue
batch-transfer->worker (DisruptorQueue.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to rename this? Usually the -> implies a map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that when we port executor.clj.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with me.

(str "executor" executor-id "-send-queue")
ProducerType/SINGLE
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:producer-type :single-threaded
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
(storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
]
(recursive-map
:worker worker
Expand Down Expand Up @@ -286,14 +287,14 @@
(defn- mk-disruptor-backpressure-handler [executor-data]
"make a handler for the executor's receive disruptor queue to
check highWaterMark and lowWaterMark for backpressure"
(disruptor/disruptor-backpressure-handler
(fn []
(reify DisruptorBackpressureCallback
(highWaterMark [this]
"When receive queue is above highWaterMark"
(if (not @(:backpressure executor-data))
(do (reset! (:backpressure executor-data) true)
(log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))
(fn []
(lowWaterMark [this]
"When receive queue is below lowWaterMark"
(if @(:backpressure executor-data)
(do (reset! (:backpressure executor-data) false)
Expand All @@ -305,16 +306,19 @@
cached-emit (MutableObject. (ArrayList.))
storm-conf (:storm-conf executor-data)
serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
^DisruptorQueue batch-transfer-queue (:batch-transfer-queue executor-data)
handler (reify com.lmax.disruptor.EventHandler
(onEvent [this o seq-id batch-end?]
(let [^ArrayList alist (.getObject cached-emit)]
(.add alist o)
(when batch-end?
(worker-transfer-fn serializer alist)
(.setObject cached-emit (ArrayList.))))))
]
(disruptor/consume-loop*
(:batch-transfer-queue executor-data)
(disruptor/handler [o seq-id batch-end?]
(let [^ArrayList alist (.getObject cached-emit)]
(.add alist o)
(when batch-end?
(worker-transfer-fn serializer alist)
(.setObject cached-emit (ArrayList.)))))
:uncaught-exception-handler (:report-error-and-die executor-data))))
(Utils/asyncLoop
(fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0)
(.getName batch-transfer-queue)
(:uncaught-exception-handler (:report-error-and-die executor-data)))))

(defn setup-metrics! [executor-data]
(let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
Expand All @@ -326,7 +330,7 @@
interval
(fn []
(let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
(disruptor/publish receive-queue val)))))))
(.publish ^DisruptorQueue receive-queue val)))))))

(defn metrics-tick
[executor-data task-data ^TupleImpl tuple]
Expand Down Expand Up @@ -367,7 +371,7 @@
tick-time-secs
(fn []
(let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
(disruptor/publish receive-queue val))))))))
(.publish ^DisruptorQueue receive-queue val))))))))

(defn mk-executor [worker executor-id initial-credentials]
(let [executor-data (mk-executor-data worker executor-id)
Expand Down Expand Up @@ -410,15 +414,15 @@
(let [receive-queue (:receive-queue executor-data)
context (:worker-context executor-data)
val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
(disruptor/publish receive-queue val)))
(.publish ^DisruptorQueue receive-queue val)))
(get-backpressure-flag [this]
@(:backpressure executor-data))
Shutdownable
(shutdown
[this]
(log-message "Shutting down executor " component-id ":" (pr-str executor-id))
(disruptor/halt-with-interrupt! (:receive-queue executor-data))
(disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))
(.haltWithInterrupt ^DisruptorQueue (:receive-queue executor-data))
(.haltWithInterrupt ^DisruptorQueue (:batch-transfer-queue executor-data))
(doseq [t threads]
(.interrupt t)
(.join t))
Expand Down Expand Up @@ -460,8 +464,8 @@
(let [task-ids (:task-ids executor-data)
debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
]
(disruptor/clojure-handler
(fn [tuple-batch sequence-id end-of-batch?]
(reify com.lmax.disruptor.EventHandler
(onEvent [this tuple-batch sequence-id end-of-batch?]
(fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
(let [^TupleImpl tuple (.getTuple addressed-tuple)
task-id (.getDest addressed-tuple)]
Expand Down Expand Up @@ -623,7 +627,7 @@

(fn []
;; This design requires that spouts be non-blocking
(disruptor/consume-batch receive-queue event-handler)
(.consumeBatch ^DisruptorQueue receive-queue event-handler)

(let [active? @(:storm-active-atom executor-data)
curr-count (.get emitted-count)
Expand Down Expand Up @@ -841,7 +845,7 @@
(let [receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)]
(fn []
(disruptor/consume-batch-when-available receive-queue event-handler)
(.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler)
0)))]
;; TODO: can get any SubscribedState objects out of the context now

Expand Down
54 changes: 28 additions & 26 deletions storm-core/src/clj/org/apache/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
(:require [org.apache.storm.daemon [executor :as executor]])
(:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]])
(:require [org.apache.storm [cluster :as cluster]])
(:require [clojure.set :as set])
(:require [org.apache.storm.messaging.loader :as msg-loader])
(:import [java.util.concurrent Executors]
[org.apache.storm.hooks IWorkerHook BaseWorkerHook]
[uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
(:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time WorkerBackpressureCallback DisruptorBackpressureCallback])
(:import [java.util ArrayList HashMap]
[java.util.concurrent.locks ReentrantReadWriteLock])
(:import [org.apache.commons.io FileUtils])
(:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time])
(:import [org.apache.storm.grouping LoadMapping])
(:import [org.apache.storm.messaging TransportFactory])
(:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
Expand Down Expand Up @@ -125,7 +125,7 @@
(fast-map-iter [[short-executor pairs] grouped]
(let [q (short-executor-receive-queue-map short-executor)]
(if q
(disruptor/publish q pairs)
(.publish ^DisruptorQueue q pairs)
(log-warn "Received invalid messages for unknown tasks. Dropping... ")
)))))))

Expand All @@ -136,8 +136,8 @@

(defn- mk-backpressure-handler [executors]
"make a handler that checks and updates worker's backpressure flag"
(disruptor/worker-backpressure-handler
(fn [worker]
(reify WorkerBackpressureCallback
(onEvent [this worker]
(let [storm-id (:storm-id worker)
assignment-id (:assignment-id worker)
port (:port worker)
Expand All @@ -156,11 +156,11 @@
(defn- mk-disruptor-backpressure-handler [worker]
"make a handler for the worker's send disruptor queue to
check highWaterMark and lowWaterMark for backpressure"
(disruptor/disruptor-backpressure-handler
(fn []
(reify DisruptorBackpressureCallback
(highWaterMark [this]
(reset! (:transfer-backpressure worker) true)
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))
(fn []
(lowWaterMark [this]
(reset! (:transfer-backpressure worker) false)
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker)))))

Expand Down Expand Up @@ -192,7 +192,7 @@
)))))

(when (not (.isEmpty local)) (local-transfer local))
(when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))]
(when (not (.isEmpty remoteMap)) (.publish ^DisruptorQueue transfer-queue remoteMap))))]
(if try-serialize-local
(do
(log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
Expand All @@ -204,11 +204,11 @@
(defn- mk-receive-queue-map [storm-conf executors]
(->> executors
;; TODO: this depends on the type of executor
(map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
(storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
(map (fn [e] [e (DisruptorQueue. (str "receive-queue" e)
(storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
(storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
(into {})
))

Expand Down Expand Up @@ -248,10 +248,11 @@
(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
(let [assignment-versions (atom {})
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
transfer-queue (DisruptorQueue. "worker-transfer-queue"
(storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
(storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)

receive-queue-map (->> executor-receive-queue-map
Expand Down Expand Up @@ -438,21 +439,20 @@

;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
(defn mk-transfer-tuples-handler [worker]
(let [^DisruptorQueue transfer-queue (:transfer-queue worker)
(let [^DisruptorQueue transfer-queue (:transfer-queue worker)
drainer (TransferDrainer.)
node+port->socket (:cached-node+port->socket worker)
task->node+port (:cached-task->node+port worker)
endpoint-socket-lock (:endpoint-socket-lock worker)
]
(disruptor/clojure-handler
(fn [packets _ batch-end?]
(reify com.lmax.disruptor.EventHandler
(onEvent [this packets seqId batch-end?]
(.add drainer packets)

(when batch-end?
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
(.send drainer task->node+port node+port->socket)))
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
(.send drainer task->node+port node+port->socket)))
(.clear drainer))))))

;; Check whether this messaging connection is ready to send data
Expand Down Expand Up @@ -658,7 +658,9 @@

transfer-tuples (mk-transfer-tuples-handler worker)

transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)
transfer-thread (Utils/asyncLoop
(fn []
(.consumeBatchWhenAvailable ^DisruptorQueue (:transfer-queue worker) transfer-tuples) 0))

disruptor-handler (mk-disruptor-backpressure-handler worker)
_ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler)
Expand Down Expand Up @@ -690,7 +692,7 @@
;;in which case it's a noop
(.term ^IContext (:mq-context worker))
(log-message "Shutting down transfer thread")
(disruptor/halt-with-interrupt! (:transfer-queue worker))
(.haltWithInterrupt ^DisruptorQueue (:transfer-queue worker))

(.interrupt transfer-thread)
(.join transfer-thread)
Expand Down
89 changes: 0 additions & 89 deletions storm-core/src/clj/org/apache/storm/disruptor.clj

This file was deleted.

Loading