Skip to content

Commit

Permalink
Ensure errback is subscribed to in all async calls so errors can't ge…
Browse files Browse the repository at this point in the history
…t swollowed and synchrony style interactions won't hang
  • Loading branch information
Nathan committed Jul 7, 2011
1 parent 0feccee commit 9b8026a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 18 deletions.
36 changes: 28 additions & 8 deletions lib/em-mongo/cursor.rb
Expand Up @@ -125,14 +125,18 @@ def check_and_transform_document(doc, response)
# query string and the values for skip and limit, are preserved.
def rewind!
response = EM::DefaultDeferrable.new
close.callback do
close_resp = close
close_resp.callback do
@cache.clear
@cursor_id = nil
@closed = false
@query_run = false
@n_received = nil
response.succeed
end
close_resp.errback do |err|
response.fail err
end
response
end

Expand All @@ -141,7 +145,9 @@ def rewind!
# @return [Boolean]
def has_next?
response = EM::DefaultDeferrable.new
num_remaining.callback { |num| response.succeed( num > 0 ) }
num_resp = num_remaining
num_resp.callback { |num| response.succeed( num > 0 ) }
num_resp.errback { |err| response.fail err }
response
end

Expand Down Expand Up @@ -423,7 +429,9 @@ def convert_fields_for_query(fields)
def num_remaining
response = EM::DefaultDeferrable.new
if @cache.length == 0
refresh.callback { response.succeed(@cache.length) }
ref_resp = refresh
ref_resp.callback { response.succeed(@cache.length) }
ref_resp.errback { |err| response.fail err }
else
response.succeed(@cache.length)
end
Expand Down Expand Up @@ -454,11 +462,17 @@ def refresh
message.put_long(@cursor_id)

response = EM::DefaultDeferrable.new
@connection.send_command(EM::Mongo::OP_GET_MORE, message).callback do |resp|
cmd_resp = @connection.send_command(EM::Mongo::OP_GET_MORE, message)
cmd_resp.callback do |resp|
@cache += resp.docs
@n_received = resp.number_returned
@returned += @n_received
close_cursor_if_query_complete.callback { response.succeed }
close_resp = close_cursor_if_query_complete
close_resp.callback { response.succeed }
close_resp.errback { |err| response.fail err }
end
cmd_resp.errback do |err|
response.fail err
end
response
end
Expand All @@ -467,14 +481,18 @@ def refresh
def send_initial_query
response = EM::DefaultDeferrable.new
message = construct_query_message
@connection.send_command(EM::Mongo::OP_QUERY, message).callback do |resp|
cmd_resp = @connection.send_command(EM::Mongo::OP_QUERY, message)
cmd_resp.callback do |resp|
@cache += resp.docs
@n_received = resp.number_returned
@cursor_id = resp.cursor_id
@returned += @n_received
@query_run = true
close_cursor_if_query_complete.callback { response.succeed }
close_resp = close_cursor_if_query_complete
close_resp.callback { response.succeed }
close_resp.errback { |err| response.fail err }
end
cmd_resp.errback { |err| response.fail err }
response
end

Expand Down Expand Up @@ -525,7 +543,9 @@ def to_s
def close_cursor_if_query_complete
response = EM::DefaultDeferrable.new
if @limit > 0 && @returned >= @limit
close.callback { response.succeed }
close_resp = close
close_resp.callback { response.succeed }
close_resp.errback { |err| response.fail err }
else
response.succeed
end
Expand Down
40 changes: 30 additions & 10 deletions lib/em-mongo/database.rb
Expand Up @@ -36,12 +36,14 @@ def name
# @return [Array]
def collection_names
response = EM::DefaultDeferrable.new
collections_info.to_a.callback do |docs|
name_resp = collections_info.to_a
name_resp.callback do |docs|
names = docs.collect{ |doc| doc['name'] || '' }
names = names.delete_if {|name| name.index(self.name).nil? || name.index('$')}
names = names.map{ |name| name.sub(self.name + '.','')}
response.succeed(names)
end
name_resp.errback { |err| response.fail err }
response
end

