Navigation Menu

Skip to content

Commit

Permalink
Merge pull request #144 from mitchelkuijpers/native-bulk-operations
Browse files Browse the repository at this point in the history
A first stab at Support bulk operations in the native client #24
  • Loading branch information
michaelklishin committed Mar 2, 2015
2 parents d64b32a + a63c156 commit 7c8ccf8
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 21 deletions.
34 changes: 34 additions & 0 deletions src/clojurewerkz/elastisch/common/bulk.clj
@@ -0,0 +1,34 @@
;; Copyright (c) 2011-2014 Michael S. Klishin, Alex Petrov, and the ClojureWerkz Team
;;
;; The use and distribution terms for this software are covered by the
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
;; which can be found in the file epl-v10.html at the root of this distribution.
;; By using this software in any fashion, you are agreeing to be bound by
;; the terms of this license.
;; You must not remove this notice, or any other, from this software.

(ns clojurewerkz.elastisch.common.bulk
(:require [clojure.set :refer :all]))

(def ^:private special-operation-keys
[:_index :_type :_id :_retry_on_conflict :_routing :_percolate :_parent :_timestamp :_ttl])

(defn index-operation
[doc]
{"index" (select-keys doc special-operation-keys)})

(defn delete-operation
[doc]
{"delete" (select-keys doc special-operation-keys)})

