Skip to content

Commit

Permalink
Clara durability should prefer ThreadLocal to dynamic vars
Browse files Browse the repository at this point in the history
  • Loading branch information
kulkarnipushkar committed Dec 19, 2016
1 parent 2f283aa commit 378a7f8
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 62 deletions.
2 changes: 2 additions & 0 deletions CONTRIBUTORS.md
Expand Up @@ -4,9 +4,11 @@ Cerner Corporation
- Mike Rodriguez [@mrrodriguez]
- William Parker [@WilliamParker]
- Ethan Christian [@EthanEChristian]
- Pushkar Kulkarni [@kulkarnipushkar]

[@rbrush]: https://github.com/rbrush
[@mrrodriguez]: https://github.com/mrrodriguez
[@WilliamParker]: https://github.com/WilliamParker
[@EthanEChristian]: https://github.com/EthanEChristian
[@kulkarnipushkar]: https://github.com/kulkarnipushkar

72 changes: 43 additions & 29 deletions src/main/clojure/clara/rules/durability.clj
Expand Up @@ -27,22 +27,22 @@
;;;; Rulebase serialization helpers.
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(def ^:dynamic *node-id->node-cache*
(def ^:internal ^ThreadLocal node-id->node-cache
"Useful for caching rulebase network nodes by id during serialization and deserialization to
avoid creating multiple object instances for the same node."
nil)
(ThreadLocal.))

(def ^:dynamic *compile-expr-fn*
(def ^:internal ^ThreadLocal compile-expr-fn
"Similar to what is done in clara.rules.compiler, this is a function used to compile
expressions used in nodes of the rulebase network. A common function would cache
evaluated expressions by node-id and expression form to avoid duplicate evalutation
of the same expressions."
nil)
(ThreadLocal.))

(defn- add-node-fn [node fn-key meta-key]
(assoc node
fn-key
(*compile-expr-fn* (:id node) (meta-key (meta node)))))
((.get compile-expr-fn) (:id node) (meta-key (meta node)))))

(defn add-rhs-fn [node]
;; The RHS expression may need to be compiled within the namespace scope of specifically declared
Expand All @@ -69,56 +69,66 @@

(defn add-accumulator [node]
(assoc node
:accumulator ((*compile-expr-fn* (:id node)
:accumulator (((.get compile-expr-fn) (:id node)
(:accum-expr (meta node)))
(:env node))))

(defn node-id->node
"Lookup the node for the given node-id in the *node-id->node-cache* cache."
"Lookup the node for the given node-id in the node-id->node-cache cache."
[node-id]
(@*node-id->node-cache* node-id))
(@(.get node-id->node-cache) node-id))

(defn cache-node
"Cache the node in the *node-id->node-cache*. Returns the node."
"Cache the node in the node-id->node-cache. Returns the node."
[node]
(when-let [node-id (:id node)]
(vswap! *node-id->node-cache* assoc node-id node))
(vswap! (.get node-id->node-cache) assoc node-id node))
node)

(def ^:dynamic *clj-record-holder*
(def ^:internal ^ThreadLocal clj-record-holder
"A cache for writing and reading Clojure records. At write time, an IdentityHashMap can be
used to keep track of repeated references to the same record object instance occurring in
the serialization stream. At read time, a plain ArrayList (mutable and indexed for speed)
can be used to add records to when they are first seen, then look up repeated occurrences
of references to the same record instance later."
nil)
(ThreadLocal.))

(defn clj-record-fact->idx
"Gets the numeric index for the given fact from the *clj-record-holder*."
"Gets the numeric index for the given fact from the clj-record-holder."
[fact]
(.get ^Map *clj-record-holder* fact))
(-> clj-record-holder
^Map (.get)
(.get fact)))

(defn clj-record-holder-add-fact-idx!
"Adds the fact to the *clj-record-holder* with a new index. This can later be retrieved
"Adds the fact to the clj-record-holder with a new index. This can later be retrieved
with clj-record-fact->idx."
[fact]
;; Note the values will be int type here. This shouldn't be a problem since they
;; will be read later as longs and both will be compatible with the index lookup
;; at read-time. This could have a cast to long here, but it would waste time
;; unnecessarily.
(.put ^Map *clj-record-holder* fact (.size ^Map *clj-record-holder*)))
(-> clj-record-holder
^Map (.get)
(.put fact (-> clj-record-holder
^Map (.get)
(.size)))))

(defn clj-record-idx->fact
"The reverse of clj-record-fact->idx. Returns a fact for the given index found
in *clj-record-holder*."
in clj-record-holder."
[id]
(.get ^List *clj-record-holder* id))
(-> clj-record-holder
^List (.get)
(.get id)))

