Skip to content

Commit

Permalink
Batch policy
Browse files Browse the repository at this point in the history
  • Loading branch information
evg-tso committed Jan 14, 2024
1 parent d1a3b8b commit bfe561f
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 53 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ the `com.aerospike/aerospike-client` dependency to be used.
Please note that the `aerospike-clj.batch-client` namespace requires the `com.aerospike/aerospike-client` dependency to
be version `6.0.0` or higher.

### Setting up a client policy for `com.aerospike/aerospike-client` with version 6.0.0 and above

The `com.aerospike/aerospike-client` dependency version `6.0.0` is a breaking change.
To set batch operation policies, please use the `aerospike-clj.batch-policy` namespace.

## Testing
### Unit tests
Executed via running `lein test`.
Expand Down
10 changes: 8 additions & 2 deletions src/main/clojure/aerospike_clj/batch_client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@
[aerospike-clj.protocols :as pt]
[promesa.core :as p])
(:import (aerospike_clj.client SimpleAerospikeClient)
(aerospike_clj.listeners AsyncBatchOperateListListener)
(com.aerospike.client AerospikeClient BatchRecord)
(com.aerospike.client AerospikeClient AerospikeException BatchRecord)
(com.aerospike.client.async EventLoop EventLoops)
(com.aerospike.client.listener BatchOperateListListener)
(com.aerospike.client.policy BatchPolicy)
(java.util List)))

(deftype ^:private AsyncBatchOperateListListener [op-future]
BatchOperateListListener
(^void onSuccess [_this ^List records ^boolean _status]
(p/resolve! op-future records))
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex)))

(defn- batch-record->map [^BatchRecord batch-record]
(let [k (.key batch-record)]
(-> (record/record->map (.record batch-record))
Expand Down
36 changes: 36 additions & 0 deletions src/main/clojure/aerospike_clj/batch_policy.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
(ns aerospike-clj.batch-policy
(:require [aerospike-clj.policy :as policy])
(:import (com.aerospike.client.policy BatchPolicy BatchWritePolicy ClientPolicy)))

(defn map->batch-policy
"Create a `BatchPolicy` from a map.
This function is slow due to possible reflection."
^BatchPolicy [conf]
(let [bp (BatchPolicy. (policy/map->policy conf))
conf (merge {"timeoutDelay" 3000} conf)]
(policy/set-java bp conf "allowInline")
(policy/set-java bp conf "respondAllKeys")
(policy/set-java bp conf "maxConcurrentThreads")
(policy/set-java bp conf "sendSetName")
bp))

(defn map->batch-write-policy
"Create a `BatchWritePolicy` from a map. Enumeration names should start with capitalized letter.
This function is slow due to possible reflection."
^BatchWritePolicy [conf]
(let [p (BatchWritePolicy.)]
(policy/set-java-enum p conf "RecordExistsAction")
(policy/set-java-enum p conf "CommitLevel")
(policy/set-java-enum p conf "GenerationPolicy")
(policy/set-java p conf "filterExp")
(policy/set-java p conf "generation")
(policy/set-java p conf "expiration")
(policy/set-java p conf "durableDelete")
(policy/set-java p conf "sendKey")
p))

(defn add-batch-write-policy
"Set the [[batchWritePolicyDefault]] or the [[batchParentPolicyWriteDefault]] in a [[ClientPolicy]]."
[^ClientPolicy client-policy conf]
(set! (.batchParentPolicyWriteDefault client-policy) (get conf "batchParentPolicyWriteDefault" (map->batch-policy conf)))
(set! (.batchWritePolicyDefault client-policy) (get conf "batchWritePolicyDefault" (map->batch-write-policy conf))))
19 changes: 6 additions & 13 deletions src/main/clojure/aerospike_clj/listeners.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
(ns aerospike-clj.listeners
(:require [promesa.core :as p]
[aerospike-clj.aerospike-record :as record])
(:import (java.util List Map)
(com.aerospike.client Key Record AerospikeException AerospikeException$QueryTerminated)
(com.aerospike.client.listener RecordListener WriteListener DeleteListener
ExistsListener BatchListListener RecordSequenceListener InfoListener ExistsArrayListener BatchOperateListListener)))
(:require [aerospike-clj.aerospike-record :as record]
[promesa.core :as p])
(:import (com.aerospike.client AerospikeException AerospikeException$QueryTerminated Key Record)
(com.aerospike.client.listener BatchListListener DeleteListener
ExistsArrayListener ExistsListener InfoListener RecordListener RecordSequenceListener WriteListener)
(java.util List Map)))

(deftype AsyncExistsListener [op-future]
ExistsListener
Expand Down Expand Up @@ -66,10 +66,3 @@
(p/reject! op-future ex))
(^void onSuccess [_this ^"[Lcom.aerospike.client.Key;" _keys ^"[Z" exists]
(p/resolve! op-future exists)))

(deftype AsyncBatchOperateListListener [op-future]
BatchOperateListListener
(^void onSuccess [_this ^List records ^boolean _status]
(p/resolve! op-future records))
(^void onFailure [_this ^AerospikeException ex]
(p/reject! op-future ex)))
31 changes: 1 addition & 30 deletions src/main/clojure/aerospike_clj/policy.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#_{:clj-kondo/ignore [:unused-import]}
[com.aerospike.client.policy Policy ClientPolicy WritePolicy RecordExistsAction
GenerationPolicy BatchPolicy CommitLevel
AuthMode ReadModeAP ReadModeSC Replica BatchWritePolicy]))
AuthMode ReadModeAP ReadModeSC Replica]))

