Permalink
Browse files

- retry connection and publishing (LOGSTASH-71)

  • Loading branch information...
1 parent ff589a6 commit 7a95ab050003e5516029efb44c5c899e74bd6cc1 @jordansissel jordansissel committed May 3, 2011
Showing with 28 additions and 7 deletions.
  1. +28 −7 lib/logstash/outputs/amqp.rb
@@ -41,6 +41,11 @@ def register
end
@logger.info("Registering output #{to_s}")
+ connect
+ end # def register
+
+ public
+ def connect
amqpsettings = {
:vhost => @vhost,
:host => @host,
@@ -49,17 +54,33 @@ def register
amqpsettings[:user] = @user if @user
amqpsettings[:pass] = @password if @password
amqpsettings[:logging] = @debug
- @logger.debug(["Connecting to AMQP", amqpsettings, @exchange_type, @name])
- @bunny = Bunny.new(amqpsettings)
- @bunny.start
-
+ loop do
+ @logger.debug(["Connecting to AMQP", amqpsettings, @exchange_type, @name])
+ @bunny = Bunny.new(amqpsettings)
+ begin
+ @bunny.start
+ break # success
+ rescue Bunny::ServerDownError => e
+ @logger.error("AMQP connection error, will reconnect: #{e}")
+ sleep(1)
+ end
+ end # loop
@target = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
- end # def register
+ end # def connect
public
def receive(event)
- @logger.debug(["Sending event", { :destination => to_s, :event => event }])
- @target.publish(event.to_json)
+ loop do
+ @logger.debug(["Sending event", { :destination => to_s, :event => event }])
+ begin
+ @target.publish(event.to_json)
+ break;
+ rescue Bunny::ServerDownError => e
+ @logger.error("AMQP connection error, will reconnect: #{e}")
+ connect
+ retry
+ end
+ end # loop do
end # def receive
# This is used by the ElasticSearch AMQP/River output.

0 comments on commit 7a95ab0

Please sign in to comment.