Skip to content

Commit

Permalink
Fix support for Elasticsearch 7.x
Browse files Browse the repository at this point in the history
This commit fixes issues with supporting Elasticsearch 7.x

This handles the removal of the '_routing' field name and
'_retry_on_conflict' update action aliases in place of 'routing' and
'retry_on_conflict' respectively in Elasticsearch 7.x.

Also fixes integration tests to handle these changes and
the renaming of doc type from 'doc' to '_doc' and the mapping
name change from `_default_` to '_doc'.

Fixes elastic#812
  • Loading branch information
robbavey committed Nov 14, 2018
1 parent 3d3a8d5 commit ce49ee5
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 42 deletions.
23 changes: 18 additions & 5 deletions lib/logstash/outputs/elasticsearch/common.rb
Expand Up @@ -67,7 +67,7 @@ def event_action_tuple(event)
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => event.sprintf(@index),
:_type => get_event_type(event),
:_routing => @routing ? event.sprintf(@routing) : nil
routing_key => @routing ? event.sprintf(@routing) : nil
}

if @pipeline
Expand All @@ -79,16 +79,16 @@ def event_action_tuple(event)
join_value = event.get(@join_field)
parent_value = event.sprintf(@parent)
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
params[:_routing] = event.sprintf(@parent)
params[routing_key] = event.sprintf(@parent)
else
params[:parent] = event.sprintf(@parent)
end
end

if action == 'update'
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[upsert_key] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[:_script] = event.sprintf(@script) if @script != ""
params[:_retry_on_conflict] = @retry_on_conflict
params[retry_on_conflict_key] = @retry_on_conflict
end

if @version
Expand All @@ -114,6 +114,19 @@ def maximum_seen_major_version
client.maximum_seen_major_version
end


def routing_key
@routing_key ||= maximum_seen_major_version >= 6 ? :routing : :_routing
end

def upsert_key
@upsert_key ||= maximum_seen_major_version >= 6 ? :upsert : :_upsert
end

def retry_on_conflict_key
@retry_on_conflict_key ||= maximum_seen_major_version >= 6 ? :retry_on_conflict : :_retry_on_conflict
end

def install_template
TemplateManager.install_template(self)
@template_installed.make_true
Expand Down Expand Up @@ -306,7 +319,7 @@ def safe_bulk(actions)
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose
message = "Encountered a retryable error. Will Retry with exponential backoff "

puts e.response_body
# We treat 429s as a special case because these really aren't errors, but
# rather just ES telling us to back off a bit, which we do.
# The other retryable code is 503, which are true errors
Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Expand Up @@ -362,6 +362,7 @@ def update_action_builder(args, source)
elsif @options[:doc_as_upsert]
source['upsert'] = source_orig
else
source['upsert'] = args.delete(:upsert) if args[:upsert]
source['upsert'] = args.delete(:_upsert) if args[:_upsert]
end
case @options[:script_type]
Expand All @@ -379,6 +380,7 @@ def update_action_builder(args, source)
source['doc_as_upsert'] = true
else
source['upsert'] = args.delete(:_upsert) if args[:_upsert]
source['upsert'] = args.delete(:upsert) if args[:upsert]
end
end
[args, source]
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/elasticsearch/http_client/pool.rb
Expand Up @@ -170,7 +170,7 @@ def check_sniff
return nil
else
case major_version(url_meta[:version])
when 5, 6
when 5, 6, 7
sniff_5x_and_above(nodes)
when 2, 1
sniff_2x_1x(nodes)
Expand Down
17 changes: 17 additions & 0 deletions spec/es_spec_helper.rb
Expand Up @@ -11,6 +11,23 @@ def get_client
Elasticsearch::Client.new(:hosts => [get_host_port])
end

def get_doc_type
if ESHelper.es_version_satisfies?(">=7")
"_doc"
else
"doc"
end
end

def get_mapping_name
if ESHelper.es_version_satisfies?(">=7")
"_doc"
else
"_default_"
end

