Skip to content

Commit

Permalink
Merge pull request #4 from Motiva-AI/issue-2/replace-interceptor-impl…
Browse files Browse the repository at this point in the history
…ementation

Issue 2/replace interceptor implementation
  • Loading branch information
Quantisan committed Jun 12, 2021
2 parents 69a581b + 35bb49f commit c0ea44b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 91 deletions.
1 change: 1 addition & 0 deletions project.clj
Expand Up @@ -22,6 +22,7 @@
[org.clojure/tools.logging "1.1.0"]
[org.clojure/data.json "2.2.2"]
[org.clojure/core.async "1.3.618"]
[metosin/sieppari "0.0.0-alpha13"]
[com.amazonaws/aws-java-sdk-iam "1.11.1007"]
[com.amazonaws/aws-java-sdk-sts "1.11.1007"]
[com.amazonaws/aws-java-sdk-stepfunctions "1.11.1007"]])
Expand Down
16 changes: 5 additions & 11 deletions src/stepwise/activities.clj
Expand Up @@ -43,20 +43,14 @@
[activity-name (ensure activity-name)]))
activity-names))

(defn make-handler-interceptor [handler-fn]
[:handler
{:before (fn [{input :input :as env}]
(assoc env :output (handler-fn input)))}])

(defn identity-handler-fn [input] input)

(defn compile [handler]
(if (fn? handler)
handler
(interceptors/compile (into (vec (:interceptors handler))
[(make-handler-interceptor (get handler
:handler-fn
identity-handler-fn))]))))
(interceptors/compile
(vec (:interceptors handler))
(get handler
:handler-fn
identity))))

(defn compile-all [activity->handler]
(into {}
Expand Down
15 changes: 8 additions & 7 deletions src/stepwise/interceptors.clj
Expand Up @@ -27,19 +27,20 @@
false))]
(when continue? (recur)))))

(defn send-heartbeat-interceptor-fn
(defn send-heartbeat-interceptor
"Usage:
(stepwise/start-workers!
::addr
{:handler-fn add
:interceptors [[:send-heartbeat
{:before (send-heartbeat-interceptor-fn 10)}]]}
:interceptors [send-heartbeat-interceptor [...] [...] ...]}
"
[n-seconds]
(fn [{{heartbeat-fn :send-heartbeat} :context
:as env}]
(beat-heart-every-n-seconds! (heartbeat-fn) n-seconds)
env))
[:send-heartbeat
{:enter
(fn [{send-heartbeat-fn :send-heartbeat-fn
:as ctx}]
(beat-heart-every-n-seconds! (send-heartbeat-fn) n-seconds)
ctx)}])

120 changes: 56 additions & 64 deletions src/stepwise/interceptors/core.clj
@@ -1,77 +1,69 @@
(ns stepwise.interceptors.core
(:refer-clojure :exclude [compile])
(:require [clojure.tools.logging :as log]))
(:require [sieppari.core :as s]))

; cribbed from re-frame -- thanks re-frame!
(defn well-formed-interceptor-tuple?
"Interceptor-tuple should be a tuple of the form,
[:name {:enter (fn [ctx] ... (update ctx :request myfn1))
:leave (fn [ctx] ... (update ctx :response myfn2))}]
(defn- invoke-interceptor-fn
; TODO clean up names
[context [id interceptor] direction]
(log/trace (prn-str {:interceptor id
:direction direction
:context context}))
(if-let [f (get interceptor direction)]
(f context)
context))
A few notes:
(defn- invoke-interceptors
([context direction]
(loop [context context]
(let [queue (:queue context)] ;; future interceptors
(if (empty? queue)
context
(let [interceptor (peek queue) ;; next interceptor to call
stack (:stack context)] ;; already completed interceptors
(recur (-> context
(assoc :queue (pop queue)
:stack (conj stack interceptor))
(invoke-interceptor-fn interceptor direction)))))))))
1. `:enter` and `:leave` functions need to return `ctx` or an updated
version of `ctx` for the next interceptor in the chain
2. either `:enter` or `:leave` fn can be nil if not used
3. incoming data are tucked away in `:request` for `:enter` fn
4. outgoing data are in `:response` for `:leave` fn.
(defn- change-direction [context]
(-> context
(dissoc :queue)
(assoc :queue (:stack context))))
Reference:
https://github.com/metosin/sieppari"
[interceptor-tuple]
(let [[interceptor-name interceptor-map] interceptor-tuple]
(and (vector? interceptor-tuple)
(= (count interceptor-tuple) 2)
(keyword? interceptor-name)
(map? interceptor-map)
(or (nil? (:enter interceptor-map))
(fn? (:enter interceptor-map)))
(or (nil? (:leave interceptor-map))
(fn? (:leave interceptor-map))))))