(defn clj-record-holder-add-fact!
"The reverse of clj-record-holder-add-fact-idx!. Adds the fact to the *clj-record-holder*
"The reverse of clj-record-holder-add-fact-idx!. Adds the fact to the clj-record-holder
at the next available index."
[fact]
(.add ^List *clj-record-holder* fact)
(-> clj-record-holder
^List (.get)
(.add fact))
fact)

(defn create-map-entry
Expand Down Expand Up @@ -349,29 +359,33 @@
;;;; Commonly useful session serialization helpers.
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(def ^:dynamic ^List *mem-facts*
(def ^:internal ^ThreadLocal ^List mem-facts
"Useful for ISessionSerializer implementors to have a reference to the facts deserialized via
IWorkingMemorySerializer that are needed to restore working memory whose locations were stubbed
with a MemIdx during serialization."
nil)
(ThreadLocal.))

(def ^:dynamic ^List *mem-internal*
(def ^:internal ^ThreadLocal ^List mem-internal
"Useful for ISessionSerializer implementors to have a reference to the facts deserialized via
IWorkingMemorySerializer that are needed to restore working memory whose locations were stubbed
with a InternalMemIdx during serialization. These objects are specific to the Clare engine,
and as such will be serialized and deserialized along with the memory."
nil)
(ThreadLocal.))

(defn find-mem-idx
"Finds the fact from *mem-facts* at the given index. See docs on *mem-facts* for more."
"Finds the fact from mem-facts at the given index. See docs on mem-facts for more."
[idx]
(.get *mem-facts* idx))
(-> mem-facts
(.get)
(get idx)))

(defn find-internal-idx
"Finds the fact from *mem-internal* at the given index. See docs on *mem-internal* for more."
"Finds the fact from mem-internal at the given index. See docs on mem-internal for more."
[idx]
(.get *mem-internal* idx))

(-> mem-internal
(.get)
(get idx)))

