Permalink
Browse files

abstracts bulk api handling to a new method

adds new methods to access the delete and create features of the bulk api
  • Loading branch information...
rhec committed Aug 6, 2012
1 parent 5129f8a commit c0195ad104dc6b8ad716a7139c8643c171375b44
Showing with 82 additions and 48 deletions.
  1. +19 −4 lib/tire/index.rb
  2. +63 −44 test/unit/index_test.rb
View
@@ -84,17 +84,19 @@ def store(*args)
curl = %Q|curl -X POST "#{url}" -d '#{document}'|
logged([type, id].join('/'), curl)
end
-
- def bulk_store(documents, options={})
+
+ def bulk_api_action(action, documents, options={})
payload = documents.map do |document|
type = get_type_from_document(document, :escape => false) # Do not URL-escape the _type
id = get_id_from_document(document)
STDERR.puts "[ERROR] Document #{document.inspect} does not have ID" unless id
output = []
- output << %Q|{"index":{"_index":"#{@name}","_type":"#{type}","_id":"#{id}"}}|
- output << convert_document_to_json(document)
+ output << %Q|{"#{action}":{"_index":"#{@name}","_type":"#{type}","_id":"#{id}"}}|
+ unless action == 'delete' # delete does not require a document, just an _id
+ output << convert_document_to_json(document)
+ end
output.join("\n")
end
payload << ""
@@ -120,6 +122,19 @@ def bulk_store(documents, options={})
curl = %Q|curl -X POST "#{url}/_bulk" -d '{... data omitted ...}'|
logged('BULK', curl)
end
+
+ end
+
+ def bulk_create(documents, options={})
+ bulk_api_action("create", documents, options)
+ end
+
+ def bulk_store(documents, options={})
+ bulk_api_action("index", documents, options)
+ end
+
+ def bulk_delete(documents, options={})
+ bulk_api_action("delete", documents, options)
end
def import(klass_or_collection, options={})
View
@@ -462,49 +462,69 @@ class MyDocument;end; document = MyDocument.new
end
- context "when storing in bulk" do
+ context "when performing a bulk api action" do
+ # Possible api actions are index, create, delete
# The expected JSON looks like this:
#
# {"index":{"_index":"dummy","_type":"document","_id":"1"}}
# {"id":"1","title":"One"}
- # {"index":{"_index":"dummy","_type":"document","_id":"2"}}
+ # {"create":{"_index":"dummy","_type":"document","_id":"2"}}
# {"id":"2","title":"Two"}
+ # {"delete":{"_index":"dummy","_type":"document","_id":"2"}}
#
# See http://www.elasticsearch.org/guide/reference/api/bulk.html
+
+ # test json with each type of action
+ ["index","create","delete"].each do |action|
+ should "serialize Hashes for #{action}" do
+ Configuration.client.expects(:post).with do |url, json|
+
+ url == "#{@index.url}/_bulk" &&
+ json =~ /"#{action}"/ &&
+ json =~ /"_index":"dummy"/ &&
+ json =~ /"_type":"document"/ &&
+ json =~ /"_id":"1"/ &&
+ json =~ /"_id":"2"/ &&
+ if action == 'delete'
+ # delete should not contain the document contents
+ true
+ else
+ json =~ /"id":"1"/ &&
+ json =~ /"id":"2"/ &&
+ json =~ /"title":"One"/ &&
+ json =~ /"title":"Two"/
+ end
+ end.returns(mock_response('{}'), 200)
+
+ @index.bulk_api_action action, [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
+ end
- should "serialize Hashes" do
- Configuration.client.expects(:post).with do |url, json|
- url == "#{@index.url}/_bulk" &&
- json =~ /"_index":"dummy"/ &&
- json =~ /"_type":"document"/ &&
- json =~ /"_id":"1"/ &&
- json =~ /"_id":"2"/ &&
- json =~ /"id":"1"/ &&
- json =~ /"id":"2"/ &&
- json =~ /"title":"One"/ &&
- json =~ /"title":"Two"/
- end.returns(mock_response('{}'), 200)
-
- @index.bulk_store [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
- end
-
- should "serialize ActiveModel instances" do
- Configuration.client.expects(:post).with do |url, json|
- url == "#{ActiveModelArticle.index.url}/_bulk" &&
- json =~ /"_index":"active_model_articles"/ &&
- json =~ /"_type":"active_model_article"/ &&
- json =~ /"_id":"1"/ &&
- json =~ /"_id":"2"/ &&
- json =~ /"title":"One"/ &&
- json =~ /"title":"Two"/
- end.returns(mock_response('{}', 200))
-
- one = ActiveModelArticle.new 'title' => 'One'; one.id = '1'
- two = ActiveModelArticle.new 'title' => 'Two'; two.id = '2'
-
- ActiveModelArticle.index.bulk_store [ one, two ]
+ should "serialize ActiveModel instances for #{action}" do
+ Configuration.client.expects(:post).with do |url, json|
+ url == "#{ActiveModelArticle.index.url}/_bulk" &&
+ json =~ /"#{action}"/ &&
+ json =~ /"_index":"active_model_articles"/ &&
+ json =~ /"_type":"active_model_article"/ &&
+ json =~ /"_id":"1"/ &&
+ json =~ /"_id":"2"/ &&
+ if action == 'delete'
+ # delete should not contain the document contents
+ true
+ else
+ json =~ /"title":"One"/ &&
+ json =~ /"title":"Two"/
+ end
+
+ end.returns(mock_response('{}', 200))
+
+ one = ActiveModelArticle.new 'title' => 'One'; one.id = '1'
+ two = ActiveModelArticle.new 'title' => 'Two'; two.id = '2'
+
+ ActiveModelArticle.index.bulk_api_action action, [ one, two ]
+ end
+
end
-
+ # use the index action to test common features of the bulk api
context "namespaced models" do
should "not URL-escape the document_type" do
Configuration.client.expects(:post).with do |url, json|
@@ -513,43 +533,42 @@ class MyDocument;end; document = MyDocument.new
json =~ %r|"_index":"my_namespace_my_models"| &&
json =~ %r|"_type":"my_namespace/my_model"|
end.returns(mock_response('{}', 200))
-
+
module MyNamespace
class MyModel
def document_type; "my_namespace/my_model"; end
def to_indexed_json; "{}"; end
end
end
-
- Tire.index('my_namespace_my_models').bulk_store [ MyNamespace::MyModel.new ]
+
+ Tire.index('my_namespace_my_models').bulk_api_action "index", [ MyNamespace::MyModel.new ]
end
end
-
should "try again when an exception occurs" do
Configuration.client.expects(:post).returns(mock_response('Server error', 503)).at_least(2)
- assert !@index.bulk_store([ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
+ assert !@index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
end
should "try again and the raise when an exception occurs" do
Configuration.client.expects(:post).returns(mock_response('Server error', 503)).at_least(2)
assert_raise(RuntimeError) do
- @index.bulk_store([ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ], {:raise => true})
+ @index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ], {:raise => true})
end
end
should "try again when a connection error occurs" do
Configuration.client.expects(:post).raises(Errno::ECONNREFUSED, "Connection refused - connect(2)").at_least(2)
- assert !@index.bulk_store([ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
+ assert !@index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
end
should "signal exceptions should not be caught" do
Configuration.client.expects(:post).raises(Interrupt, "abort then interrupt!")
assert_raise Interrupt do
- @index.bulk_store([ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
+ @index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
end
end
@@ -561,7 +580,7 @@ def to_indexed_json; "{}"; end
STDERR.expects(:puts).once
documents = [ { :title => 'Bogus' }, { :title => 'Real', :id => 1 } ]
- ActiveModelArticle.index.bulk_store documents
+ ActiveModelArticle.index.bulk_api_action("index", documents)
end
should "log the response code" do
@@ -572,7 +591,7 @@ def to_indexed_json; "{}"; end
status == 200
end
- @index.bulk_store [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
+ @index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
end
end

1 comment on commit c0195ad

karmi commented on c0195ad Aug 23, 2012

Send to @karmi

Please sign in to comment.