Skip to content

Commit

Permalink
Flag to support acks, and also setting needs_ha flag
Browse files Browse the repository at this point in the history
This proof of concept will call 'ack_callback' from an output like
elasticsearch when 'sent' is called.

This is implemented in
  github.com/catalyst/logstash/feature/ack-after-processing-event

Largely a proof of concept atm, still working on it
  • Loading branch information
Matthew B. Gray committed Jan 16, 2015
1 parent 9d76ee5 commit 5b9d27b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
11 changes: 10 additions & 1 deletion lib/log-courier/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,16 @@ def process_jdat(message, comm, event_queue)

# Queue the event
begin
event_queue.push event, [0, ack_timeout - Time.now.to_i].max
# TODO: Don't ack till this ack_callback is called
# This should be done by the output
now = Time.now
ack_callback = Proc.new do
puts "I'm here #{now.to_i}"
end

data = { :event => event, :ack_callback => ack_callback, }
event_queue.push data, [0, ack_timeout - Time.now.to_i].max

rescue TimeoutError
# Full pipeline, partial ack
# NOTE: comm.send can raise a Timeout::Error of its own
Expand Down
14 changes: 12 additions & 2 deletions lib/logstash/inputs/courier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module Inputs
class Courier < LogStash::Inputs::Base
config_name 'courier'
milestone 1
acks_on_send true

default :codec, 'plain'

Expand Down Expand Up @@ -76,6 +77,14 @@ class Courier < LogStash::Inputs::Base
# using client certificates
config :add_peer_fields, :validate => :boolean

# Courier can be configured to ack only when messages have been acked by
# an HA output
#
# e.g. Elasticsearch could be setup to be HA with 3 nodes
#
# Assumes you're going to flag ES output with provides_ha => true
config :needs_ha, :validate => :boolean, :default => false

public

def register
Expand Down Expand Up @@ -107,8 +116,9 @@ def register
public

def run(output_queue)
@log_courier.run do |event|
event = LogStash::Event.new(event)
@log_courier.run do |data|
event = LogStash::Event.new(data[:event])
event.on_sent(&data[:ack_callback])
decorate event
output_queue << event
end
Expand Down

0 comments on commit 5b9d27b

Please sign in to comment.