Skip to content

Commit

Permalink
More pipeline work.
Browse files Browse the repository at this point in the history
This gets us closer to a workable api. It still has zero docs.
  • Loading branch information
tobias committed Dec 14, 2012
1 parent 776c6e6 commit 05bca03
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 122 deletions.
@@ -1,26 +1,16 @@
(ns in-container.pipeline
(:use clojure.test)
(:require [immutant.messaging.pipeline :as pl]
[immutant.messaging :as msg]))

(defn pio [f]
(fn [i]
(println "INPUT:" i)
(let [o (f i)]
(println "OUTPUT:" o)
o)))
(:require [immutant.pipeline :as pl]
[immutant.messaging :as msg]))

(defn random-queue []
(msg/as-queue (str (java.util.UUID/randomUUID))))
(let [q (msg/as-queue (str (java.util.UUID/randomUUID)))]
(msg/start q)
q))

(defn dollarizer [s]
(.replace s "S" "$"))

(defn make-sleeper [ms]
(fn [x]
(Thread/sleep ms)
x))

(deftest it-should-work
(let [result-queue (random-queue)
pl (pl/pipeline
Expand All @@ -29,46 +19,100 @@
(memfn toUpperCase)
dollarizer
#(.replace % "$" "Ke$ha")
#(msg/publish result-queue %))]
(msg/start result-queue)
(msg/publish pl "hambiscuit")
(partial msg/publish result-queue))]
(pl "hambiscuit")
(is (= "HAXBIKe$haCUIT" (msg/receive result-queue)))))

(deftest it-should-work-with-a-result-queue
(let [result-queue "queue.pl.result-opt"
pl (pl/pipeline "result-queue"
#(.replace % "y" "x")
(memfn toUpperCase)
dollarizer
#(.replace % "$" "NipseyHu$$le")
:result-destination result-queue)]
(msg/publish pl "gravybiscuit")
(is (= "GRAVXBINipseyHu$$leCUIT" (msg/receive result-queue)))))

#_(deftest it-should-work-with-concurrency
(deftest it-should-work-with-a-step-name-on-publish
(let [result-queue (random-queue)
pl (pl/pipeline
"concurrency"
"basic-step"
#(.replace % "m" "x")
(pl/step (memfn toUpperCase) :name :uc)
dollarizer
(pl/step (make-sleeper 500) :concurrency 10)
:result-destination result-queue)]
#(.replace % "$" "Ke$ha")
(partial msg/publish result-queue))]
(pl "hambiscuit" :step :uc)
(is (= "HAMBIKe$haCUIT" (msg/receive result-queue)))))

(deftest it-should-work-with-concurrency
(let [result-queue (random-queue)
pl (pl/pipeline
"concurrency"
(pl/step (fn [_]
(Thread/sleep 500)
(.getName (Thread/currentThread))) :concurrency 5)
(partial msg/publish result-queue))]
(dotimes [n 10]
(pl "yo"))
(let [results (->> (range 10)
(map (fn [_] (msg/receive result-queue :timeout 400)))
set)]
(is (<= 2 (count results))))))

(deftest it-should-work-with-global-concurrency
(let [result-queue (random-queue)
pl (pl/pipeline
"concurrency-global"
(fn [_]
(Thread/sleep 500)
(.getName (Thread/currentThread)))
(partial msg/publish result-queue)
:concurrency 5)]
(dotimes [n 10]
(msg/publish pl "hamboneS"))
(let [results
(keep identity
(map (fn [_] (msg/receive result-queue :timeout 510))
(range 10)))]
(is (= 10 (count results)))
(is (= (take 10 (repeat "hambone$")) results)))))
(pl "yo"))
(let [results (->> (range 10)
(map (fn [_] (msg/receive result-queue :timeout 400)))
set)]
(is (<= 2 (count results))))))

(deftest step-concurrency-should-override-global
(let [result-queue (random-queue)
pl (pl/pipeline
"concurrency-global-override"
(pl/step (fn [_]
(Thread/sleep 500)
(.getName (Thread/currentThread))) :concurrency 5)
(partial msg/publish result-queue)
:concurrency 1)]
(dotimes [n 10]
(pl "yo"))
(let [results (->> (range 10)
(map (fn [_] (msg/receive result-queue :timeout 400)))
(remove nil?)
set)]
(is (<= 2 (count results))))))