(defn indexed-session-memory-state
"Takes the working memory from a session and strips it down to only the memory needed for
serialization. Along with this, replaces all working memory facts with MemIdx place holders.
Expand Down
50 changes: 25 additions & 25 deletions src/main/clojure/clara/rules/durability/fressian.clj
Expand Up @@ -7,6 +7,7 @@
[clara.rules.memory :as mem]
[clara.rules.engine :as eng]
[clara.rules.compiler :as com]
[clara.rules.platform :as pform]
[schema.core :as s]
[clojure.data.fressian :as fres]
[clojure.java.io :as jio]
Expand Down Expand Up @@ -145,7 +146,7 @@
:writer (reify WriteHandler
(write [_ w o]
(let [node-id (:id o)]
(if (@d/*node-id->node-cache* node-id)
(if (@(.get d/node-id->node-cache) node-id)
(do
(.writeTag w tag-for-cached 1)
(.writeInt w node-id))
Expand All @@ -171,7 +172,7 @@
:writer (reify WriteHandler
(write [_ w o]
(let [node-id (:id o)]
(if (@d/*node-id->node-cache* node-id)
(if (@(.get d/node-id->node-cache) node-id)
(do
(.writeTag w tag-for-cached 1)
(.writeInt w node-id))
Expand Down Expand Up @@ -350,7 +351,7 @@
"clj/record"
{:class clojure.lang.IRecord
;; Write a record a single time per object reference to that record. The record is then "cached"
;; with the IdentityHashMap `d/*clj-record-holder*`. If another reference to this record instance
;; with the IdentityHashMap `d/clj-record-holder`. If another reference to this record instance
;; is encountered later, only the "index" of the record in the map will be written.
:writer (reify WriteHandler
(write [_ w rec]
Expand Down Expand Up @@ -543,54 +544,53 @@
(fn [sources]
(with-open [^FressianWriter wtr
(fres/create-writer out-stream :handlers write-handler-lookup)]
(binding [d/*node-id->node-cache* (volatile! {})
d/*clj-record-holder* record-holder]
(doseq [s sources] (fres/write-object wtr s)))))]

(pform/thread-local-binding [d/node-id->node-cache (volatile! {})
d/clj-record-holder record-holder]
(doseq [s sources] (fres/write-object wtr s)))))]
;; In this case there is nothing to do with memory, so just serialize immediately.
(if (:rulebase-only? opts)
(do-serialize [rulebase])

;; Otherwise memory needs to have facts extracted to return.
(let [{:keys [memory indexed-facts internal-indexed-facts]} (d/indexed-session-memory-state memory)
sources (if (:with-rulebase? opts)
[rulebase internal-indexed-facts memory]
[internal-indexed-facts memory])]

(do-serialize sources)

;; Return the facts needing to be serialized still.
indexed-facts))))

(deserialize [_ mem-facts opts]

(with-open [^FressianReader rdr (fres/create-reader in-stream :handlers read-handler-lookup)]
(let [{:keys [rulebase-only? base-rulebase]} opts

record-holder (ArrayList.)
;; The rulebase should either be given from the base-session or found in
;; the restored session-state.
maybe-base-rulebase (when (and (not rulebase-only?) base-rulebase)
base-rulebase)

rulebase (if maybe-base-rulebase
maybe-base-rulebase
(let [without-opts-rulebase (binding [d/*node-id->node-cache* (volatile! {})
d/*clj-record-holder* record-holder
d/*compile-expr-fn* (memoize (fn [id expr] (com/try-eval expr)))]
(fres/read-object rdr))]
(let [without-opts-rulebase (pform/thread-local-binding [d/node-id->node-cache (volatile! {})
d/compile-expr-fn (memoize (fn [id expr] (com/try-eval expr)))
d/clj-record-holder record-holder]
(fres/read-object rdr))]
(d/rulebase->rulebase-with-opts without-opts-rulebase opts)))]

(if rulebase-only?
rulebase
(d/assemble-restored-session rulebase
(binding [d/*clj-record-holder* record-holder
d/*mem-facts* mem-facts]
;; internal memory contains facts provided by mem-facts
;; thus mem-facts must be bound before the call to read
;; the internal memory
(binding [d/*mem-internal* (fres/read-object rdr)]
(fres/read-object rdr)))
(pform/thread-local-binding [d/clj-record-holder record-holder
d/mem-facts mem-facts]
;; internal memory contains facts provided by mem-facts
;; thus mem-facts must be bound before the call to read
;; the internal memory
(pform/thread-local-binding [d/mem-internal (fres/read-object rdr)]
(fres/read-object rdr)))
opts))))))

(s/defn create-session-serializer
Expand Down
20 changes: 20 additions & 0 deletions src/main/clojure/clara/rules/platform.cljc
Expand Up @@ -56,3 +56,23 @@
(persistent!)))
:cljs
(def tuned-group-by clojure.core/group-by))

#?(:clj
(defmacro thread-local-binding
"Wraps given body in a try block, where it sets each given ThreadLocal binding
and removes it in finally block."
[bindings & body]
(when-not (vector? bindings)
(throw (ex-info "Binding needs to be a vector."
{:bindings bindings})))
(when-not (even? (count bindings))
(throw (ex-info "Needs an even number of forms in binding vector"
{:bindings bindings})))
(let [binding-pairs (partition 2 bindings)]
`(try
~@(for [[tl v] binding-pairs]
`(.set ~tl ~v))
~@body
(finally
~@(for [[tl] binding-pairs]
`(.remove ~tl)))))))
16 changes: 8 additions & 8 deletions src/test/clojure/clara/test_fressian.clj
Expand Up @@ -2,6 +2,7 @@
(:require [clara.rules.durability :as d]
[clara.rules.durability.fressian :as df]
[clojure.data.fressian :as fres]
[clara.rules.platform :as pform]
[clojure.test :refer :all])
(:import [org.fressian
FressianWriter
Expand All @@ -16,16 +17,15 @@
(with-open [os (java.io.ByteArrayOutputStream.)
^FressianWriter wtr (fres/create-writer os :handlers df/write-handler-lookup)]
;; Write
(binding [d/*node-id->node-cache* (volatile! {})
d/*clj-record-holder* (java.util.IdentityHashMap.)]
(fres/write-object wtr x))

(pform/thread-local-binding [d/node-id->node-cache (volatile! {})
d/clj-record-holder (java.util.IdentityHashMap.)]
(fres/write-object wtr x))
;; Read
(let [data (.toByteArray os)]
(binding [d/*clj-record-holder* (java.util.ArrayList.)]
(with-open [is (java.io.ByteArrayInputStream. data)
^FressianReader rdr (fres/create-reader is :handlers df/read-handler-lookup)]
(fres/read-object rdr))))))
(pform/thread-local-binding [d/clj-record-holder (java.util.ArrayList.)]
(with-open [is (java.io.ByteArrayInputStream. data)
^FressianReader rdr (fres/create-reader is :handlers df/read-handler-lookup)]
(fres/read-object rdr))))))

(defn serde [x]
;; Tests all serialization cases in a way that SerDe's 2 times to show that the serialization to
Expand Down

0 comments on commit 378a7f8

Please sign in to comment.