Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Add EM::PriorityQueue class for sorted queue processing. #244

Open
wants to merge 3 commits into from

5 participants

@dgraham

I needed a PriorityQueue that behaved just like EM::Queue except that it would sort its items. This was easy to do by subclassing EM::Queue and substituting a binary max heap in the @items instance variable.

The code is tested and working well. What do you guys think about including this with EventMachine?

@tmm1
Owner

I'm hesitant to include this- it probably belongs in a separate gem.

@raggi: thoughts?

@raggi
Owner

I like it, inclusion would be useful to those writing resource balanced apps.

@raggi
Owner

@dgraham : could this be made to just use Set instead of implementing it's own Heap?

@dgraham

It could use SortedSet, but it would be ugly because we'd need to wrap every object added to the queue with another object that always returns false in response to calls to eql?. The ideal solution is to use the rbtree gem, but I assumed we wanted to keep EM's dependencies to a minimum. So, I wrote the Heap class instead.

Here's an implementation of PriorityQueue using RBTree as a soft dependency. The gem only gets loaded if you use PriorityQueue. What do you think?

https://gist.github.com/1260596

@dgraham dgraham Sort items lowest to highest with a min heap to match the behavior of…
… other sorted data structures like RBTree and Array#sort.
6be790b
@stakach

Would be useful for me too.
As long as FIFO is maintained at every priority level.

I was originally using https://github.com/kanwei/algorithms however the order at any priority level is not always FIFO.

@dgraham

@raggi Any thoughts on the rbtree version: https://gist.github.com/1260596 ?

@mikelewis

As a side note, I created an em-priority-queue gem a few months before this pull request was created.

https://rubygems.org/gems/em-priority-queue

@stakach

The problem with em-priority-queue is that it relies on the algorithms gem which uses a Fibonacci heap and hence does not maintain FIFO ordering for a particular priority level.

@q.push("Checking1", 20)
@q.push("Other1", 16)
@q.push("Other2", 18)
@q.push("Checking2", 20)
@q.push("Checking3", 20)
@q.push("Checking4", 20)
@q.push("Checking5", 20)
@q.push("Checking6", 20)
@q.push("Other3", 33)
@q.push("Checking7", 20)
@q.push("Checking8", 20)

11.times do
    @q.pop do |e|
        responses << e
        EM.stop if responses.size == 4
    end
end

You'll find that the order is not maintained for the priority 20 items.
This seems to be an issue for all tree based algorithms that don't maintain a heap at individual priority levels. I haven't looked at how rbtree works.

@dgraham

RBTree doesn't maintain ordering within a priority level either, but we can just include a timestamp in the priority like this.

@q.push("Checking1", [20, Time.now])
@q.push("Other1", [16, Time.now])
@q.push("Other2", [18, Time.now])
@q.push("Checking2", [20, Time.now])
@q.push("Checking3", [20, Time.now])
@q.push("Checking4", [20, Time.now])
@q.push("Checking5", [20, Time.now])
@q.push("Checking6", [20, Time.now])
@q.push("Other3", [33, Time.now])
@q.push("Checking7", [20, Time.now])
@q.push("Checking8", [20, Time.now])

11.times do
  @q.pop do |e|
    responses << e
    EM.stop if responses.size == 4
  end
end
@stakach

Then rbtree would always return the earliest time?
If that is the case, awesome, and I'd be happy for its inclusion and will start using it in my projects now.

Although platform support may be a problem? jRuby? Not that this is an issue for me.

@mikelewis

@stakach, the newest version of em-priority-queue has FIFO built in.

@stakach

Awesome!
Just installed your gem!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 30, 2011
  1. @dgraham
  2. @dgraham
Commits on Oct 5, 2011
  1. @dgraham

    Sort items lowest to highest with a min heap to match the behavior of…

    dgraham authored
    … other sorted data structures like RBTree and Array#sort.
