diff --git a/README.md b/README.md index e4ad29d..4607947 100644 --- a/README.md +++ b/README.md @@ -509,7 +509,6 @@ You need to install rdkafka gem. partition_key (string) :default => 'partition' partition_key_key (string) :default => 'partition_key' message_key_key (string) :default => 'message_key' - default_topic (string) :default => nil use_default_for_unknown_topic (bool) :default => false use_default_for_unknown_partition_error (bool) :default => false default_partition_key (string) :default => nil diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 0fc18e7..33a5895 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -5,48 +5,17 @@ require 'rdkafka' -# This is required for `rdkafka` version >= 0.12.0 -# Overriding the close method in order to provide a time limit for when it should be forcibly closed -class Rdkafka::Producer::Client - # return false if producer is forcefully closed, otherwise return true - def close(timeout=nil) - return unless @native - - # Indicate to polling thread that we're closing - @polling_thread[:closing] = true - # Wait for the polling thread to finish up - thread = @polling_thread.join(timeout) - - Rdkafka::Bindings.rd_kafka_destroy(@native) - - @native = nil - - return !thread.nil? - end -end - -class Rdkafka::Producer - # return false if producer is forcefully closed, otherwise return true - def close(timeout = nil) - rdkafka_version = Rdkafka::VERSION || '0.0.0' - # Rdkafka version >= 0.12.0 changed its internals - if Gem::Version::create(rdkafka_version) >= Gem::Version.create('0.12.0') - ObjectSpace.undefine_finalizer(self) - - return @client.close(timeout) - end - - @closing = true - # Wait for the polling thread to finish up - # If the broker isn't alive, the thread doesn't exit - if timeout - thr = @polling_thread.join(timeout) - return !!thr - else - @polling_thread.join - return true - end +begin + rdkafka_version = Gem::Version::create(Rdkafka::VERSION) + if rdkafka_version < Gem::Version.create('0.12.0') + require_relative 'rdkafka_patch/0_11_0' + elsif rdkafka_version == Gem::Version.create('0.12.0') + require_relative 'rdkafka_patch/0_12_0' + elsif rdkafka_version >= Gem::Version.create('0.14.0') + require_relative 'rdkafka_patch/0_14_0' end +rescue LoadError, NameError + raise "unable to patch rdkafka." end module Fluent::Plugin diff --git a/lib/fluent/plugin/rdkafka_patch/0_11_0.rb b/lib/fluent/plugin/rdkafka_patch/0_11_0.rb new file mode 100644 index 0000000..ddde424 --- /dev/null +++ b/lib/fluent/plugin/rdkafka_patch/0_11_0.rb @@ -0,0 +1,15 @@ +class Rdkafka::Producer + # return false if producer is forcefully closed, otherwise return true + def close(timeout = nil) + @closing = true + # Wait for the polling thread to finish up + # If the broker isn't alive, the thread doesn't exit + if timeout + thr = @polling_thread.join(timeout) + return !!thr + else + @polling_thread.join + return true + end + end +end diff --git a/lib/fluent/plugin/rdkafka_patch/0_12_0.rb b/lib/fluent/plugin/rdkafka_patch/0_12_0.rb new file mode 100644 index 0000000..db36cc6 --- /dev/null +++ b/lib/fluent/plugin/rdkafka_patch/0_12_0.rb @@ -0,0 +1,27 @@ +# This is required for `rdkafka` version >= 0.12.0 +# Overriding the close method in order to provide a time limit for when it should be forcibly closed +class Rdkafka::Producer::Client + # return false if producer is forcefully closed, otherwise return true + def close(timeout=nil) + return unless @native + + # Indicate to polling thread that we're closing + @polling_thread[:closing] = true + # Wait for the polling thread to finish up + thread = @polling_thread.join(timeout) + + Rdkafka::Bindings.rd_kafka_destroy(@native) + + @native = nil + + return !thread.nil? + end +end + +class Rdkafka::Producer + def close(timeout = nil) + ObjectSpace.undefine_finalizer(self) + + return @client.close(timeout) + end +end diff --git a/lib/fluent/plugin/rdkafka_patch/0_14_0.rb b/lib/fluent/plugin/rdkafka_patch/0_14_0.rb new file mode 100644 index 0000000..f4da5b8 --- /dev/null +++ b/lib/fluent/plugin/rdkafka_patch/0_14_0.rb @@ -0,0 +1,44 @@ +class Rdkafka::NativeKafka + # return false if producer is forcefully closed, otherwise return true + def close(timeout=nil, object_id=nil) + return true if closed? + + synchronize do + # Indicate to the outside world that we are closing + @closing = true + + thread_status = :unknown + if @polling_thread + # Indicate to polling thread that we're closing + @polling_thread[:closing] = true + + # Wait for the polling thread to finish up, + # this can be aborted in practice if this + # code runs from a finalizer. + thread_status = @polling_thread.join(timeout) + end + + # Destroy the client after locking both mutexes + @poll_mutex.lock + + # This check prevents a race condition, where we would enter the close in two threads + # and after unlocking the primary one that hold the lock but finished, ours would be unlocked + # and would continue to run, trying to destroy inner twice + if @inner + Rdkafka::Bindings.rd_kafka_destroy(@inner) + @inner = nil + @opaque = nil + end + + !thread_status.nil? + end + end +end + +class Rdkafka::Producer + def close(timeout = nil) + return true if closed? + ObjectSpace.undefine_finalizer(self) + @native_kafka.close(timeout) + end +end