Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

rename "resources" to "services", implement actionLog as a service

  • Loading branch information...
commit daa011a18f98a11a8fdcdbc2eb2cf0ee298bb8ba 1 parent 9f07313
@stuarthalloway stuarthalloway authored
View
45 examples/hello_world.clj
@@ -48,6 +48,7 @@
:sim/systemURI (str "datomic:mem://" (d/squuid))
:sim/processCount 10}))
+;; codebase for the sim
(defn assoc-codebase-tx [entities]
(let [codebase (gen-codebase)
cid (:db/id codebase)]
@@ -56,8 +57,12 @@
(mapv #(assoc {:db/id (:db/id %)} :source/codebase cid) entities))))
(d/transact sim-conn (assoc-codebase-tx [trading-test trading-sim]))
+;; action log for this sim
+(def action-log
+ (sim/create-action-log sim-conn trading-sim))
+
;; clock for this sim
-(def sim-clock (sim/create-fixed-clock sim-conn trading-sim {:clock/multiplier 480}))
+(def sim-clock (sim/create-fixed-clock sim-conn trading-sim {:clock/multiplier 960}))
;; run the processes for this sim
;; at scale each process would have its own box
@@ -119,8 +124,44 @@
(->> trader-ids
(map (fn [[e]] (:db/id e)))
(map (partial trading/balance traderdb))))
-(apply + trader-balances)
+(assert (= 100000 (apply + trader-balances)))
;; sim written in hopes that balances will not go negative
;; but they might, because system under test does not check!
(filter neg? trader-balances)
+
+;; test nonfunctional requirements, e.g. some metric on trade times
+(def rules
+ '[[[actionTime ?sim ?actionType ?action ?nsec]
+ [?test :test/sims ?sim]
+ [?test :test/agents ?agent]
+ [?agent :agent/actions ?action]
+ [?action :action/type ?actionType]
+ [?log :actionLog/action ?action]
+ [?log :actionLog/sim ?sim]
+ [?log :actionLog/nsec ?nsec]]])
+
+;; count of trade times should match count of trades
+(assert (= (count actions)
+ (count (d/q '[:find ?nsec
+ :with ?action
+ :in $ % ?sim ?action-type
+ :where (actionTime ?sim ?action-type ?action ?nsec)]
+ simdb rules (:db/id trading-sim) :action.type/trade))))
+
+(def mean-trade-time-msec
+ (-> (d/q '[:find (avg ?nsec)
+ :with ?action
+ :in $ % ?sim ?action-type
+ :where (actionTime ?sim ?action-type ?action ?nsec)]
+ simdb rules (:db/id trading-sim) :action.type/trade)
+ ffirst
+ (/ 1000 1000)))
+
+;; This could make much more sophisticated use of statistics.
+;; And, because the work is against a database, not a live
+;; test, increased sophistication could be brought to bear
+;; at any time.
+(assert (< mean-trade-time-msec 2))
+
+
View
4 examples/simulant/examples/trading.clj
@@ -13,11 +13,11 @@
(defn trade
[conn from to amount]
(let [tx (d/tempid :db.part/tx)]
- (d/transact
+ (d/transact-async
conn
[[:db/add tx :transfer/amount amount]
[:db/add tx :transfer/from (e from)]
- [:db/add tx :transfer/to (e to)]])) )
+ [:db/add tx :transfer/to (e to)]])))
(defn balance
[db trader-id]
View
10 examples/simulant/examples/trading_sim.clj
@@ -99,7 +99,13 @@
trade-db (d/db trade-conn)
amount (:transfer/amount action)
from (find-by trade-db :trader/id (-> action :transfer/from :db/id))
- to (find-by trade-db :trader/id (-> action :transfer/to :db/id))]
- (trading/trade trade-conn from to amount)))
+ to (find-by trade-db :trader/id (-> action :transfer/to :db/id))
+ action-log (getx sim/*services* :simulant.sim/actionLog)
+ before (System/nanoTime)]
+ @(trading/trade trade-conn from to amount)
+ (action-log [{:actionLog/nsec (- (System/nanoTime) before)
+ :db/id (d/tempid :db.part/user)
+ :actionLog/sim (e sim)
+ :actionLog/action (e action)}])))
View
57 resources/simulant/schema.edn
@@ -15,7 +15,10 @@
:db/ident :test}
{:db.install/_partition :db.part/db,
:db/id #db/id[:db.part/db],
- :db/ident :sim}]]
+ :db/ident :sim}
+ {:db.install/_partition :db.part/db,
+ :db/id #db/id[:db.part/db],
+ :db/ident :log}]]
:clock
[[{:db/id #db/id[:db.part/db]
@@ -107,6 +110,28 @@
:db/cardinality :db.cardinality/many
:db.install/_attribute :db.part/db}]]
+ :actionLog
+ [[{:db/id #db/id[:db.part/db]
+ :db/ident :actionLog/action
+ :db/valueType :db.type/ref
+ :db/doc "Action this log entry is associated with"
+ :db/cardinality :db.cardinality/one
+ :db.install/_attribute :db.part/db}
+ {:db/id #db/id[:db.part/db]
+ :db/ident :actionLog/sim
+ :db/valueType :db.type/ref
+ :db/doc "Sim this log entry is associated with"
+ :db/cardinality :db.cardinality/one
+ :db.install/_attribute :db.part/db}
+ {:db/id #db/id[:db.part/db]
+ :db/ident :actionLog/nsec
+ :db/valueType :db.type/long
+ :db/doc "How long this action took, in nsec"
+ :db/cardinality :db.cardinality/one
+ :db.install/_attribute :db.part/db}
+ {:db/id #db/id [:db.part/db]
+ :db/ident :simulant.sim/actionLog}]]
+
:action
[[{:db/id #db/id[:db.part/db]
:db/ident :action/atTime
@@ -144,11 +169,6 @@
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id[:db.part/db]
- :db/ident :process/resource-manager
- :db/valueType :db.type/ref
- :db/cardinality :db.cardinality/one
- :db.install/_attribute :db.part/db}
- {:db/id #db/id[:db.part/db]
:db/ident :process/state
:db/valueType :db.type/ref
:db/cardinality :db.cardinality/one
@@ -166,11 +186,36 @@
:db/unique :db.unique/value
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}]]
+
+ :services
+ [[{:db/id #db/id[:db.part/db]
+ :db/ident :service/type
+ :db/valueType :db.type/ref
+ :db/index true
+ :db/cardinality :db.cardinality/one
+ :db.install/_attribute :db.part/db}
+ {:db/id #db/id[:db.part/db]
+ :db/ident :service/key
+ :db/valueType :db.type/ref
+ :db/index true
+ :db/cardinality :db.cardinality/one
+ :db.install/_attribute :db.part/db}
+ {:db/id #db/id[:db.part/db]
+ :db/doc "Service that accumulates events during a sim run,
+and then transacts them into the sim database at the
+end of the run."
+ :db/ident :service.type/actionLog}]]
:sim
[[{:db/id #db/id[:db.part/db]
:db/ident :sim.type/basic}
{:db/id #db/id[:db.part/db]
+ :db/ident :sim/services
+ :db/valueType :db.type/ref
+ :db/cardinality :db.cardinality/many
+ :db/doc "Services used by this sim"
+ :db.install/_attribute :db.part/db}
+ {:db/id #db/id[:db.part/db]
:db/ident :sim/type
:db/valueType :db.type/ref
:db/cardinality :db.cardinality/one
View
129 src/simulant/sim.clj
@@ -20,7 +20,9 @@
(:use simulant.util)
(:require [clojure.java.io :as io]
[datomic.api :as d])
- (:import [java.util.concurrent Executors]))
+ (:import
+ [java.io Closeable File PushbackReader]
+ [java.util.concurrent Executors]))
(set! *warn-on-reflection* true)
@@ -42,31 +44,96 @@
"Perform an action."
(fn [action process] (getx action :action/type)))
-(defmulti lifecycle
- "Return Lifecycle protocol implementation associated with
- entity. Defaults to no-op."
- (fn [entity] (get entity :lifecycle/type)))
-
-;; ## Resource Lifecycle
+;; ## Services
;;
-;; Ignore this, likely to be removed
+;; Services represent resources needed by a sim run.
-(def resources-ref (atom nil))
+(def ^:dynamic *services*
+ "During a sim run, the *services* map contains a mapping
+from fully qualified names to any services that the sim
+need to use.")
-(defn resources
- "Return the resources associated with this process"
+(defn services
+ "Return the service entities associated with this process"
[process]
- (get @resources-ref (getx process :db/id)))
-
-(defprotocol Lifecycle
- (setup [_ conn entity] "Returns resources map which will be available to teardown.")
- (teardown [_ conn entity resources]))
-
-(defmethod lifecycle :default
- [entity]
- (reify Lifecycle
- (setup [this conn entity])
- (teardown [this conn entity resources])))
+ (reduce into #{}
+ [(-> process :sim/services)
+ (-> process :sim/_processes only :sim/services)]))
+
+(defmulti start-service
+ "Start service, returning an object that will
+be stored in the *services* map, under the
+key specified by :service/key."
+ (fn [conn process service] (getx service :service/type)))
+
+(defmulti finalize-service
+ "Teardown service at the end of a sim run."
+ (fn [conn process service services-map] (getx service :service/type)))
+
+(defn- start-services
+ [conn process]
+ (reduce
+ (fn [m svc]
+ (assoc m (getx svc :service/key) (start-service conn process svc)))
+ {}
+ (services process)))
+
+(defn- finalize-services
+ "Call teardown for each service associate with
+process."
+ [conn process services-map]
+ (doseq [service (services process)]
+ (finalize-service conn process service services-map)))
+
+(defn with-services
+ "Run f inside the service lifecycle for process."
+ [conn process f]
+ (let [services-map (start-services conn process)]
+ (binding [*services* services-map]
+ (try
+ (f)
+ (finally
+ (let [process-after (d/entity (d/db conn) (e process))]
+ (finalize-services conn process-after services-map)))))))
+
+;; ## Action Log
+;;
+;; An Action Log keeps a list of transaction data describing actions
+;; in a temporary file during a sim run, and then transacts that
+;; data into the sim database at the completion of the run.
+;;
+;; ActionLogs implement IFn, and expect to be passed transaction data
+(defrecord ActionLog
+ [^File temp-file writer]
+ clojure.lang.IFn
+ (invoke
+ [_ tx-data]
+ (binding [*out* writer]
+ (pr tx-data))))
+
+(defmethod start-service :service.type/actionLog
+ [conn process service]
+ (let [f (File/createTempFile "actionLog" "edn")
+ writer (io/writer f)]
+ (ActionLog. f writer)))
+
+(defmethod finalize-service :service.type/actionLog
+ [conn process service services-map]
+ (let [{:keys [temp-file writer]} (getx services-map (:service/key service))]
+ (.close ^Closeable writer)
+ (with-open [reader (io/reader temp-file)
+ pbr (PushbackReader. reader)]
+ (transact-batch conn (form-seq pbr)))))
+
+(defn create-action-log
+ "Create an action log service for the sim."
+ [conn sim]
+ (let [id (d/tempid :sim)]
+ (-> @(d/transact conn [{:db/id id
+ :sim/_services (e sim)
+ :service/type :service.type/actionLog
+ :service/key :simulant.sim/actionLog}])
+ (tx-ent id))))
;; ## Helper Functions
@@ -251,24 +318,20 @@
and returns a future you can use to wait for the sim to complete."
[sim-conn process]
(logged-future
- (let [lifecycle (-> process :process/resource-manager lifecycle)
- resources (setup lifecycle sim-conn process)
- agents (process-agents process)
+ (let [agents (process-agents process)
actions (action-seq (d/db sim-conn) agents)]
- (swap! resources-ref assoc (:db/id process) resources)
(try
- (feed-all-actions process actions)
+ (with-services sim-conn process
+ #(do
+ (feed-all-actions process actions)
+ (await-all agents)
+ (d/transact sim-conn [[:db/add (:db/id process) :process/state :process.state/completed]])))
(catch Throwable t
(.printStackTrace t)
(d/transact sim-conn [{:db/id (:db/id process)
:process/state :process.state/failed
:process/errorDescription (stack-trace-string t)}])
- (throw t))
- (finally
- (await-all agents)
- (swap! resources-ref dissoc (:db/id process))))
- (teardown lifecycle sim-conn process resources)
- (d/transact sim-conn [[:db/add (:db/id process) :process/state :process.state/completed]]))))
+ (throw t))))))
;; ## API Entry Points
View
7 src/simulant/util.clj
@@ -155,6 +155,13 @@
(when (pos? (count str))
(read-string str))))
+(defn form-seq
+ "Lazy seq of forms read from a reader"
+ [reader]
+ (let [form (read reader false reader)]
+ (when-not (= form reader)
+ (cons form (lazy-seq (form-seq reader))))))
+
(defmacro logged-future
"Future with logging of failure."
[& body]
Please sign in to comment.
Something went wrong with that request. Please try again.