Skip to content

Commit

Permalink
Merge af21e3e into 1a8e915
Browse files Browse the repository at this point in the history
  • Loading branch information
yaronel committed Feb 27, 2020
2 parents 1a8e915 + af21e3e commit 4dc334d
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 24 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/codox
.eastwood
.clj-kondo
aerospike-clj.iml
.idea/
/.lein-failures
target
.nrepl-port
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject aerospike-clj "0.3.6"
(defproject aerospike-clj "0.3.7"
:description "An Aerospike Clojure client."
:url "https://github.com/AppsFlyer/aerospike-clj"
:license {:name "Eclipse Public License"
Expand Down
56 changes: 49 additions & 7 deletions src/aerospike_clj/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,26 @@
[aerospike-clj.utils :as utils]
[aerospike-clj.metrics :as metrics]
[manifold.deferred :as d])
(:import [com.aerospike.client AerospikeClient Host Key Bin Record AerospikeException Operation BatchRead]
(:import [com.aerospike.client AerospikeClient Host Key Bin Record AerospikeException Operation BatchRead AerospikeException$QueryTerminated]
[com.aerospike.client.async EventLoop NioEventLoops]
[com.aerospike.client.listener RecordListener WriteListener DeleteListener ExistsListener BatchListListener]
[com.aerospike.client.policy Policy BatchPolicy ClientPolicy RecordExistsAction WritePolicy]
[com.aerospike.client.listener RecordListener WriteListener DeleteListener ExistsListener BatchListListener RecordSequenceListener]
[com.aerospike.client.policy Policy BatchPolicy ClientPolicy RecordExistsAction WritePolicy ScanPolicy]
[clojure.lang IPersistentMap IPersistentVector]
[java.util List Collection ArrayList]))
[java.util List Collection ArrayList]
[java.time Instant]))

(declare record->map)

(def EPOCH
^{:doc "The 0 date reference for returned record TTL"}
(.getEpochSecond (java.time.Instant/parse "2010-01-01T00:00:00Z")))
(.getEpochSecond (Instant/parse "2010-01-01T00:00:00Z")))

(def MAX_KEY_LENGTH (dec (bit-shift-left 1 13)))

(def MAX_BIN_NAME_LENGTH 14)

(defprotocol IAerospikeClient
(get-client [ac] [ac index] "Returns the relevant AerospikeClient object for the specific shard")
(^AerospikeClient get-client [ac] [ac index] "Returns the relevant AerospikeClient object for the specific shard")
(get-all-clients [_] "Returns a sequence of all AerospikeClient objects."))

(defrecord SimpleAerospikeClient [^AerospikeClient ac
Expand Down Expand Up @@ -120,6 +123,18 @@
(^void onSuccess [_this ^Key _k ^Record record]
(d/success! op-future record))))

(defn- ^RecordSequenceListener reify-record-sequence-listener [op-future callback]
(reify RecordSequenceListener
(^void onRecord [_this ^Key k ^Record record]
(when (= :abort-scan (callback (.userKey k) (record->map record)))
(throw (AerospikeException$QueryTerminated.))))
(^void onSuccess [_this]
(d/success! op-future true))
(^void onFailure [_this ^AerospikeException exception]
(if (instance? AerospikeException$QueryTerminated exception)
(d/success! op-future false)
(d/error! op-future exception)))))

(defn- ^BatchListListener reify-record-batch-list-listener [op-future]
(reify BatchListListener
(^void onFailure [_this ^AerospikeException ex]
Expand Down Expand Up @@ -325,7 +340,7 @@
(map vector indices set-names payloads expirations)))))

