Skip to content

Commit

Permalink
Get job concurrency option working with the options machine [IMMUTANT…
Browse files Browse the repository at this point in the history
…-568]

This also includes tests for the feature, some docstring cleanup, and
removal of some `[:require :refer :all]`s.
  • Loading branch information
tobias committed Jun 26, 2015
1 parent fefc8d8 commit 08b3cee
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 21 deletions.
4 changes: 2 additions & 2 deletions project.clj
Expand Up @@ -90,8 +90,8 @@
potemkin "0.3.12"
clj-http "1.0.1"

org.projectodd.wunderboss "0.8.1"
;; org.projectodd.wunderboss "1.x.incremental.244"
;; org.projectodd.wunderboss "0.8.1"
org.projectodd.wunderboss "1.x.incremental.247"
;; org.projectodd.wunderboss "0.8.2-SNAPSHOT"

org.immutant :version
Expand Down
32 changes: 20 additions & 12 deletions scheduling/src/immutant/scheduling.clj
Expand Up @@ -15,7 +15,7 @@
(ns immutant.scheduling
"Schedule jobs for execution"
(:require [immutant.scheduling.internal :refer :all]
[immutant.internal.options :refer :all]
[immutant.internal.options :as o]
[immutant.internal.util :as iu]
[immutant.util :as u]
[immutant.scheduling.options :refer [resolve-options defoption]])
Expand Down Expand Up @@ -68,14 +68,19 @@
(schedule #(prn 'fire!) :in [5 :minutes], :every :day)
```
Two additional options may be passed in the spec:
Three additional options may be passed in the spec:
* :id - a unique identifier for the scheduled job
* :singleton - a boolean denoting the job's behavior in a cluster [true]
* :allow-concurrent-exec? - a boolean denoting the job's concurrency behavior in the current process [true]
If called with an :id that has already been scheduled, the prior job
will be replaced. If an id is not provided, a UUID is used instead.
If :allow-concurrent-exec? is true, a job that fires while a
previous invocation is running will also run, assuming a worker
thread is available.
The return value is a map of the options with any missing defaults
filled in, including a generated id if necessary.
Expand All @@ -100,19 +105,22 @@
resolve-options
(merge create-defaults schedule-defaults))
id (:id opts (iu/uuid))
scheduler (scheduler (validate-options opts schedule))]
scheduler (scheduler (o/validate-options opts schedule))]
(when (and (u/in-cluster?)
(:singleton opts)
(not (:id opts)))
(iu/warn "Singleton job scheduled in a cluster without an :id - job won't really be a singleton. See docs for immutant.scheduling/schedule."))
(.schedule scheduler (name id) f
(extract-options opts Scheduling$ScheduleOption))
(doto (o/extract-options opts Scheduling$ScheduleOption)
println))
(-> opts
(update-in [:ids scheduler] conj id)
(assoc :id id))))

(set-valid-options! schedule
(conj (opts->set Scheduling$ScheduleOption Scheduling$CreateOption) :id :ids))
(o/set-valid-options! schedule
(-> (o/opts->set Scheduling$ScheduleOption Scheduling$CreateOption)
(conj :id :ids)
(o/boolify :allow-concurrent-exec)))

(defn stop
"Unschedule a scheduled job.
Expand All @@ -125,7 +133,7 @@
[& options]
(let [options (-> options
iu/kwargs-or-map->map
(validate-options schedule "stop"))
(o/validate-options schedule "stop"))
ids (if-let [ids (:ids options)]
ids
(let [s (scheduler options)]
Expand Down Expand Up @@ -182,11 +190,11 @@
"If true (the default), only one instance of a given job name will
run in a cluster. See [[schedule]].")

(defoption ^{:arglists '([boolean] [m boolean])} allow-concurrent-exec
"If true (the default), the job is allowed to run concurrently.
If false it will be forced to run sequentially on one machine.
To disallow concurrent execution cluster wide make sure to also
set `(singleton true)`. See [[singleton]].")
(defoption ^{:arglists '([boolean] [m boolean])} allow-concurrent-exec?
"If true (the default), the job is allowed to run concurrently
within the current process. If false it will be forced to run
sequentially. To disallow concurrent execution cluster wide make
sure to also set the :singleton option. See [[singleton]].")

(defoption ^{:arglists '([str] [m str])} id
"Takes a String or keyword to use as the unique id for the job. See [[schedule]].")
14 changes: 8 additions & 6 deletions scheduling/src/immutant/scheduling/internal.clj
Expand Up @@ -13,21 +13,23 @@
;; limitations under the License.

(ns ^:no-doc ^:internal immutant.scheduling.internal
(:require [immutant.internal.options :refer :all]
(:require [immutant.internal.options :as o]
[immutant.internal.util :as u])
(:import org.projectodd.wunderboss.WunderBoss
[org.projectodd.wunderboss.scheduling
Scheduling Scheduling$CreateOption Scheduling$ScheduleOption]))

(def ^:internal create-defaults (opts->defaults-map Scheduling$CreateOption))
(def ^:internal schedule-defaults (opts->defaults-map Scheduling$ScheduleOption))
(def ^:internal create-defaults (o/opts->defaults-map Scheduling$CreateOption))
(def ^:internal schedule-defaults
(o/boolify (o/opts->defaults-map Scheduling$ScheduleOption)
:allow-concurrent-exec))

(def scheduler-name
(partial u/hash-based-component-name create-defaults))

(defn ^Scheduling scheduler [opts]
(WunderBoss/findOrCreateComponent Scheduling
(scheduler-name (select-keys opts (valid-options-for scheduler)))
(extract-options opts Scheduling$CreateOption)))
(scheduler-name (select-keys opts (o/valid-options-for scheduler)))
(o/extract-options opts Scheduling$CreateOption)))

(set-valid-options! scheduler (opts->set Scheduling$CreateOption))
(o/set-valid-options! scheduler (o/opts->set Scheduling$CreateOption))
25 changes: 24 additions & 1 deletion scheduling/test/immutant/scheduling_test.clj
Expand Up @@ -16,7 +16,8 @@
(:require [clojure.test :refer :all]
[immutant.scheduling :refer :all]
[immutant.scheduling.internal :refer :all]
[immutant.util :as u]))
[immutant.util :as u])
(:import [java.util.concurrent CountDownLatch TimeUnit]))

(u/set-log-level! (or (System/getenv "LOG_LEVEL") :OFF))

Expand Down Expand Up @@ -111,3 +112,25 @@
other-scheduler (doto (scheduler {:num-threads 1}) .start)]
(is (not= other-scheduler default))
(is (not= (.scheduler other-scheduler) (.scheduler default)))))

(defn run-with-maybe-concurrent-exec [concurrent? sleep]
(let [latch (CountDownLatch. 5)
ts (atom [])
job (schedule (fn []
(swap! ts conj (System/currentTimeMillis))
(Thread/sleep sleep)
(.countDown latch))
:every 10
:allow-concurrent-exec? concurrent?)]
(is (.await latch 10 TimeUnit/SECONDS))
(stop job)
(->> @ts reverse (partition 2 1) (map #(apply - %)))))

(let [sleep-duration 50]
(deftest concurrent-execution-enabled
(doseq [delta (run-with-maybe-concurrent-exec true sleep-duration)]
(is (< delta sleep-duration))))

(deftest concurrent-execution-disabled
(doseq [delta (run-with-maybe-concurrent-exec false sleep-duration)]
(is (>= delta sleep-duration)))))

0 comments on commit 08b3cee

Please sign in to comment.