Event Machine driver for Apache Kafka
Switch branches/tags
Nothing to show
Clone or download
daveyeu Ditch reconnect logic in favor of rebuilding dropped connections
We've narrowed down a memory leak issue to this driver, and this represents a
rather blunt solution to the problem.

My best guess is that it's possible for the Kafka connection to end up in a
disconnected state without any active reconnects (I can't reason how, but it
doesn't seem farfetched). In that case, sending data through the connection
appears to simply grow a buffer without limit.

This circumvents that possibility by simple re-creating the connection object
whenever a drop occurs.
Latest commit 7095fac Oct 19, 2012



EventMachine driver for Kafka.


When using Ruby objects, the payload is encoded to JSON

producer = EM::Kafka::Producer.new("kafka://topic@localhost:9092/0")
producer.deliver(:foo => "bar") # payload is {foo:"bar"}


consumer = EM::Kafka::Consumer.new("kafka://topic@localhost:9092/0")
consumer.consume do |message|
  puts message.payload


Messages are composed of:

  • a payload
  • a magic id (defaults to 0)

Change the magic id when the payload format changes:

EM::Kafka::Message.new("payload", 2)

Pass messages when you want to be specific:

message_1 = EM::Kafka::Message.new("payload_1", 2)
message_2 = EM::Kafka::Message.new("payload_2", 2)
producer.deliver([message_1, message_2])


Heavily influenced by / borrowed from:

  • kafka-rb (Alejandro Crosa)
  • em-hiredis (Martyn Loughran)