Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send data in batches #3

Merged
merged 12 commits into from Feb 11, 2018
134 changes: 101 additions & 33 deletions lib/logstash/outputs/firehose.rb
Expand Up @@ -43,12 +43,19 @@ class LogStash::Outputs::Firehose < LogStash::Outputs::Base
TEMPFILE_EXTENSION = "txt"
FIREHOSE_STREAM_VALID_CHARACTERS = /[\w\-]/

# These are hard limits
FIREHOSE_PUT_BATCH_SIZE_LIMIT = 4_000_000 # 4MB
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to have these properties configurable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a hard limit from AWS, you cannot change it so I'm not sure the value in making it configurable.

FIREHOSE_PUT_BATCH_RECORD_LIMIT = 500
FIREHOSE_PUT_RECORD_SIZE_LIMIT = 1_000_000 # 1_000 KB

# make properties visible for tests
attr_accessor :stream
attr_accessor :access_key_id
attr_accessor :secret_access_key
attr_accessor :codec

concurrency :single

config_name "firehose"

# Output coder
Expand All @@ -64,11 +71,6 @@ class LogStash::Outputs::Firehose < LogStash::Outputs::Base
# Register plugin
public
def register
# require "aws-sdk"
# required if using ruby version < 2.0
# http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby
#Aws.eager_autoload!(Aws::Firehose)
#Aws.eager_autoload!(services: %w(Firehose))

# Create Firehose API client
@firehose = aws_firehose_client
Expand All @@ -84,58 +86,66 @@ def register
end

# Register coder: comma separated line -> SPECIFIED_CODEC_FMT, call handler after to deliver encoded data to Firehose
@event_buffer_lock = Mutex.new
@event_buffer = Array.new
@codec.on_event do |event, encoded_event|
@logger.debug("Event info", :event => event, :encoded_event => encoded_event)
handle_event(encoded_event)
@event_buffer_lock.synchronize do
@event_buffer.push(encoded_event)
end
end
end

#
# On event received handler: just wrap as JSON and pass it to handle_event method
public
def receive(event)
@codec.encode(event)

handle_event
end # def event

def multi_receive(events)
events.each do |event|
@codec.encode(event)
end

#
# Helper methods
#
handle_events
end # def multi_receive

# Evaluate AWS endpoint for Firehose based on specified @region option
def aws_service_endpoint(region)
return {
:region => region,
:endpoint => "https://firehose.#{region}.amazonaws.com"
}
end

# Build AWS Firehose client
private

def aws_firehose_client
@logger.info "Registering Firehose output", :stream => @stream, :region => @region
@firehose = Aws::Firehose::Client.new(aws_full_options)
end

# Build and return AWS client options map
private
def aws_full_options
aws_options_hash
end

# Evaluate AWS endpoint for Firehose based on specified @region option
public
def aws_service_endpoint(region)
return {
:region => region,
:endpoint => "https://firehose.#{region}.amazonaws.com"
}
end

# Handle encoded event, specifically deliver received event into Firehose stream
private
def handle_event(encoded_event)
# TODO Multithreaded workers pool?
push_data_into_stream encoded_event
def oversized_event(event)
event.bytesize > FIREHOSE_PUT_RECORD_SIZE_LIMIT
end

# Push encoded data into Firehose stream
private
def push_data_into_stream(encoded_event)
def handle_event
encoded_event = @event_buffer.pop
@logger.debug "Pushing encoded event: #{encoded_event}"

if oversized_event(encoded_event)
# Drop it to the floor
@logger.error "Event is too big for Firehose: #{encoded_event.bytesize}"
return
end

