From 4a530aa744992f4bc934eea24c9fcad03b37f8d5 Mon Sep 17 00:00:00 2001 From: idantavor Date: Thu, 3 Aug 2023 20:41:33 +0300 Subject: [PATCH] Client version 6.1.7 (#62) --- .github/workflows/build_test_pr.yml | 44 +--- .github/workflows/ci_branch.yml | 36 +-- .github/workflows/ci_master.yml | 36 +-- .github/workflows/test.yml | 29 ++ CHANGELOG.md | 13 + README.md | 21 +- project.clj | 8 +- .../aerospike_clj/aerospike_record.clj | 7 +- src/main/clojure/aerospike_clj/client.clj | 157 +++++++---- src/main/clojure/aerospike_clj/listeners.clj | 15 +- src/main/clojure/aerospike_clj/policy.clj | 40 ++- src/main/clojure/aerospike_clj/protocols.clj | 14 +- test/aerospike_clj/client_test.clj | 0 .../integration/aerospike_setup.clj | 28 ++ .../{ => integration}/integration_test.clj | 247 +++++++++++++----- .../{ => integration}/metrics_test.clj | 9 +- update-docs.sh | 2 +- 17 files changed, 451 insertions(+), 255 deletions(-) create mode 100644 .github/workflows/test.yml delete mode 100644 test/aerospike_clj/client_test.clj create mode 100644 test/aerospike_clj/integration/aerospike_setup.clj rename test/aerospike_clj/{ => integration}/integration_test.clj (69%) rename test/aerospike_clj/{ => integration}/metrics_test.clj (78%) diff --git a/.github/workflows/build_test_pr.yml b/.github/workflows/build_test_pr.yml index 2827ed4..965f1b4 100644 --- a/.github/workflows/build_test_pr.yml +++ b/.github/workflows/build_test_pr.yml @@ -48,49 +48,7 @@ jobs: test: needs: build - runs-on: ubuntu-22.04 - timeout-minutes: 5 - services: - aerospike-test: - image: aerospike:4.9.0.11 - ports: - - 3000:3000 - - 3001:3001 - - 3002:3002 - - 3003:3003 - env: - AEROSPIKE_HOST: localhost - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - token: ${{ secrets.GITHUB_TOKEN }} - fetch-depth: 0 - - - name: Set up JDK 8 - uses: actions/setup-java@v3 - with: - distribution: corretto - java-version: 8 - - - name: Restore local Maven repository from cache - uses: actions/cache@v3 - with: - path: ~/.m2/repository - key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} - restore-keys: | - ${{ runner.os }}-maven- - - - name: Unit and integration tests - run: lein eftest :all - - - name: Upload Test Results - if: always() - uses: actions/upload-artifact@v2 - with: - name: Unit Test Results - path: | - target/junit.xml + uses: ./.github/workflows/test.yml event_file: needs: test diff --git a/.github/workflows/ci_branch.yml b/.github/workflows/ci_branch.yml index f3fd5e5..8bb1d3e 100644 --- a/.github/workflows/ci_branch.yml +++ b/.github/workflows/ci_branch.yml @@ -49,41 +49,7 @@ jobs: test: needs: build - runs-on: ubuntu-22.04 - timeout-minutes: 5 - services: - aerospike-test: - image: aerospike:4.9.0.11 - ports: - - 3000:3000 - - 3001:3001 - - 3002:3002 - - 3003:3003 - env: - AEROSPIKE_HOST: localhost - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - token: ${{ secrets.GITHUB_TOKEN }} - fetch-depth: 0 - - - name: Set up JDK 8 - uses: actions/setup-java@v3 - with: - distribution: corretto - java-version: 8 - - - name: Restore local Maven repository from cache - uses: actions/cache@v3 - with: - path: ~/.m2/repository - key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} - restore-keys: | - ${{ runner.os }}-maven- - - - name: Unit and Integration tests - run: lein eftest :all + uses: ./.github/workflows/test.yml deploy: needs: test diff --git a/.github/workflows/ci_master.yml b/.github/workflows/ci_master.yml index 60e1eab..a861461 100644 --- a/.github/workflows/ci_master.yml +++ b/.github/workflows/ci_master.yml @@ -49,41 +49,7 @@ jobs: test: needs: build - runs-on: ubuntu-22.04 - timeout-minutes: 5 - services: - aerospike-test: - image: aerospike:4.9.0.11 - ports: - - 3000:3000 - - 3001:3001 - - 3002:3002 - - 3003:3003 - env: - AEROSPIKE_HOST: localhost - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - token: ${{ secrets.GITHUB_TOKEN }} - fetch-depth: 0 - - - name: Set up JDK 8 - uses: actions/setup-java@v3 - with: - distribution: corretto - java-version: 8 - - - name: Restore local Maven repository from cache - uses: actions/cache@v3 - with: - path: ~/.m2/repository - key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} - restore-keys: | - ${{ runner.os }}-maven- - - - name: Unit and Integration tests - run: lein eftest :all + uses: ./.github/workflows/test.yml deploy: needs: test diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..87fea9f --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,29 @@ +on: [workflow_call] + +jobs: + test: + runs-on: ubuntu-22.04 + timeout-minutes: 5 + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + token: ${{ secrets.GITHUB_TOKEN }} + fetch-depth: 0 + + - name: Set up JDK 8 + uses: actions/setup-java@v3 + with: + distribution: corretto + java-version: 8 + + - name: Restore local Maven repository from cache + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles( 'project.clj' ) }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Unit and Integration tests + run: lein eftest :all diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bf5d91..487799a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,19 @@ ## This library follows [Semantic Versioning](https://semver.org). ## This CHANGELOG follows [keepachangelog](https://keepachangelog.com/en/1.0.0/). +### VERSION [3.0.0]: https://github.com/AppsFlyer/aerospike-clj/pull/62 +#### Changed +* use aerospike client version 6.1.10 +* use test containers for integration tests + +#### Added +* batch operate support +* completion executor support (with default) +* client-events additional context support + +#### Removed +* empty test namespace + ### VERSION 2.0.7 #### Changed * Fixed a bug with reporting metrics for `AerospikeSingleIndexBatchOps/operate`. diff --git a/README.md b/README.md index 86b79af..edb7a60 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ An opinionated Clojure library wrapping Aerospike Java Client. # Requirements - Java 8 - Clojure 1.8 +- Aerospike server version >= `4.9.0` +- Clojure version >= `1.11.0` # Features - Converts Java client's callback model into Java(8) `CompletableFuture` based API. @@ -53,20 +55,20 @@ user=> (def c (aero/init-simple-aerospike-client ``` It is possible to inject additional asynchronous user-defined behaviour. To do that add an implementation of the -`ClientEvents` protocol. +`ClientEvents` protocol during client initialization or per operation. Some useful info is passed in-order to support metering and to read client configuration. `op-start-time` is -`(System/nanoTime)`, see more [here](https://appsflyer.github.io/aerospike-clj/advanced-async-hooks.html). +`(System/nanoTime)`. +see more [here](https://appsflyer.github.io/aerospike-clj/advanced-async-hooks.html). ```clojure (let [c (aero/init-simple-aerospike-client ["localhost"] "test" {:client-events (reify ClientEvents - (on-success [_ op-name op-result index op-start-time db] - (when (:enable-logging? db) + (on-success [_ op-name op-result index op-start-time] (println op-name "success!"))) - (on-failure [_ op-name op-ex index op-start-time db] - (println "oh-no" op-name "failed on index" index)))})] + (on-failure [_ op-name op-ex index op-start-time] + (println "oh-no" op-name "failed on index" index)))})] (get-single c "index" "set-name")) ; for better performance, a `deftype` might be preferred over `reify`, if possible. @@ -114,12 +116,7 @@ Call `expiry-unix` with the returned TTL to get a TTL relative to the UNIX epoch Executed via running `lein test`. ### Integration tests -Testing is performed against a local Aerospike instance. You can also run an instance inside a docker container: - -```shell -$ sudo docker run -d --name aerospike -p 3000:3000 -p 3001:3001 -p 3002:3002 -p 3003:3003 aerospike -$ lein test :integration -``` +Testing is performed against a local Aerospike docker container. #### Mocking in application unit tests For unit tests purposes you can use a mock client that implements the client protocols: `MockClient`. diff --git a/project.clj b/project.clj index 844c598..3cc0a24 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.appsflyer/aerospike-clj "2.0.8-SNAPSHOT" +(defproject com.appsflyer/aerospike-clj "3.0.0-SNAPSHOT" :description "An Aerospike Clojure client." :url "https://github.com/AppsFlyer/aerospike-clj" :license {:name "Eclipse Public License" @@ -14,12 +14,14 @@ :password :env/clojars_password :sign-releases false}]] :dependencies [[org.clojure/tools.logging "1.2.4"] - [com.aerospike/aerospike-client "4.4.15"] + [com.aerospike/aerospike-client "6.1.10"] [funcool/promesa "8.0.450"]] :profiles {:dev {:plugins [[lein-eftest "0.5.9"]] :dependencies [[org.clojure/clojure "1.11.1"] + [clj-test-containers "0.7.4"] [criterium "0.4.6"] [cheshire "5.11.0"] + [tortue/spy "2.14.0"] [com.fasterxml.jackson.core/jackson-databind "2.11.2"] [clj-kondo "2022.04.25"]] :eftest {:multithread? false @@ -31,7 +33,7 @@ :test-selectors {:integration :integration :all (constantly true) :default (complement :integration)}} - :docs {:plugins [[lein-codox "0.10.7"]] + :docs {:plugins [[lein-codox "0.10.8"]] :codox {:output-path "codox" :source-uri "http://github.com/AppsFlyer/aerospike-clj/blob/{version}/{filepath}#L{line}" :metadata {:doc/format :markdown}}}}) diff --git a/src/main/clojure/aerospike_clj/aerospike_record.clj b/src/main/clojure/aerospike_clj/aerospike_record.clj index 289fced..f360dcd 100644 --- a/src/main/clojure/aerospike_clj/aerospike_record.clj +++ b/src/main/clojure/aerospike_clj/aerospike_record.clj @@ -14,15 +14,16 @@ (defn record->map [^Record record] (and record - (let [bins ^Map (.bins record) - payload (if (single-bin? bins) + (let [bins ^Map (.bins record) + payload (when (some? bins) + (if (single-bin? bins) ;; single bin record (utils/desanitize-bin-value (.get bins "")) ;; multiple-bin record (reduce-kv (fn [m k v] (assoc m k (utils/desanitize-bin-value v))) {} - bins))] + bins)))] (->AerospikeRecord payload ^Integer (.generation ^Record record) diff --git a/src/main/clojure/aerospike_clj/client.clj b/src/main/clojure/aerospike_clj/client.clj index 31120ff..b15f6aa 100644 --- a/src/main/clojure/aerospike_clj/client.clj +++ b/src/main/clojure/aerospike_clj/client.clj @@ -3,6 +3,7 @@ (:require [clojure.string :as s] [clojure.tools.logging :as log] [promesa.core :as p] + [promesa.exec :as p-exec] [aerospike-clj.policy :as policy] [aerospike-clj.bins :as bins] [aerospike-clj.utils :as utils] @@ -11,18 +12,20 @@ [aerospike-clj.listeners] [aerospike-clj.aerospike-record :as record] [aerospike-clj.protocols :as pt]) - (:import [java.time Instant] - [java.util List Collection ArrayList] - [com.aerospike.client AerospikeClient Key Bin Operation BatchRead] - [com.aerospike.client.async EventLoop NioEventLoops EventLoops] - [com.aerospike.client.cluster Node] - [com.aerospike.client.policy Policy BatchPolicy ClientPolicy + (:import (java.time Instant) + (java.util List Collection ArrayList Arrays) + (com.aerospike.client AerospikeClient Key Bin Operation BatchRead) + (com.aerospike.client.async EventLoop NioEventLoops EventLoops) + (com.aerospike.client.cluster Node) + (com.aerospike.client.policy Policy BatchPolicy ClientPolicy RecordExistsAction WritePolicy ScanPolicy - InfoPolicy] - [com.aerospike.client Key Host] - [aerospike_clj.listeners AsyncExistsListener AsyncDeleteListener AsyncWriteListener + InfoPolicy) + (com.aerospike.client Key Host BatchRecord) + (aerospike_clj.listeners AsyncExistsListener AsyncDeleteListener AsyncWriteListener AsyncInfoListener AsyncRecordListener AsyncRecordSequenceListener - AsyncBatchListListener AsyncExistsArrayListener])) + AsyncBatchListListener AsyncExistsArrayListener AsyncBatchOperateListListener) + (com.aerospike.client.listener BatchOperateListListener) + (java.util.concurrent Executor))) (def ^{:doc "The 0 date reference for returned record TTL" @@ -62,10 +65,11 @@ (p/catch (fn [op-exception] (pt/on-failure client-events op-name op-exception index op-start-time)))))) -(defn- register-events [op-future client-events op-name index op-start-time] - (if (empty? client-events) - op-future - (reduce (client-events-reducer op-name index op-start-time) op-future client-events))) +(defn- register-events [op-future default-client-events op-name index op-start-time conf] + (let [client-events (:client-events conf default-client-events)] + (if (empty? client-events) + op-future + (reduce (client-events-reducer op-name index op-start-time) op-future client-events)))) (extend-protocol pt/UserKey Key @@ -75,11 +79,12 @@ (create-key ^Key [this as-namespace set-name] (as-key/create-key this as-namespace set-name))) -(defn- batch-read->map [^BatchRead batch-read] - (let [k (.key batch-read)] - (-> (record/record->map (.record batch-read)) +(defn- batch-record->map [^BatchRecord batch-record] + (let [k (.key batch-record)] + (-> (record/record->map (.record batch-record)) (assoc :index (.toString (.userKey k))) - (assoc :set (.setName k))))) + (assoc :set (.setName k)) + (assoc :result-code (.resultCode batch-record))))) (defn- map->batch-read ^BatchRead [batch-read-map dbns] (let [k ^Key (pt/create-key (:index batch-read-map) dbns (:set batch-read-map))] @@ -89,7 +94,7 @@ (BatchRead. k ^"[Ljava.lang.String;" (utils/v->array String (:bins batch-read-map)))))) ;; put -(defn- put* [^AerospikeClient client ^EventLoops event-loops dbns client-events index data policy set-name] +(defn- put* [^AerospikeClient client ^EventLoops event-loops dbns client-events index data policy set-name conf] (let [bins (bins/data->bins data) op-future (p/deferred) start-time (System/nanoTime)] @@ -99,10 +104,11 @@ ^WritePolicy policy ^Key (pt/create-key index dbns set-name) ^"[Lcom.aerospike.client.Bin;" bins) - (register-events op-future client-events :write index start-time))) + (register-events op-future client-events :write index start-time conf))) (deftype SimpleAerospikeClient [client el + ^Executor completion-executor hosts dbns client-events @@ -133,10 +139,14 @@ ^Policy (:policy conf) ^Key (pt/create-key index dbns set-name) ^"[Ljava.lang.String;" (utils/v->array String bin-names))) - (let [p (p/chain op-future - record/record->map - (:transcoder conf identity))] - (register-events p client-events :read index start-time)))) + (-> op-future + (p/then' (fn [res] + (let [transcoder (:transcoder conf identity)] + (-> res + record/record->map + transcoder))) + completion-executor) + (register-events client-events :read index start-time conf)))) (get-single-no-meta [this index set-name] (pt/get-single this index set-name {:transcoder :payload})) @@ -155,7 +165,9 @@ (AsyncExistsListener. op-future) ^Policy (:policy conf) ^Key (pt/create-key index dbns set-name)) - (register-events op-future client-events :exists index start-time))) + (-> op-future + (p/then' identity completion-executor) + (register-events client-events :exists index start-time conf)))) (get-batch [this batch-reads] (pt/get-batch this batch-reads {})) @@ -169,10 +181,10 @@ (AsyncBatchListListener. op-future) ^BatchPolicy (:policy conf) ^List batch-reads-arr) - (let [d (p/chain op-future - #(mapv batch-read->map %) - (:transcoder conf identity))] - (register-events d client-events :read-batch nil start-time)))) + (-> op-future + (p/then' #(mapv batch-record->map %) completion-executor) + (p/then' (:transcoder conf identity)) + (register-events client-events :read-batch nil start-time conf)))) (exists-batch [this indices] (pt/exists-batch this indices {})) @@ -180,16 +192,16 @@ (exists-batch [_this indices conf] (let [op-future (p/deferred) start-time (System/nanoTime) + transcoder (:transcoder conf identity) indices (utils/v->array Key (mapv #(pt/create-key (:index %) dbns (:set %)) indices))] (.exists ^AerospikeClient client ^EventLoop (.next ^EventLoops el) (AsyncExistsArrayListener. op-future) ^BatchPolicy (:policy conf) ^"[Lcom.aerospike.client.Key;" indices) - (let [d (p/chain op-future - vec - (:transcoder conf identity))] - (register-events d client-events :exists-batch nil start-time)))) + (-> op-future + (p/then' (comp transcoder vec) completion-executor) + (register-events client-events :exists-batch nil start-time conf)))) pt/AerospikeWriteOps (put [this index set-name data expiration] @@ -203,7 +215,8 @@ index ((:transcoder conf identity) data) (:policy conf (policy/write-policy client expiration)) - set-name)) + set-name + conf)) (create [this index set-name data expiration] (pt/create this index set-name data expiration {})) @@ -216,7 +229,8 @@ index ((:transcoder conf identity) data) (policy/create-only-policy client expiration) - set-name)) + set-name + conf)) (put-multiple [this indices set-names payloads expirations] (pt/put-multiple this indices set-names payloads expirations {})) @@ -239,7 +253,8 @@ index ((:transcoder conf identity) data) (policy/set-policy client expiration) - set-name)) + set-name + conf)) (replace-only [this index set-name data expiration] (pt/replace-only this index set-name data expiration {})) @@ -252,7 +267,8 @@ index ((:transcoder conf identity) data) (policy/replace-only-policy client expiration) - set-name)) + set-name + conf)) (update [this index set-name new-record generation new-expiration] (pt/update this index set-name new-record generation new-expiration {})) @@ -265,7 +281,8 @@ index ((:transcoder conf identity) new-record) (policy/update-policy client generation new-expiration) - set-name)) + set-name + conf)) (add-bins [this index set-name new-data new-expiration] (pt/add-bins this index set-name new-data new-expiration {})) @@ -278,9 +295,13 @@ index ((:transcoder conf identity) new-data) (policy/update-only-policy client new-expiration) - set-name)) + set-name + conf)) - (touch [_this index set-name expiration] + (touch [this index set-name expiration] + (pt/touch this index set-name expiration {})) + + (touch [_this index set-name expiration conf] (let [op-future (p/deferred) start-time (System/nanoTime)] (.touch ^AerospikeClient client @@ -288,7 +309,9 @@ (AsyncWriteListener. op-future) ^WritePolicy (policy/write-policy client expiration RecordExistsAction/UPDATE_ONLY) ^Key (pt/create-key index dbns set-name)) - (register-events op-future client-events :touch index start-time))) + (-> op-future + (p/then' identity completion-executor) + (register-events client-events :touch index start-time conf)))) pt/AerospikeDeleteOps (delete [this index set-name] @@ -302,7 +325,9 @@ (AsyncDeleteListener. op-future) ^WritePolicy (:policy conf) ^Key (pt/create-key index dbns set-name)) - (register-events op-future client-events :delete index start-time))) + (-> op-future + (p/then' identity completion-executor) + (register-events client-events :delete index start-time conf)))) (delete-bins [this index set-name bin-names new-expiration] (pt/delete-bins this index set-name bin-names new-expiration {})) @@ -318,7 +343,9 @@ ^WritePolicy policy ^Key (pt/create-key index dbns set-name) ^"[Lcom.aerospike.client.Bin;" (utils/v->array Bin (mapv bins/set-bin-as-null bin-names))) - (register-events op-future client-events :write index start-time))) + (-> op-future + (p/then' identity completion-executor) + (register-events client-events :write index start-time conf)))) pt/AerospikeSingleIndexBatchOps (operate [this index set-name expiration operations] @@ -335,7 +362,33 @@ ^WritePolicy (:policy conf (policy/write-policy client expiration RecordExistsAction/UPDATE)) ^Key (pt/create-key index dbns set-name) (utils/v->array Operation operations)) - (register-events (p/then op-future record/record->map) client-events :operate index start-time)))) + (-> op-future + (p/then' record/record->map completion-executor) + (register-events client-events :operate index start-time conf))))) + + pt/AerospikeBatchOps + (batch-operate [this batch-records] + (pt/batch-operate this batch-records {})) + + (batch-operate [_this batch-records conf] + (let [op-future (p/deferred) + policy (:policy conf) + batch-list (if (list? batch-records) + batch-records + (->> batch-records + (utils/v->array BatchRecord) + (Arrays/asList))) + start-time (System/nanoTime) + transcoder (:transcoder conf identity)] + (.operate ^AerospikeClient client + ^EventLoop (.next ^EventLoops el) + ^BatchOperateListListener (AsyncBatchOperateListListener. op-future) + ^BatchPolicy policy + ^List batch-list) + (-> op-future + (p/then' (comp transcoder #(mapv batch-record->map %)) completion-executor) + (register-events client-events :batch-operate nil start-time conf)))) + pt/AerospikeSetOps (scan-set [_this aero-namespace set-name conf] @@ -351,7 +404,9 @@ aero-namespace set-name (when bin-names ^"[Ljava.lang.String;" (utils/v->array String bin-names))) - (register-events op-future client-events :scan nil start-time))) + (-> op-future + (p/then' identity completion-executor) + (register-events client-events :scan nil start-time conf)))) pt/AerospikeAdminOps (info [this node info-commands] @@ -366,7 +421,9 @@ ^InfoPolicy (:policy conf (.infoPolicyDefault ^AerospikeClient client)) ^Node node (into-array String info-commands)) - (register-events op-future client-events :info nil start-time))) + (-> op-future + (p/then' identity completion-executor) + (register-events client-events :info nil start-time conf)))) (get-nodes [_this] (into [] (.getNodes ^AerospikeClient client))) @@ -432,12 +489,14 @@ ([hosts aero-ns] (init-simple-aerospike-client hosts aero-ns {})) ([hosts aero-ns conf] - (let [close-event-loops? (nil? (:event-loops conf)) - event-loops (or (:event-loops conf) (create-event-loops conf)) - client-policy (:client-policy conf (policy/create-client-policy event-loops conf))] + (let [close-event-loops? (nil? (:event-loops conf)) + event-loops (or (:event-loops conf) (create-event-loops conf)) + completion-executor (:completion-executor conf p-exec/default-executor) + client-policy (:client-policy conf (policy/create-client-policy event-loops conf))] (log/info (format "Starting aerospike client for hosts %s with username %s" hosts (get conf "username"))) (->SimpleAerospikeClient (create-client hosts client-policy (:port conf 3000)) event-loops + completion-executor hosts aero-ns (utils/vectorize (:client-events conf)) diff --git a/src/main/clojure/aerospike_clj/listeners.clj b/src/main/clojure/aerospike_clj/listeners.clj index 6a7631f..87171c9 100644 --- a/src/main/clojure/aerospike_clj/listeners.clj +++ b/src/main/clojure/aerospike_clj/listeners.clj @@ -1,10 +1,10 @@ (ns aerospike-clj.listeners (:require [promesa.core :as p] [aerospike-clj.aerospike-record :as record]) - (:import [java.util List Map] - [com.aerospike.client Key Record AerospikeException AerospikeException$QueryTerminated] - [com.aerospike.client.listener RecordListener WriteListener DeleteListener - ExistsListener BatchListListener RecordSequenceListener InfoListener ExistsArrayListener])) + (:import (java.util List Map) + (com.aerospike.client Key Record AerospikeException AerospikeException$QueryTerminated) + (com.aerospike.client.listener RecordListener WriteListener DeleteListener + ExistsListener BatchListListener RecordSequenceListener InfoListener ExistsArrayListener BatchOperateListListener))) (deftype AsyncExistsListener [op-future] ExistsListener @@ -66,3 +66,10 @@ (p/reject! op-future ex)) (^void onSuccess [_this ^"[Lcom.aerospike.client.Key;" _keys ^"[Z" exists] (p/resolve! op-future exists))) + +(deftype AsyncBatchOperateListListener [op-future] + BatchOperateListListener + (^void onSuccess [_this ^List records ^boolean _status] + (p/resolve! op-future records)) + (^void onFailure [_this ^AerospikeException ex] + (p/reject! op-future ex))) diff --git a/src/main/clojure/aerospike_clj/policy.clj b/src/main/clojure/aerospike_clj/policy.clj index 403ac20..2600e51 100644 --- a/src/main/clojure/aerospike_clj/policy.clj +++ b/src/main/clojure/aerospike_clj/policy.clj @@ -1,10 +1,10 @@ (ns aerospike-clj.policy (:import [com.aerospike.client AerospikeClient] [com.aerospike.client.async EventPolicy] - #_{:clj-kondo/ignore [:unused-import]} + #_{:clj-kondo/ignore [:unused-import]} [com.aerospike.client.policy Policy ClientPolicy WritePolicy RecordExistsAction GenerationPolicy BatchPolicy CommitLevel - AuthMode ReadModeAP ReadModeSC Priority Replica])) + AuthMode ReadModeAP ReadModeSC Replica BatchWritePolicy])) (defmacro set-java [obj conf obj-name] `(when (some? (get ~conf ~obj-name)) @@ -30,7 +30,6 @@ (set-java-enum p conf "ReadModeAP") (set-java-enum p conf "ReadModeSC") (set-java p conf "maxRetries") - (set-java-enum p conf "Priority") (set-java-enum p conf "Replica") (set-java p conf "sendKey") (set-java p conf "sleepBetweenRetries") @@ -39,13 +38,29 @@ (set-java p conf "totalTimeout") p)) +(defn map->batch-write-policy + "Create a `BatchWritePolicy` from a map. Enumeration names should start with capitalized letter. + This function is slow due to possible reflection." + ^BatchWritePolicy [conf] + (let [p (BatchWritePolicy.)] + (set-java-enum p conf "RecordExistsAction") + (set-java-enum p conf "CommitLevel") + (set-java-enum p conf "GenerationPolicy") + (set-java p conf "filterExp") + (set-java p conf "generation") + (set-java p conf "expiration") + (set-java p conf "durableDelete") + (set-java p conf "sendKey") + p)) + (defn map->batch-policy - "Create a (read) `BatchPolicy` from a map. + "Create a `BatchPolicy` from a map. This function is slow due to possible reflection." ^BatchPolicy [conf] - (let [bp (BatchPolicy.) + (let [bp (BatchPolicy. (map->policy conf)) conf (merge {"timeoutDelay" 3000} conf)] (set-java bp conf "allowInline") + (set-java bp conf "respondAllKeys") (set-java bp conf "maxConcurrentThreads") (set-java bp conf "sendSetName") bp)) @@ -77,6 +92,18 @@ (set! (.recordExistsAction wp) record-exists-action) wp))) +(defn batch-write-policy + "Create a write policy to be passed to put methods via `{:policy wp}`. + Also used in `update` and `create`. + The default policy in case the record exists is `RecordExistsAction/UPDATE`." + (^BatchWritePolicy [client expiration] + (batch-write-policy client expiration (RecordExistsAction/UPDATE))) + (^BatchWritePolicy [client expiration record-exists-action] + (let [wp (BatchWritePolicy. (.getBatchWritePolicyDefault ^AerospikeClient client))] + (set! (.expiration wp) expiration) + (set! (.recordExistsAction wp) record-exists-action) + wp))) + (defn set-policy "Create a write policy with UPDATE record exists action. in case of new entry, create it @@ -160,6 +187,9 @@ (set! (.readPolicyDefault cp) (get conf "readPolicyDefault" (map->policy conf))) (set! (.writePolicyDefault cp) (get conf "writePolicyDefault" (map->write-policy conf))) (set! (.batchPolicyDefault cp) (get conf "batchPolicyDefault" (map->batch-policy conf))) + (set! (.batchParentPolicyWriteDefault cp) (get conf "batchParentPolicyWriteDefault" (map->batch-policy conf))) + (set! (.batchWritePolicyDefault cp) (get conf "batchWritePolicyDefault" (map->batch-write-policy conf))) + (set-java-enum cp conf "AuthMode") (set-java cp conf "clusterName") (set-java cp conf "connPoolsPerNode") diff --git a/src/main/clojure/aerospike_clj/protocols.clj b/src/main/clojure/aerospike_clj/protocols.clj index 0e21b66..854d53a 100644 --- a/src/main/clojure/aerospike_clj/protocols.clj +++ b/src/main/clojure/aerospike_clj/protocols.clj @@ -86,7 +86,9 @@ "Add bins to an existing record without modifying old data. The `new-data` must be a Clojure map.") - (touch [this index set-name expiration] + (touch + [this index set-name expiration] + [this index set-name expiration conf] "Updates the TTL of the record stored under at `index` to `expiration` seconds from now. Expects records to exist.")) @@ -112,6 +114,16 @@ will process the command and send the results to the listener. `commands` is a sequence of Aerospike CDT operations.")) +(defprotocol AerospikeBatchOps + (batch-operate + [this batch-records] + [this batch-records conf] + "Asynchronously perform multiple read/write operations on a single key in one batch call. + This method registers the command with an event loop and returns. The event loop thread + will process the command and send the results to the listener. + `commands` is a sequence of Aerospike CDT operations.")) + + (defprotocol AerospikeSetOps (scan-set [this aero-namespace set-name conf] "Scans through the given set and calls a user defined callback for each record that was found. diff --git a/test/aerospike_clj/client_test.clj b/test/aerospike_clj/client_test.clj deleted file mode 100644 index e69de29..0000000 diff --git a/test/aerospike_clj/integration/aerospike_setup.clj b/test/aerospike_clj/integration/aerospike_setup.clj new file mode 100644 index 0000000..24bc09b --- /dev/null +++ b/test/aerospike_clj/integration/aerospike_setup.clj @@ -0,0 +1,28 @@ +(ns aerospike-clj.integration.aerospike-setup + (:require [clj-test-containers.core :as tc])) + +(def ^:private ^:const container-port 3000) +(def ^:dynamic *container* nil) + +(defn get-host-and-port [] + (format "%s:%s" (:host *container*) (get (:mapped-ports *container*) container-port))) + +(defn- start-container [] + (->> {:image-name "aerospike:ee-6.2.0.3" + :exposed-ports [container-port] + :wait-for {:wait-strategy :log + :message "objects: all 0 master 0 prole 0 non-replica 0" + :times 1 + :startup-timeout 15}} + tc/create + tc/start!)) + +(defn- stop! [] + (tc/stop! *container*)) + +(defn with-aerospike [test-fn] + (binding [*container* (start-container)] + (try + (test-fn) + (finally + (stop!))))) diff --git a/test/aerospike_clj/integration_test.clj b/test/aerospike_clj/integration/integration_test.clj similarity index 69% rename from test/aerospike_clj/integration_test.clj rename to test/aerospike_clj/integration/integration_test.clj index 04446a8..6afcd0b 100644 --- a/test/aerospike_clj/integration_test.clj +++ b/test/aerospike_clj/integration/integration_test.clj @@ -1,53 +1,58 @@ (ns ^{:author "Ido Barkan" - :integration true - :doc "Integration tests. Requires a local Aerospike instance. - To run instances locally inside docker containers: - $ docker run -d --name aerospike -p 3000:3000 -p 3001:3001 -p 3002:3002 -p 3003:3003 aerospike:4.9.0.11"} - aerospike-clj.integration-test + :integration true} + aerospike-clj.integration.integration-test (:require [clojure.test :refer [deftest testing is use-fixtures]] + [cheshire.core :as json] + [aerospike-clj.integration.aerospike-setup :as as-setup] [aerospike-clj.client :as client] [aerospike-clj.protocols :as pt] [aerospike-clj.policy :as policy] [aerospike-clj.key :as as-key] - [cheshire.core :as json]) - (:import [com.aerospike.client Value AerospikeClient] - [com.aerospike.client.cdt ListOperation ListPolicy ListOrder ListWriteFlags ListReturnType - MapOperation MapPolicy MapOrder MapWriteFlags MapReturnType CTX] - [com.aerospike.client.policy Priority ReadModeSC ReadModeAP Replica GenerationPolicy RecordExistsAction - WritePolicy BatchPolicy Policy] - [java.util HashMap ArrayList] - [java.util.concurrent ExecutionException] - [clojure.lang PersistentArrayMap] - [aerospike_clj.client SimpleAerospikeClient])) + [aerospike-clj.utils :as utils] + [spy.core :as spy]) + (:import (com.aerospike.client Value AerospikeClient BatchWrite Operation Bin) + (com.aerospike.client.cdt ListOperation ListPolicy ListOrder ListWriteFlags ListReturnType + MapOperation MapPolicy MapOrder MapWriteFlags MapReturnType CTX) + (com.aerospike.client.policy ReadModeSC ReadModeAP Replica GenerationPolicy RecordExistsAction + WritePolicy BatchPolicy Policy CommitLevel BatchWritePolicy) + (java.util HashMap ArrayList) + (java.util.concurrent ExecutionException) + (clojure.lang PersistentArrayMap) + (aerospike_clj.client SimpleAerospikeClient) + (com.aerospike.client.exp Exp))) (def _set "set") (def _set2 "set2") (def as-namespace "test") (def ^:dynamic *c* nil) +(def ^:dynamic *as-hosts* nil) (def TTL 5) -(defn db-connection [test-fn] - (binding [*c* (client/init-simple-aerospike-client - ["localhost"] - as-namespace)] - (test-fn) - (pt/stop *c*))) -(use-fixtures :once db-connection) +(defn with-db-connection [test-fn] + (let [as-hosts [(as-setup/get-host-and-port)]] + (binding [*c* (client/init-simple-aerospike-client + as-hosts + as-namespace) + *as-hosts* as-hosts] + (test-fn) + (pt/stop *c*)))) + +(use-fixtures :once as-setup/with-aerospike with-db-connection) (deftest client-creation - (let [c (client/init-simple-aerospike-client ["localhost"] "test")] + (let [c (client/init-simple-aerospike-client *as-hosts* as-namespace)] (is c) - (is (= ["localhost"] (.-hosts ^SimpleAerospikeClient c)))) + (is (= *as-hosts* (.-hosts ^SimpleAerospikeClient c)))) (letfn [(no-password? [ex] (let [conf (:conf (ex-data ex))] (and conf (not (contains? conf "password")))))] - (let [ex (is (thrown-with-msg? Exception #"unbounded delay queue" (client/init-simple-aerospike-client ["localhost"] "test" {"maxCommandsInProcess" 1})))] + (let [ex (is (thrown-with-msg? Exception #"unbounded delay queue" (client/init-simple-aerospike-client *as-hosts* as-namespace {"maxCommandsInProcess" 1})))] (is (no-password? ex))) (with-redefs [client/create-event-loops (constantly nil)] - (let [ex (is (thrown-with-msg? Exception #"event-loops" (client/init-simple-aerospike-client ["localhost"] "test")))] + (let [ex (is (thrown-with-msg? Exception #"event-loops" (client/init-simple-aerospike-client *as-hosts* as-namespace)))] (is (no-password? ex)))))) (deftest health @@ -141,7 +146,7 @@ (is (true? @(pt/create *c* k _set data TTL))) (testing "clojure maps can be serialized as-is" (let [v @(pt/get-single-no-meta *c* k _set)] - (is (= data v)) ;; per value it is identical + (is (= data v)) ;; per value it is identical (is (= PersistentArrayMap (type v))))))) (deftest put-multiple-bins-get-clj-map @@ -153,11 +158,11 @@ (is (true? @(pt/create *c* k _set data TTL))) (testing "clojure maps can be serialized from bins" (let [v @(pt/get-single-no-meta *c* k _set)] - (is (= (get data "foo") (get v "foo"))) ;; per value it is identical - (is (= (get data "bar") (get v "bar"))) ;; true value returns the same after being sanitized/desanitized - (is (= (get data "baz") (get v "baz"))) ;; false value returns the same after being sanitized/desanitized - (is (= (get data "qux") (get v "qux"))) ;; nil value retuns the same after being sanitized/desanitized - (is (= PersistentArrayMap (type v))) ;; converted back to a Clojure map instead of HashMap + (is (= (get data "foo") (get v "foo"))) ;; per value it is identical + (is (= (get data "bar") (get v "bar"))) ;; true value returns the same after being sanitized/desanitized + (is (= (get data "baz") (get v "baz"))) ;; false value returns the same after being sanitized/desanitized + (is (= (get data "qux") (get v "qux"))) ;; nil value retuns the same after being sanitized/desanitized + (is (= PersistentArrayMap (type v))) ;; converted back to a Clojure map instead of HashMap (is (true? (map? v))))))) (deftest get-single-multiple-bins @@ -170,7 +175,7 @@ (let [v1 @(pt/get-single *c* k _set {} ["foo"]) v2 @(pt/get-single *c* k _set {} ["bar"]) v3 @(pt/get-single *c* k _set {} ["baz"]) - v4 @(pt/get-single *c* k _set {})] ;; getting all bins for the record + v4 @(pt/get-single *c* k _set {})] ;; getting all bins for the record (is (= (get data "foo") (get (:payload v1) "foo"))) (is (= (get data "bar") (get (:payload v2) "bar"))) (is (= (get data "baz") (get (:payload v3) "baz"))) @@ -200,7 +205,7 @@ new-data {"qux" [(rand-int 1000)]} k (random-key)] (is (true? @(pt/create *c* k _set data TTL))) - (is (true? @(pt/add-bins *c* k _set new-data TTL))) ;; adding value to bin + (is (true? @(pt/add-bins *c* k _set new-data TTL))) ;; adding value to bin (testing "bin values can be added to existing records" (let [v @(pt/get-single-no-meta *c* k _set)] (is (= v (merge data new-data))) @@ -219,7 +224,7 @@ bin-keys ["foo" "bar" "baz"] k (random-key)] (is (true? @(pt/create *c* k _set data TTL))) - (is (true? @(pt/delete-bins *c* k _set bin-keys TTL))) ;; removing value from bin + (is (true? @(pt/delete-bins *c* k _set bin-keys TTL))) ;; removing value from bin (testing "bin values can be removed from existing records" (let [v @(pt/get-single-no-meta *c* k _set)] (is (= v (apply dissoc data bin-keys))) @@ -519,26 +524,62 @@ (is (nil? (first (:payload @(set-pop "foo"))))) (is (= #{"bar"} (set-getall)))))) +(deftest batch-operate + (testing "mixed cdt operations on multiple keys" + (letfn [(create-batch-write-record [list-bin map-bin map-key ^String string-bin k v] + (let [as-key (pt/create-key k as-namespace _set)] + (BatchWrite. as-key (utils/v->array Operation [(ListOperation/append list-bin (Value/get (str v)) nil) + (MapOperation/put (MapPolicy.) map-bin (Value/get map-key) (Value/get (str v)) nil) + (Operation/put (Bin. string-bin (str v)))])))) + (create-touch-record [k _v] + (let [as-key (pt/create-key k as-namespace _set)] + (BatchWrite. as-key (utils/v->array Operation [(Operation/touch)])))) + (create-read-batch-record [k] + {:index k :set _set})] + (let [list-bin "list" + map-bin "map" + map-key "test-key" + string-bin "string" + ks (take 3 (repeatedly random-key)) + expected-write-result-payload {:result-code 0 + :payload {list-bin 1 + map-bin 1 + string-bin nil}} + expected-read-payloads (mapv (fn [^String val] + (hash-map map-bin {map-key (str val)} + list-bin [(str val)] + string-bin (str val))) (range 3)) + batch-write-records (mapv (partial create-batch-write-record + list-bin + map-bin + map-key + string-bin) ks (range)) + batch-touch-records (mapv create-touch-record ks (range)) + batch-read-records (mapv create-read-batch-record ks)] + (is (every? #(= expected-write-result-payload (select-keys % [:result-code :payload])) + @(pt/batch-operate *c* batch-write-records))) + (is (= expected-read-payloads + (mapv :payload @(pt/get-batch *c* batch-read-records)))) + (is (every? #(zero? (:result-code %)) @(pt/batch-operate *c* batch-touch-records))))))) + (deftest default-read-policy (let [rp (.getReadPolicyDefault ^AerospikeClient (.-client ^SimpleAerospikeClient *c*))] - (is (= Priority/DEFAULT (.priority rp))) ;; Priority of request relative to other transactions. Currently, only used for scans. - (is (= ReadModeAP/ONE (.readModeAP rp))) ;; Involve single node in the read operation. - (is (= Replica/SEQUENCE (.replica rp))) ;; Try node containing master partition first. + (is (= Replica/SEQUENCE (.replica rp))) ;; Try node containing master partition first. ;; If connection fails, all commands try nodes containing replicated partitions. ;; If socketTimeout is reached, reads also try nodes containing replicated partitions, ;; but writes remain on master node.))) ;; This option requires ClientPolicy.requestProleReplicas to be enabled in order to function properly. - (is (= 30000 (.socketTimeout rp))) ;; 30 seconds default - (is (zero? (.totalTimeout rp))) ;; no time limit - (is (= 3000 (.timeoutDelay rp))) ;; no delay, connection closed on timeout - (is (= 2 (.maxRetries rp))) ;; initial attempt + 2 retries = 3 attempts - (is (zero? (.sleepBetweenRetries rp))) ;; do not sleep between retries - (is (false? (.sendKey rp))) ;; do not send the user defined key - (is (= ReadModeSC/SESSION (.readModeSC rp))))) ;; Ensures this client will only see an increasing sequence of record versions. Server only reads from master. This is the default.. + (is (= 30000 (.socketTimeout rp))) ;; 30 seconds default + (is (= 1000 (.totalTimeout rp))) ;; total timeout of 1 second + (is (= 3000 (.timeoutDelay rp))) ;; no delay, connection closed on timeout + (is (= 2 (.maxRetries rp))) ;; initial attempt + 2 retries = 3 attempts + (is (zero? (.sleepBetweenRetries rp))) ;; do not sleep between retries + (is (false? (.sendKey rp))) ;; do not send the user defined key + (is (= ReadModeSC/SESSION (.readModeSC rp))))) ;; Ensures this client will only see an increasing sequence of record versions. Server only reads from master. This is the default. (deftest configure-read-and-batch-policy (let [c (client/init-simple-aerospike-client - ["localhost"] "test" + *as-hosts* as-namespace {"readPolicyDefault" (policy/map->policy {"ReadModeAP" "ALL" "ReadModeSC" "LINEARIZE" "maxRetries" 1 @@ -555,7 +596,6 @@ rp ^Policy (.getReadPolicyDefault ^AerospikeClient (.-client ^SimpleAerospikeClient c)) bp ^BatchPolicy (.getBatchPolicyDefault ^AerospikeClient (.-client ^SimpleAerospikeClient c))] - (is (= Priority/DEFAULT (.priority rp))) (is (= ReadModeAP/ALL (.readModeAP rp))) (is (= Replica/RANDOM (.replica rp))) (is (= 1000 (.socketTimeout rp))) @@ -572,23 +612,24 @@ (deftest default-write-policy (let [rp ^WritePolicy (.getWritePolicyDefault ^AerospikeClient (.-client ^SimpleAerospikeClient *c*))] - (is (= ReadModeAP/ONE (.readModeAP rp))) ;; Involve master only in the read operation. - (is (= Replica/SEQUENCE (.replica rp))) ;; Try node containing master partition first. + (is (= ReadModeAP/ONE (.readModeAP rp))) ;; Involve master only in the read operation. + (is (= Replica/SEQUENCE (.replica rp))) ;; Try node containing master partition first. ;; If connection fails, all commands try nodes containing replicated partitions. ;; If socketTimeout is reached, reads also try nodes containing replicated partitions, ;; but writes remain on master node.))) ;; This option requires ClientPolicy.requestProleReplicas to be enabled in order to function properly. - (is (= 30000 (.socketTimeout rp))) ;; 30 seconds default - (is (zero? (.totalTimeout rp))) ;; no time limit - (is (= 3000 (.timeoutDelay rp))) ;; no delay, connection closed on timeout - (is (= 2 (.maxRetries rp))) ;; initial attempt + 2 retries = 3 attempts - (is (zero? (.sleepBetweenRetries rp))) ;; do not sleep between retries - (is (false? (.sendKey rp))) ;; do not send the user defined key - (is (= ReadModeSC/SESSION (.readModeSC rp))))) ;; Ensures this client will only see an increasing sequence of record versions. Server only reads from master. This is the default.. + ;; This option requires ClientPolicy.requestProleReplicas to be enabled in order to function properly. + (is (= 30000 (.socketTimeout rp))) ;; 30 seconds default + (is (= 1000 (.totalTimeout rp))) ;; total timeout of 1 second + (is (= 3000 (.timeoutDelay rp))) ;; no delay, connection closed on timeout + (is (= 2 (.maxRetries rp))) ;; initial attempt + 2 retries = 3 attempts + (is (zero? (.sleepBetweenRetries rp))) ;; do not sleep between retries + (is (false? (.sendKey rp))) ;; do not send the user defined key + (is (= ReadModeSC/SESSION (.readModeSC rp))))) ;; Ensures this client will only see an increasing sequence of record versions. Server only reads from master. This is the default. (deftest configure-write-policy (let [c (client/init-simple-aerospike-client - ["localhost"] "test" + *as-hosts* as-namespace {"writePolicyDefault" (policy/map->write-policy {"CommitLevel" "COMMIT_MASTER" "durableDelete" true "expiration" 1000 @@ -598,7 +639,6 @@ "respondAllOps" true})}) wp ^WritePolicy (.getWritePolicyDefault ^AerospikeClient (.-client ^SimpleAerospikeClient c))] - (is (= Priority/DEFAULT (.priority wp))) (is (= ReadModeAP/ONE (.readModeAP wp))) (is (true? (.durableDelete wp))) (is (= 1000 (.expiration wp))) @@ -607,6 +647,34 @@ (is (= RecordExistsAction/REPLACE_ONLY (.recordExistsAction wp))) (is (true? (.respondAllOps wp))))) +(deftest configure-batch-write-policies + (let [expression (Exp/build (Exp/ge (Exp/intBin "a") (Exp/intBin "b"))) + c (client/init-simple-aerospike-client + *as-hosts* as-namespace + {"batchWritePolicyDefault" (policy/map->batch-write-policy {"CommitLevel" "COMMIT_MASTER" + "durableDelete" true + "expiration" 1000 + "generation" 7 + "GenerationPolicy" "EXPECT_GEN_GT" + "RecordExistsAction" "REPLACE_ONLY" + "filterExp" expression}) + "batchParentPolicyWriteDefault" (policy/map->batch-policy {"allowInline" false + "maxConcurrentThreads" 2 + "sendSetName" true})}) + + batch-write-policy ^BatchWritePolicy (.getBatchWritePolicyDefault ^AerospikeClient (.-client ^SimpleAerospikeClient c)) + batch-parent-write-policy ^BatchPolicy (.getBatchParentPolicyWriteDefault ^AerospikeClient (.-client ^SimpleAerospikeClient c))] + (is (true? (.durableDelete batch-write-policy))) + (is (= 1000 (.expiration batch-write-policy))) + (is (= 7 (.generation batch-write-policy))) + (is (= GenerationPolicy/EXPECT_GEN_GT (.generationPolicy batch-write-policy))) + (is (= RecordExistsAction/REPLACE_ONLY (.recordExistsAction batch-write-policy))) + (is (= expression (.filterExp batch-write-policy))) + (is (= CommitLevel/COMMIT_MASTER (.commitLevel batch-write-policy))) + + (is (false? (.allowInline batch-parent-write-policy))) + (is (= 2 (.maxConcurrentThreads batch-parent-write-policy))) + (is (true? (.sendSetName batch-parent-write-policy))))) (deftest set-entry (let [data (rand-int 1000) @@ -687,12 +755,11 @@ @(pt/put-multiple *c* [k k2 k3] (repeat _set) [10 20 30] (repeat ttl) conf) @(pt/scan-set *c* aero-namespace _set {:callback callback}) - + (Thread/sleep 50) ;wait for callback completion (let [res @(pt/get-batch *c* [{:index k :set _set} {:index k2 :set _set} {:index k3 :set _set}])] - - (is (= (sort (mapv :payload res)) [11 21 31])))) + (is (= (mapv :payload res) [11 21 31])))) (delete-records)) (testing "it can delete items during a scan" @@ -725,3 +792,61 @@ (is (= 1 (count @res)))) (delete-records)))) + +(deftest client-events-test + (let [success-spy (spy/spy) + failure-spy (spy/spy) + create-mock-client-events (fn [on-success-spy on-failure-spy] + (reify pt/ClientEvents + (on-success [_this op-name op-result index op-start-time] + (on-success-spy op-name op-result index op-start-time) + op-result) + (on-failure [_this op-name op-ex index op-start-time] + (on-failure-spy op-name op-ex index op-start-time) + op-ex))) + client-events (create-mock-client-events success-spy failure-spy) + c (client/init-simple-aerospike-client *as-hosts* as-namespace {:client-events client-events}) + index (random-key) + payload {"string" "hello"}] + (letfn [(nth-args-map [spy-instance ks n] + (let [args-map (zipmap ks (spy/nth-call spy-instance n))] + (assert number? (:op-start-time args-map)) + (dissoc args-map :op-start-time))) + (success-nth-args-map [spy-instance n] + (nth-args-map spy-instance [:op-name :op-result :index :op-start-time] n)) + (failure-nth-args-map [spy-instance n] + (nth-args-map spy-instance [:op-name :op-ex :index :op-start-time] n))] + (testing "client events is called with the right parameters using default client-events passed on client initialization" + @(pt/create c index _set payload TTL) + @(pt/get-single c index _set) + (is (thrown-with-msg? ExecutionException #"Generation error" @(pt/update c index _set "new-payload" 42 TTL))) + + (is (= {:op-name :write + :op-result true + :index index} (success-nth-args-map success-spy 0))) + (is (= {:op-name :read + :op-result payload + :index index} (-> (success-nth-args-map success-spy 1) + (update :op-result :payload)))) + (is (= {:op-name :write + :op-ex 3 + :index index} (-> (failure-nth-args-map failure-spy 0) + (update :op-ex #(.getResultCode %)))))) + (testing "client events is called with the right parameters using custom client-events per op" + (let [custom-success-spy (spy/spy) + custom-failure-spy (spy/spy) + custom-client-events (create-mock-client-events custom-success-spy custom-failure-spy)] + @(pt/put c index _set payload TTL {:client-events [custom-client-events]}) + @(pt/get-single c index _set {:client-events [custom-client-events]}) + (is (thrown-with-msg? ExecutionException #"Generation error" @(pt/update c index _set "new-payload" 42 TTL {:client-events [custom-client-events]}))) + (is (= {:op-name :write + :op-result true + :index index} (success-nth-args-map custom-success-spy 0))) + (is (= {:op-name :read + :op-result payload + :index index} (-> (success-nth-args-map custom-success-spy 1) + (update :op-result :payload)))) + (is (= {:op-name :write + :op-ex 3 + :index index} (-> (failure-nth-args-map custom-failure-spy 0) + (update :op-ex #(.getResultCode %)))))))))) diff --git a/test/aerospike_clj/metrics_test.clj b/test/aerospike_clj/integration/metrics_test.clj similarity index 78% rename from test/aerospike_clj/metrics_test.clj rename to test/aerospike_clj/integration/metrics_test.clj index bd23311..8959bf2 100644 --- a/test/aerospike_clj/metrics_test.clj +++ b/test/aerospike_clj/integration/metrics_test.clj @@ -1,12 +1,15 @@ (ns ^{:author "Ido Barkan" :integration true} - aerospike-clj.metrics-test - (:require [clojure.test :refer [deftest is]] + aerospike-clj.integration.metrics-test + (:require [clojure.test :refer [deftest is use-fixtures]] [aerospike-clj.client :as client] + [aerospike-clj.integration.aerospike-setup :as as-setup] [aerospike-clj.protocols :as pt])) +(use-fixtures :once as-setup/with-aerospike) + (deftest get-cluster-stats - (let [c (client/init-simple-aerospike-client ["localhost"] "test") + (let [c (client/init-simple-aerospike-client [(as-setup/get-host-and-port)] "test") loopback-v4 "127-0-0-1" loopback-v6 "0:0:0:0:0:0:0:1"] (is (or diff --git a/update-docs.sh b/update-docs.sh index aa5fad3..f71e7f9 100755 --- a/update-docs.sh +++ b/update-docs.sh @@ -10,7 +10,7 @@ git clean -fdx cd .. echo "regenerate docs..." -lein with-profile docs codox +lein with-profile +docs codox echo "commit changes" cd codox