Skip to content

Commit

Permalink
Make port obsolete
Browse files Browse the repository at this point in the history
 - Update flag and docs
 - Remove references from code
 - Make sure default port 9200 is added if no port specified
 - Add tests to validate changes

fixes elastic#262

Fixes elastic#266
  • Loading branch information
untergeek authored and jordansissel committed Oct 9, 2015
1 parent 2c68bb4 commit 5ce433b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 27 deletions.
38 changes: 19 additions & 19 deletions lib/logstash/outputs/elasticsearch.rb
Expand Up @@ -56,13 +56,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base

# The index type to write events to. Generally you should try to write only
# similar events to the same 'type'. String expansion `%{foo}` works here.
#
#
# Deprecated in favor of `document_type` field.
config :index_type, :validate => :string, :deprecated => "Please use the 'document_type' setting instead. It has the same effect, but is more appropriately named."

# The document type to write events to. Generally you should try to write only
# similar events to the same 'type'. String expansion `%{foo}` works here.
# Unless you set 'document_type', the event 'type' will be used if it exists
# Unless you set 'document_type', the event 'type' will be used if it exists
# otherwise the document type will be assigned the value of 'logs'
config :document_type, :validate => :string

Expand Down Expand Up @@ -111,14 +111,15 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base

config :hosts, :validate => :array

# You can set the remote port as part of the host, or explicitly here as well
config :port, :validate => :string, :default => 9200
# The port setting is obsolete. Please use the 'hosts' setting instead.
# Hosts entries can be in "host:port" format.
config :port, :obsolete => "Please use the 'hosts' setting instead. Hosts entries can be in 'host:port' format."

# This plugin uses the bulk index API for improved indexing performance.
# To make efficient bulk API calls, we will buffer a certain number of
# events before flushing that out to Elasticsearch. This setting
# controls how many events will be buffered before sending a batch
# of events. Increasing the `flush_size` has an effect on Logstash's heap size.
# of events. Increasing the `flush_size` has an effect on Logstash's heap size.
# Remember to also increase the heap size using `LS_HEAP_SIZE` if you are sending big documents
# or have increased the `flush_size` to a higher value.
config :flush_size, :validate => :number, :default => 500
Expand All @@ -134,12 +135,12 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# near-real-time.
config :idle_flush_time, :validate => :number, :default => 1

# The Elasticsearch action to perform. Valid actions are:
# The Elasticsearch action to perform. Valid actions are:
#
# - index: indexes a document (an event from Logstash).
# - delete: deletes a document by id (An id is required for this action)
# - create: indexes a document, fails if a document by that id already exists in the index.
# - update: updates a document by id. Update has a special case where you can upsert -- update a
# - update: updates a document by id. Update has a special case where you can upsert -- update a
# document if not already present. See the `upsert` option
#
# For more details on actions, check out the http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation]
Expand Down Expand Up @@ -198,7 +199,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# Set max interval between bulk retries
config :retry_max_interval, :validate => :number, :default => 5

# Set the address of a forward HTTP proxy.
# Set the address of a forward HTTP proxy.
# Can be either a string, such as `http://localhost:123` or a hash in the form
# of `{host: 'proxy.org' port: 80 scheme: 'http'}`.
# Note, this is NOT a SOCKS proxy, but a plain HTTP proxy
Expand Down Expand Up @@ -251,7 +252,7 @@ def register
common_options.merge! update_options if @action == 'update'

@client = LogStash::Outputs::Elasticsearch::HttpClient.new(
common_options.merge(:hosts => @hosts, :port => @port, :logger => @logger)
common_options.merge(:hosts => @hosts, :logger => @logger)
)

if @manage_template
Expand All @@ -263,7 +264,7 @@ def register
end
end

@logger.info("New Elasticsearch output", :hosts => @hosts, :port => @port)
@logger.info("New Elasticsearch output", :hosts => @hosts)

@client_idx = 0

Expand Down Expand Up @@ -304,9 +305,8 @@ def get_template

public
def receive(event)


