Skip to content

Commit

Permalink
Merge 263a4a4 into 97761bb
Browse files Browse the repository at this point in the history
  • Loading branch information
barkanido committed Mar 8, 2020
2 parents 97761bb + 263a4a4 commit 0fea358
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 28 deletions.
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject aerospike-clj "0.3.8"
(defproject aerospike-clj "0.4.0"
:description "An Aerospike Clojure client."
:url "https://github.com/AppsFlyer/aerospike-clj"
:license {:name "Eclipse Public License"
Expand Down
76 changes: 55 additions & 21 deletions src/aerospike_clj/client.clj
Expand Up @@ -6,10 +6,12 @@
[manifold.deferred :as d])
(:import [com.aerospike.client AerospikeClient Host Key Bin Record AerospikeException Operation BatchRead AerospikeException$QueryTerminated]
[com.aerospike.client.async EventLoop NioEventLoops]
[com.aerospike.client.listener RecordListener WriteListener DeleteListener ExistsListener BatchListListener RecordSequenceListener]
[com.aerospike.client.policy Policy BatchPolicy ClientPolicy RecordExistsAction WritePolicy ScanPolicy]
[com.aerospike.client.listener RecordListener WriteListener DeleteListener ExistsListener BatchListListener RecordSequenceListener
InfoListener]
[com.aerospike.client.policy Policy BatchPolicy ClientPolicy RecordExistsAction WritePolicy ScanPolicy InfoPolicy]
[com.aerospike.client.cluster Node]
[clojure.lang IPersistentMap IPersistentVector]
[java.util List Collection ArrayList]
[java.util List Collection ArrayList Map]
[java.time Instant]))

(declare record->map)
Expand Down Expand Up @@ -116,6 +118,14 @@
(^void onFailure [_this ^AerospikeException ex]
(d/error! op-future ex))))

(defn- ^InfoListener reify-info-listener [op-future]
(reify
InfoListener
(^void onSuccess [_this ^Map result-map]
(d/success! op-future (into {} result-map)))
(^void onFailure [_this ^AerospikeException ex]
(d/error! op-future ex))))

(defn- ^RecordListener reify-record-listener [op-future]
(reify RecordListener
(^void onFailure [_this ^AerospikeException ex]
Expand Down Expand Up @@ -262,7 +272,6 @@
#(mapv batch-read->map %)
(:transcoder conf identity))]
(register-events d db "read-batch" nil start-time)))))


(defn get-multiple
"DEPRECATED - use `get-batch` instead.
Expand Down Expand Up @@ -503,6 +512,28 @@
(when bin-names ^"[Ljava.lang.String;" (utils/v->array String bin-names)))
(register-events op-future db "scan" nil start-time)))

(defn info
"Asynchronously make info commands to a node. a node can be retreived from `get-nodes`. commands is a seq
of strings available from https://www.aerospike.com/docs/reference/info/index.html the returned future conatains
a map from and info command to its response.
conf can contain {:policy InfoPolicy} "
([db node info-commands]
(info db node info-commands {}))
([db ^Node node info-commands conf]
(let [client (get-client db)
op-future (d/deferred)
start-time (System/nanoTime)]
(.info ^AerospikeClient client
^EventLoop (.next ^NioEventLoops (:el db))
(reify-info-listener op-future)
^InfoPolicy (:policy conf (.infoPolicyDefault ^AerospikeClient client))
node
(into-array String info-commands))
(register-events op-future db "info" nil start-time))))

(defn get-nodes [db]
(.getNodes (get-client db)))

