Skip to content

Commit

Permalink
Add test for indexed update script and clarify bulk code
Browse files Browse the repository at this point in the history
  • Loading branch information
dchauviere authored and andrewvc committed Jan 8, 2016
1 parent 7473ecf commit fbf5216
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 37 deletions.
73 changes: 37 additions & 36 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Expand Up @@ -34,49 +34,14 @@ def template_install(name, template, force=false)
end
end

def process_update_action(args, source)
if args[:_id]
if args[:_script]
# Use the event as a hash from your script with variable name defined by script_var_name (default: "event")
# Ex: event["@timestamp"]
source = { 'script' => {'params' => { @options[:script_var_name] => source }} }
if @options[:scripted_upsert]
source['scripted_upsert'] = true
source['upsert'] = {}
else
source['upsert'] = args.delete(:_upsert) if args[:_upsert]
end
case @options[:script_type]
when "indexed"
source['script']['id'] = args.delete(:_script)
when "file"
source['script']['file'] = args.delete(:_script)
when "inline"
source['script']['inline'] = args.delete(:_script)
end
source['script']['lang'] = @options[:script_lang] if @options[:script_lang] != ''
else
source = { 'doc' => source }
if @options[:doc_as_upsert]
source['doc_as_upsert'] = true
else
source['upsert'] = args.delete(:_upsert) if args[:_upsert]
end
end
else
raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.")
end
[args, source]
end

def bulk(actions)
@request_mutex.synchronize { non_threadsafe_bulk(actions) }
end

def non_threadsafe_bulk(actions)
return if actions.empty?
bulk_body = actions.collect do |action, args, source|
args, source = process_update_action(args, source) if action == 'update'
args, source = update_action_builder(args, source) if action == 'update'

if source && action != 'delete'
next [ { action => args }, source ]
Expand Down Expand Up @@ -181,5 +146,41 @@ def template_exists?(name)
def template_put(name, template)
@client.indices.put_template(:name => name, :body => template)
end

# Build a bulk item for an elasticsearch update action
def update_action_builder(args, source)
if args[:_id]
if args[:_script]
# Use the event as a hash from your script with variable name defined by script_var_name (default: "event")
# Ex: event["@timestamp"]
source = { 'script' => {'params' => { @options[:script_var_name] => source }} }
if @options[:scripted_upsert]
source['scripted_upsert'] = true
source['upsert'] = {}
else
source['upsert'] = args.delete(:_upsert) if args[:_upsert]
end
case @options[:script_type]
when "indexed"
source['script']['id'] = args.delete(:_script)
when "file"
source['script']['file'] = args.delete(:_script)
when "inline"
source['script']['inline'] = args.delete(:_script)
end
source['script']['lang'] = @options[:script_lang] if @options[:script_lang] != ''
else
source = { 'doc' => source }
if @options[:doc_as_upsert]
source['doc_as_upsert'] = true
else
source['upsert'] = args.delete(:_upsert) if args[:_upsert]
end
end
else
raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.")
end
[args, source]
end
end
end end end
2 changes: 1 addition & 1 deletion spec/es_spec_helper.rb
Expand Up @@ -47,7 +47,7 @@ def get_client
rescue Docker::Error::NotFoundError
scriptDir = File.expand_path File.dirname(__FILE__) + '/fixtures/scripts'
Longshoreman.new("#{CONTAINER_IMAGE}:#{CONTAINER_TAG}", CONTAINER_NAME, {
'Cmd' => [ "-Des.script.inline=on" ],
'Cmd' => [ "-Des.script.inline=on", "-Des.script.indexed=on" ],
'HostConfig' => {
'Binds' => ["#{scriptDir}:/usr/share/elasticsearch/config/scripts"],
'PublishAllPorts' => true
Expand Down
15 changes: 15 additions & 0 deletions spec/integration/outputs/update_spec.rb
Expand Up @@ -90,6 +90,21 @@ def get_es_output( options={} )
r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "123", :refresh => true)
insist { r["_source"]["counter"] } == 4
end

it "should increment a counter with event/doc 'count' variable with indexed script" do
@es.put_script lang: 'groovy', id: 'indexed_update', body: { script: 'ctx._source.counter += event["count"]' }
subject = get_es_output({
'document_id' => "123",
'script' => 'indexed_update',
'script_lang' => 'groovy',
'script_type' => 'indexed'
})
subject.register
subject.receive(LogStash::Event.new("count" => 4 ))
subject.flush
r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "123", :refresh => true)
insist { r["_source"]["counter"] } == 5
end
end

context "when update with upsert" do
Expand Down

0 comments on commit fbf5216

Please sign in to comment.