end

def self.es_version
RSpec.configuration.filter[:es_version] || ENV['ES_VERSION']
end
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/compressed_indexing_spec.rb
Expand Up @@ -50,7 +50,7 @@
result = LogStash::Json.load(response.body)
result["hits"]["hits"].each do |doc|
if ESHelper.es_version_satisfies?(">= 6")
expect(doc["_type"]).to eq("doc")
expect(doc["_type"]).to eq(get_doc_type)
else
expect(doc["_type"]).to eq(type)
end
Expand Down
8 changes: 4 additions & 4 deletions spec/integration/outputs/delete_spec.rb
Expand Up @@ -40,25 +40,25 @@
it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => 'doc', :id => id, :refresh => true)
r = es.get(:index => 'logstash-delete', :type => get_doc_type, :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 98)])
r2 = es.get(:index => 'logstash-delete', :type => 'doc', :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-delete', :type => get_doc_type, :id => id, :refresh => true)
expect(r2['_version']).to eq(99)
expect(r2['_source']['message']).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => 'doc', :id => id, :refresh => true)
r = es.get(:index => 'logstash-delete', :type => get_doc_type, :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 100)])
expect { es.get(:index => 'logstash-delete', :type => 'doc', :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect { es.get(:index => 'logstash-delete', :type => get_doc_type, :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/index_spec.rb
Expand Up @@ -79,7 +79,7 @@
result = LogStash::Json.load(response.body)
result["hits"]["hits"].each do |doc|
if ESHelper.es_version_satisfies?(">= 6")
expect(doc["_type"]).to eq("doc")
expect(doc["_type"]).to eq(get_doc_type)
else
expect(doc["_type"]).to eq(type)
end
Expand Down
14 changes: 7 additions & 7 deletions spec/integration/outputs/index_version_spec.rb
Expand Up @@ -38,11 +38,11 @@

it "should default to ES version" do
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => 'doc', :id => "123", :refresh => true)
r = es.get(:index => 'logstash-index', :type => get_doc_type, :id => "123", :refresh => true)
expect(r["_version"]).to eq(1)
expect(r["_source"]["message"]).to eq('foo')
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foobar")])
r2 = es.get(:index => 'logstash-index', :type => 'doc', :id => "123", :refresh => true)
r2 = es.get(:index => 'logstash-index', :type => get_doc_type, :id => "123", :refresh => true)
expect(r2["_version"]).to eq(2)
expect(r2["_source"]["message"]).to eq('foobar')
end
Expand All @@ -66,33 +66,33 @@
it "should respect the external version" do
id = "ev1"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => 'doc', :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')
end

it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => 'doc', :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "98", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => 'doc', :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
expect(r2["_version"]).to eq(99)
expect(r2["_source"]["message"]).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => 'doc', :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "100", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => 'doc', :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-index', :type => get_doc_type, :id => id, :refresh => true)
expect(r2["_version"]).to eq(100)
expect(r2["_source"]["message"]).to eq('foo')
end
Expand Down
27 changes: 14 additions & 13 deletions spec/integration/outputs/painless_update_spec.rb
Expand Up @@ -27,7 +27,7 @@ def get_es_output( options={} )
@es.indices.delete(:index => "*") rescue nil
@es.index(
:index => 'logstash-update',
:type => 'doc',
:type => get_doc_type,
:id => "123",
:body => { :message => 'Test', :counter => 1 }
)
Expand All @@ -41,15 +41,15 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "123", 'script' => 'scripted_update', 'script_type' => 'file' })
subject.register
subject.multi_receive([LogStash::Event.new("count" => 2)])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(3)
end

it "should increment a counter with event/doc '[data][count]' nested variable" do
subject = get_es_output({ 'document_id' => "123", 'script' => 'scripted_update_nested', 'script_type' => 'file' })
subject.register
subject.multi_receive([LogStash::Event.new("data" => { "count" => 3 })])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(4)
end
end
Expand All @@ -63,7 +63,7 @@ def get_es_output( options={} )
})
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 3 )])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(4)
end

