Skip to content

Commit

Permalink
Merge pull request elastic#65 from cdahlqvist/master
Browse files Browse the repository at this point in the history
Added support for routing parameters.
  • Loading branch information
talevy committed Apr 7, 2015
2 parents 10d2e3a + 60c8c8f commit 63ff11d
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 4 deletions.
9 changes: 7 additions & 2 deletions lib/logstash/outputs/elasticsearch.rb
Expand Up @@ -81,7 +81,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base

# The document ID for the index. Useful for overwriting existing entries in
# Elasticsearch with the same ID.
config :document_id, :validate => :string, :default => nil
config :document_id, :validate => :string

# A routing override to be applied to all processed events.
# This can be dynamic using the `%{foo}` syntax.
config :routing, :validate => :string

# The name of your cluster if you set it on the Elasticsearch side. Useful
# for discovery.
Expand Down Expand Up @@ -400,7 +404,8 @@ def receive(event)
index = event.sprintf(@index)

document_id = @document_id ? event.sprintf(@document_id) : nil
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event])
routing = @routing ? event.sprintf(@routing) : nil
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type, :_routing => routing }, event])
end # def receive

public
Expand Down
4 changes: 4 additions & 0 deletions lib/logstash/outputs/elasticsearch/protocol.rb
Expand Up @@ -228,19 +228,23 @@ def build_request(action, args, source)
when "index"
request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
request.id(args[:_id]) if args[:_id]
request.routing(args[:_routing]) if args[:_routing]
request.source(source)
when "delete"
request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index])
request.id(args[:_id])
request.routing(args[:_routing]) if args[:_routing]
when "create"
request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
request.id(args[:_id]) if args[:_id]
request.routing(args[:_routing]) if args[:_routing]
request.source(source)
request.opType("create")
when "create_unless_exists"
unless args[:_id].nil?
request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
request.id(args[:_id])
request.routing(args[:_routing]) if args[:_routing]
request.source(source)
request.opType("create")
else
Expand Down
228 changes: 226 additions & 2 deletions spec/outputs/elasticsearch_spec.rb
Expand Up @@ -73,6 +73,230 @@
end
end

describe "ship lots of events w/ default index_type and fixed routing key using http protocol", :elasticsearch => true do
# Generate a random index name
index = 10.times.collect { rand(10).to_s }.join("")
type = 10.times.collect { rand(10).to_s }.join("")

# Write 900 events so that we can verify these have been routed correctly.
event_count = 900
flush_size = rand(200) + 1

config <<-CONFIG
input {
generator {
message => "hello world"
count => #{event_count}
type => "#{type}"
}
}
output {
elasticsearch {
host => "127.0.0.1"
index => "#{index}"
flush_size => #{flush_size}
routing => "test"
protocol => "http"
}
}
CONFIG

agent do
# Try a few times to check if we have the correct number of events stored
# in ES.
#
# We try multiple times to allow final agent flushes as well as allowing
# elasticsearch to finish processing everything.
ftw = FTW::Agent.new
ftw.post!("http://localhost:9200/#{index}/_refresh")

# Wait until all events are available.
Stud::try(10.times) do
data = ""
response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*")
response.read_body { |chunk| data << chunk }
result = LogStash::Json.load(data)
count = result["count"]
insist { count } == event_count
end

response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*&routing=test")
data = ""
response.read_body { |chunk| data << chunk }
result = LogStash::Json.load(data)
count = result["count"]
insist { count } == event_count
end
end

describe "ship lots of events w/ default index_type and dynamic routing key using http protocol", :elasticsearch => true do
# Generate a random index name
index = 10.times.collect { rand(10).to_s }.join("")
type = 10.times.collect { rand(10).to_s }.join("")

# Write 900 events so that we can verify these have been routed correctly.
event_count = 900
flush_size = rand(200) + 1

config <<-CONFIG
input {
generator {
message => "test"
count => #{event_count}
type => "#{type}"
}
}
output {
elasticsearch {
host => "127.0.0.1"
index => "#{index}"
flush_size => #{flush_size}
routing => "%{message}"
protocol => "http"
}
}
CONFIG

agent do
# Try a few times to check if we have the correct number of events stored
# in ES.
#
# We try multiple times to allow final agent flushes as well as allowing
# elasticsearch to finish processing everything.
ftw = FTW::Agent.new
ftw.post!("http://localhost:9200/#{index}/_refresh")

