Skip to content

Commit

Permalink
rems.poller.email: review comments
Browse files Browse the repository at this point in the history
- remove redundant when-not
- use :last-processed-event-id in poller state
- event ids start from 1
- add comment about thread safety
  • Loading branch information
opqdonut committed Mar 12, 2019
1 parent 38fc2e8 commit 87d11df
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/clj/rems/poller/email.clj
Expand Up @@ -133,27 +133,27 @@
;; these can be moved to rems.poller once we have multiple pollers
(defn get-poller-state [name-kw]
(or (json/parse-string (:state (db/get-poller-state {:name (name name-kw)})))
{:event/id -1}))
{:last-processed-event-id 0}))

(defn set-poller-state! [name-kw state]
(db/set-poller-state! {:name (name name-kw) :state (json/generate-string state)})
nil)

(defn run-event-poller [name-kw process-event!]
;; This isn't thread-safe but ScheduledThreadPoolExecutor guarantees exclusion
(let [prev-state (get-poller-state name-kw)
events (applications/get-dynamic-application-events-since (:event/id prev-state))]
events (applications/get-dynamic-application-events-since (:last-processed-event-id prev-state))]
(log/info name-kw "running with state" (pr-str prev-state))
(when-not (empty? events)
(try
(doseq [e events]
(try
(log/info name-kw "processing event" (:event/id e))
(process-event! e)
(set-poller-state! name-kw {:event/id (:event/id e)})
(catch Throwable t
(throw (Exception. (str name-kw " processing event " (pr-str e)) t)))))
(catch Throwable t
(log/error t))))
(try
(doseq [e events]
(try
(log/info name-kw "processing event" (:event/id e))
(process-event! e)
(set-poller-state! name-kw {:last-processed-event-id (:event/id e)})
(catch Throwable t
(throw (Exception. (str name-kw " processing event " (pr-str e)) t)))))
(catch Throwable t
(log/error t)))
(log/info name-kw "finished")))

(deftest test-run-event-poller-error-handling
Expand All @@ -165,14 +165,14 @@
(when (contains? @ids-to-fail (:event/id event))
(throw (Error. "BOOM")))
(swap! processed conj event))
poller-state (atom {:event/id -1})
poller-state (atom {:event/id 0})
run #(run-event-poller :test process-event!)]
(with-redefs [applications/get-dynamic-application-events-since (fn [id] (filterv #(< id (:event/id %)) @events))
get-poller-state (fn [_] @poller-state)
set-poller-state! (fn [_ state] (reset! poller-state state))]
(testing "no events, nothing should happen"
(run)
(is (= {:event/id -1} @poller-state))
(is (= {:event/id 0} @poller-state))
(is (= [] @processed)))
(testing "add a few events, process them"
(add-event! {:event/id 1})
Expand Down

0 comments on commit 87d11df

Please sign in to comment.