Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Branch: master
Fetching contributors…

Cannot retrieve contributors at this time

79 lines (65 sloc) 2.244 kB
require 'rubygems'
require 'bud'
require 'time'
require 'membership/membership'
HB_EXPIRE = 4.0
module HeartbeatProtocol
include MembershipProtocol
state do
interface input, :payld, [:pload]
interface input, :return_address, [] => [:addy]
interface output, :last_heartbeat, [:peer] => [:sender, :time, :pload]
end
end
module HeartbeatAgent
include HeartbeatProtocol
state do
channel :heartbeat, [:@dst, :src, :sender, :pload]
table :heartbeat_buffer, [:peer, :sender, :pload]
table :heartbeat_log, [:peer, :sender, :time, :pload]
table :payload_buffer, [:pload]
table :my_address, [] => [:addy]
periodic :hb_timer, 2
scratch :to_del, heartbeat_log.schema
scratch :last_heartbeat_stg, last_heartbeat.schema
end
bloom :selfness do
my_address <+ return_address
my_address <- (my_address * return_address).lefts
end
bloom :announce do
heartbeat <~ (hb_timer * member * payload_buffer * my_address).combos do |t, m, p, r|
unless m.host == r.addy
[m.host, r.addy, ip_port, p.pload]
end
end
heartbeat <~ (hb_timer * member * payload_buffer).combos do |t, m, p|
if my_address.empty?
unless m.host == ip_port
[m.host, ip_port, ip_port, p.pload]
end
end
end
end
bloom :buffer do
payload_buffer <+ payld
payload_buffer <- (payload_buffer * payld).lefts
end
bloom :reckon do
#stdio <~ heartbeat {|h| ["HB RCV @ #{h.src} from #{h.sender}"]}
heartbeat_buffer <= heartbeat {|h| [h.src, h.sender, h.pload] }
heartbeat_log <= (hb_timer * heartbeat_buffer).pairs {|t, h| [h.peer, h.sender, t.val.to_f, h.pload] }
heartbeat_buffer <- (hb_timer * heartbeat_buffer).rights
end
bloom :current_output do
#stdio <~ last_heartbeat.inspected
last_heartbeat_stg <= heartbeat_log.argagg(:max, [heartbeat_log.peer], heartbeat_log.time)
last_heartbeat <= last_heartbeat_stg.group([last_heartbeat_stg.peer, last_heartbeat_stg.sender, last_heartbeat_stg.time], choose(last_heartbeat_stg.pload))
to_del <= (heartbeat_log * hb_timer).pairs do |log, t|
if (t.val.to_f - log.time) > HB_EXPIRE
log
end
end
heartbeat_log <- to_del
end
end
Jump to Line
Something went wrong with that request. Please try again.