Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions lib/rdkafka/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ def finalizer

# Close this admin instance
def close
return if closed?
ObjectSpace.undefine_finalizer(self)

@native_kafka.close
end

# Whether this admin has closed
def closed?
@native_kafka.closed?
end

# Create a topic with the given partition count and replication factor
#
# @raise [ConfigError] When the partition count or replication factor are out of valid range
Expand Down Expand Up @@ -149,7 +154,7 @@ def delete_topic(topic_name)

private
def closed_admin_check(method)
raise Rdkafka::ClosedAdminError.new(method) if @native_kafka.closed?
raise Rdkafka::ClosedAdminError.new(method) if closed?
end
end
end
4 changes: 3 additions & 1 deletion lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ class TopicPartitionList < FFI::Struct
]

attach_function :rd_kafka_new, [:kafka_type, :pointer, :pointer, :int], :pointer
attach_function :rd_kafka_destroy, [:pointer], :void

RD_KAFKA_DESTROY_F_IMMEDIATE = 0x4
attach_function :rd_kafka_destroy_flags, [:pointer, :int], :void

# Consumer

Expand Down
7 changes: 4 additions & 3 deletions lib/rdkafka/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,14 @@ def consumer
Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback)
end

# Create native client
kafka = native_kafka(config, :rd_kafka_consumer)

# Redirect the main queue to the consumer
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)

# Return consumer with Kafka client
Rdkafka::Consumer.new(kafka)
Rdkafka::Consumer.new(Rdkafka::NativeKafka.new(kafka, run_polling_thread: false))
end

# Create a producer with this configuration.
Expand All @@ -181,7 +182,7 @@ def producer
Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
# Return producer with Kafka client
partitioner_name = self[:partitioner] || self["partitioner"]
Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer)), partitioner_name).tap do |producer|
Rdkafka::Producer.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true), partitioner_name).tap do |producer|
opaque.producer = producer
end
end
Expand All @@ -196,7 +197,7 @@ def admin
opaque = Opaque.new
config = native_config(opaque)
Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction)
Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer)))
Rdkafka::Admin.new(Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true))
end

# Error that is returned by the underlying rdkafka error if an invalid configuration option is present.
Expand Down
41 changes: 20 additions & 21 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ def finalizer
# @return [nil]
def close
return if closed?

Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka)
Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
@native_kafka = nil
ObjectSpace.undefine_finalizer(self)
Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka.inner)
@native_kafka.close
end

# Whether this consumer has closed
def closed?
@native_kafka.nil?
@native_kafka.closed?
end

# Subscribe to one or more topics letting Kafka handle partition assignments.
Expand All @@ -55,7 +54,7 @@ def subscribe(*topics)
end

# Subscribe to topic partition list and check this was successful
response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl)
response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka.inner, tpl)
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'")
end
Expand All @@ -71,7 +70,7 @@ def subscribe(*topics)
def unsubscribe
closed_consumer_check(__method__)

response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka)
response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka.inner)
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
Expand All @@ -94,7 +93,7 @@ def pause(list)
tpl = list.to_native_tpl

begin
response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka, tpl)
response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka.inner, tpl)

if response != 0
list = TopicPartitionList.from_native_tpl(tpl)
Expand Down Expand Up @@ -122,7 +121,7 @@ def resume(list)
tpl = list.to_native_tpl

begin
response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka, tpl)
response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka.inner, tpl)
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'")
end
Expand All @@ -140,7 +139,7 @@ def subscription
closed_consumer_check(__method__)

ptr = FFI::MemoryPointer.new(:pointer)
response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, ptr)
response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka.inner, ptr)

if response != 0
raise Rdkafka::RdkafkaError.new(response)
Expand Down Expand Up @@ -170,7 +169,7 @@ def assign(list)
tpl = list.to_native_tpl

begin
response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka, tpl)
response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka.inner, tpl)
if response != 0
raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'")
end
Expand All @@ -188,7 +187,7 @@ def assignment
closed_consumer_check(__method__)

ptr = FFI::MemoryPointer.new(:pointer)
response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka, ptr)
response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka.inner, ptr)
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
Expand Down Expand Up @@ -227,7 +226,7 @@ def committed(list=nil, timeout_ms=1200)
tpl = list.to_native_tpl

begin
response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms)
response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka.inner, tpl, timeout_ms)
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
Expand All @@ -253,7 +252,7 @@ def query_watermark_offsets(topic, partition, timeout_ms=200)
high = FFI::MemoryPointer.new(:int64, 1)