(defmacro set-java [obj conf obj-name]
`(when (some? (get ~conf ~obj-name))
Expand Down Expand Up @@ -38,21 +38,6 @@
(set-java p conf "totalTimeout")
p))

(defn map->batch-write-policy
"Create a `BatchWritePolicy` from a map. Enumeration names should start with capitalized letter.
This function is slow due to possible reflection."
^BatchWritePolicy [conf]
(let [p (BatchWritePolicy.)]
(set-java-enum p conf "RecordExistsAction")
(set-java-enum p conf "CommitLevel")
(set-java-enum p conf "GenerationPolicy")
(set-java p conf "filterExp")
(set-java p conf "generation")
(set-java p conf "expiration")
(set-java p conf "durableDelete")
(set-java p conf "sendKey")
p))

(defn map->batch-policy
"Create a `BatchPolicy` from a map.
This function is slow due to possible reflection."
Expand Down Expand Up @@ -92,18 +77,6 @@
(set! (.recordExistsAction wp) record-exists-action)
wp)))

(defn batch-write-policy
"Create a write policy to be passed to put methods via `{:policy wp}`.
Also used in `update` and `create`.
The default policy in case the record exists is `RecordExistsAction/UPDATE`."
(^BatchWritePolicy [client expiration]
(batch-write-policy client expiration (RecordExistsAction/UPDATE)))
(^BatchWritePolicy [client expiration record-exists-action]
(let [wp (BatchWritePolicy. (.getBatchWritePolicyDefault ^AerospikeClient client))]
(set! (.expiration wp) expiration)
(set! (.recordExistsAction wp) record-exists-action)
wp)))

(defn set-policy
"Create a write policy with UPDATE record exists action.
in case of new entry, create it
Expand Down Expand Up @@ -187,8 +160,6 @@
(set! (.readPolicyDefault cp) (get conf "readPolicyDefault" (map->policy conf)))
(set! (.writePolicyDefault cp) (get conf "writePolicyDefault" (map->write-policy conf)))
(set! (.batchPolicyDefault cp) (get conf "batchPolicyDefault" (map->batch-policy conf)))
(set! (.batchParentPolicyWriteDefault cp) (get conf "batchParentPolicyWriteDefault" (map->batch-policy conf)))
(set! (.batchWritePolicyDefault cp) (get conf "batchWritePolicyDefault" (map->batch-write-policy conf)))

(set-java-enum cp conf "AuthMode")
(set-java cp conf "clusterName")
Expand Down
16 changes: 8 additions & 8 deletions test/aerospike_clj/integration/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
[clojure.test :refer [deftest testing is use-fixtures]]
[cheshire.core :as json]
[aerospike-clj.integration.aerospike-setup :as as-setup]
[aerospike-clj.batch-policy :as batch-policy]
[aerospike-clj.client :as client]
[aerospike-clj.protocols :as pt]
[aerospike-clj.policy :as policy]
[aerospike-clj.key :as as-key]
[aerospike-clj.utils :as utils]
[clojure.tools.logging :as log]
[spy.core :as spy])
(:import (com.aerospike.client Value AerospikeClient BatchWrite Operation Bin)
(com.aerospike.client.cdt ListOperation ListPolicy ListOrder ListWriteFlags ListReturnType
Expand Down Expand Up @@ -652,13 +652,13 @@
(let [expression (Exp/build (Exp/ge (Exp/intBin "a") (Exp/intBin "b")))
c (client/init-simple-aerospike-client
*as-hosts* as-namespace
{"batchWritePolicyDefault" (policy/map->batch-write-policy {"CommitLevel" "COMMIT_MASTER"
"durableDelete" true
"expiration" 1000
"generation" 7
"GenerationPolicy" "EXPECT_GEN_GT"
"RecordExistsAction" "REPLACE_ONLY"
"filterExp" expression})
{"batchWritePolicyDefault" (batch-policy/map->batch-write-policy {"CommitLevel" "COMMIT_MASTER"
"durableDelete" true
"expiration" 1000
"generation" 7
"GenerationPolicy" "EXPECT_GEN_GT"
"RecordExistsAction" "REPLACE_ONLY"
"filterExp" expression})
"batchParentPolicyWriteDefault" (policy/map->batch-policy {"allowInline" false
"maxConcurrentThreads" 2
"sendSetName" true})})
Expand Down

0 comments on commit bfe561f

Please sign in to comment.