diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 60f870c9..e5dd5d41 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -4,9 +4,11 @@ Cerner Corporation - Mike Rodriguez [@mrrodriguez] - William Parker [@WilliamParker] - Ethan Christian [@EthanEChristian] +- Pushkar Kulkarni [@kulkarnipushkar] [@rbrush]: https://github.com/rbrush [@mrrodriguez]: https://github.com/mrrodriguez [@WilliamParker]: https://github.com/WilliamParker [@EthanEChristian]: https://github.com/EthanEChristian +[@kulkarnipushkar]: https://github.com/kulkarnipushkar diff --git a/src/main/clojure/clara/rules/durability.clj b/src/main/clojure/clara/rules/durability.clj index 7117028d..84bfc704 100644 --- a/src/main/clojure/clara/rules/durability.clj +++ b/src/main/clojure/clara/rules/durability.clj @@ -27,22 +27,22 @@ ;;;; Rulebase serialization helpers. ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(def ^:dynamic *node-id->node-cache* +(def ^:internal ^ThreadLocal node-id->node-cache "Useful for caching rulebase network nodes by id during serialization and deserialization to avoid creating multiple object instances for the same node." - nil) + (ThreadLocal.)) -(def ^:dynamic *compile-expr-fn* +(def ^:internal ^ThreadLocal compile-expr-fn "Similar to what is done in clara.rules.compiler, this is a function used to compile expressions used in nodes of the rulebase network. A common function would cache evaluated expressions by node-id and expression form to avoid duplicate evalutation of the same expressions." - nil) + (ThreadLocal.)) (defn- add-node-fn [node fn-key meta-key] (assoc node fn-key - (*compile-expr-fn* (:id node) (meta-key (meta node))))) + ((.get compile-expr-fn) (:id node) (meta-key (meta node))))) (defn add-rhs-fn [node] ;; The RHS expression may need to be compiled within the namespace scope of specifically declared @@ -69,56 +69,66 @@ (defn add-accumulator [node] (assoc node - :accumulator ((*compile-expr-fn* (:id node) + :accumulator (((.get compile-expr-fn) (:id node) (:accum-expr (meta node))) (:env node)))) (defn node-id->node - "Lookup the node for the given node-id in the *node-id->node-cache* cache." + "Lookup the node for the given node-id in the node-id->node-cache cache." [node-id] - (@*node-id->node-cache* node-id)) + (@(.get node-id->node-cache) node-id)) (defn cache-node - "Cache the node in the *node-id->node-cache*. Returns the node." + "Cache the node in the node-id->node-cache. Returns the node." [node] (when-let [node-id (:id node)] - (vswap! *node-id->node-cache* assoc node-id node)) + (vswap! (.get node-id->node-cache) assoc node-id node)) node) -(def ^:dynamic *clj-record-holder* +(def ^:internal ^ThreadLocal clj-record-holder "A cache for writing and reading Clojure records. At write time, an IdentityHashMap can be used to keep track of repeated references to the same record object instance occurring in the serialization stream. At read time, a plain ArrayList (mutable and indexed for speed) can be used to add records to when they are first seen, then look up repeated occurrences of references to the same record instance later." - nil) + (ThreadLocal.)) (defn clj-record-fact->idx - "Gets the numeric index for the given fact from the *clj-record-holder*." + "Gets the numeric index for the given fact from the clj-record-holder." [fact] - (.get ^Map *clj-record-holder* fact)) + (-> clj-record-holder + ^Map (.get) + (.get fact))) (defn clj-record-holder-add-fact-idx! - "Adds the fact to the *clj-record-holder* with a new index. This can later be retrieved + "Adds the fact to the clj-record-holder with a new index. This can later be retrieved with clj-record-fact->idx." [fact] ;; Note the values will be int type here. This shouldn't be a problem since they ;; will be read later as longs and both will be compatible with the index lookup ;; at read-time. This could have a cast to long here, but it would waste time ;; unnecessarily. - (.put ^Map *clj-record-holder* fact (.size ^Map *clj-record-holder*))) + (-> clj-record-holder + ^Map (.get) + (.put fact (-> clj-record-holder + ^Map (.get) + (.size))))) (defn clj-record-idx->fact "The reverse of clj-record-fact->idx. Returns a fact for the given index found - in *clj-record-holder*." + in clj-record-holder." [id] - (.get ^List *clj-record-holder* id)) + (-> clj-record-holder + ^List (.get) + (.get id))) (defn clj-record-holder-add-fact! - "The reverse of clj-record-holder-add-fact-idx!. Adds the fact to the *clj-record-holder* + "The reverse of clj-record-holder-add-fact-idx!. Adds the fact to the clj-record-holder at the next available index." [fact] - (.add ^List *clj-record-holder* fact) + (-> clj-record-holder + ^List (.get) + (.add fact)) fact) (defn create-map-entry @@ -349,29 +359,33 @@ ;;;; Commonly useful session serialization helpers. ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(def ^:dynamic ^List *mem-facts* +(def ^:internal ^ThreadLocal ^List mem-facts "Useful for ISessionSerializer implementors to have a reference to the facts deserialized via IWorkingMemorySerializer that are needed to restore working memory whose locations were stubbed with a MemIdx during serialization." - nil) + (ThreadLocal.)) -(def ^:dynamic ^List *mem-internal* +(def ^:internal ^ThreadLocal ^List mem-internal "Useful for ISessionSerializer implementors to have a reference to the facts deserialized via IWorkingMemorySerializer that are needed to restore working memory whose locations were stubbed with a InternalMemIdx during serialization. These objects are specific to the Clare engine, and as such will be serialized and deserialized along with the memory." - nil) + (ThreadLocal.)) (defn find-mem-idx - "Finds the fact from *mem-facts* at the given index. See docs on *mem-facts* for more." + "Finds the fact from mem-facts at the given index. See docs on mem-facts for more." [idx] - (.get *mem-facts* idx)) + (-> mem-facts + (.get) + (get idx))) (defn find-internal-idx - "Finds the fact from *mem-internal* at the given index. See docs on *mem-internal* for more." + "Finds the fact from mem-internal at the given index. See docs on mem-internal for more." [idx] - (.get *mem-internal* idx)) - + (-> mem-internal + (.get) + (get idx))) + (defn indexed-session-memory-state "Takes the working memory from a session and strips it down to only the memory needed for serialization. Along with this, replaces all working memory facts with MemIdx place holders. diff --git a/src/main/clojure/clara/rules/durability/fressian.clj b/src/main/clojure/clara/rules/durability/fressian.clj index da38a81d..479ac1a4 100644 --- a/src/main/clojure/clara/rules/durability/fressian.clj +++ b/src/main/clojure/clara/rules/durability/fressian.clj @@ -7,6 +7,7 @@ [clara.rules.memory :as mem] [clara.rules.engine :as eng] [clara.rules.compiler :as com] + [clara.rules.platform :as pform] [schema.core :as s] [clojure.data.fressian :as fres] [clojure.java.io :as jio] @@ -145,7 +146,7 @@ :writer (reify WriteHandler (write [_ w o] (let [node-id (:id o)] - (if (@d/*node-id->node-cache* node-id) + (if (@(.get d/node-id->node-cache) node-id) (do (.writeTag w tag-for-cached 1) (.writeInt w node-id)) @@ -171,7 +172,7 @@ :writer (reify WriteHandler (write [_ w o] (let [node-id (:id o)] - (if (@d/*node-id->node-cache* node-id) + (if (@(.get d/node-id->node-cache) node-id) (do (.writeTag w tag-for-cached 1) (.writeInt w node-id)) @@ -350,7 +351,7 @@ "clj/record" {:class clojure.lang.IRecord ;; Write a record a single time per object reference to that record. The record is then "cached" - ;; with the IdentityHashMap `d/*clj-record-holder*`. If another reference to this record instance + ;; with the IdentityHashMap `d/clj-record-holder`. If another reference to this record instance ;; is encountered later, only the "index" of the record in the map will be written. :writer (reify WriteHandler (write [_ w rec] @@ -543,54 +544,53 @@ (fn [sources] (with-open [^FressianWriter wtr (fres/create-writer out-stream :handlers write-handler-lookup)] - (binding [d/*node-id->node-cache* (volatile! {}) - d/*clj-record-holder* record-holder] - (doseq [s sources] (fres/write-object wtr s)))))] - + (pform/thread-local-binding [d/node-id->node-cache (volatile! {}) + d/clj-record-holder record-holder] + (doseq [s sources] (fres/write-object wtr s)))))] + ;; In this case there is nothing to do with memory, so just serialize immediately. (if (:rulebase-only? opts) (do-serialize [rulebase]) - + ;; Otherwise memory needs to have facts extracted to return. (let [{:keys [memory indexed-facts internal-indexed-facts]} (d/indexed-session-memory-state memory) sources (if (:with-rulebase? opts) [rulebase internal-indexed-facts memory] [internal-indexed-facts memory])] - + (do-serialize sources) ;; Return the facts needing to be serialized still. indexed-facts)))) - + (deserialize [_ mem-facts opts] - + (with-open [^FressianReader rdr (fres/create-reader in-stream :handlers read-handler-lookup)] (let [{:keys [rulebase-only? base-rulebase]} opts - + record-holder (ArrayList.) ;; The rulebase should either be given from the base-session or found in ;; the restored session-state. maybe-base-rulebase (when (and (not rulebase-only?) base-rulebase) base-rulebase) - rulebase (if maybe-base-rulebase maybe-base-rulebase - (let [without-opts-rulebase (binding [d/*node-id->node-cache* (volatile! {}) - d/*clj-record-holder* record-holder - d/*compile-expr-fn* (memoize (fn [id expr] (com/try-eval expr)))] - (fres/read-object rdr))] + (let [without-opts-rulebase (pform/thread-local-binding [d/node-id->node-cache (volatile! {}) + d/compile-expr-fn (memoize (fn [id expr] (com/try-eval expr))) + d/clj-record-holder record-holder] + (fres/read-object rdr))] (d/rulebase->rulebase-with-opts without-opts-rulebase opts)))] - + (if rulebase-only? rulebase (d/assemble-restored-session rulebase - (binding [d/*clj-record-holder* record-holder - d/*mem-facts* mem-facts] - ;; internal memory contains facts provided by mem-facts - ;; thus mem-facts must be bound before the call to read - ;; the internal memory - (binding [d/*mem-internal* (fres/read-object rdr)] - (fres/read-object rdr))) + (pform/thread-local-binding [d/clj-record-holder record-holder + d/mem-facts mem-facts] + ;; internal memory contains facts provided by mem-facts + ;; thus mem-facts must be bound before the call to read + ;; the internal memory + (pform/thread-local-binding [d/mem-internal (fres/read-object rdr)] + (fres/read-object rdr))) opts)))))) (s/defn create-session-serializer diff --git a/src/main/clojure/clara/rules/platform.cljc b/src/main/clojure/clara/rules/platform.cljc index 2e8e2bb3..3aa920bd 100644 --- a/src/main/clojure/clara/rules/platform.cljc +++ b/src/main/clojure/clara/rules/platform.cljc @@ -56,3 +56,23 @@ (persistent!))) :cljs (def tuned-group-by clojure.core/group-by)) + +#?(:clj + (defmacro thread-local-binding + "Wraps given body in a try block, where it sets each given ThreadLocal binding + and removes it in finally block." + [bindings & body] + (when-not (vector? bindings) + (throw (ex-info "Binding needs to be a vector." + {:bindings bindings}))) + (when-not (even? (count bindings)) + (throw (ex-info "Needs an even number of forms in binding vector" + {:bindings bindings}))) + (let [binding-pairs (partition 2 bindings)] + `(try + ~@(for [[tl v] binding-pairs] + `(.set ~tl ~v)) + ~@body + (finally + ~@(for [[tl] binding-pairs] + `(.remove ~tl))))))) \ No newline at end of file diff --git a/src/test/clojure/clara/test_fressian.clj b/src/test/clojure/clara/test_fressian.clj index 8020b6a4..fd684117 100644 --- a/src/test/clojure/clara/test_fressian.clj +++ b/src/test/clojure/clara/test_fressian.clj @@ -2,6 +2,7 @@ (:require [clara.rules.durability :as d] [clara.rules.durability.fressian :as df] [clojure.data.fressian :as fres] + [clara.rules.platform :as pform] [clojure.test :refer :all]) (:import [org.fressian FressianWriter @@ -16,16 +17,15 @@ (with-open [os (java.io.ByteArrayOutputStream.) ^FressianWriter wtr (fres/create-writer os :handlers df/write-handler-lookup)] ;; Write - (binding [d/*node-id->node-cache* (volatile! {}) - d/*clj-record-holder* (java.util.IdentityHashMap.)] - (fres/write-object wtr x)) - + (pform/thread-local-binding [d/node-id->node-cache (volatile! {}) + d/clj-record-holder (java.util.IdentityHashMap.)] + (fres/write-object wtr x)) ;; Read (let [data (.toByteArray os)] - (binding [d/*clj-record-holder* (java.util.ArrayList.)] - (with-open [is (java.io.ByteArrayInputStream. data) - ^FressianReader rdr (fres/create-reader is :handlers df/read-handler-lookup)] - (fres/read-object rdr)))))) + (pform/thread-local-binding [d/clj-record-holder (java.util.ArrayList.)] + (with-open [is (java.io.ByteArrayInputStream. data) + ^FressianReader rdr (fres/create-reader is :handlers df/read-handler-lookup)] + (fres/read-object rdr)))))) (defn serde [x] ;; Tests all serialization cases in a way that SerDe's 2 times to show that the serialization to