Expand All @@ -50,11 +52,13 @@ def collection_names
# @return [Array<Mongo::Collection>]
def collections
response = EM::DefaultDeferrable.new
collection_names.callback do |names|
name_resp = collection_names
name_resp.callback do |names|
response.succeed names.map do |name|
EM::Mongo::Collection.new(@db_name, name, @em_connection)
end
end
name_resp.errback { |err| response.fail err }
response
end

Expand Down Expand Up @@ -95,22 +99,26 @@ def collections_info(coll_name=nil)
# @return [Mongo::Collection]
def create_collection(name)
response = EM::DefaultDeferrable.new
collection_names.callback do |names|
names_resp = collection_names
names_resp.callback do |names|
if names.include?(name.to_s)
response.succeed EM::Mongo::Collection.new(@db_name, name, @em_connection)
end

# Create a new collection.
oh = BSON::OrderedHash.new
oh[:create] = name
command(oh).callback do |doc|
cmd_resp = command(oh)
cmd_resp.callback do |doc|
if EM::Mongo::Support.ok?(doc)
response.succeed EM::Mongo::Collection.new(@db_name, name, @em_connection)
else
response.fail [MongoDBError, "Error creating collection: #{doc.inspect}"]
end
end
cmd_resp.errback { |err| response.fail err }
end
names_resp.errback { |err| response.fail err }
response
end

Expand All @@ -121,15 +129,19 @@ def create_collection(name)
# @return [Boolean] +true+ on success or +false+ if the collection name doesn't exist.
def drop_collection(name)
response = EM::DefaultDeferrable.new
collection_names.callback do |names|
names_resp = collection_names
names_resp.callback do |names|
if names.include?(name.to_s)
command(:drop=>name).callback do |doc|
cmd_resp = command(:drop=>name)
cmd_resp.callback do |doc|
response.succeed EM::Mongo::Support.ok?(doc)
end
cmd_resp.errback { |err| response.fail err }
else
response.succeed true
end
end
names_resp.errback { |err| response.fail err }
response
end

Expand All @@ -147,13 +159,15 @@ def get_last_error(opts={})
cmd = BSON::OrderedHash.new
cmd[:getlasterror] = 1
cmd.merge!(opts)
command(cmd, :check_response => false).callback do |doc|
cmd_resp = command(cmd, :check_response => false)
cmd_resp.callback do |doc|
if EM::Mongo::Support.ok?(doc)
response.succeed doc
else
response.fail [MongoDBError, "error retrieving last error: #{doc.inspect}"]
end
end
cmd_resp.errback { |err| response.fail err }
response
end

Expand Down Expand Up @@ -245,7 +259,8 @@ def command(selector, opts={})

def authenticate(username, password)
response = EM::DefaultDeferrable.new
self.collection(SYSTEM_COMMAND_COLLECTION).first({'getnonce' => 1}).callback do |res|
auth_resp = self.collection(SYSTEM_COMMAND_COLLECTION).first({'getnonce' => 1})
auth_resp.callback do |res|
if not res or not res['nonce']
response.succeed false
else
Expand All @@ -255,25 +270,30 @@ def authenticate(username, password)
auth['nonce'] = res['nonce']
auth['key'] = EM::Mongo::Support.auth_key(username, password, res['nonce'])

self.collection(SYSTEM_COMMAND_COLLECTION).first(auth).callback do |res|
auth_resp2 = self.collection(SYSTEM_COMMAND_COLLECTION).first(auth)
auth_resp2.callback do |res|
if EM::Mongo::Support.ok?(res)
response.succeed true
else
response.fail res
end
end
auth_resp2.errback { |err| response.fail err }
end
end
auth_resp.errback { |err| response.fail err }
response
end

def add_user(username, password)
response = EM::DefaultDeferrable.new
self.collection(SYSTEM_USER_COLLECTION).first({:user => username}).callback do |res|
user_resp = self.collection(SYSTEM_USER_COLLECTION).first({:user => username})
user_resp.callback do |res|
user = res || {:user => username}
user['pwd'] = EM::Mongo::Support.hash_password(username, password)
response.succeed self.collection(SYSTEM_USER_COLLECTION).save(user)
end
user_resp.errback { |err| response.fail err }
response
end

Expand Down

0 comments on commit 9b8026a

Please sign in to comment.