Skip to content

Commit

Permalink
Further fixes to integration and unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
robbavey committed Nov 14, 2018
1 parent b3570c7 commit 0eef127
Show file tree
Hide file tree
Showing 15 changed files with 73 additions and 68 deletions.
3 changes: 3 additions & 0 deletions ci/run.sh
Expand Up @@ -132,6 +132,9 @@ else
fi

case "$ES_VERSION" in
# ES_VERSION prefixed with LATEST-SNAPSHOT- will interrogate the artifacts API to find the latest version that matches
# the part after the prefix - eg LATEST-SNAPSHOT-7 will pull the latest snapshot release of 7.x of Elasticsearch,
# LATEST-SNAPSHOT-6.5 will pull the latest snapshot release of 6.5
LATEST-SNAPSHOT-*)
split_latest=${ES_VERSION##*-}

Expand Down
22 changes: 9 additions & 13 deletions lib/logstash/outputs/elasticsearch/common.rb
Expand Up @@ -67,7 +67,7 @@ def event_action_tuple(event)
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => event.sprintf(@index),
:_type => get_event_type(event),
routing_key => @routing ? event.sprintf(@routing) : nil
routing_field_name => @routing ? event.sprintf(@routing) : nil
}

if @pipeline
Expand All @@ -79,16 +79,16 @@ def event_action_tuple(event)
join_value = event.get(@join_field)
parent_value = event.sprintf(@parent)
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
params[routing_key] = event.sprintf(@parent)
params[routing_field_name] = event.sprintf(@parent)
else
params[:parent] = event.sprintf(@parent)
end
end

if action == 'update'
params[upsert_key] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[:_script] = event.sprintf(@script) if @script != ""
params[retry_on_conflict_key] = @retry_on_conflict
params[retry_on_conflict_action_name] = @retry_on_conflict
end

if @version
Expand All @@ -115,16 +115,12 @@ def maximum_seen_major_version
end


def routing_key
@routing_key ||= maximum_seen_major_version >= 6 ? :routing : :_routing
def routing_field_name
maximum_seen_major_version >= 6 ? :routing : :_routing
end

def upsert_key
@upsert_key ||= maximum_seen_major_version >= 6 ? :upsert : :_upsert
end

def retry_on_conflict_key
@retry_on_conflict_key ||= maximum_seen_major_version >= 6 ? :retry_on_conflict : :_retry_on_conflict
def retry_on_conflict_action_name
maximum_seen_major_version >= 6 ? :retry_on_conflict : :_retry_on_conflict
end

def install_template
Expand Down Expand Up @@ -319,7 +315,7 @@ def safe_bulk(actions)
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose
message = "Encountered a retryable error. Will Retry with exponential backoff "
puts e.response_body

# We treat 429s as a special case because these really aren't errors, but
# rather just ES telling us to back off a bit, which we do.
# The other retryable code is 503, which are true errors
Expand Down
2 changes: 0 additions & 2 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Expand Up @@ -362,7 +362,6 @@ def update_action_builder(args, source)
elsif @options[:doc_as_upsert]
source['upsert'] = source_orig
else
source['upsert'] = args.delete(:upsert) if args[:upsert]
source['upsert'] = args.delete(:_upsert) if args[:_upsert]
end
case @options[:script_type]
Expand All @@ -380,7 +379,6 @@ def update_action_builder(args, source)
source['doc_as_upsert'] = true
else
source['upsert'] = args.delete(:_upsert) if args[:_upsert]
source['upsert'] = args.delete(:upsert) if args[:upsert]
end
end
[args, source]
Expand Down
12 changes: 10 additions & 2 deletions spec/es_spec_helper.rb
Expand Up @@ -11,15 +11,15 @@ def get_client
Elasticsearch::Client.new(:hosts => [get_host_port])
end

def get_doc_type
def doc_type
if ESHelper.es_version_satisfies?(">=7")
"_doc"
else
"doc"
end
end

def get_mapping_name
def mapping_name
if ESHelper.es_version_satisfies?(">=7")
"_doc"
else
Expand All @@ -28,6 +28,14 @@ def get_mapping_name

end

def routing_field_name
if ESHelper.es_version_satisfies?(">=6")
:routing
else
:_routing
end
end