(defn set-single
"`put` with a update policy"
"`put` with an update policy"
([db index set-name data expiration]
(set-single db index set-name data expiration {}))
([db index set-name data expiration conf]
Expand Down Expand Up @@ -461,6 +476,33 @@
(utils/v->array Operation operations))
(register-events (d/chain' op-future record->map) db "operate" index start-time)))))

(defn scan-set
"Scans through the given set and calls a user defined callback for each record that was found.
Returns a deferred response that resolves once the scan completes. When the scan completes
successfully it returns `true`. The scan may be aborted by returning :abort-scan from the callback.
In that case the return value is `false`.
The `conf` argument should be a map with the following keys:
:callback - Function that accepts a com.aerospike.client.Value and an AerospikeRecord.
:policy - Optional. com.aerospike.client.policy.ScanPolicy.
:bins - Optional. Vector of bin names to return. Returns all bins by default."
[db aero-namespace set-name conf]
(when-not (fn? (:callback conf))
(throw (IllegalArgumentException. "(:callback conf) must be a function")))

(let [client (get-client db)
op-future (d/deferred)
start-time (System/nanoTime)
bin-names (:bins conf)]
(.scanAll ^AerospikeClient client
^EventLoop (.next ^NioEventLoops (:el db))
(reify-record-sequence-listener op-future (:callback conf))
^Policy (:policy conf (ScanPolicy.))
aero-namespace
set-name
(when bin-names ^"[Ljava.lang.String;" (utils/v->array String bin-names)))
(register-events op-future db "scan" nil start-time)))

;; metrics
(defn get-cluster-stats
"For each client, return a vector of [metric-name metric-val] 2-tuples.
Expand Down
122 changes: 106 additions & 16 deletions test/aerospike_clj/client_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
[aerospike-clj.policy :as policy]
[cheshire.core :as json]
[taoensso.timbre :refer [spy]])
(:import [com.aerospike.client AerospikeException Value Value$ValueArray]
(:import [com.aerospike.client AerospikeException Value AerospikeClient]
[com.aerospike.client.cdt ListOperation ListPolicy ListOrder ListWriteFlags ListReturnType
MapOperation MapPolicy MapOrder MapWriteFlags MapReturnType CTX MapWriteMode]
[com.aerospike.client.policy Priority ReadModeSC ReadModeAP Replica AuthMode ClientPolicy GenerationPolicy
RecordExistsAction]
MapOperation MapPolicy MapOrder MapWriteFlags MapReturnType CTX]
[com.aerospike.client.policy Priority ReadModeSC ReadModeAP Replica GenerationPolicy RecordExistsAction
WritePolicy BatchPolicy Policy]
[java.util HashMap ArrayList]
[com.fasterxml.jackson.databind ObjectMapper]))
[clojure.lang PersistentArrayMap]))

(def K "k")
(def K2 "k2")
Expand Down Expand Up @@ -89,7 +89,7 @@
(testing "clojure maps can be serialized as-is"
(let [v @(client/get-single-no-meta *c* K _set)]
(is (= data v)) ;; per value it is identical
(is (= clojure.lang.PersistentArrayMap (type v)))))))
(is (= PersistentArrayMap (type v)))))))

(deftest put-multiple-bins-get-clj-map
(let [data {"foo" {"bar" [(rand-int 1000)]}
Expand All @@ -103,7 +103,7 @@
(is (= (get data "bar") (get v "bar"))) ;; true value returns the same after being sanitized/desanitized
(is (= (get data "baz") (get v "baz"))) ;; false value returns the same after being sanitized/desanitized
(is (= (get data "qux") (get v "qux"))) ;; nil value retuns the same after being sanitized/desanitized
(is (= clojure.lang.PersistentArrayMap (type v))) ;; converted back to a Clojure map instead of HashMap
(is (= PersistentArrayMap (type v))) ;; converted back to a Clojure map instead of HashMap
(is (true? (map? v)))))))

(deftest get-single-multiple-bins
Expand Down Expand Up @@ -310,7 +310,7 @@
MapWriteFlags/NO_FAIL))
bin-name ""
max-entries 4
initial-empty-value #(hash-map (Value/get %) (Value/get (ArrayList. 1)))
initial-empty-value #(hash-map (Value/get %) (Value/get (ArrayList. 1)))
append (fn [k v]
(first
(:payload @(client/operate *c* K _set 100
Expand All @@ -324,7 +324,7 @@
(bit-or MapReturnType/INVERTED
MapReturnType/COUNT)
outer-ctx)]))))]

(is (= nil (get-all)))
(is (= 1 (append "foo" 19)))
(is (= {"foo" [19]} (get-all)))
Expand Down Expand Up @@ -454,7 +454,7 @@
(is (= #{"bar"} (set-getall)))))

(deftest default-read-policy
(let [rp (.getReadPolicyDefault (client/get-client *c*))]
(let [rp (.getReadPolicyDefault ^AerospikeClient (client/get-client *c*))]
(is (= Priority/DEFAULT (.priority rp))) ;; Priority of request relative to other transactions. Currently, only used for scans.
(is (= ReadModeAP/ONE (.readModeAP rp))) ;; Involve single node in the read operation.
(is (= Replica/SEQUENCE (.replica rp))) ;; Try node containing master partition first.
Expand Down Expand Up @@ -485,10 +485,10 @@
"batchPolicyDefault" (policy/map->batch-policy {"allowInline" false
"maxConcurrentThreads" 2
"sendSetName" true})})
rp (.getReadPolicyDefault (client/get-client c))
bp (.getBatchPolicyDefault (client/get-client c))]


rp ^Policy (.getReadPolicyDefault (client/get-client c))
bp ^BatchPolicy (.getBatchPolicyDefault (client/get-client c))]
(is (= Priority/DEFAULT (.priority rp)))
(is (= ReadModeAP/ALL (.readModeAP rp)))
(is (= Replica/RANDOM (.replica rp)))
Expand All @@ -505,7 +505,7 @@
(is (true? (.sendSetName bp)))))

