Skip to content

Commit

Permalink
Support added for :fetch-size <num> option to execute, execute-async
Browse files Browse the repository at this point in the history
and execute-chan.

`setFetchSize` is invoked on underlying com.datastax.jdbc.core.Statement
object, restricting the most records returned at a time from Cassandra.
  • Loading branch information
coltnz committed Jan 21, 2014
1 parent 7542dbe commit ff67348
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 12 deletions.
30 changes: 20 additions & 10 deletions src/qbits/alia.clj
Expand Up @@ -75,6 +75,11 @@
hayt queries are executed , defaults to LU with a threshold of 100"
(utils/var-root-setter *hayt-query-fn*))

(def ^:dynamic *fetch-size* nil)

(def set-fetch-size!
(utils/var-root-setter *fetch-size*))

(defn cluster
"Returns a new com.datastax.driver.core/Cluster instance"
[hosts & {:as options}]
Expand Down Expand Up @@ -162,14 +167,16 @@ ex: (prepare (select :foo (where {:bar ?})))"


(defn ^:private set-statement-options!
[^Statement statement routing-key retry-policy tracing? consistency]
[^Statement statement routing-key retry-policy tracing? consistency fetch-size]
(when routing-key
(.setRoutingKey ^SimpleStatement statement
^ByteBuffer routing-key))
(when retry-policy
(.setRetryPolicy statement retry-policy))
(when tracing?
(.enableTracing statement))
(when fetch-size
(.setFetchSize statement fetch-size))

(.setConsistencyLevel statement (enum/consistency-levels consistency)))

Expand Down Expand Up @@ -207,12 +214,13 @@ The query can be a raw string, a PreparedStatement (returned by
"
[& args]
(let [[^Session session query & {:keys [consistency routing-key retry-policy
tracing? keywordize? values]
tracing? keywordize? fetch-size values]
:or {keywordize? *keywordize*
consistency *consistency*}}]
consistency *consistency*
fetch-size *fetch-size*}}]
(fix-session-arg args)
^Statement statement (query->statement query values)]
(set-statement-options! statement routing-key retry-policy tracing? consistency)
(set-statement-options! statement routing-key retry-policy tracing? consistency fetch-size)
(try
(codec/result-set->maps (.execute session statement) keywordize?)
(catch Exception err
Expand All @@ -225,13 +233,14 @@ The query can be a raw string, a PreparedStatement (returned by
[& args]
(let [[^Session session query & {:keys [success error executor consistency
routing-key retry-policy tracing?
keywordize? values]
keywordize? fetch-size values]
:or {executor *executor*
keywordize? *keywordize*
consistency *consistency*}}]
consistency *consistency*
fetch-size *fetch-size*}}]
(fix-session-arg args)
^Statement statement (query->statement query values)]
(set-statement-options! statement routing-key retry-policy tracing? consistency)
(set-statement-options! statement routing-key retry-policy tracing? consistency fetch-size)
(let [^ResultSetFuture rs-future
(try
(.executeAsync session statement)
Expand Down Expand Up @@ -261,13 +270,14 @@ The query can be a raw string, a PreparedStatement (returned by
[& args]
(let [[^Session session query & {:keys [executor consistency
routing-key retry-policy tracing?
keywordize? values]
keywordize? fetch-size values]
:or {executor *executor*
keywordize? *keywordize*
consistency *consistency*}}]
consistency *consistency*
fetch-size *fetch-size*}}]
(fix-session-arg args)
^Statement statement (query->statement query values)]
(set-statement-options! statement routing-key retry-policy tracing? consistency)
(set-statement-options! statement routing-key retry-policy tracing? consistency fetch-size)
(let [^ResultSetFuture rs-future (.executeAsync session statement)
ch (async/chan 1)]
(Futures/addCallback
Expand Down
39 changes: 37 additions & 2 deletions test/qbits/alia/test/core.clj
@@ -1,13 +1,16 @@
(ns qbits.alia.test.core
(:use clojure.test
(:import (com.datastax.driver.core Statement))
(:use clojure.test
clojure.data
qbits.alia
qbits.alia.codec
qbits.alia.codec.joda-time
qbits.alia.codec.eaio-uuid
qbits.tardis
qbits.hayt
qbits.alia.test.embedded)
(:require [ clojure.core.async :as async]))
(:require [ clojure.core.async :as async]
[ lamina.core :as lamina]))

(def ^:dynamic *cluster*)

Expand Down Expand Up @@ -205,4 +208,36 @@
(merge q (where {:si (-> coll last :si inc)}))))
:keywordize? true))))))

(defn ^:private get-private-field [instance field-name]
(.get
(doto (.getDeclaredField (class instance) field-name)
(.setAccessible true))
instance))

(deftest test-fetch-size
(with-redefs [result-set->maps (fn [result-set keywordize?]
result-set)]
(let [query "select * from items;"
result-set (execute query :fetch-size 3)
^Statement statement (get-private-field result-set "statement")]
(is (= 3 (.getFetchSize statement))))))

(deftest test-fetch-size-async
(with-redefs [result-set->maps (fn [result-set keywordize?]
result-set)]
(let [query "select * from items;"
result-channel (execute-async query :fetch-size 4)
result-set (lamina/wait-for-result result-channel)
^Statement statement (get-private-field result-set "statement")]
(is (= 4 (.getFetchSize statement))))))

(deftest test-fetch-size-chan
(with-redefs [result-set->maps (fn [result-set keywordize?]
result-set)]
(let [query "select * from items;"
result-channel (execute-chan query :fetch-size 5)
result-set (async/<!! result-channel)
^Statement statement (get-private-field result-set "statement")]
(is (= 5 (.getFetchSize statement))))))

;; (run-tests)

0 comments on commit ff67348

Please sign in to comment.