Skip to content

Commit

Permalink
queue-manager -> queues, proper readme
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Sep 25, 2013
1 parent bea4194 commit 7ed7404
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pom.xml.asc
*.class
/.lein-*
/.nrepl-port
.DS_Store
.DS_Store
/doc
66 changes: 58 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,64 @@
# durable-queue
![](docs/EasterIsland.jpg)

A Clojure library designed to ... well, that part is up to you.
This library implements a disk-backed task queue, allowing for queues that can survive processes dying, and whose size are bounded by available disk rather than memory.

## Usage
### usage

FIXME
To interact with queues, first create a `queues` object by specifying a directory in the filesystem and an options map:

## License
```clj
> (require '[durable-queue :refer :all])
nil
> (def q (queues "/tmp" {}))
#'q
```

Copyright © 2013 FIXME
This manager allows us to `put!` and `take!` tasks from named queues. `take!` is a blocking read, and will only return once a task is available or, if a timeout is defined (in milliseconds), once the timeout elapses:

Distributed under the Eclipse Public License either version 1.0 or (at
your option) any later version.
```clj
> (take! q :foo 10 :timed-out!)
:timed-out!
> (put! q :foo "a task")
true
> (take! q :foo)
< :in-progress | "a task" >
> (deref *1)
"a task"
```

Notice that the task has a value describing its progress, and a value describing the task itself. We can get the task descriptor by dereferencing the returned task. However, just because we've taken the task doesn't mean we've completed the action associated with it. In order to make sure the task isn't retried on restart, we must mark it as `complete!`.

```clj
> (put! q :foo "another task")
true
> (take! q :foo)
< :in-progress | "another task" >
> (complete! *1)
true
```

If our task fails and we want to re-enqueue it to be tried again, we can instead call `(retry! task)`.

### configuring the queues

The queue-manager can be given a number of different options, which can affect its performance and correctness.

By default, the queue-manager assumes all tasks are idempotent. This is necessary, since the process can die at any time and leave an in-progress task in an undefined state. If your tasks are not idempotent, a `:complete?` predicate can be defined which, on instantiation of the queue-manager, will scan through all pre-existing task descriptors and remove those for which the predicate returns true.

A complete list of options is as follows:

| name | description |
|------|-------------|
| `:complete?` | a predicate for identifying already completed tasks, defaults to always returning false |
| `:max-queue-size` | the maximum number of elements that can be in the queue before `put!` blocks |
| `:slab-size` | The size, in bytes, of the backing files for the queue. Defaults to 16mb. |
| `:fsync-put?` | Whether an fsync should be performed for each `put!`. Defaults to true. |
| `:fsync-take?` | Whether an fsync should be performed for each `take!`. Defaults to false. |

Disabling `:fsync-put?` will risk losing tasks if a process dies (though, depending on the hardware and underlying implementation of fsync, this may be possible regardless). Disabling `:fsync-take?` increases the chance of a task being re-run when a proces dies. Disabling both will increase throughput of the queue by an order of magnitude (~6k tasks/sec in the default configuration, ~100k tasks/sec with fsync completely disabled). Tradeoffs between these two can be made by batching tasks.

### license

Copyright © 2013 Factual Inc

Distributed under the Eclipse Public License 1.0
Binary file added docs/EasterIsland.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 4 additions & 4 deletions src/durable_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@

;;;

(defprotocol IQueueManager
(defprotocol IQueues
(take!
[_ q-name]
[_ q-name timeout timeout-val]
Expand All @@ -226,7 +226,7 @@
"A blocking enqueue to `name`. If `timeout` is specified, returns `false` if unable to
enqueue within `timeout` milliseconds."))

(defn queue-manager
(defn queues
"Creates a point of interaction for queues, backed by disk storage in `directory`.
The following options can be specified:
Expand All @@ -244,7 +244,7 @@
fsync-take? - if true, each `take!` will force an fsync. Defaults to false."
([directory]
(queue-manager directory nil))
(queues directory nil))
([directory
{:keys [max-queue-size
complete?
Expand Down Expand Up @@ -301,7 +301,7 @@
"'max-queue-size' insufficient to hold existing tasks."))))))
(sync-slab slab))))

(reify IQueueManager
(reify IQueues

(take! [this q-name timeout timeout-val]
(let [q-name (munge (name q-name))
Expand Down
18 changes: 9 additions & 9 deletions test/durable_queue_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@

(deftest test-basic-put-take
(clear-tmp-directory)
(let [q (queue-manager "/tmp" {:slab-size 1024})
(let [q (queues "/tmp" {:slab-size 1024})
tasks (range 1e4)]
(doseq [t tasks]
(put! q :foo t))
(is (= tasks (map deref (immediate-task-seq q :foo))))))

(deftest test-retry
(clear-tmp-directory)
(let [q (queue-manager "/tmp")]
(let [q (queues "/tmp")]

(doseq [t (range 10)]
(put! q :foo t))
Expand All @@ -34,19 +34,19 @@
(put! q :foo t))))

;; create a new manager, which will mark all in-progress tasks as incomplete
(let [q (queue-manager "/tmp")
(let [q (queues "/tmp")
tasks' (immediate-task-seq q :foo)]
(is (= (range 5 15) (map deref tasks')))
(doseq [t (take 5 tasks')]
(complete! t)))

(let [q (queue-manager "/tmp")
(let [q (queues "/tmp")
tasks' (immediate-task-seq q :foo)]
(is (= (range 10 15) (map deref tasks')))
(doseq [t (range 15 20)]
(put! q :foo t)))

(let [q (queue-manager "/tmp" {:complete? even?})]
(let [q (queues "/tmp" {:complete? even?})]
(is (= (remove even? (range 10 20)) (map deref (immediate-task-seq q :foo))))))

;;;
Expand All @@ -55,28 +55,28 @@
(clear-tmp-directory)

(println "\n\n-- sync both")
(let [q (queue-manager "/tmp" {:fsync-put? true, :fsync-take? true})]
(let [q (queues "/tmp" {:fsync-put? true, :fsync-take? true})]
(c/quick-bench
(do
(put! q :foo 1)
(complete! (take! q :foo)))))

(println "\n\n-- sync take")
(let [q (queue-manager "/tmp" {:fsync-put? false, :fsync-take? true})]
(let [q (queues "/tmp" {:fsync-put? false, :fsync-take? true})]
(c/quick-bench
(do
(put! q :foo 1)
(complete! (take! q :foo)))))

(println "\n\n-- sync put")
(let [q (queue-manager "/tmp" {:fsync-put? true, :fsync-take? false})]
(let [q (queues "/tmp" {:fsync-put? true, :fsync-take? false})]
(c/quick-bench
(do
(put! q :foo 1)
(complete! (take! q :foo)))))

(println "\n\n-- sync neither")
(let [q (queue-manager "/tmp" {:fsync-put? false, :fsync-take? false})]
(let [q (queues "/tmp" {:fsync-put? false, :fsync-take? false})]
(c/quick-bench
(do
(put! q :foo 1)
Expand Down

0 comments on commit 7ed7404

Please sign in to comment.