Skip to content

Commit

Permalink
reduce creation of canonical probes in servers
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Aug 24, 2011
1 parent 69a0260 commit 06f4a21
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
9 changes: 6 additions & 3 deletions src/aleph/http/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,23 @@
identity
options)]
(run-pipeline client
:error-handler (fn [_])
(fn [ch]
(splice
(wrap-response-stream options ch)
(siphon->> (wrap-request-stream options) ch))))))

(defn- http-client- [client-fn options]
(let [options (merge
(let [options (process-options options)
options (merge
{:name
(str "http-client:" (:server-name options) ":" (gensym ""))
:description
(str (:scheme options) "://" (:server-name options) ":" (:server-port options))}
(process-options options))
options)
client (client-fn
#(http-connection options))
#(http-connection options)
options)
f (fn [request timeout]
(if (map? request)
(client (assoc (merge options request)
Expand Down
52 changes: 30 additions & 22 deletions src/aleph/netty.clj
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,30 @@
(.close ch)
(.sendUpstream ctx evt)))))

(def pipeline-handlers
(memoize
(fn [pipeline-name]
(let [error-probe (canonical-probe [pipeline-name :errors])
error-handler (fn [evt]
(when-let [ex ^ExceptionEvent (exception-event evt)]
(when-not (instance? ClosedChannelException ex)
(when-not (trace error-probe
{:exception ex
:address (-> ex .getChannel channel-origin)})
(log/error nil ex)))
nil))
traffic-handler (fn [probe-suffix]
(let [traffic-probe (canonical-probe [pipeline-name :traffic probe-suffix])]
(fn [evt]
(when-let [msg (message-event evt)]
(trace traffic-probe
{:address (-> evt channel-event channel-origin)
:bytes (.readableBytes ^ChannelBuffer msg)}))
nil)))]
{:error error-handler
:in (traffic-handler :in)
:out (traffic-handler :out)}))))

(defn create-netty-pipeline
"Creates a pipeline. Each stage must have a name.
Expand All @@ -170,34 +194,17 @@
:stage-b b)"
[pipeline-name & stages]
(let [netty-pipeline (Channels/pipeline)
error-probe (canonical-probe [pipeline-name :errors])
_ (register-probe error-probe)
error-handler (fn [evt]
(when-let [ex ^ExceptionEvent (exception-event evt)]
(when-not (instance? ClosedChannelException ex)
(when-not (trace error-probe
{:exception ex
:address (-> ex .getChannel channel-origin)})
(log/error nil ex)))
nil))
traffic-handler (fn [probe-suffix]
(let [traffic-probe (canonical-probe [pipeline-name :traffic probe-suffix])]
(fn [evt]
(when-let [msg (message-event evt)]
(trace traffic-probe
{:address (-> evt channel-event channel-origin)
:bytes (.readableBytes ^ChannelBuffer msg)}))
nil)))]
{:keys [error in out]} (pipeline-handlers pipeline-name)]
(doseq [[id stage] (partition 2 stages)]
(.addLast netty-pipeline (name id) stage))
(.addFirst netty-pipeline "incoming-traffic"
(upstream-stage (traffic-handler :in)))
(upstream-stage in))
(.addFirst netty-pipeline "outgoing-traffic"
(downstream-stage (traffic-handler :out)))
(downstream-stage out))
(.addLast netty-pipeline "outgoing-error"
(downstream-stage error-handler))
(downstream-stage error))
(.addFirst netty-pipeline "incoming-error"
(upstream-stage error-handler))
(upstream-stage error))
netty-pipeline))

;;;
Expand Down Expand Up @@ -436,6 +443,7 @@

;; intialize client
(run-pipeline (.connect client (InetSocketAddress. ^String host (int port)))
:error-handler (fn [_])
wrap-netty-channel-future
(fn [^Channel netty-channel]

Expand Down
4 changes: 2 additions & 2 deletions test/aleph/benchmark/hello_world.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

#_(def stop-server
(start-http-server
(fn [ch request]
(enqueue ch
(wrap-ring-handler
(fn [request]
{:status 200
:content-type "text/plain"
:body (str (swap! counter inc))}))
Expand Down

0 comments on commit 06f4a21

Please sign in to comment.