;; metrics
(defn get-cluster-stats
"For each client, return a vector of [metric-name metric-val] 2-tuples.
Expand All @@ -518,23 +549,26 @@

(defn healthy?
"Returns true iff the cluster is reachable and can take reads and writes.
Uses __health-check set to avoid data collisions. `operation-timeout-ms` is for total timeout of reads
(including 2 retries) so an small over estimation is advised to avoid false negatives."
[db operation-timeout-ms]
(let [read-policy (let [p (.readPolicyDefault ^AerospikeClient (get-client db ""))]
(set! (.totalTimeout p) operation-timeout-ms)
p)
k (str "__health__" (rand-int 1000))
v 1
ttl (min 1 (int (/ operation-timeout-ms 1000)))
set-name "__health-check"]
(try
@(create db k set-name v ttl)
(= v
@(get-single db k set-name {:transcoder :payload
:policy read-policy}))
(catch Exception _ex
false))))
Uses __health-check set to avoid data collisions. `operation-timeout-ms` is
for total timeout of reads (default is 1s) including 2 retries so a small
over estimation is advised to avoid false negatives."
([db]
(healthy? db 1000))
([db operation-timeout-ms]
(let [read-policy (let [p (.readPolicyDefault ^AerospikeClient (get-client db ""))]
(set! (.totalTimeout p) operation-timeout-ms)
p)
k (str "__health__" (rand-int Integer/MAX_VALUE))
v (rand-int Integer/MAX_VALUE)
ttl (min 1 (int (/ operation-timeout-ms 1000)))
set-name "__health-check"]
(try
@(put db k set-name v ttl)
(= v
@(get-single db k set-name {:transcoder :payload
:policy read-policy}))
(catch Exception _ex
false)))))

;; etc

Expand Down
4 changes: 1 addition & 3 deletions src/aerospike_clj/metrics.clj
@@ -1,8 +1,6 @@
(ns aerospike-clj.metrics
(:require [clojure.string :as s]
[taoensso.timbre :refer [info warn error spy]])
(:require [clojure.string :as s])
(:import [com.aerospike.client.cluster ClusterStats NodeStats Node]
[com.aerospike.client AerospikeClient AerospikeException]
[com.aerospike.client.async EventLoopStats]))

(defn node-ip [^Node node]
Expand Down
2 changes: 1 addition & 1 deletion src/aerospike_clj/policy.clj
Expand Up @@ -2,7 +2,7 @@
(:import [com.aerospike.client AerospikeClient]
[com.aerospike.client.async EventPolicy]
[com.aerospike.client.policy Policy ClientPolicy WritePolicy RecordExistsAction GenerationPolicy BatchPolicy CommitLevel
AuthMode ReadModeAP ReadModeSC Priority Replica]))
AuthMode ReadModeAP ReadModeSC Priority Replica]))

(defmacro set-java [obj conf obj-name]
`(when (some? (get ~conf ~obj-name))
Expand Down
11 changes: 9 additions & 2 deletions test/aerospike_clj/client_test.clj
Expand Up @@ -7,7 +7,7 @@
[aerospike-clj.client :as client]
[aerospike-clj.policy :as policy]
[cheshire.core :as json]
[taoensso.timbre :refer [spy]])
[clojure.string :as s])
(:import [com.aerospike.client AerospikeException Value AerospikeClient]
[com.aerospike.client.cdt ListOperation ListPolicy ListOrder ListWriteFlags ListReturnType
MapOperation MapPolicy MapOrder MapWriteFlags MapReturnType CTX]
Expand Down Expand Up @@ -47,6 +47,13 @@
(is (thrown-with-msg? Exception #"setting maxCommandsInProcess>0 and maxCommandsInQueue=0 creates an unbounded delay queue"
(client/init-simple-aerospike-client ["localhost"] "test" {"maxCommandsInProcess" 1}))))

(deftest info
(doseq [node (client/get-nodes *c*)]
(= {"health-stats" "stat=test_device_read_latency:value=0:device=/opt/aerospike/data/test.dat:namespace=test"
"namespaces" "test"
"set-config:context=service;enable-health-check=true" "ok"}
@(client/info *c* node ["namespaces" "set-config:context=service;enable-health-check=true" "health-stats"]))))

(deftest health
(is (true? (client/healthy? *c* 10))))

Expand Down Expand Up @@ -174,7 +181,7 @@
(is (= 2 gen))))

(deftest too-long-key
(let [too-long-key (clojure.string/join "" (repeat (inc client/MAX_KEY_LENGTH) "k"))]
(let [too-long-key (s/join "" (repeat (inc client/MAX_KEY_LENGTH) "k"))]
(is (thrown-with-msg? Exception #"key is too long"
@(client/put *c* too-long-key _set 1 100)))))

Expand Down

0 comments on commit 0fea358

Please sign in to comment.