(defn bulk-index
"generates the content for a bulk insert operation"
([documents]
(let [operations (map index-operation documents)
documents (map #(apply dissoc % special-operation-keys) documents)]
(interleave operations documents))))

(defn bulk-delete
"generates the content for a bulk delete operation"
([documents]
(map delete-operation documents)))
76 changes: 76 additions & 0 deletions src/clojurewerkz/elastisch/native/bulk.clj
@@ -0,0 +1,76 @@
;; Copyright 2011-2014 Michael S. Klishin, Alex Petrov, and the ClojureWerkz Team
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns clojurewerkz.elastisch.native.bulk
(:refer-clojure :exclude [get replace count sort])
(:require [clojurewerkz.elastisch.native :as native]
[clojure.string :as string]
[clojure.set :refer :all]
[clojurewerkz.elastisch.native.conversion :as cnv]
[clojurewerkz.elastisch.arguments :as ar]
[clojurewerkz.elastisch.common.bulk :as common-bulk])
(:import clojure.lang.IPersistentMap
org.elasticsearch.client.Client
org.elasticsearch.action.bulk.BulkRequest
org.elasticsearch.action.bulk.BulkRequestBuilder
org.elasticsearch.action.bulk.BulkResponse
org.elasticsearch.action.index.IndexRequest
org.elasticsearch.action.delete.DeleteRequest))

(defprotocol AddOperation
(add-operation [operation bulk-builder]))

(extend-protocol AddOperation
IndexRequest
(add-operation [^IndexRequest operation ^BulkRequestBuilder bulk-builder]
(.add bulk-builder operation))

DeleteRequest
(add-operation [^DeleteRequest operation ^BulkRequestBuilder bulk-builder]
(.add bulk-builder operation)))

(defn add-default [doc default]
(if-let [action (cnv/get-bulk-item-action doc)]
(update-in doc [action] #(merge default %))
doc))

(defn bulk
"Performs a bulk operation"
[^Client conn operations & params]
(let [^BulkRequestBuilder req (reduce #(add-operation %2 %1) (.prepareBulk conn)
(cnv/->action-requests operations))]
(when (:refresh (first (flatten params)))
(.setRefresh req true))
(-> req
.execute
^BulkResponse .actionGet
cnv/bulk-response->map)))

(defn bulk-with-index
"Performs a bulk operation defaulting to the index specified"
[^Client conn index operations & params]
(bulk conn (map #(add-default % {:_index index}) operations) params))

(defn bulk-with-index-and-type
"Performs a bulk operation defaulting to the index and type specified"
[^Client conn index mapping-type operations & params]
(bulk conn (map #(add-default % {:_index index :_type mapping-type}) operations) params))

(def index-operation common-bulk/index-operation)

(def delete-operation common-bulk/delete-operation)

(def bulk-index common-bulk/bulk-index)

(def bulk-delete common-bulk/bulk-delete)
72 changes: 71 additions & 1 deletion src/clojurewerkz/elastisch/native/conversion.clj
Expand Up @@ -100,7 +100,9 @@
org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest
org.elasticsearch.action.support.broadcast.BroadcastOperationResponse
org.elasticsearch.action.support.master.AcknowledgedResponse))
org.elasticsearch.action.support.master.AcknowledgedResponse
;; Bulk responses
[org.elasticsearch.action.bulk BulkResponse BulkItemResponse]))

;;
;; Implementation
Expand Down Expand Up @@ -1448,3 +1450,71 @@
(defn ^DeleteIndexTemplateRequest ->delete-index-template-request
[template-name]
(DeleteIndexTemplateRequest. template-name))

;;
;; Bulk Responses
;;

(defn bulk-item-response->map
[^BulkItemResponse item]
(let [res {:index (.getIndex item)
:_index (.getIndex item)
:type (.getType item)
:_type (.getType item)
:id (.getId item)
:_id (.getId item)
:version (.getVersion item)
:_version (.getVersion item)
:op-type (.getOpType item)
:failed? (.isFailed item)}]
(if (:failed? res)
(assoc res :failure-message (.getFailureMessage item))
res)))

(defn bulk-response->map
[^BulkResponse response]
{:took (.getTookInMillis response)
:has-failures? (.hasFailures response)
:items (mapv bulk-item-response->map (.getItems response))})

(defn remove-underscores
[opts]
(reduce-kv (fn [m k v]
(assoc m (keyword (clojure.string/replace (name k) #"^_" "")) v))
{} opts))

(defn get-bulk-item-action
[doc]
(cond (contains? doc "index") "index"
(contains? doc "delete") "delete"
:else nil))

(defn ->action-requests
[a]
(loop [actions a
results []]
(let [curr (first actions)
request-type (get-bulk-item-action curr)
add (case request-type
"index" (let [source (second actions)
opts (clojure.core/get curr "index")]
(->index-request
(:_index opts)
(:_type opts)
source
(remove-underscores opts)))
"delete" (let [opts (clojure.core/get curr "delete")]
(->delete-request
(:_index opts)
(:_type opts)
(:_id opts)
(remove-underscores opts)))
nil nil)
new-results (if (nil? add) results (conj results add))
next-rest (case request-type
"index" (rest (rest actions))
"delete" (rest actions)
nil ())]
(if (empty? next-rest)
new-results
(recur next-rest new-results)))))
25 changes: 5 additions & 20 deletions src/clojurewerkz/elastisch/rest/bulk.clj
Expand Up @@ -18,6 +18,7 @@
[cheshire.core :as json]
[clojure.string :as string]
[clojure.set :refer :all]
[clojurewerkz.elastisch.common.bulk :as common-bulk]
[clojurewerkz.elastisch.arguments :as ar])
(:import clojurewerkz.elastisch.rest.Connection))

Expand Down Expand Up @@ -49,26 +50,10 @@
(apply bulk-with-url conn (rest/bulk-url conn
index mapping-type) operations params))

(def ^:private special-operation-keys
[:_index :_type :_id :_retry_on_conflict :_routing :_percolate :_parent :_timestamp :_ttl])
(def index-operation common-bulk/index-operation)

(defn index-operation
[doc]
{"index" (select-keys doc special-operation-keys)})
(def delete-operation common-bulk/delete-operation)

(defn delete-operation
[doc]
{"delete" (select-keys doc special-operation-keys)})
(def bulk-index common-bulk/bulk-index)

(defn bulk-index
"generates the content for a bulk insert operation"
([documents]
(let [operations (map index-operation documents)
documents (map #(apply dissoc % special-operation-keys) documents)]
(interleave operations documents))))

(defn bulk-delete
"generates the content for a bulk delete operation"
([documents]
(let [operations (map delete-operation documents)]
operations)))
(def bulk-delete common-bulk/bulk-delete)
68 changes: 68 additions & 0 deletions test/clojurewerkz/elastisch/native_api/bulk_test.clj
@@ -0,0 +1,68 @@
;; Copyright (c) 2011-2014 Michael S. Klishin, Alex Petrov, and the ClojureWerkz Team
;;
;; The use and distribution terms for this software are covered by the
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
;; which can be found in the file epl-v10.html at the root of this distribution.
;; By using this software in any fashion, you are agreeing to be bound by
;; the terms of this license.
;; You must not remove this notice, or any other, from this software.

(ns clojurewerkz.elastisch.native-api.bulk-test
(:require [clojurewerkz.elastisch.native.bulk :refer :all]
[clojurewerkz.elastisch.native.index :as idx]
[clojurewerkz.elastisch.native.document :as doc]
[clojurewerkz.elastisch.fixtures :as fx]
[clojurewerkz.elastisch.test.helpers :as th]
[clojurewerkz.elastisch.native.response :refer [created?]]
[clojure.test :refer :all]))

(use-fixtures :each fx/reset-indexes)

(def ^{:const true} index-name "people")
(def ^{:const true} index-type "person")

(defn are-all-successful
[xs]
(is (every? (fn [m] (and (:_index m)
(:_type m)
(:_id m)
(:status m))) xs))
(is (every? created? xs)))

(let [conn (th/connect-native-client)]
(deftest ^{:native true :indexing true} test-bulk-indexing
(let [person fx/person-jack
for-index (assoc person :_type index-type :_index index-name)
bulk-operations (bulk-index (repeat 2 for-index))]
(is (= 2 (count (:items (bulk conn bulk-operations {:refresh true})))))
(is (= 2 (:count (doc/count conn index-name index-type))))))

(deftest ^{:native true :indexing true} test-bulk-with-index
(let [document fx/person-jack
for-index (assoc document :_type index-type)
insert-operations (bulk-index (repeat 10 for-index))
response (bulk-with-index conn index-name insert-operations {:refresh true})
first-id (-> response :items first :create :_id)]
(is (= 10 (:count (doc/count conn index-name index-type))))
(is (= false (:has-failures? response)))
(is (= 10 (count (filter #(= "create" (:op-type %)) (:items response)))))
(is (idx/exists? conn index-name))))

(deftest ^{:native true :indexing true} test-bulk-with-index-and-type
(let [document fx/person-jack
insert-operations (bulk-index (repeat 10 document))
response (bulk-with-index-and-type conn index-name index-type insert-operations {:refresh true})]
(is (= 10 (:count (doc/count conn index-name index-type))))
(is (= false (:has-failures? response)))
(is (= 10 (count (filter #(= "create" (:op-type %)) (:items response)))))
(is (idx/exists? conn index-name))))

(deftest ^{:native true :indexing true} test-bulk-delete
(let [insert-ops (bulk-index (repeat 10 fx/person-jack))
response (bulk-with-index-and-type conn index-name index-type insert-ops {:refresh true})
docs (->> response :items)
initial-count (:count (doc/count conn index-name index-type))
delete-response (bulk-with-index-and-type conn index-name index-type (bulk-delete docs) {:refresh true})]
(is (= 10 initial-count))
(is (= false (:has-failures? response)))
(is (= 0 (:count (doc/count conn index-name index-type)))))))

0 comments on commit 7c8ccf8

Please sign in to comment.