begin
@firehose.put_record({
delivery_stream_name: @stream,
Expand All @@ -147,14 +157,72 @@ def push_data_into_stream(encoded_event)
# Firehose stream not found
@logger.error "Firehose: AWS resource error", :error => error
raise LogStash::Error, "Firehose: AWS resource not found error: #{error}"
rescue Exception => error
rescue Aws::Firehose::Errors::ServiceError => error
# TODO Retry policy
# TODO Fallback policy
# TODO Keep failed events somewhere, probably in fallback file
# Permanently failing events can be pushed to a DLQ by Logstash
@logger.error "Firehose: AWS delivery error", :error => error
@logger.info "Failed to deliver event: #{encoded_event}"
@logger.error "TODO Retry and fallback policy implementation"
end
end

def array_size(array)
array.collect(&:bytesize).inject(0, :+)
end

def remove_oversized_events(events)
undersized_events = events.reject { |event| oversized_event(event) }
oversized_events = events - undersized_events
if oversized_events.length > 0
@logger.error "#{oversized_events.length} events are too big for Firehose, they will not be sent"
end
undersized_events
end

def handle_events
@logger.debug "Pushing encoded events"

begin
rounds = (@event_buffer.length / FIREHOSE_PUT_BATCH_RECORD_LIMIT.to_f).ceil
rounds.times do
events = []
@event_buffer_lock.synchronize do
events = @event_buffer.slice!(0, FIREHOSE_PUT_BATCH_RECORD_LIMIT)
end
break if events.nil?

events = remove_oversized_events(events)
break if events.empty?

put_in_batches(events)
end
rescue Aws::Firehose::Errors::ResourceNotFoundException => error
# Firehose stream not found
@logger.error "Firehose: AWS resource error", :error => error
raise LogStash::Error, "Firehose: AWS resource not found error: #{error}"
rescue Aws::Firehose::Errors::ServiceError => error
@logger.error "Firehose: AWS delivery error", :error => error
end
end

def put_in_batches(events)
if array_size(events) > FIREHOSE_PUT_BATCH_SIZE_LIMIT
while events.length > 0
event_chunk = []
while events.length > 0 && (array_size(event_chunk) + events.last.bytesize) < FIREHOSE_PUT_BATCH_SIZE_LIMIT
event_chunk << events.pop
end
put_batch(event_chunk)
end
else
put_batch(events)
end
end

def put_batch(events)
aws_firehose_client.put_record_batch({
delivery_stream_name: @stream,
records: events.map { |e| {data: e} }
})
end

end # class LogStash::Outputs::Firehose
17 changes: 9 additions & 8 deletions logstash-output-firehose.gemspec
@@ -1,12 +1,12 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-firehose'
s.version = "0.0.2"
s.licenses = ["Apache License (2.0)"]
s.summary = "Output plugin to push data into AWS Kinesis Firehose stream."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
s.authors = ["Valera Chevtaev"]
s.email = "myltik@gmail.com"
s.homepage = "https://github.com/chupakabr/logstash-output-firehose"
s.name = 'logstash-output-firehose'
s.version = "0.0.2"
s.licenses = ["Apache License (2.0)"]
s.summary = "Output plugin to push data into AWS Kinesis Firehose stream."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
s.authors = ["Valera Chevtaev"]
s.email = "myltik@gmail.com"
s.homepage = "https://github.com/chupakabr/logstash-output-firehose"
s.require_paths = ["lib"]

# Files
Expand All @@ -25,4 +25,5 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "logstash-codec-line"
s.add_runtime_dependency "logstash-codec-json_lines"
s.add_development_dependency "logstash-devutils"
s.add_development_dependency "timecop"
end
118 changes: 104 additions & 14 deletions spec/outputs/firehose_spec.rb
@@ -1,41 +1,131 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/outputs/firehose"
require "logstash/codecs/plain"
require "logstash/codecs/line"
require "logstash/codecs/json_lines"
require "logstash/event"
require "aws-sdk"
require "timecop"

describe LogStash::Outputs::Firehose do
dataStr = "123,someValue,1234567890"

let(:sample_event) { LogStash::Event.new("message" => dataStr) }
let(:expected_event) { LogStash::Event.new("message" => dataStr) }
let(:output) { LogStash::Outputs::Firehose.new({"codec" => "plain"}) }
let(:time_now) { Time.now }
let(:expected_event) { "#{time_now.strftime("%FT%H:%M:%S.%3NZ")} %{host} 123,someValue,1234567890" }
let(:firehose_double) { instance_double(Aws::Firehose::Client) }
let(:stream_name) { "aws-test-stream" }
let(:oversized_event) { "A" * 999_999 }
subject { LogStash::Outputs::Firehose.new({"codec" => "plain"}) }

before do
Thread.abort_on_exception = true

# Setup Firehose client
output.stream = "aws-test-stream"
subject.stream = stream_name
output.access_key_id = "Key ID"
output.secret_access_key = "Secret key"
output.register
subject.register

allow(Aws::Firehose::Client).to receive(:new).and_return(firehose_double)
allow(firehose_double).to receive(:put_record)
allow(firehose_double).to receive(:put_record_batch)
end

describe "receive message with plain codec" do
subject {
expect(output).to receive(:handle_event) do |arg|
arg
describe "receive one message" do
it "returns same string" do
expect(firehose_double).to receive(:put_record).with({
delivery_stream_name: stream_name,
record: {
data: expected_event
}
})
Timecop.freeze(time_now) do
subject.receive(sample_event)
end
output.receive(sample_event)
}
end

it "doesn't attempt to send a record greater than 1000 KB" do
expect(firehose_double).not_to receive(:put_record)
subject.receive([oversized_event * 2])
end
end

describe "receive multiple messages" do
let(:sample_event_1) { LogStash::Event.new("message" => "abc") }
let(:sample_event_2) { LogStash::Event.new("message" => "def") }
let(:sample_event_3) { LogStash::Event.new("message" => "ghi") }
let(:time_now) { Time.now }
let(:expected_event_1) { "#{time_now.strftime("%FT%H:%M:%S.%3NZ")} %{host} abc" }
let(:expected_event_2) { "#{time_now.strftime("%FT%H:%M:%S.%3NZ")} %{host} def" }
let(:expected_event_3) { "#{time_now.strftime("%FT%H:%M:%S.%3NZ")} %{host} ghi" }
it "returns same string" do
expect(subject).not_to eq(nil)
expect(subject.include? expected_event["message"]).to be_truthy
# expect(subject).to eq(expected_event["message"])
expect(firehose_double).to receive(:put_record_batch).with({
delivery_stream_name: stream_name,
records: [
{
data: expected_event
},
{
data: expected_event
},
{
data: expected_event
},
]
})
Timecop.freeze(time_now) do
subject.multi_receive([sample_event, sample_event, sample_event])
end
end

it "sends each message once" do
expect(firehose_double).to receive(:put_record_batch).with({
delivery_stream_name: stream_name,
records: [
{
data: expected_event_1
},
{
data: expected_event_2
},
]
}).once
expect(firehose_double).to receive(:put_record_batch).with({
delivery_stream_name: stream_name,
records: [
{
data: expected_event_3
},
]
}).once
Timecop.freeze(time_now) do
subject.multi_receive([sample_event_1, sample_event_2])
subject.multi_receive([sample_event_3])
subject.multi_receive([])
end
end

it "doesn't crash if no events are sent" do
# Necessary to replicate our race condition
a = Thread.new { subject.multi_receive(Array.new(499, sample_event_1)) }
b = Thread.new { subject.multi_receive([sample_event_1, sample_event_2]) }
expect { subject.multi_receive([sample_event_1, sample_event_2]) }.not_to raise_exception
# Ensure rspec doubles don't leak into other examples
a.join
b.join
end

context "oversized events are sent" do
it "doesn't attempt to send payloads greater than 4MB" do
expect(firehose_double).to receive(:put_record_batch).twice
subject.multi_receive(Array.new(5, oversized_event))
end

it "doesn't attempt to send a record greater than 1000 KB" do
expect(firehose_double).not_to receive(:put_record_batch)
subject.multi_receive([oversized_event * 2])
end
end
end
end