# block until we have not maxed out our
# block until we have not maxed out our
# retry queue. This is applying back-pressure
# to slow down the receive-rate
@retry_flush_mutex.synchronize {
Expand All @@ -330,7 +330,7 @@ def receive(event)
:_type => type,
:_routing => @routing ? event.sprintf(@routing) : nil
}

params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != ""

buffer_receive([event.sprintf(@action), params, event])
Expand Down Expand Up @@ -407,19 +407,19 @@ def close

@retry_close_requested.make_true
# First, make sure retry_timer_thread is stopped
# to ensure we do not signal a retry based on
# to ensure we do not signal a retry based on
# the retry interval.
Thread.kill(@retry_timer_thread)
@retry_timer_thread.join
# Signal flushing in the case that #retry_flush is in
# Signal flushing in the case that #retry_flush is in
# the process of waiting for a signal.
@retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal }
# Now, #retry_flush is ensured to not be in a state of
# Now, #retry_flush is ensured to not be in a state of
# waiting and can be safely joined into the main thread
# for further final execution of an in-process remaining call.
@retry_thread.join

# execute any final actions along with a proceeding retry for any
# execute any final actions along with a proceeding retry for any
# final actions that did not succeed.
buffer_flush(:final => true)
retry_flush
Expand Down Expand Up @@ -484,11 +484,11 @@ def setup_basic_auth
end

private
# in charge of submitting any actions in @retry_queue that need to be
# in charge of submitting any actions in @retry_queue that need to be
# retried
#
# This method is not called concurrently. It is only called by @retry_thread
# and once that thread is ended during the close process, a final call
# and once that thread is ended during the close process, a final call
# to this method is done upon close in the main thread.
def retry_flush()
unless @retry_queue.empty?
Expand Down
19 changes: 13 additions & 6 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Expand Up @@ -7,13 +7,16 @@
module LogStash::Outputs::Elasticsearch
class HttpClient
attr_reader :client, :options, :client_options, :sniffer_thread
DEFAULT_OPTIONS = {
:port => 9200
}
# This is here in case we use DEFAULT_OPTIONS in the future
# DEFAULT_OPTIONS = {
# :setting => value
# }

def initialize(options={})
@logger = options[:logger]
@options = DEFAULT_OPTIONS.merge(options)
# Again, in case we use DEFAULT_OPTIONS in the future, uncomment this.
# @options = DEFAULT_OPTIONS.merge(options)
@options = options
@client = build_client(@options)
start_sniffing!
end
Expand Down Expand Up @@ -83,12 +86,16 @@ def sniff!

def build_client(options)
hosts = options[:hosts] || ["127.0.0.1"]
port = options[:port] || 9200
client_settings = options[:client_settings] || {}

uris = hosts.map do |host|
proto = client_settings[:ssl] ? "https" : "http"
"#{proto}://#{host}:#{port}#{client_settings[:path]}"
if !(host =~ /:/).nil?
"#{proto}://#{host}#{client_settings[:path]}"
else
# Use default port of 9200 if none provided with host.
"#{proto}://#{host}:9200#{client_settings[:path]}"
end
end

@client_options = {
Expand Down
16 changes: 14 additions & 2 deletions spec/unit/outputs/elasticsearch_spec.rb
Expand Up @@ -8,7 +8,7 @@
let(:options) {
{
"index" => "my-index",
"hosts" => "localhost",
"hosts" => ["localhost","localhost:9202"],
"path" => "some-path"
}
}
Expand All @@ -30,7 +30,6 @@
expect(eso.path).to eql(options["path"])
end


it "should properly set the path on the HTTP client adding slashes" do
expect(manticore_host).to include("/" + options["path"] + "/")
end
Expand All @@ -46,5 +45,18 @@
end
end
end
describe "without a port specified" do
it "should properly set the default port (9200) on the HTTP client" do
expect(manticore_host).to include("9200")
end
end
describe "with a port other than 9200 specified" do
let(:manticore_host) {
eso.client.send(:client).transport.options[:hosts].last
}
it "should properly set the specified port on the HTTP client" do
expect(manticore_host).to include("9202")
end
end
end
end

0 comments on commit 5ce433b

Please sign in to comment.