Permalink
Browse files

big bug fix for delayed work unit processing.

  • Loading branch information...
1 parent 0b13220 commit e785aac9890d6c141c7929bfc2ddb8a706b29a3c @jashkenas jashkenas committed Mar 4, 2011
Showing with 8 additions and 8 deletions.
  1. +8 −8 lib/cloud_crowd/models/work_unit.rb
@@ -39,17 +39,21 @@ class WorkUnit < ActiveRecord::Base
# action in question disabled.
def self.distribute_to_nodes
reservation = nil
- filter = {}
loop do
+ # Find the available nodes, and determine what actions we're capable
+ # of running at the moment.
+ available_nodes = NodeRecord.available
+ available_actions = available_nodes.map {|node| node.actions }.flatten.uniq
+ filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})"
+
# Reserve a handful of available work units.
WorkUnit.cancel_reservations(reservation) if reservation
return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter)
work_units = WorkUnit.reserved(reservation)
# Round robin through the nodes and units, sending the unit if the node
# is able to process it.
- available_nodes = NodeRecord.available
while (node = available_nodes.shift) && (unit = work_units.shift) do
if node.actions.include?(unit.action)
if node.send_work_unit(unit)
@@ -62,12 +66,8 @@ def self.distribute_to_nodes
work_units.push(unit)
end
- # If there are both units and nodes left over, make sure that the next
- # time around, we only select units that can currently be served.
- if work_units.any? && available_nodes.any?
- filter = {:action => available_nodes.map {|node| node.actions }.flatten.uniq }
- next
- end
+ # If there are both units and nodes left over, try again.
+ next if work_units.any? && available_nodes.any?
# If we still have units at this point, or we're fresh out of nodes,
# that means we're done.

0 comments on commit e785aac

Please sign in to comment.