This repository has been archived by the owner on Dec 7, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 65
/
node.rb
142 lines (118 loc) · 3.3 KB
/
node.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
module DCell
# Exception raised when no response was received within a given timeout
class NoResponseError < StandardError; end
# Exception raised when remote node appears dead
class DeadNodeError < StandardError; end
# A node in a DCell cluster
class Node
include Celluloid
include Celluloid::FSM
include DCell::Node::Communication
include DCell::Node::RPC
include DCell::Node::Actors
attr_reader :id
finalizer :shutdown
# FSM
default_state :disconnected
state :shutdown do
on_shutdown
end
state :disconnected, to: [:connected, :shutdown]
state :connected do
on_connected
end
state :partitioned do
on_partitioned
end
# Access sugar to NodeManager methods
class << self
include Enumerable
extend Forwardable
include NodeManager
end
def initialize(id, addr, server=false)
super self # FSM's constructor
@id = id
init_rpc
init_actors
init_defaults
init_comm addr
if server
update_ttl
elsif !Directory[@id].alive?
Logger.warn "Node '#{@id}' looks dead"
fail DeadNodeError
end
end
# Find an call registered with a given name on this node
def find(name)
request = Message::Find.new(Thread.mailbox, name)
methods = send_request request
return nil if methods.is_a? NilClass
rsocket # open relay pipe to avoid race conditions
actor = DCell::ActorProxy.create.new self, name, methods
add_actor actor
end
alias_method :[], :find
# List all registered actors on this node
def actors
request = Message::List.new(Thread.mailbox)
list = send_request request
list.map!(&:to_sym)
end
alias_method :all, :actors
# Send a ping message with a given timeout
def ping(timeout=nil)
request = Message::Ping.new(Thread.mailbox)
send_request request, :request, timeout
end
# Friendlier inspection
def inspect
"#<DCell::Node[#{@id}] @addr=#{@addr.inspect}>"
end
##################################################
# Internal API
##################################################
def init_defaults
@ttl = nil
@heartbeat_rate = DCell.heartbeat_rate
@heartbeat_timeout = DCell.heartbeat_timeout
@request_timeout = DCell.request_timeout
@ttl_rate = DCell.ttl_rate
end
def detach
kill_actors
cancel_requests
remote_dead
terminate
end
# Graceful termination of the node
def shutdown
transition :shutdown
kill_actors
close_comm
NodeCache.delete @id
MailboxManager.delete Thread.mailbox
instance_variables.each { |iv| remove_instance_variable iv }
end
# Update TTL in registry
def update_ttl
Directory[@id].update_ttl
@ttl = after(@ttl_rate) { update_ttl }
end
def on_shutdown
Logger.info "Disconnected from #{@id}"
end
def on_connected
send_heartbeat
transition :partitioned, delay: @heartbeat_timeout unless @id == DCell.id
Logger.info "Connected to #{@id}"
end
def on_partitioned
@heartbeat.cancel if @heartbeat
@ttl.cancel if @ttl
Logger.warn "Communication with #{@id} interrupted"
detach
end
end
end