Skip to content

Commit

Permalink
big bug fix for delayed work unit processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
jashkenas committed Mar 4, 2011
1 parent 0b13220 commit e785aac
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions lib/cloud_crowd/models/work_unit.rb
Expand Up @@ -39,17 +39,21 @@ class WorkUnit < ActiveRecord::Base
# action in question disabled.
def self.distribute_to_nodes
reservation = nil
filter = {}
loop do

# Find the available nodes, and determine what actions we're capable
# of running at the moment.
available_nodes = NodeRecord.available
available_actions = available_nodes.map {|node| node.actions }.flatten.uniq
filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})"

# Reserve a handful of available work units.
WorkUnit.cancel_reservations(reservation) if reservation
return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter)
work_units = WorkUnit.reserved(reservation)

# Round robin through the nodes and units, sending the unit if the node
# is able to process it.
available_nodes = NodeRecord.available
while (node = available_nodes.shift) && (unit = work_units.shift) do
if node.actions.include?(unit.action)
if node.send_work_unit(unit)
Expand All @@ -62,12 +66,8 @@ def self.distribute_to_nodes
work_units.push(unit)
end

# If there are both units and nodes left over, make sure that the next
# time around, we only select units that can currently be served.
if work_units.any? && available_nodes.any?
filter = {:action => available_nodes.map {|node| node.actions }.flatten.uniq }
next
end
# If there are both units and nodes left over, try again.
next if work_units.any? && available_nodes.any?

# If we still have units at this point, or we're fresh out of nodes,
# that means we're done.
Expand Down

0 comments on commit e785aac

Please sign in to comment.