Skip to content

Commit

Permalink
fix: Fix logstash-input-courier not shutting down with Logstash pipeline
Browse files Browse the repository at this point in the history
Fixes #397
  • Loading branch information
driskell committed Feb 10, 2023
1 parent 3a8c6aa commit ea7a63c
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 44 deletions.
1 change: 1 addition & 0 deletions ruby/log-courier/lib/log-courier/event_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def empty?
def clear
@mutex.synchronize do
@que.clear
@enque_cond.signal if @que.length < @max
end
self
end
Expand Down
57 changes: 25 additions & 32 deletions ruby/log-courier/lib/log-courier/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,53 +51,46 @@ def initialize(options = {})

# Grab the port back and update the logger context
@port = @server.port
end

def run(&block)
# TODO: Make queue size configurable
@event_queue = EventQueue.new 1
server_thread = nil

begin
server_thread = Thread.new do
# Receive messages and process them
@server.run do |signature, message, comm|
case signature
when 'PING'
process_ping message, comm
when 'JDAT'
process_jdat message, comm, @event_queue

@server_thread = Thread.new do
# Receive messages and process them
@server.run do |signature, message, comm|
case signature
when 'PING'
process_ping message, comm
when 'JDAT'
process_jdat message, comm, @event_queue
else
if comm.peer.nil?
@logger&.warn 'Unknown message received', from: 'unknown'
else
if comm.peer.nil?
@logger&.warn 'Unknown message received', from: 'unknown'
else
@logger&.warn 'Unknown message received', from: comm.peer
end
# Don't kill a client that sends a bad message
# Just reject it and let it send it again, potentially to another server
comm.send '????', ''
@logger&.warn 'Unknown message received', from: comm.peer
end
# Don't kill a client that sends a bad message
# Just reject it and let it send it again, potentially to another server
comm.send '????', ''
end
end
end
end

loop do
event = @event_queue.pop
break if event.nil?
def run(&block)
loop do
event = @event_queue.pop
break if event.nil?

block.call event
end
ensure
# Signal the server thread to stop
unless server_thread.nil?
server_thread.raise ShutdownSignal
server_thread.join
end
block.call event
end
nil
end

def stop
@server_thread.raise ShutdownSignal
@event_queue << nil
@server_thread.join
nil
end

Expand Down
4 changes: 2 additions & 2 deletions ruby/log-courier/lib/log-courier/server_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ def run(&block)
# Connection abort request due to a protocol error
@logger&.warn 'Protocol error, connection aborted', error: e.message, peer: @peer
nil
rescue ShutdownSignal
rescue ShutdownSignal => e
# Shutting down
@logger&.info 'Server shutting down, closing connection', peer: @peer
nil
raise e
rescue StandardError => e
# Some other unknown problem
@logger&.warn e.message, hint: 'Unknown error, connection aborted', peer: @peer
Expand Down
6 changes: 3 additions & 3 deletions ruby/logstash-input-courier/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
PATH
remote: ../log-courier
specs:
log-courier (2.7.3)
log-courier (2.9.1)
cabin (~> 0.6)
multi_json (~> 1.10)

PATH
remote: .
specs:
logstash-input-courier (2.7.4-java)
log-courier (~> 2.7.3)
logstash-input-courier (2.9.1-java)
log-courier (= 2.9.1)
logstash-codec-plain
logstash-core-plugin-api (>= 1.60, <= 2.99)

Expand Down
20 changes: 15 additions & 5 deletions ruby/logstash-input-courier/lib/logstash/inputs/courier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ class Courier < LogStash::Inputs::Base
config :add_peer_fields, validate: :boolean

def register
require 'log-courier/server'
@log_courier = LogCourier::Server.new options

@logger.info(
'Starting courier input listener',
'Started courier input listener',
address: "#{@host}:#{@port}",
)

require 'log-courier/server'
@log_courier = LogCourier::Server.new options
nil
end

Expand All @@ -93,8 +93,18 @@ def run(output_queue)
nil
end

def close
def stop
@logger.info(
'Stopping courier input listener',
address: "#{@host}:#{@port}",
)

@log_courier.stop

@logger.info(
'Stopped courier input listener',
address: "#{@host}:#{@port}",
)
nil
end

Expand Down
9 changes: 9 additions & 0 deletions ruby/logstash-input-courier/logstash.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
input {
courier {
port => 12345
transport => tcp
}
}
output {
stdout { codec => rubydebug }
}
6 changes: 4 additions & 2 deletions ruby/logstash-output-courier/lib/logstash/outputs/courier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ class Courier < LogStash::Outputs::Base
config :idle_timeout, validate: :number, default: 5

def register
@logger.info 'Starting courier output'

require 'log-courier/client'
@client = LogCourier::Client.new(options)

@logger.info 'Started courier output'
nil
end

Expand All @@ -70,7 +70,9 @@ def receive(event)
end

def close
@logger.info 'Stopping courier output'
@client.shutdown
@logger.info 'Stopped courier output'
nil
end

Expand Down

0 comments on commit ea7a63c

Please sign in to comment.