New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LOGSTASH-691: move amqp plugin to be an external plugin #338
Changes from 24 commits
3e1a3a5
dc3347a
9de2349
573a05b
e60fda4
6f1d69b
793ec8b
cf24b4b
5ff3b7b
17336a9
608219e
7482f3f
1fb6574
233ea03
e23b036
ec79b59
371dce1
ca3514c
24588d7
0765674
9773daa
140669d
94b5ba4
c92ae36
4867a89
b40821b
0b9a27d
215f81a
5c9d009
3971be2
1a02aa1
6735ee4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
input { | ||
amqp { | ||
rabbitmq { | ||
port => 12345 | ||
tag => [ a, b, c ] | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,44 +2,40 @@ | |
require "logstash/namespace" | ||
require "cgi" # for CGI.escape | ||
|
||
# Pull events from an AMQP exchange. | ||
# | ||
# <b> NOTE: THIS IS ONLY KNOWN TO WORK WITH RECENT RELEASES OF RABBITMQ. Any | ||
# other amqp broker will not work with this plugin. I do not know why. If you | ||
# need support for brokers other than rabbitmq, please file bugs here: | ||
# <https://github.com/ruby-amqp/bunny> </b> | ||
# Pull events from a RabbitMQ exchange. | ||
# | ||
# The default settings will create an entirely transient queue and listen for all messages by default. | ||
# If you need durability or any other advanced settings, please set the appropriate options | ||
class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable | ||
# | ||
# This has been tested with Bunny 0.8.x, which supports RabbitMQ 2.x and 3.x. You can | ||
# find links to both here: | ||
# | ||
# * RabbitMQ - <http://www.rabbitmq.com/> | ||
# * Bunny - <https://github.com/ruby-amqp/bunny> | ||
class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable | ||
|
||
config_name "amqp" | ||
plugin_status "unsupported" | ||
config_name "rabbitmq" | ||
plugin_status "beta" | ||
|
||
# Your amqp broker's custom arguments. For mirrored queues in RabbitMQ: [ "x-ha-policy", "all" ] | ||
# Custom arguments. For mirrored queues in rabbitmq: [ "x-ha-policy", "all" ] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With RabbitMQ 3x, we do not need any more to user x-ha-policy when declaring then queue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the custom arguments variable no longer needed (and should be removed), or should just the doc example with policy be removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assumes rabbitmq 3. Ubuntu 12.04 still ships RabbitMQ 2.7, so I expect there to continue to be ubuntu users on 2.x at least another 2 years and debian for another 4-6 years. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good point. So it could be good to have both configuration extract for RabbitMQ 2x and |
||
config :arguments, :validate => :array, :default => [] | ||
|
||
# Your amqp server address | ||
# Your rabbitmq server address | ||
config :host, :validate => :string, :required => true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to have host failover. If the main host is not reachable, we could try with the following members of the RabbitMQ cluster. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In order to preserve some compatibility, should we just add a new variable, 'hosts', that takes an array of hosts that tries to connect to each? Should this fall back to the normal "host" variable if all hosts fail? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense for me. Probablt, bunny has to proper parameter to handle it natively, like java amqp client. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like logstash already handles the multiple hosts field, like this for redis: https://github.com/logstash/logstash/blob/master/lib/logstash/outputs/redis.rb#L21-L25. I will look into this. |
||
|
||
# The AMQP port to connect on | ||
# The rabbitmq port to connect on | ||
config :port, :validate => :number, :default => 5672 | ||
|
||
# Your amqp username | ||
# Your rabbitmq username | ||
config :user, :validate => :string, :default => "guest" | ||
|
||
# Your amqp password | ||
# Your rabbitmq password | ||
config :password, :validate => :password, :default => "guest" | ||
|
||
# The name of the queue. Depricated due to conflicts with puppet naming convention. | ||
# Replaced by 'queue' variable. See LOGSTASH-755 | ||
config :name, :validate => :string, :deprecated => true | ||
|
||
# The name of the queue. | ||
config :queue, :validate => :string, :default => "" | ||
|
||
# The name of the exchange to bind the queue. This is analogous to the 'amqp | ||
# output' [config 'name'](../outputs/amqp) | ||
# The name of the exchange to bind the queue. | ||
config :exchange, :validate => :string, :required => true | ||
|
||
# The routing key to use. This is only valid for direct or fanout exchanges | ||
|
@@ -90,43 +86,36 @@ def initialize(params) | |
end # def initialize | ||
|
||
public | ||
def register | ||
|
||
if @name | ||
if @queue | ||
@logger.error("'name' and 'queue' are the same setting, but 'name' is deprecated. Please use only 'queue'") | ||
end | ||
@queue = @name | ||
end | ||
def register | ||
|
||
@logger.info("Registering input #{@url}") | ||
require "bunny" # rubygem 'bunny' | ||
@vhost ||= "/" | ||
@port ||= 5672 | ||
@key ||= "#" | ||
@amqpsettings = { | ||
@rabbitmq_settings = { | ||
:vhost => @vhost, | ||
:host => @host, | ||
:port => @port, | ||
} | ||
@amqpsettings[:user] = @user if @user | ||
@amqpsettings[:pass] = @password.value if @password | ||
@amqpsettings[:logging] = @debug | ||
@amqpsettings[:ssl] = @ssl if @ssl | ||
@amqpsettings[:verify_ssl] = @verify_ssl if @verify_ssl | ||
@amqpurl = "amqp://" | ||
@rabbitmq_settings[:user] = @user if @user | ||
@rabbitmq_settings[:pass] = @password.value if @password | ||
@rabbitmq_settings[:logging] = @debug | ||
@rabbitmq_settings[:ssl] = @ssl if @ssl | ||
@rabbitmq_settings[:verify_ssl] = @verify_ssl if @verify_ssl | ||
@rabbitmq_url = "amqp://" | ||
if @user | ||
@amqpurl << @user if @user | ||
@amqpurl << ":#{CGI.escape(@password.to_s)}" if @password | ||
@amqpurl << "@" | ||
@rabbitmq_url << @user if @user | ||
@rabbitmq_url << ":#{CGI.escape(@password.to_s)}" if @password | ||
@rabbitmq_url << "@" | ||
end | ||
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@queue}" | ||
@rabbitmq_url += "#{@host}:#{@port}#{@vhost}/#{@queue}" | ||
end # def register | ||
|
||
def run(queue) | ||
begin | ||
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up queue #{@queue.inspect}") | ||
@bunny = Bunny.new(@amqpsettings) | ||
@logger.debug("Connecting with RabbitMQ settings #{@rabbitmq_settings.inspect} to set up queue #{@queue.inspect}") | ||
@bunny = Bunny.new(@rabbitmq_settings) | ||
return if terminating? | ||
@bunny.start | ||
@bunny.qos({:prefetch_count => @prefetch_count}) | ||
|
@@ -137,14 +126,14 @@ def run(queue) | |
@bunnyqueue.bind(@exchange, :key => @key) | ||
|
||
@bunnyqueue.subscribe({:ack => @ack}) do |data| | ||
e = to_event(data[:payload], @amqpurl) | ||
e = to_event(data[:payload], @rabbitmq_url) | ||
if e | ||
queue << e | ||
end | ||
end # @bunnyqueue.subscribe | ||
|
||
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e | ||
@logger.error("AMQP connection error, will reconnect: #{e}") | ||
@logger.error("RabbitMQ connection error, will reconnect: #{e}") | ||
# Sleep for a bit before retrying. | ||
# TODO(sissel): Write 'backoff' method? | ||
sleep(1) | ||
|
@@ -158,4 +147,4 @@ def teardown | |
@bunny.close if @bunny | ||
finished | ||
end # def teardown | ||
end # class LogStash::Inputs::Amqp | ||
end # class LogStash::Inputs::RabbitMQ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have a recommended version, can you specify it? Many distros ship with ancient versions of rabbitmq that probably don't work anymore with the bunny gem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the input/output plugins with version info for Bunny & RabbitMQ, and also added version requirements to the gemspec for Bunny 0.8.x. Let me know if any more information would help.