From ff67348922b0020a93e5e1274c8971bab9b53d13 Mon Sep 17 00:00:00 2001 From: Colin Taylor Date: Wed, 22 Jan 2014 11:45:32 +1300 Subject: [PATCH] Support added for :fetch-size option to execute, execute-async and execute-chan. `setFetchSize` is invoked on underlying com.datastax.jdbc.core.Statement object, restricting the most records returned at a time from Cassandra. --- src/qbits/alia.clj | 30 ++++++++++++++++++--------- test/qbits/alia/test/core.clj | 39 +++++++++++++++++++++++++++++++++-- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/src/qbits/alia.clj b/src/qbits/alia.clj index ed194761..c0c17793 100644 --- a/src/qbits/alia.clj +++ b/src/qbits/alia.clj @@ -75,6 +75,11 @@ hayt queries are executed , defaults to LU with a threshold of 100" (utils/var-root-setter *hayt-query-fn*)) +(def ^:dynamic *fetch-size* nil) + +(def set-fetch-size! + (utils/var-root-setter *fetch-size*)) + (defn cluster "Returns a new com.datastax.driver.core/Cluster instance" [hosts & {:as options}] @@ -162,7 +167,7 @@ ex: (prepare (select :foo (where {:bar ?})))" (defn ^:private set-statement-options! - [^Statement statement routing-key retry-policy tracing? consistency] + [^Statement statement routing-key retry-policy tracing? consistency fetch-size] (when routing-key (.setRoutingKey ^SimpleStatement statement ^ByteBuffer routing-key)) @@ -170,6 +175,8 @@ ex: (prepare (select :foo (where {:bar ?})))" (.setRetryPolicy statement retry-policy)) (when tracing? (.enableTracing statement)) + (when fetch-size + (.setFetchSize statement fetch-size)) (.setConsistencyLevel statement (enum/consistency-levels consistency))) @@ -207,12 +214,13 @@ The query can be a raw string, a PreparedStatement (returned by " [& args] (let [[^Session session query & {:keys [consistency routing-key retry-policy - tracing? keywordize? values] + tracing? keywordize? fetch-size values] :or {keywordize? *keywordize* - consistency *consistency*}}] + consistency *consistency* + fetch-size *fetch-size*}}] (fix-session-arg args) ^Statement statement (query->statement query values)] - (set-statement-options! statement routing-key retry-policy tracing? consistency) + (set-statement-options! statement routing-key retry-policy tracing? consistency fetch-size) (try (codec/result-set->maps (.execute session statement) keywordize?) (catch Exception err @@ -225,13 +233,14 @@ The query can be a raw string, a PreparedStatement (returned by [& args] (let [[^Session session query & {:keys [success error executor consistency routing-key retry-policy tracing? - keywordize? values] + keywordize? fetch-size values] :or {executor *executor* keywordize? *keywordize* - consistency *consistency*}}] + consistency *consistency* + fetch-size *fetch-size*}}] (fix-session-arg args) ^Statement statement (query->statement query values)] - (set-statement-options! statement routing-key retry-policy tracing? consistency) + (set-statement-options! statement routing-key retry-policy tracing? consistency fetch-size) (let [^ResultSetFuture rs-future (try (.executeAsync session statement) @@ -261,13 +270,14 @@ The query can be a raw string, a PreparedStatement (returned by [& args] (let [[^Session session query & {:keys [executor consistency routing-key retry-policy tracing? - keywordize? values] + keywordize? fetch-size values] :or {executor *executor* keywordize? *keywordize* - consistency *consistency*}}] + consistency *consistency* + fetch-size *fetch-size*}}] (fix-session-arg args) ^Statement statement (query->statement query values)] - (set-statement-options! statement routing-key retry-policy tracing? consistency) + (set-statement-options! statement routing-key retry-policy tracing? consistency fetch-size) (let [^ResultSetFuture rs-future (.executeAsync session statement) ch (async/chan 1)] (Futures/addCallback diff --git a/test/qbits/alia/test/core.clj b/test/qbits/alia/test/core.clj index 473f80fb..042ef760 100644 --- a/test/qbits/alia/test/core.clj +++ b/test/qbits/alia/test/core.clj @@ -1,13 +1,16 @@ (ns qbits.alia.test.core - (:use clojure.test + (:import (com.datastax.driver.core Statement)) + (:use clojure.test clojure.data qbits.alia + qbits.alia.codec qbits.alia.codec.joda-time qbits.alia.codec.eaio-uuid qbits.tardis qbits.hayt qbits.alia.test.embedded) - (:require [ clojure.core.async :as async])) + (:require [ clojure.core.async :as async] + [ lamina.core :as lamina])) (def ^:dynamic *cluster*) @@ -205,4 +208,36 @@ (merge q (where {:si (-> coll last :si inc)})))) :keywordize? true)))))) +(defn ^:private get-private-field [instance field-name] + (.get + (doto (.getDeclaredField (class instance) field-name) + (.setAccessible true)) + instance)) + +(deftest test-fetch-size + (with-redefs [result-set->maps (fn [result-set keywordize?] + result-set)] + (let [query "select * from items;" + result-set (execute query :fetch-size 3) + ^Statement statement (get-private-field result-set "statement")] + (is (= 3 (.getFetchSize statement)))))) + +(deftest test-fetch-size-async + (with-redefs [result-set->maps (fn [result-set keywordize?] + result-set)] + (let [query "select * from items;" + result-channel (execute-async query :fetch-size 4) + result-set (lamina/wait-for-result result-channel) + ^Statement statement (get-private-field result-set "statement")] + (is (= 4 (.getFetchSize statement)))))) + +(deftest test-fetch-size-chan + (with-redefs [result-set->maps (fn [result-set keywordize?] + result-set)] + (let [query "select * from items;" + result-channel (execute-chan query :fetch-size 5) + result-set (async/