(deftest default-write-policy
(let [rp (.getWritePolicyDefault (client/get-client *c*))]
(let [rp ^WritePolicy (.getWritePolicyDefault (client/get-client *c*))]
(is (= Priority/DEFAULT (.priority rp))) ;; Priority of request relative to other transactions. Currently, only used for scans.
(is (= ReadModeAP/ONE (.readModeAP rp))) ;; Involve master only in the read operation.
(is (= Replica/SEQUENCE (.replica rp))) ;; Try node containing master partition first.
Expand All @@ -532,7 +532,7 @@
"RecordExistsAction" "REPLACE_ONLY"
"respondAllOps" true})})

wp (.getWritePolicyDefault (client/get-client c))]
wp ^WritePolicy (.getWritePolicyDefault (client/get-client c))]
(is (= Priority/DEFAULT (.priority wp)))
(is (= ReadModeAP/ONE (.readModeAP wp)))
(is (true? (.durableDelete wp)))
Expand All @@ -551,4 +551,94 @@
(is (true? @(client/set-single *c* K _set update-data 100)))
(is (= update-data @(client/get-single-no-meta *c* K _set)))))

(deftest scan-test
(let [conf {:policy (policy/map->write-policy {"sendKey" true})}
aero-namespace "test"
ttl 100
delete-records (fn []
@(client/delete *c* K _set)
@(client/delete *c* K2 _set)
@(client/delete *c* K3 _set))]

(testing "it should throw an IllegalArgumentException when:
conf is missing, :callback is missing, or :callback is not a function"
(is (thrown? IllegalArgumentException @(client/scan-set *c* aero-namespace _set nil)))
(is (thrown? IllegalArgumentException @(client/scan-set *c* aero-namespace _set {})))
(is (thrown? IllegalArgumentException @(client/scan-set *c* aero-namespace _set {:callback "not a function"}))))

(testing "it should throw a ClassCastException when :bins is not a vector"
(is (thrown? ClassCastException @(client/scan-set *c* aero-namespace _set {:callback (constantly true) :bins {}}))))

(testing "it should return all the items in the set"
@(client/put-multiple *c* [K K2 K3] (repeat _set) [10 20 30] (repeat ttl) conf)

(let [res (atom [])
callback (fn [k v] (swap! res conj [(.toString ^Value k) (:payload v)]))]

@(client/scan-set *c* aero-namespace _set {:callback callback})
(is (= (sort-by first @res) [[K 10] [K2 20] [K3 30]])))

(delete-records))

(testing "it should return only the bins that were requested"
(let [data [{"name" "John" "occupation" "Carpenter"}
{"name" "Jerry" "occupation" "Bus Driver"}
{"name" "Jack" "occupation" "Chef"}]]

@(client/put-multiple *c* [K K2 K3] (repeat _set) data (repeat ttl) conf)

(let [res (atom [])
callback (fn [k v] (swap! res conj [(.toString ^Value k) (:payload v)]))]

@(client/scan-set *c* aero-namespace _set {:callback callback :bins ["occupation"]})

(is (= (sort-by first @res) [[K {"occupation" "Carpenter"}]
[K2 {"occupation" "Bus Driver"}]
[K3 {"occupation" "Chef"}]])))
(delete-records)))

(testing "it can update items during a scan"
(let [client *c*
callback (fn [k v] (client/put client (.toString ^Value k) _set (inc (:payload v)) ttl))]

@(client/put-multiple *c* [K K2 K3] (repeat _set) [10 20 30] (repeat ttl) conf)

@(client/scan-set *c* aero-namespace _set {:callback callback})

(let [res @(client/get-batch *c* [{:index K :set _set}
{:index K2 :set _set}
{:index K3 :set _set}])]

(is (= (sort (mapv :payload res)) [11 21 31]))))
(delete-records))

(testing "it can delete items during a scan"
(let [client *c*
callback (fn [k _] (client/delete client (.toString ^Value k) _set))]

@(client/put-multiple *c* [K K2 K3] (repeat _set) [10 20 30] (repeat ttl) conf)

@(client/scan-set *c* aero-namespace _set {:callback callback})

(is (empty? (filter :payload @(client/get-batch *c* [{:index K :set _set}
{:index K2 :set _set}
{:index K3 :set _set}])))))
(delete-records))

(testing "it should stop the scan when the callback returns :abort-scan"
@(client/put-multiple *c* [K K2 K3] (repeat _set) [10 20 30] (repeat ttl) conf)

(let [res (atom [])
counter (atom 0)
callback (fn [k v]
(if (< (swap! counter inc) 2)
(swap! res conj [(.toString ^Value k) (:payload v)])
:abort-scan))]

(is (false? @(client/scan-set *c* aero-namespace _set {:callback callback})))
(is (= 1 (count @res))))

(delete-records))))



0 comments on commit 4dc334d

Please sign in to comment.