(deftest *pipeline*-should-be-bound
(let [result-queue (random-queue)
pl (pl/pipeline
"pipeline var"
(fn [_] (msg/publish result-queue (str pl/*pipeline*))))]
(msg/start result-queue)
(msg/publish pl "hi")
(is (= (str pl) (msg/receive result-queue)))))
(fn [_] (msg/publish result-queue (meta pl/*pipeline*))))]
(pl "hi")
(is (= (-> pl meta :pipeline) (:pipeline (msg/receive result-queue))))))

(deftest *current-step*-and-*next-step*-should-be-bound
(let [result-queue (random-queue)
pl (pl/pipeline
"step vars"
(fn [_] (msg/publish result-queue [pl/*current-step* pl/*next-step*]))
(fn [_]))]
(pl "hi")
(is (= ["0" "1"] (msg/receive result-queue)))))

(deftest *current-step*-and-*next-step*-should-be-bound-when-steps-are-named
(let [result-queue (random-queue)
pl (pl/pipeline
"step vars redux"
(pl/step
(fn [_]
(msg/publish result-queue [pl/*current-step* pl/*next-step*]))
:name "one")
(pl/step
(fn [_])
:name "two"))]
(pl "hi")
(is (= ["one" "two"] (msg/receive result-queue)))))

(defn chucker [_]
(throw (Exception. "boom")))
Expand All @@ -81,8 +125,7 @@
chucker
:error-handler (fn [e m]
(msg/publish result-queue "caught it!")))]
(msg/start result-queue)
(msg/publish pl "hi")
(pl "hi")
(is (= "caught it!" (msg/receive result-queue)))))

(deftest step-error-handling-should-work
Expand All @@ -92,8 +135,7 @@
(pl/step chucker
:error-handler (fn [e m]
(msg/publish result-queue "from step"))))]
(msg/start result-queue)
(msg/publish pl "hi")
(pl "hi")
(is (= "from step" (msg/receive result-queue)))))

(deftest step-error-handling-should-override-global
Expand All @@ -105,8 +147,7 @@
(msg/publish result-queue "from step")))
:error-handler (fn [e m]
(msg/publish result-queue "from global")))]
(msg/start result-queue)
(msg/publish pl "hi")
(pl "hi")
(is (= "from step" (msg/receive result-queue)))))

(deftest *pipeline*-should-be-bound-in-an-error-handler
Expand All @@ -115,10 +156,9 @@
"pipeline var in eh"
chucker
:error-handler (fn [e m]
(msg/publish result-queue (str pl/*pipeline*))))]
(msg/start result-queue)
(msg/publish pl "hi")
(is (= (str pl) (msg/receive result-queue))))))
(msg/publish result-queue (meta pl/*pipeline*))))]
(pl "hi")
(is (= (-> pl meta :pipeline) (:pipeline (msg/receive result-queue)))))))



67 changes: 0 additions & 67 deletions modules/messaging/src/main/clojure/immutant/messaging/pipeline.clj

This file was deleted.

101 changes: 101 additions & 0 deletions modules/messaging/src/main/clojure/immutant/pipeline.clj
@@ -0,0 +1,101 @@
;; Copyright 2008-2012 Red Hat, Inc, and individual contributors.
;;
;; This is free software; you can redistribute it and/or modify it
;; under the terms of the GNU Lesser General Public License as
;; published by the Free Software Foundation; either version 2.1 of
;; the License, or (at your option) any later version.
;;
;; This software is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; Lesser General Public License for more details.
;;
;; You should have received a copy of the GNU Lesser General Public
;; License along with this software; if not, write to the Free
;; Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
;; 02110-1301 USA, or see the FSF site: http://www.fsf.org.

(ns immutant.pipeline
(:use [immutant.util :only (mapply)])
(:require [clojure.tools.logging :as log]
[immutant.messaging :as msg]))

(def ^:dynamic *pipeline* nil)
(def ^:dynamic *current-step* nil)
(def ^:dynamic *next-step* nil)

(def ^:private pipelines (atom #{}))

(defn- wrap-error-handler [f {:keys [error-handler]}]
(if error-handler
(fn [m]
(try
(f m)
(catch Exception e
(error-handler e m))))
f))

(defn- wrap-step-bindings [f current next]
(binding [*current-step* current
*next-step* next]
(bound-fn* f)))

(defn- pipeline-listen [pl opts f]
(let [{:keys [step next-step]} (meta f)
opts (-> opts
(merge (meta f))
(assoc :selector
(str "step = '" step "'")))
wrapped-f (-> f
(wrap-error-handler opts)
(wrap-step-bindings step next-step))]
(mapply msg/listen
pl
(if next-step
#(msg/publish pl (wrapped-f %)
:properties {"step" next-step})
wrapped-f)
opts)))

(defn- pipeline-fn [pl entry]
(vary-meta
(fn [m & {:keys [step] :or {step entry}}]
(msg/publish pl m :properties {"step" step}))
assoc
:pipeline pl))

(defn- named-steps [fns]
(let [step-names (-> (map-indexed (fn [n s]
(str (or (:name (meta s)) n))) fns)
vec
(conj nil))]
(map
(fn [f [step next-step]]
(vary-meta f assoc :step step :next-step next-step))
fns (partition 2 1 step-names))))

(defn pipeline [name & args]
(let [steps (named-steps (take-while fn? args))
{:as opts} (drop-while fn? args)
pl (str "queue.pipeline-" name)
pl-fn (pipeline-fn pl (-> steps first meta :step))]
(if (some #{pl} @pipelines)
(throw (IllegalArgumentException.
(str "A pipeline named " name " already exists."))))
(mapply msg/start pl opts)
(binding [*pipeline* pl-fn]
(let [listeners (->> steps
(map (partial pipeline-listen pl opts))
doall)]
(swap! pipelines conj pl)
(vary-meta pl-fn assoc :listeners listeners)))))

(defn step [f & {:as opts}]
(vary-meta f merge opts))

(defn stop [pl & args]
(let [{:keys [pipeline listeners]} (meta pl)]
(swap! pipelines disj pipeline)
(doseq [l listeners]
(msg/unlisten l))
(apply msg/stop pipeline args)))

0 comments on commit 05bca03

Please sign in to comment.