This page is out of date. Refresh to see the latest.
View
92 lib/em/priority_queue.rb
@@ -0,0 +1,92 @@
+# encoding: UTF-8
+
+module EventMachine
+
+ # Behaves just like EM::Queue with the exception that items added to the queue
+ # are sorted and popped from the queue by their priority. The optional comparator
+ # block passed in the constructor determines element priority, with items sorted
+ # lowest to highest. If no block is provided, the elements' natural ordering,
+ # via <=>, is used.
+ #
+ # @example
+ #
+ # q = EM::PriorityQueue.new do |a, b|
+ # a[:priority] <=> b[:priority]
+ # end
+ # q.push({:priority => 3, :msg => 'three'})
+ # q.push({:priority => 2, :msg => 'two'})
+ # q.push({:priority => 1, :msg => 'one'})
+ # 3.times do
+ # q.pop {|item| puts item[:msg] }
+ # end
+ # #=> "one"
+ # # "two"
+ # # "three"
+ #
+ class PriorityQueue < Queue
+ def initialize(&comparator)
+ super
+ @items = Heap.new(&comparator)
+ end
+
+ # A binary min heap implementation for efficient storage of queue items. This
+ # class implements the Array methods called by EM::Queue so that it may
+ # replace the +@items+ instance variable. Namely, +push+ +shift+, +size+, and
+ # +empty?+ are implemented.
+ class Heap
+ def initialize(*items, &comp)
+ @heap = []
+ @comp = comp || proc {|a, b| a <=> b }
+ push(*items)
+ end
+
+ def push(*items)
+ items.each do |item|
+ @heap << item
+ move_up(@heap.size - 1)
+ end
+ end
+ alias :<< :push
+
+ def pop
+ return if @heap.empty?
+ root = @heap[0]
+ @heap[0] = @heap[-1]
+ @heap.pop
+ move_down(0)
+ root
+ end
+ alias :shift :pop
+
+ def size
+ @heap.size
+ end
+
+ def empty?
+ @heap.empty?
+ end
+
+ private
+
+ def move_down(k)
+ left = 2 * k + 1
+ right = 2 * k + 2
+ return if left > (@heap.size - 1)
+ smaller = (right < @heap.size && @comp[@heap[right], @heap[left]] < 0) ? right : left
+ if @comp[@heap[k], @heap[smaller]] > 0
+ @heap[k], @heap[smaller] = @heap[smaller], @heap[k]
+ move_down(smaller)
+ end
+ end
+
+ def move_up(k)
+ return if k == 0
+ parent = (k - 1) / 2
+ if @comp[@heap[k], @heap[parent]] < 0
+ @heap[k], @heap[parent] = @heap[parent], @heap[k]
+ move_up(parent)
+ end
+ end
+ end
+ end
+end
View
1  lib/eventmachine.rb
@@ -26,6 +26,7 @@
require 'em/connection'
require 'em/callback'
require 'em/queue'
+require 'em/priority_queue'
require 'em/channel'
require 'em/file_watch'
require 'em/process_watch'
View
67 tests/test_priority_queue.rb
@@ -0,0 +1,67 @@
+require 'em_test_helper'
+
+class TestEMPriorityQueue < Test::Unit::TestCase
+ def test_queue_push
+ s = 0
+ EM.run do
+ q = EM::PriorityQueue.new
+ q.push(1)
+ EM.next_tick { s = q.size; EM.stop }
+ end
+ assert_equal 1, s
+ end
+
+ def test_queue_pop
+ x,y,z = nil
+ EM.run do
+ q = EM::PriorityQueue.new
+ q.push(3,1,2)
+ q.pop { |v| x = v }
+ q.pop { |v| y = v }
+ q.pop { |v| z = v; EM.stop }
+ end
+ assert_equal 1, x
+ assert_equal 2, y
+ assert_equal 3, z
+ end
+
+ def test_queue_reactor_thread
+ q = EM::PriorityQueue.new
+
+ Thread.new { q.push(3,1,2) }.join
+ assert q.empty?
+ EM.run { EM.next_tick { EM.stop } }
+ assert_equal 3, q.size
+
+ x = nil
+ Thread.new { q.pop { |v| x = v } }.join
+ assert_equal nil, x
+ EM.run { EM.next_tick { EM.stop } }
+ assert_equal 1, x
+ end
+
+ def test_num_waiting
+ q = EM::PriorityQueue.new
+ many = 3
+ many.times { q.pop {} }
+ EM.run { EM.next_tick { EM.stop } }
+ assert_equal many, q.num_waiting
+ end
+
+ def test_queue_push_sorts
+ queue = EM::PriorityQueue.new
+ nums = (0..10_000).map {|i| rand(10_000) }
+ popped = []
+ EM.run do
+ nums.each {|num| queue.push(num) }
+ assert_equal nums.size, queue.size
+ (nums.size - 1).times do
+ queue.pop {|item| popped << item }
+ end
+ queue.pop {|item| popped << item; EM.stop }
+ end
+ assert queue.empty?
+ assert_equal 0, queue.size
+ assert_equal nums.sort, popped
+ end
+end
Something went wrong with that request. Please try again.