From 0108765688e64fa7c039e9478b303c118882d972 Mon Sep 17 00:00:00 2001 From: Kang Xiao Date: Sun, 16 Feb 2014 23:12:54 +0800 Subject: [PATCH 1/2] STORM-63 remove timeout drpc request from its function's request queue --- storm-core/src/clj/backtype/storm/daemon/drpc.clj | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj b/storm-core/src/clj/backtype/storm/daemon/drpc.clj index df07343b046..54e71e070cb 100644 --- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj +++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj @@ -47,9 +47,13 @@ id->sem (atom {}) id->result (atom {}) id->start (atom {}) + id->function (atom {}) + id->request (atom {}) request-queues (atom {}) cleanup (fn [id] (swap! id->sem dissoc id) (swap! id->result dissoc id) + (swap! id->function dissoc id) + (swap! id->request dissoc id) (swap! id->start dissoc id)) my-ip (.getHostAddress (InetAddress/getLocalHost)) clear-thread (async-loop @@ -59,6 +63,8 @@ (when-let [sem (@id->sem id)] (swap! id->result assoc id (DRPCExecutionException. "Request timed out")) (.release sem)) + (.remove (acquire-queue request-queues (@id->function id)) (@id->request id)) + (log-warn "Timeout DRPC request id: " id " start at " start) (cleanup id) )) TIMEOUT-CHECK-SECS @@ -74,6 +80,8 @@ ] (swap! id->start assoc id (current-time-secs)) (swap! id->sem assoc id sem) + (swap! id->function assoc id function) + (swap! id->request assoc id req) (.add queue req) (log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis)) (.acquire sem) From 50408bfc1c193356a66f50f9ce9aeb7786031f4b Mon Sep 17 00:00:00 2001 From: Kang Xiao Date: Sat, 8 Mar 2014 01:12:22 +0800 Subject: [PATCH 2/2] fix added code not in when-let block --- storm-core/src/clj/backtype/storm/daemon/drpc.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj b/storm-core/src/clj/backtype/storm/daemon/drpc.clj index 54e71e070cb..57c26df2000 100644 --- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj +++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj @@ -62,9 +62,9 @@ (when (> (time-delta start) (conf DRPC-REQUEST-TIMEOUT-SECS)) (when-let [sem (@id->sem id)] (swap! id->result assoc id (DRPCExecutionException. "Request timed out")) - (.release sem)) (.remove (acquire-queue request-queues (@id->function id)) (@id->request id)) (log-warn "Timeout DRPC request id: " id " start at " start) + (.release sem)) (cleanup id) )) TIMEOUT-CHECK-SECS