Permalink
Browse files

Add new parameter for join datatype

Add parameters. routing_keys, routing_format
  • Loading branch information...
K-Bokka committed Dec 16, 2018
1 parent 27f2396 commit 183edc8da16bd22c7724c0bf1e7da8eea591f5e5
Showing with 5 additions and 0 deletions.
  1. +5 −0 lib/embulk/output/elasticsearch_using_url.rb
@@ -25,6 +25,8 @@ def self.transaction(config, schema, count, &control)
"retry_on_failure" => config.param("retry_on_failure", :integer, default: 5),
"before_template_name" => config.param("before_template_name", :string, default: nil),
"before_template" => config.param("before_template", :hash, default: nil),
"routing_keys" => config.param("routing_keys", :array, default: nil),
"routing_format" => config.param("routing_format", :string, default: nil),
}
task['time_value'] = Time.now.strftime('%Y.%m.%d.%H.%M.%S')
task['index'] = Time.now.strftime(task['index'])
@@ -108,6 +110,8 @@ def init
@bulk_actions = task["bulk_actions"]
@array_columns = task["array_columns"]
@retry_on_failure = task["retry_on_failure"]
@routing_keys = task["routing_keys"]
@routing_format = task["routing_format"]
@mode = task["mode"]
@index = self.class.get_index(task)

@@ -125,6 +129,7 @@ def add(page)
meta = {}
meta[action] = { _index: @index, _type: @index_type }
meta[action][:_id] = generate_id(@id_format, hash, @id_keys) unless @id_keys.nil?
meta[action][:_routing] = generate_id(@routing_format, hash, @routing_keys) unless @routing_keys.nil?
source = generate_array(hash)
@bulk_message << meta
@bulk_message << source

0 comments on commit 183edc8

Please sign in to comment.