Skip to content
Browse files

Add data:load bytes partitioning.

  • Loading branch information...
1 parent 520c42b commit 3de9ee8901683bd19896dcdd46564ee28e81432f @drewr committed with Aug 2, 2012
Showing with 46 additions and 26 deletions.
  1. +26 −23 src/esperanto/client/rest.clj
  2. +14 −0 src/esperanto/utils.clj
  3. +6 −3 test/esperanto/test/client/rest.clj
View
49 src/esperanto/client/rest.clj
@@ -30,26 +30,29 @@
(query (http/uri-append url "_bulk")
(merge {:method :post} o {:body body}))))
-(defn data:load [url & {:as opts}]
- (let [o (merge {:method :put
- :bulksize 100} opts)
- demetaize (fn [doc]
- (let [doc (json/decode doc)]
- (json/encode (dissoc doc :_index :_type))))
- metaize (fn [doc]
- (let [doc (json/decode doc)
- ix (:index o (doc "_index"))
- ty (:type o (doc "_type"))
- id (:id o (doc "_id"))
- m {:index {:_index ix :_type ty}}
- m (if id (update-in m [:index] assoc :_id id) m)]
- (if (not (and ix ty))
- (throw (Exception.
- (str "missing _index or _type for doc "
- (with-out-str (pr doc))))))
- (json/encode m)))
- batches (map #(apply str (interpose "\n" %))
- (for [batch (partition-all (:bulksize o) (:doc-seq o))]
- (interleave (map metaize batch) (map demetaize batch))))]
- (doseq [b batches]
- (index:bulk url (assoc o :body b)))))
+(defn metaize [o doc]
+ (let [doc (if (string? doc) (json/decode doc) doc)
+ ix (:index o (doc "_index"))
+ ty (:type o (doc "_type"))
+ id (:id o (doc "_id"))
+ m {:index {:_index ix :_type ty}}
+ m (if id (update-in m [:index] assoc :_id id) m)]
+ (if (not (and ix ty))
+ (throw (Exception.
+ (str "missing _index or _type for doc "
+ (with-out-str (pr doc))))))
+ (json/encode m)))
+
+(defn data:load [url & {:as o}]
+ (count
+ (filter #(get-in % [:create :ok])
+ (apply concat
+ (for [b (map #(apply str (interpose "\n" %))
+ (for [batch (if (:bulkbytes o)
+ (u/partition-bytes (:bulkbytes o) (:doc-seq o))
+ (partition-all (:bulknum o) (:doc-seq o)))]
+ (do
+ (println (count batch))
+ (interleave (map (partial metaize o) batch)
+ batch))))]
+ (->> (index:bulk url (assoc o :body b)) :body :items))))))
View
14 src/esperanto/utils.clj
@@ -4,4 +4,18 @@
(condp instance? (first opts)
clojure.lang.Keyword (apply hash-map opts)
clojure.lang.PersistentArrayMap (first opts)
+ clojure.lang.PersistentHashMap (first opts)
(throw (Exception. (str "can't make sense of args " (str opts))))))
+
+(defn take-bytes [bytes coll]
+ (lazy-seq
+ (when (and (seq coll) (pos? bytes))
+ (cons (first coll)
+ (take-bytes (- bytes (count (first coll))) (rest coll))))))
+
+(defn partition-bytes [bytes coll]
+ (lazy-seq
+ (when-let [s (seq coll)]
+ (let [seg (doall (take-bytes bytes s))]
+ (cons seg (partition-bytes bytes (nthrest s (count seg))))))))
+
View
9 test/esperanto/test/client/rest.clj
@@ -48,7 +48,8 @@
(es/data:load (url node)
:index _idx
:type _type
- :doc-seq (map json/encode (doc-seq ct)))
+ :doc-seq (map json/encode (doc-seq ct))
+ :bulknum 100)
(es/index:refresh (url node _idx))
(is (= ct (-> (url node _idx) es/index:count :body :count)))
(is (< 4.99999999
@@ -67,7 +68,8 @@
(es/data:load (url node)
:index _idx
:type _type
- :doc-seq (-> f io/resource io/reader line-seq))
+ :doc-seq (-> f io/resource io/reader line-seq)
+ :bulknum 2)
(es/index:refresh (url node _idx))
(is (= 7 (-> (es/index:search (url node _idx)
:body {:query {:match_all {}}})
@@ -76,7 +78,8 @@
(es/data:load (url node)
:type _type
- :doc-seq (-> f io/resource io/reader line-seq))
+ :doc-seq (-> f io/resource io/reader line-seq)
+ :bulknum 2)
(es/index:refresh (url node "test"))
(is (= 7 (-> (es/index:search (url node "test")
:body {:query {:match_all {}}})

0 comments on commit 3de9ee8

Please sign in to comment.
Something went wrong with that request. Please try again.