Expand All @@ -76,7 +76,7 @@ def get_es_output( options={} )
})
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 3 )])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(4)
end

Expand All @@ -89,7 +89,7 @@ def get_es_output( options={} )
})
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 3 )])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "456", :refresh => true)
expect(r["_source"]["counter"]).to eq(3)
end

Expand All @@ -114,7 +114,7 @@ def get_es_output( options={} )
subject = get_es_output(plugin_parameters)
subject.register
subject.multi_receive([LogStash::Event.new("count" => 4 )])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(5)
end
end
Expand All @@ -125,15 +125,15 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

it "should create new documents with event/doc as upsert" do
subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('sample message here')
end

Expand All @@ -150,7 +150,7 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "456", 'script' => 'scripted_update', 'upsert' => '{"message": "upsert message"}', 'script_type' => 'file' })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

Expand All @@ -159,7 +159,7 @@ def get_es_output( options={} )
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 1)])
@es.indices.refresh
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "456", :refresh => true)
expect(r["_source"]["counter"]).to eq(1)
end
end
Expand All @@ -169,8 +169,9 @@ def get_es_output( options={} )
it "should create new documents with upsert content" do
subject = get_es_output({ 'document_id' => "456", 'script' => 'ctx._source.counter = params.event.counter', 'upsert' => '{"message": "upsert message"}', 'script_type' => 'inline' })
subject.register

subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

Expand All @@ -179,7 +180,7 @@ def get_es_output( options={} )
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 1)])
@es.indices.refresh
r = @es.get(:index => 'logstash-update', :type => 'doc', :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :type => get_doc_type, :id => "456", :refresh => true)
expect(r["_source"]["counter"]).to eq(1)
end
end
Expand Down
4 changes: 2 additions & 2 deletions spec/integration/outputs/retry_spec.rb
Expand Up @@ -4,9 +4,9 @@
describe "failures in bulk class expected behavior", :integration => true do
let(:template) { '{"template" : "not important, will be updated by :index"}' }
let(:event1) { LogStash::Event.new("somevalue" => 100, "@timestamp" => "2014-11-17T20:37:17.223Z", "@metadata" => {"retry_count" => 0}) }
let(:action1) { ["index", {:_id=>nil, :_routing=>nil, :_index=>"logstash-2014.11.17", :_type=>"doc"}, event1] }
let(:action1) { ["index", {:_id=>nil, :routing=>nil, :_index=>"logstash-2014.11.17", :_type=> get_doc_type }, event1] }
let(:event2) { LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0] }, "@timestamp" => "2014-11-17T20:37:17.223Z", "@metadata" => {"retry_count" => 0}) }
let(:action2) { ["index", {:_id=>nil, :_routing=>nil, :_index=>"logstash-2014.11.17", :_type=>"doc"}, event2] }
let(:action2) { ["index", {:_id=>nil, :routing=>nil, :_index=>"logstash-2014.11.17", :_type=> get_doc_type }, event2] }
let(:invalid_event) { LogStash::Event.new("geoip" => { "location" => "notlatlon" }, "@timestamp" => "2014-11-17T20:37:17.223Z") }

def mock_actions_with_response(*resp)
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/templates_5x_spec.rb
Expand Up @@ -82,7 +82,7 @@
end

it "make [geoip][location] a geo_point" do
expect(@es.indices.get_template(name: "logstash")["logstash"]["mappings"]["_default_"]["properties"]["geoip"]["properties"]["location"]["type"]).to eq("geo_point")
expect(@es.indices.get_template(name: "logstash")["logstash"]["mappings"][get_mapping_name]["properties"]["geoip"]["properties"]["location"]["type"]).to eq("geo_point")
end

it "aggregate .keyword results correctly " do
Expand Down

0 comments on commit ce49ee5

Please sign in to comment.