Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
118 lines (100 sloc) 3.27 KB
begin
require 'em-hiredis'
rescue LoadError => error
raise 'Missing EM-Synchrony dependency: gem install em-hiredis'
end
module EventMachine
module Hiredis
def self.connect(uri = nil)
client = setup(uri)
EM::Synchrony.sync client.connect
client
end
class Client
def pubsub
return @pubsub if @pubsub
client = PubsubClient.new(@host, @port, @password, @db)
EM::Synchrony.sync client.connect
@pubsub = client
end
end
class BaseClient
def self.connect(host = 'localhost', port = 6379)
conn = new(host, port)
EM::Synchrony.sync conn.connect
conn
end
def connect
@auto_reconnect = true
@connection = EM.connect(@host, @port, Connection, @host, @port)
@connection.on(:closed) do
if @connected
@defs.each { |d| d.fail(Error.new("Redis disconnected")) }
@defs = []
@deferred_status = nil
@connected = false
if @auto_reconnect
# Next tick avoids reconnecting after for example EM.stop
EM.next_tick { reconnect }
end
emit(:disconnected)
EM::Hiredis.logger.info("#{@connection} Disconnected")
else
if @auto_reconnect
@reconnect_failed_count += 1
@reconnect_timer = EM.add_timer(EM::Hiredis.reconnect_timeout) {
@reconnect_timer = nil
reconnect
}
emit(:reconnect_failed, @reconnect_failed_count)
EM::Hiredis.logger.info("#{@connection} Reconnect failed")
if @reconnect_failed_count >= 4
emit(:failed)
self.fail(Error.new("Could not connect after 4 attempts"))
end
end
end
end
@connection.on(:connected) do
Fiber.new do
@connected = true
@reconnect_failed_count = 0
@failed = false
select(@db) unless @db == 0
auth(@password) if @password
@command_queue.each do |df, command, args|
@connection.send_command(command, args)
@defs.push(df)
end
@command_queue = []
emit(:connected)
EM::Hiredis.logger.info("#{@connection} Connected")
succeed
if @reconnecting
@reconnecting = false
emit(:reconnected)
end
end.resume
end
@connection.on(:message) do |reply|
if RuntimeError === reply
raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
deferred = @defs.shift
error = RedisError.new(reply.message)
error.redis_error = reply
deferred.fail(error) if deferred
else
handle_reply(reply)
end
end
@connected = false
@reconnecting = false
return self
end
alias :old_method_missing :method_missing
def method_missing(sym, *args)
EM::Synchrony.sync old_method_missing(sym, *args)
end
end
end
end
Jump to Line
Something went wrong with that request. Please try again.