-
Notifications
You must be signed in to change notification settings - Fork 10
/
leader_election.rb
147 lines (130 loc) · 4.76 KB
/
leader_election.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
require 'rubygems'
require 'bud'
require 'delivery/delivery'
require 'delivery/multicast'
require 'membership/membership'
require 'ordering/sequences'
require 'util/colour'
# @abstract LeaderMembership is the module for Paxos leader election.
# A given node in Paxos should include this module.
module LeaderMembership
include MembershipProtocol
include MulticastProtocol
include SequencesProtocol
include DeliveryProtocol
include Colour
# Each node believes it is its own leader when it first starts up.
bootstrap do
me <= [[ip_port]]
new_leader <= [[ip_port]]
add_member <= [[ip_port, ip_port]]
get_count <= [[:mcast_msg]]
end
state do
# Currently known leader
table :leader, [] => [:host]
# My own address
table :me, [] => [:host]
# Scratches for potential new leaders
scratch :new_leader, [] => [:host]
scratch :potential_new_leader, [:host]
scratch :temp_new_leader, leader.schema
# Scratches for potential new members
scratch :potential_member, [:host]
# Scratches to maintain if we received a leader vote message or
# a list of members
scratch :leader_vote, [:src, :host]
scratch :member_list, [:src, :members]
scratch :members_to_send, [:host]
scratch :should_increment_mcast_count, [:key]
end
bloom :debug do
magenta <= leader { |l| ["leader: #{l.host}"] }
blue <= leader_vote { |lv| ["leader_vote: #{lv.inspect}"] }
green <= member_list { |ml| ["member_list: #{ml.inspect}"] }
red <= pipe_out { |po| ["pipe_out: #{po.inspect}"] }
end
# Each node, when receiving a message from pipe_out, needs to determine
# the type of message. Messages are determined by the following: the
# payload looks like [:vote, :host] or [:members, [:mem1, :mem2, ...]]
# Because of the different types of messages, we need to demultiplex
# the messages into the appropriate scratches.
bloom :demux do
leader_vote <= pipe_out do |p|
if p.payload[0] == "vote"
[p.src, p.payload[1]]
end
end
member_list <= pipe_out do |p|
if p.payload[0] == "members"
[p.src, p.payload[1]]
end
end
end
# From leader_vote messages, add the source and the host to a scratch of
# potential members. From member_list messages, add them to potential
# members. Those who are not in the member list should be added to the
# membership.
bloom :add_member do
potential_member <= leader_vote { |u| [u.src] }
potential_member <= leader_vote { |u| [u.host] }
potential_member <= member_list.flat_map do |ml|
ml.members.each.map { |m| [m] }
end
add_member <= potential_member { |n| [n.host, n.host] }
end
# Changes the leader under one of two conditions:
# 1. I get a leader_vote proposing a leader with a lower host
# 2. Another node has joined without notifying me and its host is
# lowest in my list of members.
bloom :change_leader do
potential_new_leader <= (leader_vote * leader).pairs do |lv, l|
if lv.host < l.host
[lv.host]
end
end
temp_new_leader <= member.group([], min(:host))
potential_new_leader <= temp_new_leader.notin(leader, :host => :host)
new_leader <= potential_new_leader.group([], min(:host))
leader <+- new_leader
end
# If there is a new leader, multicast the message to everyone in my list
# of members.
# If the one who told me of a "possible" new leader is wrong, send the
# correct leader back to that source.
bloom :notify do
get_count <= [[:mcast_msg]]
temp :did_add_member <= added_member.group([], count(:ident))
mcast_send <= (return_count *
new_leader).pairs do |r, n|
if r.ident == :mcast_msg
["vote_#{r.tally}", [:vote, n.host]]
end
end
increment_count <= leader_vote { |lv| [[:unicast, lv.host]] }
get_count <= leader_vote { |lv| [[:unicast, lv.host]] }
pipe_in <= (return_count * leader_vote * leader).combos do |r, lv, l|
if lv.host > l.host and r.ident == [:unicast, lv.host]
[lv.src, ip_port, "vote_#{r.tally}", [:vote, l.host]]
end
end
end
bloom :increment_mcast_count do
should_increment_mcast_count <= temp_new_leader { |n| [:mcast_msg] }
should_increment_mcast_count <= did_add_member { |n| [:mcast_msg] }
increment_count <= should_increment_mcast_count
end
# I send to my members my list of memers if I added someone new and
# I am the leader
bloom :leader_specific do
members_to_send <= member { |m| [m.host] }
mcast_send <= (return_count *
did_add_member *
leader * me).combos(leader.host =>
me.host) do |r, d, l, m|
if r.ident == :mcast_msg
["member_#{r.tally}", [:members, members_to_send.map { |ms| ms }.flatten]]
end
end
end
end