Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ruby-kafka instead of poseidon in out_kafka #62

Merged
merged 1 commit into from Jul 22, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
131 changes: 72 additions & 59 deletions lib/fluent/plugin/out_kafka.rb
Expand Up @@ -3,7 +3,10 @@ class Fluent::KafkaOutput < Fluent::Output

def initialize
super
require 'poseidon'

require 'kafka'

@kafka = nil
end

config_param :brokers, :string, :default => 'localhost:9092',
Expand All @@ -25,28 +28,34 @@ def initialize
config_param :output_include_tag, :bool, :default => false
config_param :output_include_time, :bool, :default => false

# https://github.com/zendesk/ruby-kafka#encryption-and-authentication-using-ssl
config_param :ssl_ca_cert, :string, :default => nil,
:desc => "a PEM encoded CA cert to use with and SSL connection."
config_param :ssl_client_cert, :string, :default => nil,
:desc => "a PEM encoded client cert to use with and SSL connection. Must be used in combination with ssl_client_cert_key."
config_param :ssl_client_cert_key, :string, :default => nil,
:desc => "a PEM encoded client cert key to use with and SSL connection. Must be used in combination with ssl_client_cert."

# poseidon producer options
config_param :max_send_retries, :integer, :default => 3,
config_param :max_send_retries, :integer, :default => 1,
:desc => "Number of times to retry sending of messages to a leader."
config_param :required_acks, :integer, :default => 0,
:desc => "The number of acks required per request."
config_param :ack_timeout_ms, :integer, :default => 1500,
config_param :ack_timeout, :integer, :default => nil,
:desc => "How long the producer waits for acks."
config_param :compression_codec, :string, :default => 'none',
config_param :compression_codec, :string, :default => nil,
:desc => "The codec the producer uses to compress messages."

config_param :time_format, :string, :default => nil

attr_accessor :output_data_type
attr_accessor :field_separator

@seed_brokers = []

unless method_defined?(:log)
define_method("log") { $log }
end

def refresh_producer()
def refresh_client
if @zookeeper
@seed_brokers = []
z = Zookeeper.new(@zookeeper)
Expand All @@ -59,8 +68,9 @@ def refresh_producer()
end
begin
if @seed_brokers.length > 0
@producer = Poseidon::Producer.new(@seed_brokers, @client_id, :max_send_retries => @max_send_retries, :required_acks => @required_acks, :ack_timeout_ms => @ack_timeout_ms, :compression_codec => @compression_codec.to_sym)
log.info "initialized producer #{@client_id}"
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key))
log.info "initialized kafka producer: #{@client_id}"
else
log.warn "No brokers found on Zookeeper"
end
Expand All @@ -69,25 +79,24 @@ def refresh_producer()
end
end

def read_ssl_file(path)
return nil if path.nil?
File.read(path)
end

def configure(conf)
super

if @zookeeper
require 'zookeeper'
require 'yajl'
else
@seed_brokers = @brokers.match(",").nil? ? [@brokers] : @brokers.split(",")
log.info "brokers has been set directly: #{@seed_brokers}"
end
if @compression_codec == 'snappy'
require 'snappy'
end
case @output_data_type
when 'json'
require 'yajl'
when 'ltsv'
require 'ltsv'
when 'msgpack'
require 'msgpack'

if conf['ack_timeout_ms']
log.warn "'ack_timeout_ms' parameter is deprecated. Use second unit 'ack_timeout' instead"
@ack_timeout = conf['ack_timeout_ms'].to_i / 1000
end

@f_separator = case @field_separator
Expand All @@ -97,56 +106,57 @@ def configure(conf)
else "\t"
end

@custom_attributes = if @output_data_type == 'json'
nil
elsif @output_data_type == 'ltsv'
nil
elsif @output_data_type == 'msgpack'
nil
elsif @output_data_type =~ /^attr:(.*)$/
$1.split(',').map(&:strip).reject(&:empty?)
else
@formatter = Fluent::Plugin.new_formatter(@output_data_type)
@formatter.configure(conf)
nil
end
@formatter_proc = setup_formatter(conf)

@producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks}
@producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
@producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec
end

def start
super
refresh_producer()
refresh_client
end

def shutdown
super
@kafka = nil
end

def parse_record(record)
if @custom_attributes.nil?
case @output_data_type
when 'json'
Yajl::Encoder.encode(record)
when 'ltsv'
LTSV.dump(record)
when 'msgpack'
record.to_msgpack
else
record.to_s
end
else
def setup_formatter(conf)
if @output_data_type == 'json'
require 'yajl'
Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
elsif @output_data_type == 'ltsv'
require 'ltsv'
Proc.new { |tag, time, record| LTSV.dump(record) }
elsif @output_data_type == 'msgpack'
require 'msgpack'
Proc.new { |tag, time, record| record.to_msgpack }
elsif @output_data_type =~ /^attr:(.*)$/
@custom_attributes = $1.split(',').map(&:strip).reject(&:empty?)
@custom_attributes.unshift('time') if @output_include_time
@custom_attributes.unshift('tag') if @output_include_tag
@custom_attributes.map { |attr|
record[attr].nil? ? '' : record[attr].to_s
}.join(@f_separator)
Proc.new { |tag, time, record|
@custom_attributes.map { |attr|
record[attr].nil? ? '' : record[attr].to_s
}.join(@f_separator)
}
else
@formatter = Fluent::Plugin.new_formatter(@output_data_type)
@formatter.configure(conf)
@formatter.method(:format)
end
end

def emit(tag, es, chain)
begin
chain.next
es.each do |time,record|

# out_kafka is mainly for testing so don't need the performance unlike out_kafka_buffered.
producer = @kafka.producer(@producer_opts)

es.each do |time, record|
if @output_include_time
if @time_format
record['time'] = Time.at(time).strftime(@time_format)
Expand All @@ -155,17 +165,20 @@ def emit(tag, es, chain)
end
end
record['tag'] = tag if @output_include_tag
topic = record['topic'] || self.default_topic || tag
topic = record['topic'] || @default_topic || tag
partition_key = record['partition_key'] || @default_partition_key
value = @formatter.nil? ? parse_record(record) : @formatter.format(tag, time, record)
log.trace("message send to #{topic} with key: #{partition_key} and value: #{value}.")
message = Poseidon::MessageToSend.new(topic, value, partition_key)
@producer.send_messages([message])
value = @formatter_proc.call(tag, time, record)

log.on_trace { log.trace("message send to #{topic} with key: #{partition_key} and value: #{value}.") }
producer.produce(value, topic: topic, partition_key: partition_key)
end

producer.deliver_messages
producer.shutdown
rescue Exception => e
log.warn("Send exception occurred: #{e}")
@producer.close if @producer
refresh_producer()
log.warn "Send exception occurred: #{e}"
producer.shutdown if producer
refresh_client
raise e
end
end
Expand Down