# Wait until all events are available.
Stud::try(10.times) do
data = ""
response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*")
response.read_body { |chunk| data << chunk }
result = LogStash::Json.load(data)
count = result["count"]
insist { count } == event_count
end

response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*&routing=test")
data = ""
response.read_body { |chunk| data << chunk }
result = LogStash::Json.load(data)
count = result["count"]
insist { count } == event_count
end
end

describe "ship lots of events w/ default index_type and fixed routing key using transport protocol", :elasticsearch => true do
# Generate a random index name
index = 10.times.collect { rand(10).to_s }.join("")
type = 10.times.collect { rand(10).to_s }.join("")

# Write 900 events so that we can verify these have been routed correctly.
event_count = 900
flush_size = rand(200) + 1

config <<-CONFIG
input {
generator {
message => "hello world"
count => #{event_count}
type => "#{type}"
}
}
output {
elasticsearch {
host => "127.0.0.1"
index => "#{index}"
flush_size => #{flush_size}
routing => "test"
protocol => "transport"
}
}
CONFIG

agent do
# Try a few times to check if we have the correct number of events stored
# in ES.
#
# We try multiple times to allow final agent flushes as well as allowing
# elasticsearch to finish processing everything.
ftw = FTW::Agent.new
ftw.post!("http://localhost:9200/#{index}/_refresh")

# Wait until all events are available.
Stud::try(10.times) do
data = ""
response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*")
response.read_body { |chunk| data << chunk }
result = LogStash::Json.load(data)
count = result["count"]
insist { count } == event_count
end

response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*&routing=test")
data = ""
response.read_body { |chunk| data << chunk }
result = LogStash::Json.load(data)
count = result["count"]
insist { count } == event_count
end
end

describe "ship lots of events w/ default index_type and fixed routing key using node protocol", :elasticsearch => true do
# Generate a random index name
index = 10.times.collect { rand(10).to_s }.join("")
type = 10.times.collect { rand(10).to_s }.join("")

# Write 900 events so that we can verify these have been routed correctly.
event_count = 900
flush_size = rand(200) + 1

config <<-CONFIG
input {
generator {
message => "hello world"
count => #{event_count}
type => "#{type}"
}
}
output {
elasticsearch {
host => "127.0.0.1"
index => "#{index}"
flush_size => #{flush_size}
routing => "test"
protocol => "node"
}
}
CONFIG

agent do
# Try a few times to check if we have the correct number of events stored
# in ES.
#
# We try multiple times to allow final agent flushes as well as allowing
# elasticsearch to finish processing everything.
ftw = FTW::Agent.new
ftw.post!("http://localhost:9200/#{index}/_refresh")

# Wait until all events are available.
Stud::try(10.times) do
data = ""
response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*")
response.read_body { |chunk| data << chunk }
result = LogStash::Json.load(data)
count = result["count"]
insist { count } == event_count
end

response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*&routing=test")
data = ""
response.read_body { |chunk| data << chunk }
result = LogStash::Json.load(data)
count = result["count"]
insist { count } == event_count
end
end

describe "node client create actions", :elasticsearch => true do
require "logstash/outputs/elasticsearch"
require "elasticsearch"
Expand Down Expand Up @@ -475,9 +699,9 @@ def settings_with_index(index)
describe "failures in bulk class expected behavior", :elasticsearch => true do
let(:template) { '{"template" : "not important, will be updated by :index"}' }
let(:event1) { LogStash::Event.new("somevalue" => 100, "@timestamp" => "2014-11-17T20:37:17.223Z", "@metadata" => {"retry_count" => 0}) }
let(:action1) { ["index", {:_id=>nil, :_index=>"logstash-2014.11.17", :_type=>"logs"}, event1] }
let(:action1) { ["index", {:_id=>nil, :_routing=>nil, :_index=>"logstash-2014.11.17", :_type=>"logs"}, event1] }
let(:event2) { LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0] }, "@timestamp" => "2014-11-17T20:37:17.223Z", "@metadata" => {"retry_count" => 0}) }
let(:action2) { ["index", {:_id=>nil, :_index=>"logstash-2014.11.17", :_type=>"logs"}, event2] }
let(:action2) { ["index", {:_id=>nil, :_routing=>nil, :_index=>"logstash-2014.11.17", :_type=>"logs"}, event2] }
let(:invalid_event) { LogStash::Event.new("geoip" => { "location" => "notlatlon" }, "@timestamp" => "2014-11-17T20:37:17.223Z") }
let(:max_retries) { 3 }

Expand Down

0 comments on commit 63ff11d

Please sign in to comment.