Skip to content

Commit

Permalink
elector: hoist Elector data bits out into a new ElectionLogic class
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Farnum <gfarnum@redhat.com>
  • Loading branch information
gregsfortytwo committed Aug 19, 2019
1 parent 0c85f8f commit cda3b25
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 78 deletions.
143 changes: 76 additions & 67 deletions src/mon/Elector.cc
Expand Up @@ -24,7 +24,7 @@

#define dout_subsys ceph_subsys_mon
#undef dout_prefix
#define dout_prefix _prefix(_dout, mon, epoch)
#define dout_prefix _prefix(_dout, mon, logic.epoch)
static ostream& _prefix(std::ostream *_dout, Monitor *mon, epoch_t epoch) {
return *_dout << "mon." << mon->name << "@" << mon->rank
<< "(" << mon->get_state_name()
Expand All @@ -34,19 +34,19 @@ static ostream& _prefix(std::ostream *_dout, Monitor *mon, epoch_t epoch) {

void Elector::init()
{
epoch = mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
if (!epoch) {
logic.epoch = mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
if (!logic.epoch) {
dout(1) << "init, first boot, initializing epoch at 1 " << dendl;
epoch = 1;
} else if (epoch % 2) {
dout(1) << "init, last seen epoch " << epoch
logic.epoch = 1;
} else if (logic.epoch % 2) {
dout(1) << "init, last seen epoch " << logic.epoch
<< ", mid-election, bumping" << dendl;
++epoch;
++logic.epoch;
auto t(std::make_shared<MonitorDBStore::Transaction>());
t->put(Monitor::MONITOR_NAME, "election_epoch", epoch);
t->put(Monitor::MONITOR_NAME, "election_epoch", logic.epoch);
mon->store->apply_transaction(t);
} else {
dout(1) << "init, last seen epoch " << epoch << dendl;
dout(1) << "init, last seen epoch " << logic.epoch << dendl;
}
}

Expand All @@ -57,18 +57,19 @@ void Elector::shutdown()

void Elector::bump_epoch(epoch_t e)
{
dout(10) << "bump_epoch " << epoch << " to " << e << dendl;
ceph_assert(epoch <= e);
epoch = e;
dout(10) << "bump_epoch " << logic.epoch << " to " << e << dendl;
ceph_assert(logic.epoch <= e);
logic.epoch = e;
auto t(std::make_shared<MonitorDBStore::Transaction>());
t->put(Monitor::MONITOR_NAME, "election_epoch", epoch);
t->put(Monitor::MONITOR_NAME, "election_epoch", logic.epoch);
mon->store->apply_transaction(t);

mon->join_election();

// clear up some state
electing_me = false;
acked_me.clear();
logic.electing_me = false;
logic.acked_me.clear();
peer_info.clear();
}


Expand All @@ -80,31 +81,33 @@ void Elector::start()
}
dout(5) << "start -- can i be leader?" << dendl;

acked_me.clear();
logic.acked_me.clear();
peer_info.clear();
init();

// start by trying to elect me
if (epoch % 2 == 0) {
bump_epoch(epoch+1); // odd == election cycle
if (logic.epoch % 2 == 0) {
bump_epoch(logic.epoch+1); // odd == election cycle
} else {
// do a trivial db write just to ensure it is writeable.
auto t(std::make_shared<MonitorDBStore::Transaction>());
t->put(Monitor::MONITOR_NAME, "election_writeable_test", rand());
int r = mon->store->apply_transaction(t);
ceph_assert(r >= 0);
}
electing_me = true;
acked_me[mon->rank].cluster_features = CEPH_FEATURES_ALL;
acked_me[mon->rank].mon_release = ceph_release();
acked_me[mon->rank].mon_features = ceph::features::mon::get_supported();
mon->collect_metadata(&acked_me[mon->rank].metadata);
leader_acked = -1;
logic.electing_me = true;
logic.acked_me.insert(mon->rank);
peer_info[mon->rank].cluster_features = CEPH_FEATURES_ALL;
peer_info[mon->rank].mon_release = ceph_release();
peer_info[mon->rank].mon_features = ceph::features::mon::get_supported();
mon->collect_metadata(&peer_info[mon->rank].metadata);
logic.leader_acked = -1;

// bcast to everyone else
for (unsigned i=0; i<mon->monmap->size(); ++i) {
if ((int)i == mon->rank) continue;
MMonElection *m =
new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap);
new MMonElection(MMonElection::OP_PROPOSE, logic.epoch, mon->monmap);
m->mon_features = ceph::features::mon::get_supported();
m->mon_release = ceph_release();
mon->send_mon_message(m, i);
Expand All @@ -117,15 +120,16 @@ void Elector::defer(int who)
{
dout(5) << "defer to " << who << dendl;

if (electing_me) {
if (logic.electing_me) {
// drop out
acked_me.clear();
electing_me = false;
logic.acked_me.clear();
peer_info.clear();
logic.electing_me = false;
}

// ack them
leader_acked = who;
MMonElection *m = new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap);
logic.leader_acked = who;
MMonElection *m = new MMonElection(MMonElection::OP_ACK, logic.epoch, mon->monmap);
m->mon_features = ceph::features::mon::get_supported();
m->mon_release = ceph_release();
mon->collect_metadata(&m->metadata);
Expand Down Expand Up @@ -175,8 +179,8 @@ void Elector::expire()
dout(5) << "election timer expired" << dendl;

// did i win?
if (electing_me &&
acked_me.size() > (unsigned)(mon->monmap->size() / 2)) {
if (logic.electing_me &&
logic.acked_me.size() > (unsigned)(mon->monmap->size() / 2)) {
// i win
victory();
} else {
Expand All @@ -191,16 +195,19 @@ void Elector::expire()

void Elector::victory()
{
leader_acked = -1;
electing_me = false;
logic.leader_acked = -1;
logic.electing_me = false;

uint64_t cluster_features = CEPH_FEATURES_ALL;
mon_feature_t mon_features = ceph::features::mon::get_supported();
set<int> quorum;
map<int,Metadata> metadata;
ceph_release_t min_mon_release{ceph_release_t::unknown};
for (const auto& [id, info] : acked_me) {
for (auto id : logic.acked_me) {
quorum.insert(id);
auto i = peer_info.find(id);
ceph_assert(i != peer_info.end());
auto& info = i->second;
cluster_features &= info.cluster_features;
mon_features &= info.mon_features;
metadata[id] = info.metadata;
Expand All @@ -212,15 +219,15 @@ void Elector::victory()

cancel_timer();

ceph_assert(epoch % 2 == 1); // election
bump_epoch(epoch+1); // is over!
ceph_assert(logic.epoch % 2 == 1); // election
bump_epoch(logic.epoch+1); // is over!

// tell everyone!
for (set<int>::iterator p = quorum.begin();
p != quorum.end();
++p) {
if (*p == mon->rank) continue;
MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch,
MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, logic.epoch,
mon->monmap);
m->quorum = quorum;
m->quorum_features = cluster_features;
Expand All @@ -231,7 +238,7 @@ void Elector::victory()
}

// tell monitor
mon->win_election(epoch, quorum,
mon->win_election(logic.epoch, quorum,
cluster_features, mon_features, min_mon_release,
metadata);
}
Expand Down Expand Up @@ -274,11 +281,11 @@ void Elector::handle_propose(MonOpRequestRef op)
<< " without required mon_features " << missing
<< dendl;
nak_old_peer(op);
} else if (m->epoch > epoch) {
} else if (m->epoch > logic.epoch) {
bump_epoch(m->epoch);
} else if (m->epoch < epoch) {
} else if (m->epoch < logic.epoch) {
// got an "old" propose,
if (epoch % 2 == 0 && // in a non-election cycle
if (logic.epoch % 2 == 0 && // in a non-election cycle
mon->quorum.count(from) == 0) { // from someone outside the quorum
// a mon just started up, call a new election so they can rejoin!
dout(5) << " got propose from old epoch, quorum is " << mon->quorum
Expand All @@ -293,24 +300,24 @@ void Elector::handle_propose(MonOpRequestRef op)

if (mon->rank < from) {
// i would win over them.
if (leader_acked >= 0) { // we already acked someone
ceph_assert(leader_acked < from); // and they still win, of course
dout(5) << "no, we already acked " << leader_acked << dendl;
if (logic.leader_acked >= 0) { // we already acked someone
ceph_assert(logic.leader_acked < from); // and they still win, of course
dout(5) << "no, we already acked " << logic.leader_acked << dendl;
} else {
// wait, i should win!
if (!electing_me) {
if (!logic.electing_me) {
mon->start_election();
}
}
} else {
// they would win over me
if (leader_acked < 0 || // haven't acked anyone yet, or
leader_acked > from || // they would win over who you did ack, or
leader_acked == from) { // this is the guy we're already deferring to
if (logic.leader_acked < 0 || // haven't acked anyone yet, or
logic.leader_acked > from || // they would win over who you did ack, or
logic.leader_acked == from) { // this is the guy we're already deferring to
defer(from);
} else {
// ignore them!
dout(5) << "no, we already acked " << leader_acked << dendl;
dout(5) << "no, we already acked " << logic.leader_acked << dendl;
}
}
}
Expand All @@ -323,13 +330,13 @@ void Elector::handle_ack(MonOpRequestRef op)
int from = m->get_source().num();

ceph_assert(m->epoch % 2 == 1); // election
if (m->epoch > epoch) {
if (m->epoch > logic.epoch) {
dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << dendl;
bump_epoch(m->epoch);
start();
return;
}
ceph_assert(m->epoch == epoch);
ceph_assert(m->epoch == logic.epoch);
uint64_t required_features = mon->get_required_features();
if ((required_features ^ m->get_connection()->get_features()) &
required_features) {
Expand All @@ -347,17 +354,19 @@ void Elector::handle_ack(MonOpRequestRef op)
return;
}

if (electing_me) {
if (logic.electing_me) {
// thanks
acked_me[from].cluster_features = m->get_connection()->get_features();
acked_me[from].mon_features = m->mon_features;
acked_me[from].mon_release = m->mon_release;
acked_me[from].metadata = m->metadata;
peer_info[from].cluster_features = m->get_connection()->get_features();
peer_info[from].mon_features = m->mon_features;
peer_info[from].mon_release = m->mon_release;
peer_info[from].metadata = m->metadata;
dout(5) << " so far i have {";
for (map<int, elector_info_t>::const_iterator p = acked_me.begin();
p != acked_me.end();
++p) {
if (p != acked_me.begin())
for (auto q = logic.acked_me.begin();
q != logic.acked_me.end();
++q) {
auto p = peer_info.find(*q);
ceph_assert(p != peer_info.end());
if (q != logic.acked_me.begin())
*_dout << ",";
*_dout << " mon." << p->first << ":"
<< " features " << p->second.cluster_features
Expand All @@ -366,13 +375,13 @@ void Elector::handle_ack(MonOpRequestRef op)
*_dout << " }" << dendl;

// is that _everyone_?
if (acked_me.size() == mon->monmap->size()) {
if (logic.acked_me.size() == mon->monmap->size()) {
// if yes, shortcut to election finish
victory();
}
} else {
// ignore, i'm deferring already.
ceph_assert(leader_acked >= 0);
ceph_assert(logic.leader_acked >= 0);
}
}

Expand All @@ -390,10 +399,10 @@ void Elector::handle_victory(MonOpRequestRef op)
ceph_assert(from < mon->rank);
ceph_assert(m->epoch % 2 == 0);

leader_acked = -1;
logic.leader_acked = -1;

// i should have seen this election if i'm getting the victory.
if (m->epoch != epoch + 1) {
if (m->epoch != logic.epoch + 1) {
dout(5) << "woah, that's a funny epoch, i must have rebooted. bumping and re-starting!" << dendl;
bump_epoch(m->epoch);
start();
Expand All @@ -403,7 +412,7 @@ void Elector::handle_victory(MonOpRequestRef op)
bump_epoch(m->epoch);

// they win
mon->lose_election(epoch, m->quorum, from,
mon->lose_election(logic.epoch, m->quorum, from,
m->quorum_features, m->mon_features, m->mon_release);

// cancel my timer
Expand Down Expand Up @@ -531,7 +540,7 @@ void Elector::dispatch(MonOpRequestRef op)
return;
}

if (em->epoch < epoch) {
if (em->epoch < logic.epoch) {
dout(5) << "old epoch, dropping" << dendl;
break;
}
Expand Down

0 comments on commit cda3b25

Please sign in to comment.