Skip to content

Commit

Permalink
Merge pull request #62 from htgc/use-ruby-kafka-in-out-kafka
Browse files Browse the repository at this point in the history
Use ruby-kafka instead of poseidon in out_kafka
  • Loading branch information
repeatedly committed Jul 22, 2016
2 parents dbb50d1 + cb0a4f6 commit beeeff1
Showing 1 changed file with 72 additions and 59 deletions.
131 changes: 72 additions & 59 deletions lib/fluent/plugin/out_kafka.rb
Expand Up @@ -3,7 +3,10 @@ class Fluent::KafkaOutput < Fluent::Output

def initialize
super
require 'poseidon'

require 'kafka'

@kafka = nil
end

config_param :brokers, :string, :default => 'localhost:9092',
Expand All @@ -25,28 +28,34 @@ def initialize
config_param :output_include_tag, :bool, :default => false
config_param :output_include_time, :bool, :default => false

# https://github.com/zendesk/ruby-kafka#encryption-and-authentication-using-ssl
config_param :ssl_ca_cert, :string, :default => nil,
:desc => "a PEM encoded CA cert to use with and SSL connection."
config_param :ssl_client_cert, :string, :default => nil,
:desc => "a PEM encoded client cert to use with and SSL connection. Must be used in combination with ssl_client_cert_key."
config_param :ssl_client_cert_key, :string, :default => nil,
:desc => "a PEM encoded client cert key to use with and SSL connection. Must be used in combination with ssl_client_cert."

# poseidon producer options
config_param :max_send_retries, :integer, :default => 3,
config_param :max_send_retries, :integer, :default => 1,
:desc => "Number of times to retry sending of messages to a leader."
config_param :required_acks, :integer, :default => 0,
:desc => "The number of acks required per request."
config_param :ack_timeout_ms, :integer, :default => 1500,
config_param :ack_timeout, :integer, :default => nil,
:desc => "How long the producer waits for acks."
config_param :compression_codec, :string, :default => 'none',
config_param :compression_codec, :string, :default => nil,
:desc => "The codec the producer uses to compress messages."

config_param :time_format, :string, :default => nil

attr_accessor :output_data_type
attr_accessor :field_separator

@seed_brokers = []

unless method_defined?(:log)
define_method("log") { $log }
end

def refresh_producer()
def refresh_client
if @zookeeper
@seed_brokers = []
z = Zookeeper.new(@zookeeper)
Expand All @@ -59,8 +68,9 @@ def refresh_producer()
end
begin
if @seed_brokers.length > 0
@producer = Poseidon::Producer.new(@seed_brokers, @client_id, :max_send_retries => @max_send_retries, :required_acks => @required_acks, :ack_timeout_ms => @ack_timeout_ms, :compression_codec => @compression_codec.to_sym)
log.info "initialized producer #{@client_id}"
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key))
log.info "initialized kafka producer: #{@client_id}"
else
log.warn "No brokers found on Zookeeper"
end
Expand All @@ -69,25 +79,24 @@ def refresh_producer()
end
end

def read_ssl_file(path)
return nil if path.nil?
File.read(path)
end

def configure(conf)
super

if @zookeeper
require 'zookeeper'
require 'yajl'
else
@seed_brokers = @brokers.match(",").nil? ? [@brokers] : @brokers.split(",")
log.info "brokers has been set directly: #{@seed_brokers}"
end
if @compression_codec == 'snappy'
require 'snappy'
end
case @output_data_type
when 'json'
require 'yajl'
when 'ltsv'
require 'ltsv'
when 'msgpack'
require 'msgpack'

if conf['ack_timeout_ms']
log.warn "'ack_timeout_ms' parameter is deprecated. Use second unit 'ack_timeout' instead"
@ack_timeout = conf['ack_timeout_ms'].to_i / 1000
end

@f_separator = case @field_separator
Expand All @@ -97,56 +106,57 @@ def configure(conf)
else "\t"
end

@custom_attributes = if @output_data_type == 'json'
nil
elsif @output_data_type == 'ltsv'
nil
elsif @output_data_type == 'msgpack'
nil
elsif @output_data_type =~ /^attr:(.*)$/
$1.split(',').map(&:strip).reject(&:empty?)
else
@formatter = Fluent::Plugin.new_formatter(@output_data_type)
@formatter.configure(conf)
nil
end
@formatter_proc = setup_formatter(conf)

@producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks}
@producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
@producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec
end

def start
super
refresh_producer()
refresh_client
end

def shutdown
super
@kafka = nil
end

def parse_record(record)
if @custom_attributes.nil?
case @output_data_type
when 'json'
Yajl::Encoder.encode(record)
when 'ltsv'
LTSV.dump(record)
when 'msgpack'
record.to_msgpack
else
record.to_s
end
else
def setup_formatter(conf)
if @output_data_type == 'json'
require 'yajl'
Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
elsif @output_data_type == 'ltsv'
require 'ltsv'
Proc.new { |tag, time, record| LTSV.dump(record) }
elsif @output_data_type == 'msgpack'
require 'msgpack'
Proc.new { |tag, time, record| record.to_msgpack }
elsif @output_data_type =~ /^attr:(.*)$/
@custom_attributes = $1.split(',').map(&:strip).reject(&:empty?)
@custom_attributes.unshift('time') if @output_include_time
@custom_attributes.unshift('tag') if @output_include_tag
@custom_attributes.map { |attr|
record[attr].nil? ? '' : record[attr].to_s
}.join(@f_separator)
Proc.new { |tag, time, record|
@custom_attributes.map { |attr|
record[attr].nil? ? '' : record[attr].to_s
}.join(@f_separator)
}
else
@formatter = Fluent::Plugin.new_formatter(@output_data_type)
@formatter.configure(conf)
@formatter.method(:format)
end
end

def emit(tag, es, chain)
begin
chain.next
es.each do |time,record|

# out_kafka is mainly for testing so don't need the performance unlike out_kafka_buffered.
producer = @kafka.producer(@producer_opts)

es.each do |time, record|
if @output_include_time
if @time_format
record['time'] = Time.at(time).strftime(@time_format)
Expand All @@ -155,17 +165,20 @@ def emit(tag, es, chain)
end
end
record['tag'] = tag if @output_include_tag
topic = record['topic'] || self.default_topic || tag
topic = record['topic'] || @default_topic || tag
partition_key = record['partition_key'] || @default_partition_key
value = @formatter.nil? ? parse_record(record) : @formatter.format(tag, time, record)
log.trace("message send to #{topic} with key: #{partition_key} and value: #{value}.")
message = Poseidon::MessageToSend.new(topic, value, partition_key)
@producer.send_messages([message])
value = @formatter_proc.call(tag, time, record)

log.on_trace { log.trace("message send to #{topic} with key: #{partition_key} and value: #{value}.") }
producer.produce(value, topic: topic, partition_key: partition_key)
end

producer.deliver_messages
producer.shutdown
rescue Exception => e
log.warn("Send exception occurred: #{e}")
@producer.close if @producer
refresh_producer()
log.warn "Send exception occurred: #{e}"
producer.shutdown if producer
refresh_client
raise e
end
end
Expand Down

0 comments on commit beeeff1

Please sign in to comment.