def self.es_version
RSpec.configuration.filter[:es_version] || ENV['ES_VERSION']
end
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/compressed_indexing_spec.rb
Expand Up @@ -50,7 +50,7 @@
result = LogStash::Json.load(response.body)
result["hits"]["hits"].each do |doc|
if ESHelper.es_version_satisfies?(">= 6")
expect(doc["_type"]).to eq(get_doc_type)
expect(doc["_type"]).to eq(doc_type)
else
expect(doc["_type"]).to eq(type)
end
Expand Down
8 changes: 4 additions & 4 deletions spec/integration/outputs/delete_spec.rb
Expand Up @@ -40,25 +40,25 @@
it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => get_doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 98)])
r2 = es.get(:index => 'logstash-delete', :type => get_doc_type, :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
expect(r2['_version']).to eq(99)
expect(r2['_source']['message']).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => get_doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 100)])
expect { es.get(:index => 'logstash-delete', :type => get_doc_type, :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect { es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
end
end
end
Expand Down
22 changes: 11 additions & 11 deletions spec/integration/outputs/groovy_update_spec.rb
Expand Up @@ -25,7 +25,7 @@ def get_es_output( options={} )
@es.indices.delete(:index => "*") rescue nil
@es.index(
:index => 'logstash-update',
:type => 'doc',
:type => doc_type,
:id => "123",
:body => { :message => 'Test', :counter => 1 }
)
Expand All @@ -37,15 +37,15 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "123", 'script' => 'scripted_update', 'script_type' => 'file' })
subject.register
subject.multi_receive([LogStash::Event.new("count" => 2)])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(3)
end

it "should increment a counter with event/doc '[data][count]' nested variable" do
subject = get_es_output({ 'document_id' => "123", 'script' => 'scripted_update_nested', 'script_type' => 'file' })
subject.register
subject.multi_receive([LogStash::Event.new("data" => { "count" => 3 })])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(4)
end

Expand All @@ -58,7 +58,7 @@ def get_es_output( options={} )
})
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 3 )])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(4)
end

Expand All @@ -72,7 +72,7 @@ def get_es_output( options={} )
})
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 3 )])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(4)
end

Expand All @@ -86,7 +86,7 @@ def get_es_output( options={} )
})
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 3 )])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
expect(r["_source"]["counter"]).to eq(3)
end

Expand All @@ -100,7 +100,7 @@ def get_es_output( options={} )
})
subject.register
subject.multi_receive([LogStash::Event.new("count" => 4 )])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(5)
end
end
Expand All @@ -110,15 +110,15 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

it "should create new documents with event/doc as upsert" do
subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('sample message here')
end

Expand All @@ -133,7 +133,7 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "456", 'script' => 'scripted_update', 'upsert' => '{"message": "upsert message"}', 'script_type' => 'file' })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

Expand All @@ -142,7 +142,7 @@ def get_es_output( options={} )
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 1)])
@es.indices.refresh
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
expect(r["_source"]["counter"]).to eq(1)
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/index_spec.rb
Expand Up @@ -79,7 +79,7 @@
result = LogStash::Json.load(response.body)
result["hits"]["hits"].each do |doc|
if ESHelper.es_version_satisfies?(">= 6")
expect(doc["_type"]).to eq(get_doc_type)
expect(doc["_type"]).to eq(doc_type)
else
expect(doc["_type"]).to eq(type)
end
Expand Down
14 changes: 7 additions & 7 deletions spec/integration/outputs/index_version_spec.rb
Expand Up @@ -38,11 +38,11 @@

it "should default to ES version" do
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => get_doc_type, :id => "123", :refresh => true)
r = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
expect(r["_version"]).to eq(1)
expect(r["_source"]["message"]).to eq('foo')
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foobar")])
r2 = es.get(:index => 'logstash-index', :type => get_doc_type, :id => "123", :refresh => true)
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
expect(r2["_version"]).to eq(2)
expect(r2["_source"]["message"]).to eq('foobar')
end
Expand All @@ -66,33 +66,33 @@
it "should respect the external version" do
id = "ev1"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')
end

it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "98", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r2["_version"]).to eq(99)
expect(r2["_source"]["message"]).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "100", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r2["_version"]).to eq(100)
expect(r2["_source"]["message"]).to eq('foo')
end
Expand Down

0 comments on commit 0eef127

Please sign in to comment.