Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

deliver messages at each superstep

  • Loading branch information...
commit a985fe82f79f16b2f1497afeb00baaf173345523 1 parent 4fa4399
@igrigorik authored
View
7 lib/pregel.rb
@@ -8,7 +8,7 @@ class PostOffice
include Singleton
def initialize
- @mailboxes = Hash.new([])
+ @mailboxes = Hash.new
@mutex = Mutex.new
end
@@ -24,10 +24,9 @@ def deliver(to, msg)
def read(box)
@mutex.synchronize do
- msgs = @mailboxes[box]
+ msgs = @mailboxes[box] || []
@mailboxes.clear
+ msgs
end
-
- msgs
end
end
View
11 lib/pregel/vertex.rb
@@ -1,12 +1,13 @@
module Pregel
class Vertex
attr_reader :id
- attr_accessor :value
+ attr_accessor :value, :messages
def initialize(id, value, *outedges)
@id = id
@value = value
@outedges = outedges
+ @messages = []
@active = true
@superstep = 0
end
@@ -20,7 +21,6 @@ def deliver_to_all_neighbors(msg)
end
def deliver(to, msg)
- p [:deliver_to, to, msg]
PostOffice.instance.deliver(to, msg)
end
@@ -29,12 +29,13 @@ def step
compute
end
- def compute; end
+ def halt; @active = false; end
+ def active!; @active = true; end
+ def active?; @active; end
- def halt; @active = false; end
- def active?; @active; end
def superstep; @superstep; end
def neighbors; @outedges; end
+ def compute; end
end
end
View
5 lib/pregel/worker.rb
@@ -10,6 +10,11 @@ def initialize(graph = [])
def superstep
Thread.new do
+ @vertices.each do |v|
+ v.messages = PostOffice.instance.read(v.id)
+ v.active! if v.messages.size > 0
+ end
+
active = @vertices.select {|v| v.active?}
active.each {|v| v.step}
View
6 spec/coordinator_spec.rb
@@ -36,14 +36,14 @@
pending
class PageRankVertex < Vertex
- def compute(msgs)
+ def compute
if superstep >= 1
- sum = msgs.collect(0) {|total,msg| total += msg; total }
+ sum = messages.collect(0) {|total,msg| total += msg; total }
@value = (0.15 / 3) + 0.85 * sum
end
if superstep < 30
- send_to_all_neighbors(@value / neighbors.size)
+ deliver_to_all_neighbors(@value / neighbors.size)
else
halt
end
View
16 spec/vertex_spec.rb
@@ -47,16 +47,10 @@ def compute; @value = 20; end
v.value.should == 20
end
- it 'should be able to send messages' do
- class V < Vertex
- def compute
- deliver(:a, :b)
- end
- end
-
- lambda do
- v = V.new(:a, 10)
- v.compute
- end.should_not raise_error
+ it 'should have an inbox for incoming messages' do
+ v.messages.size.should == 0
+ v.messages = [:a, :b]
+ v.messages.size.should == 2
end
+
end
View
35 spec/worker_spec.rb
@@ -18,6 +18,20 @@
lambda { Worker.new([]) }.should raise_error
end
+ it 'should partition graphs with variable worker sizes' do
+ graph = [
+ Vertex.new(:igvita, 1, :wikipedia),
+ Vertex.new(:wikipedia, 2, :google),
+ Vertex.new(:google, 1, :wikipedia)
+ ]
+
+ c = Coordinator.new(graph)
+ c.workers.size.should == 1
+
+ c = Coordinator.new(graph, partitions: 2)
+ c.workers.size.should == 2
+ end
+
it 'should execute an async superstep' do
# TODO: simulate async message delivery to worker by returning
# a thread per message
@@ -35,18 +49,13 @@
worker.active.should > 0
end
- # it 'should partition graphs with variable worker sizes' do
- # graph = [
- # Vertex.new(:igvita, 1, :wikipedia),
- # Vertex.new(:wikipedia, 2, :google),
- # Vertex.new(:google, 1, :wikipedia)
- # ]
- #
- # c = Coordinator.new(graph)
- # c.workers.size.should == 1
- #
- # c = Coordinator.new(graph, partitions: 2)
- # c.workers.size.should == 2
- # end
+ it 'should deliver messages to vertices at beginning of each superstep' do
+ PostOffice.instance.deliver(:igvita, 'hello')
+ worker.superstep.join
+
+ ig = worker.vertices.find {|v| v.id == :igvita }
+ ig.messages.size.should == 1
+ ig.messages.first.should == 'hello'
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.