response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets(
@native_kafka,
@native_kafka.inner,
topic,
partition,
low,
Expand Down Expand Up @@ -307,7 +306,7 @@ def lag(topic_partition_list, watermark_timeout_ms=100)
# @return [String, nil]
def cluster_id
closed_consumer_check(__method__)
Rdkafka::Bindings.rd_kafka_clusterid(@native_kafka)
Rdkafka::Bindings.rd_kafka_clusterid(@native_kafka.inner)
end

# Returns this client's broker-assigned group member id
Expand All @@ -317,7 +316,7 @@ def cluster_id
# @return [String, nil]
def member_id
closed_consumer_check(__method__)
Rdkafka::Bindings.rd_kafka_memberid(@native_kafka)
Rdkafka::Bindings.rd_kafka_memberid(@native_kafka.inner)
end

# Store offset of a message to be used in the next commit of this consumer
Expand All @@ -335,7 +334,7 @@ def store_offset(message)
# rd_kafka_offset_store is one of the few calls that does not support
# a string as the topic, so create a native topic for it.
native_topic = Rdkafka::Bindings.rd_kafka_topic_new(
@native_kafka,
@native_kafka.inner,
message.topic,
nil
)
Expand Down Expand Up @@ -367,7 +366,7 @@ def seek(message)
# rd_kafka_offset_store is one of the few calls that does not support
# a string as the topic, so create a native topic for it.
native_topic = Rdkafka::Bindings.rd_kafka_topic_new(
@native_kafka,
@native_kafka.inner,
message.topic,
nil
)
Expand Down Expand Up @@ -411,7 +410,7 @@ def commit(list=nil, async=false)
tpl = list ? list.to_native_tpl : nil

begin
response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, tpl, async)
response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka.inner, tpl, async)
if response != 0
raise Rdkafka::RdkafkaError.new(response)
end
Expand All @@ -430,7 +429,7 @@ def commit(list=nil, async=false)
def poll(timeout_ms)
closed_consumer_check(__method__)

message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms)
message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka.inner, timeout_ms)
if message_ptr.null?
nil
else
Expand All @@ -445,7 +444,7 @@ def poll(timeout_ms)
end
ensure
# Clean up rdkafka message if there is one
if !message_ptr.nil? && !message_ptr.null?
if message_ptr && !message_ptr.null?
Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
end
end
Expand Down
48 changes: 29 additions & 19 deletions lib/rdkafka/native_kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ module Rdkafka
# @private
# A wrapper around a native kafka that polls and cleanly exits
class NativeKafka
def initialize(inner)
def initialize(inner, run_polling_thread:)
@inner = inner

# Start thread to poll client for delivery callbacks
@polling_thread = Thread.new do
loop do
Rdkafka::Bindings.rd_kafka_poll(inner, 250)
# Exit thread if closing and the poll queue is empty
if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(inner) == 0
break
if run_polling_thread
# Start thread to poll client for delivery callbacks,
# not used in consumer.
@polling_thread = Thread.new do
loop do
Rdkafka::Bindings.rd_kafka_poll(inner, 250)
# Exit thread if closing and the poll queue is empty
if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(inner) == 0
break
end
end
end
@polling_thread.abort_on_exception = true
@polling_thread[:closing] = false
end
@polling_thread.abort_on_exception = true
@polling_thread[:closing] = false

@closing = false
end

def inner
Expand All @@ -30,22 +35,27 @@ def finalizer
end

def closed?
@inner.nil?
@closing || @inner.nil?
end

def close(object_id=nil)
return if closed?

# Flush outstanding activity
Rdkafka::Bindings.rd_kafka_flush(@inner, 30 * 1000)

# Indicate to polling thread that we're closing
@polling_thread[:closing] = true
# Wait for the polling thread to finish up
@polling_thread.join
# Indicate to the outside world that we are closing
@closing = true

Rdkafka::Bindings.rd_kafka_destroy(@inner)
if @polling_thread
# Indicate to polling thread that we're closing
@polling_thread[:closing] = true
# Wait for the polling thread to finish up
@polling_thread.join
end

# Destroy the client
Rdkafka::Bindings.rd_kafka_destroy_flags(
@inner,
Rdkafka::Bindings::RD_KAFKA_DESTROY_F_IMMEDIATE
)
@inner = nil
end
end
Expand Down
16 changes: 14 additions & 2 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,23 @@ def delivery_callback=(callback)

# Close this producer and wait for the internal poll queue to empty.
def close
return if closed?
ObjectSpace.undefine_finalizer(self)

@native_kafka.close
end

# Whether this producer has closed
def closed?
@native_kafka.closed?
end

# Wait until all outstanding producer requests are completed, with the given timeout
# in seconds. Call this before closing a producer to ensure delivery of all messages.
def flush(timeout_ms=5_000)
closed_producer_check(__method__)
Rdkafka::Bindings.rd_kafka_flush(@native_kafka.inner, timeout_ms)
end

# Partition count for a given topic.
# NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
#
Expand Down Expand Up @@ -173,7 +185,7 @@ def arity(callback)

private
def closed_producer_check(method)
raise Rdkafka::ClosedProducerError.new(method) if @native_kafka.closed?
raise Rdkafka::ClosedProducerError.new(method) if closed?
end
end
end
2 changes: 1 addition & 1 deletion spec/rdkafka/consumer/message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
end

after(:each) do
Rdkafka::Bindings.rd_kafka_destroy(native_client)
Rdkafka::Bindings.rd_kafka_destroy_flags(native_client, Rdkafka::Bindings::RD_KAFKA_DESTROY_F_IMMEDIATE)
end

subject { Rdkafka::Consumer::Message.new(native_message) }
Expand Down
2 changes: 1 addition & 1 deletion spec/rdkafka/metadata_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

after do
Rdkafka::Bindings.rd_kafka_consumer_close(native_kafka)
Rdkafka::Bindings.rd_kafka_destroy(native_kafka)
Rdkafka::Bindings.rd_kafka_destroy_flags(native_kafka, Rdkafka::Bindings::RD_KAFKA_DESTROY_F_IMMEDIATE)
end

context "passing in a topic name" do
Expand Down
Loading