Permalink
Browse files

merge kvs.rb

  • Loading branch information...
2 parents 2b741c7 + d71b4d2 commit 4bc08effeaca7d587a0234b6be4553b956352897 @palvaro palvaro committed Nov 5, 2011
View
@@ -2,8 +2,8 @@ Bloom^ Filesystem
---------------------------------------------------------------------------
BFS implements a chunked, distributed filesystem (mostly) in the Bloom
-language. BFS is architecturally based on BOOMFS, which is itself based on
-the Google Filesystem (GFS). As in GFS, a single master node manages
+language. BFS is architecturally based on BOOMFS [1], which is itself based
+on the Google Filesystem (GFS) [2]. As in GFS, a single master node manages
filesystem metadata, while data blocks are replicated and stored on a large
number of storage nodes. Writing or reading data involves a multi-step
protocol in which clients interact with the master, retrieving metadata and
@@ -32,7 +32,7 @@ HBMaster collects heartbeat messages from storage nodes and maintains
summary information about chunk distribution and node state.
BFSDatanode represents the storage node logic. It is currently fairly
-trivial: it polls the data directory for copies of chunks, and uses
+trivial: it polls the data directory for copies of chunks, and uses the
HeartbeatProtocol to communicate its list of local chunks to the master.
BFSDataProtocol implements the data transfer protocol run on clients and
@@ -47,6 +47,9 @@ BFSDataProtocol.
BFSMaster glues together the ChunkedFSMaster API with the BFSClient API.
+[1] Alvaro, P., Condie, T., Conway, N., Elmeleegy, K., Hellerstein, J. M., &
+Sears, R. (2010). BOOM Analytics: exploring data-centric, declarative
+programming for the cloud. EuroSys.
----------------------------------------------------------------------------
-^ Coming soon: BUD, an alpha release of the Bloom language. (3/25/2011)
+[2] Ghemawat, S., Gobioff, H., & Leung, S.-T. (2003). The Google File
+System. SOSP.
View
@@ -58,7 +58,7 @@ def start_background_task_thread
end
end
- def stop_bg
+ def stop
unregister_callback(@task)
super
end
@@ -67,4 +67,3 @@ def bg_besteffort_request(c, o, r)
DataProtocolClient.send_replicate(c, r, o)
end
end
-
View
@@ -64,6 +64,6 @@ def initialize(dataport=nil, opts={})
def stop_datanode
@dp_server.stop_server
- stop_bg
+ stop
end
end
@@ -10,7 +10,7 @@ module DestructiveCart
include KVSProtocol
def delete_one(a, i)
- a.delete_at(a.index(i))
+ a.delete_at(a.index(i)) unless a.index(i).nil?
return a
end
@@ -55,6 +55,6 @@ module ReplicatedDisorderlyCart
bloom :replicate do
send_mcast <= action_msg {|a| [a.reqid, [a.session, a.reqid, a.item, a.action]]}
cart_action <= mcast_done {|m| m.payload}
- cart_action <= pipe_chan {|c| c.payload}
+ cart_action <= pipe_out {|c| c.payload}
end
end
@@ -1,5 +1,3 @@
-require 'rubygems'
-require 'bud'
require 'delivery/delivery'
#randomly reorders messages, but reports success upon first send
@@ -37,17 +35,22 @@ module DastardlyDelivery
end
bloom :done do
+ # Report success immediately
pipe_sent <= pipe_in
end
bloom :snd do
temp :do_send <= (buf.argagg(:choose_rand, [], :whenbuf)*max_delay).pairs do |b,d|
- if (buf.length != 1) || (@budtime - b.whenbuf >= d.delay):
- b
+ if (buf.length != 1) || (@budtime - b.whenbuf >= d.delay)
+ b
end
end
buf <- do_send
pipe_chan <~ do_send { |s| s.msg }
end
+
+ bloom :rcv do
+ pipe_out <= pipe_chan
+ end
end
View
@@ -1,10 +1,15 @@
-require 'rubygems'
-require 'bud'
-
module DeliveryProtocol
state do
+ # At the sender side, used to request that a new message be delivered. The
+ # recipient address is given by the "dst" field.
interface input, :pipe_in, [:dst, :src, :ident] => [:payload]
+
+ # At the sender side, the transport protocol will insert a corresponding
+ # "pipe_sent" fact when a message has been delivered.
interface output, :pipe_sent, [:dst, :src, :ident] => [:payload]
+
+ # At the recipient side, this indicates that a new message has been delivered.
+ interface output, :pipe_out, [:dst, :src, :ident] => [:payload]
end
end
@@ -19,6 +24,10 @@ module BestEffortDelivery
pipe_chan <~ pipe_in
end
+ bloom :rcv do
+ pipe_out <= pipe_chan
+ end
+
bloom :done do
# Report success immediately -- this implementation of "best effort" is more
# like "an effort".
@@ -1,5 +1,3 @@
-require 'rubygems'
-require 'bud'
require 'delivery/delivery'
#intentionally drops messages, but reports success
@@ -26,7 +24,6 @@ module DemonicDelivery
bloom :control do
drop_pct <+- set_drop_pct
-# drop_pct <+ set_drop_pct
end
bloom :snd do
@@ -37,6 +34,10 @@ module DemonicDelivery
end
end
+ bloom :rcv do
+ pipe_out <= pipe_chan
+ end
+
bloom :done do
pipe_sent <= pipe_in
end
View
@@ -1,13 +1,12 @@
-require 'rubygems'
-require 'bud'
-
-require 'delivery/reliable_delivery'
+require 'delivery/reliable'
require 'voting/voting'
require 'membership/membership.rb'
module MulticastProtocol
include MembershipProtocol
+ # XXX: This should provide an interface for the recipient-side to read the
+ # multicast w/o needing to peek at pipe_in.
state do
interface input, :send_mcast, [:ident] => [:payload]
interface output, :mcast_done, [:ident] => [:payload]
@@ -50,8 +49,6 @@ module ReliableMulticast
end
bloom :done_mcast do
- mcast_done <= vote_status do |v|
- "VEE: " + v.inspect
- end
+ mcast_done <= victor {|v| v.content}
end
end
@@ -1,8 +1,6 @@
-require 'rubygems'
-require 'bud'
require 'delivery/delivery'
-# Note that this provides at-least-once semantics. If you need exactly-once, the
+# Note that this provides at-least-once delivery. If you need exactly-once, the
# receiver-side can record the message IDs that have been received to avoid
# processing duplicate messages.
module ReliableDelivery
@@ -21,8 +19,9 @@ module ReliableDelivery
bed.pipe_in <= (buf * clock).lefts
end
- bloom :send_ack do
- ack <~ bed.pipe_chan {|p| [p.src, p.dst, p.ident]}
+ bloom :rcv do
+ pipe_out <= bed.pipe_out
+ ack <~ bed.pipe_out {|p| [p.src, p.dst, p.ident]}
end
bloom :done do
View
@@ -1,6 +1,6 @@
require 'rubygems'
require 'bud'
-require 'delivery/reliable_delivery'
+require 'delivery/reliable'
require 'delivery/multicast'
require 'ordering/nonce'
require 'ordering/serializer'
@@ -94,7 +94,7 @@ module ReplicatedKVS
end
# if I am a replica, store the payload of the multicast
- kvs.kvput <= pipe_chan do |d|
+ kvs.kvput <= pipe_out do |d|
if d.payload.fetch(1) != @addy and d.payload[0] == "put"
puts "PL is #{d.payload[1]} class #{d.payload[1].class} siz #{d.payload.length} and PL izz #{d.payload[0]} class #{d.payload[0].class}"
#puts "PL is #{d.payload} class #{d.payload.class} siz #{d.payload.length}"
@@ -117,7 +117,7 @@ module ReplicatedKVS
end
end
- kvs.kvdel <= pipe_chan do |d|
+ kvs.kvdel <= pipe_out do |d|
if d.payload.fetch(1) != @addy and d.payload[0] == "del"
d.payload[1]
end
@@ -11,7 +11,6 @@ def run_cart(program, client, actions=3)
contact.client_action <+ [[addy, 1234, 125, 'diapers', 'Add']]
contact.client_action <+ [[addy, 1234, 126, 'meat', 'Del']]
-
(0..actions).each do |i|
contact.client_action <+ [[addy, 1234, 127 + i, 'beer', 'Add']]
end
@@ -8,18 +8,18 @@ class BED
import BestEffortDelivery => :bed
state do
- table :pipe_chan_perm, bed.pipe_chan.schema
+ table :pipe_out_perm, bed.pipe_out.schema
table :pipe_sent_perm, bed.pipe_sent.schema
- scratch :got_pipe, bed.pipe_chan.schema
+ scratch :got_pipe, bed.pipe_out.schema
# XXX: only necessary because we don't rewrite sync_do blocks
scratch :send_msg, bed.pipe_in.schema
end
bloom do
pipe_sent_perm <= bed.pipe_sent
- pipe_chan_perm <= bed.pipe_chan
- got_pipe <= bed.pipe_chan
+ pipe_out_perm <= bed.pipe_out
+ got_pipe <= bed.pipe_out
bed.pipe_in <= send_msg
end
end
@@ -38,7 +38,7 @@ def broken_test_besteffort_delivery
assert_equal(1, rd.pipe_sent_perm.length)
assert_equal(sendtup, rd.pipe_sent_perm.first)
}
- rd.stop_bg
+ rd.stop
end
def test_bed_delivery
@@ -65,9 +65,9 @@ def test_bed_delivery
tuples.length.times { q.pop }
rcv.sync_do {
- assert_equal(tuples.sort, rcv.pipe_chan_perm.to_a.sort)
+ assert_equal(tuples.sort, rcv.pipe_out_perm.to_a.sort)
}
- snd.stop_bg
- rcv.stop_bg
+ snd.stop
+ rcv.stop
end
end
View
@@ -76,8 +76,8 @@ def test_directorystuff1
assert_equal(["3"], four)
#dump_internal_state(b)
- b.stop_bg
- s.stop_bg
+ b.stop
+ s.stop
end
def dump_internal_state(rt)
@@ -94,7 +94,7 @@ def test_fsmaster
b = FSC.new(@opts)
b.run_bg
do_basic_fs_tests(b)
- b.stop_bg
+ b.stop
end
def test_rms
@@ -129,8 +129,8 @@ def test_rms
m.sync_do
m.sync_do
- m.stop_bg
- b.stop_bg
+ m.stop
+ b.stop
end
def new_datanode(dp, master_port)
@@ -150,8 +150,8 @@ def tnest_addchunks
do_addchunks(b)
dn.stop_datanode
- #dn2.stop_bg
- b.stop_bg
+ #dn2.stop
+ b.stop
end
def assert_resp(inst, reqid, data)
View
@@ -91,7 +91,7 @@ def test_replicated_destructive_cart
rep.run_bg
#rep2.run_bg
cart_test_dist(prog, cli, rep, rep2)
- rep.stop_bg
+ rep.stop
end
def test_replicated_disorderly_cart
@@ -104,7 +104,7 @@ def test_replicated_disorderly_cart
rep.run_bg
#rep2.run_bg
cart_test_dist(prog, cli, rep, rep2)
- rep.stop_bg
+ rep.stop
end
def test_destructive_cart
@@ -142,7 +142,7 @@ def cart_test_internal(program, dotest, client=nil, *others)
# temporarily disabled.
#assert_equal(4, cli.memo.first.array.length, "crap, i got #{cli.memo.first.inspect}") if dotest
}
- program.stop_bg
+ program.stop
end
def add_members(b, *hosts)
Oops, something went wrong.

0 comments on commit 4bc08ef

Please sign in to comment.