Skip to content

Commit

Permalink
Ditch reconnect logic in favor of rebuilding dropped connections
Browse files Browse the repository at this point in the history
We've narrowed down a memory leak issue to this driver, and this represents a
rather blunt solution to the problem.

My best guess is that it's possible for the Kafka connection to end up in a
disconnected state without any active reconnects (I can't reason how, but it
doesn't seem farfetched). In that case, sending data through the connection
appears to simply grow a buffer without limit.

This circumvents that possibility by simple re-creating the connection object
whenever a drop occurs.
  • Loading branch information
daveyeu committed Oct 19, 2012
1 parent 2662bfb commit 7095fac
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 51 deletions.
53 changes: 4 additions & 49 deletions lib/em-kafka/client.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
module EventMachine
module Kafka
class Client
include EventMachine::Kafka::EventEmitter
include EM::Deferrable

def initialize(host, port)
@host = host || 'localhost'
@port = port || 9092
@closing_connection = false
@callback = nil
end

def send_data(data)
connect if @connection.nil? || @connection.disconnected?
@connection.send_data(data)
end

Expand All @@ -20,58 +17,16 @@ def on_data(&block)
end

def connect
@connection = EM.connect(@host, @port, EM::Kafka::Connection, @host, @port)

@connection.on(:closed) do
if @connected
@deferred_status = nil
@connected = false
unless @closing_connection
@reconnecting = true
reconnect
end
else
unless @closing_connection
EM.add_timer(1) { reconnect }
end
end
end

@connection.on(:connected) do
@connected = true
succeed

if @reconnecting
@reconnecting = false
emit(:reconnected)
end
end

@connection = EM.connect(@host, @port, EM::Kafka::Connection)
@connection.on(:message) do |message|
@callback.call(message)
@callback.call(message) if @callback
end

@connected = false
@reconnecting = false

return self
end

def connected?
@connected
@connection
end

def close_connection
@closing_connection = true
@connection.close_connection_after_writing
end

private

def reconnect
EventMachine::Kafka.logger.debug("Trying to reconnect to Kafka")
@connection.reconnect @host, @port
end
end
end
end
9 changes: 7 additions & 2 deletions lib/em-kafka/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ module EventMachine::Kafka
class Connection < EM::Connection
include EventMachine::Kafka::EventEmitter

def initialize(host, port)
def initialize(*args)
super
@host, @port = host, port
@disconnected = false
end

def disconnected?
@disconnected
end

def connection_completed
Expand All @@ -17,6 +21,7 @@ def receive_data(data)
end

def unbind
@disconnected = true
EventMachine::Kafka.logger.info("Disconnected from Kafka")
emit(:closed)
end
Expand Down

0 comments on commit 7095fac

Please sign in to comment.