(defn assert-named-chain
[named-chain]
(doseq [[index interceptor-tuple] (map vector
(range 0 (count named-chain))
named-chain)]
(when-not (well-formed-interceptor-tuple? interceptor-tuple)
(throw (ex-info "Malformed interceptor-tuple. See (doc stepwise.interceptors.core/well-formed-interceptor-tuple?) for example."
{:index index
:form interceptor-tuple})))))

(defn- assoc-send-heartbeat-fn-to-context-interceptor
[send-heartbeat-fn]
{:enter (fn [ctx] (assoc ctx :send-heartbeat-fn send-heartbeat-fn))})

(defn well-formed-interceptor?
"Interceptor should be a tuple of the form,
[:name {:before (fn [env] ... env)
:after (fn [env] ... env)}]
(defn- interceptor-tuples->interceptors [interceptor-tuples]
(map second interceptor-tuples))

:before and :after fn needs to return env or an updated version of env for
the next interceptor in the queue. Either fn can be nil if not used."
[interceptor]
(let [stage-map (second interceptor)]
(and (vector? interceptor)
(= (count interceptor) 2)
(keyword? (first interceptor))
(map? stage-map)
(or (nil? (:before stage-map))
(fn? (:before stage-map)))
(or (nil? (:after stage-map))
(fn? (:after stage-map))))))
(defn- prepend-this-interceptor-to-interceptor-chain [this-interceptor chain]
(cons this-interceptor
chain))

(defn- form-interceptor-chain [handler-fn interceptors]
(concat interceptors [handler-fn]))

(defn compile
"Returns a fn that exercises a queue of interceptors against a task and returns a result.
"Returns a fn that exercises a chain of interceptor-tuples against a task
and returns a result. Uses metosin/siepppari under the hood."
[named-chain handler-fn]
(assert-named-chain named-chain)

Reference:
https://day8.github.io/re-frame/Interceptors/"
[queue]
(doseq [[index interceptor] (map vector
(range 0 (count queue))
queue)]
(when-not (well-formed-interceptor? interceptor)
(throw (ex-info "Malformed interceptor"
{:index index
:form interceptor}))))
(with-meta (fn execute [input send-heartbeat]
(-> {:input input
:output nil
:context {:send-heartbeat send-heartbeat}
:stack ()
:queue (into () (reverse queue))}
(invoke-interceptors :before)
change-direction
(invoke-interceptors :after)
:output))
(with-meta (fn [input send-heartbeat-fn]
(s/execute
(->> named-chain
(interceptor-tuples->interceptors)
(prepend-this-interceptor-to-interceptor-chain (assoc-send-heartbeat-fn-to-context-interceptor send-heartbeat-fn))
(form-interceptor-chain handler-fn))
input))
{:heartbeat? true}))

25 changes: 17 additions & 8 deletions test/stepwise/interceptors/core_test.clj
@@ -1,16 +1,25 @@
(ns stepwise.interceptors.core-test
(:refer-clojure :exclude [compile])
(:require [clojure.test :as test]
(:require [clojure.test :refer [deftest testing is]]
[stepwise.interceptors.core :as main]))

(def hello-world
[:hello-world {:before (fn [context]
(assoc context :output :hello-world))}])
(def inc-x-interceptor-tuple
[:inc-x
{:enter (fn [ctx] (update-in ctx [:request :x] inc))}])

(defn handler [request]
{:y (inc (:x request))})

(defn execute [queue task]
((main/compile queue) task (fn [])))
((main/compile queue handler) task (fn [])))

(deftest compile-test
(testing "interceptor with handler-fn"
(is (= {:y 42}
(execute [inc-x-interceptor-tuple] {:x 40}))))

(test/deftest compile
(test/is (= :hello-world
(execute [hello-world] {}))))
(testing "send-heartbeat-fn is associated to internal context"
(execute [[:check-heartbeat-fn
{:enter (fn [ctx] (is (fn? (:send-heartbeat-fn ctx))) ctx)}]]
{:x 40})))

6 changes: 5 additions & 1 deletion test/stepwise/interceptors_test.clj
Expand Up @@ -2,7 +2,8 @@
(:refer-clojure :exclude [compile])
(:require [clojure.test :refer [deftest testing is]]
[bond.james :as bond]
[stepwise.interceptors :as i]))
[stepwise.interceptors :as i]
[stepwise.interceptors.core :refer [well-formed-interceptor-tuple?]]))

(defn- heartbeat-fn [] :foo)
(defn- failing-heartbeat-fn [] (throw (Exception. "testing failure mode")))
Expand All @@ -23,3 +24,6 @@
(Thread/sleep (* (inc n) period-sec 1000))
(is (= 1 (-> failing-heartbeat-fn bond/calls count)))))))

(deftest send-heartbeat-interceptor-test
(is (well-formed-interceptor-tuple? (i/send-heartbeat-interceptor 5))))

0 comments on commit c0ea44b

Please sign in to comment.