Skip to content

Commit

Permalink
Verify failed elections are reported back.
Browse files Browse the repository at this point in the history
This was actually not working, paranoia about code coverage pays
off sometimes.
  • Loading branch information
coryan committed Jul 3, 2017
1 parent d21a920 commit fd395c0
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 31 deletions.
3 changes: 3 additions & 0 deletions jb/etcd/detail/leader_election_runner_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class leader_election_runner_impl : public leader_election_runner {
}
// ... block until all pending operations complete ...
async_ops_block();
// ... if there is a pending callback we need to let them know the
// election failed ...
make_callback(false);
// ... now we are really done with remote resources ...
set_state("resign() end", state::resigned);
}
Expand Down
23 changes: 10 additions & 13 deletions jb/etcd/leader_election_participant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,22 @@ void leader_election_participant::campaign() {
// asynchronous operations to wait until the campaign is done. It
// will call the provided lambda when it is done ...
campaign([&elected, this](bool result) {
// ... the callback receives a future, hopefully with "true"
// indicating the election was successful, but it could be an
// exception. We capture both case ..
try {
// ... the callback receives a bool, hopefully with "true"
// indicating the election was successful. Setting the value
// could raise, but that only happens when
if (result) {
elected.set_value(result);
} catch (...) {
JB_LOG(info) << this->key() << " failed to set election result";
// elected.set_exception(std::current_exception());
} else {
std::ostringstream os;
os << "election aborted for " << std::hex << this->key();
elected.set_exception(
std::make_exception_ptr(std::runtime_error(os.str())));
}
});
// ... block until the promise is satisfied, notice that get()
// raises the exception if that was the result ...
JB_LOG(trace) << key() << " blocked running election";
if (elected.get_future().get() != true) {
// ... we also raise if the campaign failed ...
std::ostringstream os;
os << "Unexpected false value after running campaign, key=" << key();
throw std::runtime_error(os.str());
}
elected.get_future().get();
}

void leader_election_participant::campaign_impl(
Expand Down
83 changes: 65 additions & 18 deletions jb/etcd/leader_election_participant_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ BOOST_AUTO_TEST_CASE(leader_election_participant_basic) {
auto queue = std::make_shared<jb::etcd::active_completion_queue>();

// ... create a unique election name ...
jb::etcd::session election_session(
queue, etcd_channel, std::chrono::milliseconds(3000));
using namespace std::chrono_literals;
jb::etcd::session election_session(queue, etcd_channel, 3000ms);
BOOST_CHECK_NE(election_session.lease_id(), 0);

std::ostringstream os;
Expand All @@ -27,8 +27,7 @@ BOOST_AUTO_TEST_CASE(leader_election_participant_basic) {

{
jb::etcd::leader_election_participant tested(
queue, etcd_channel, election_name, "42", [](bool) {},
std::chrono::seconds(3));
queue, etcd_channel, election_name, "42", [](bool) {}, 3000ms);
BOOST_TEST_CHECKPOINT("participant object constructed");
BOOST_CHECK_EQUAL(tested.value(), "42");
BOOST_CHECK_EQUAL(
Expand All @@ -51,8 +50,8 @@ BOOST_AUTO_TEST_CASE(leader_election_participant_switch_leader) {
BOOST_TEST_CHECKPOINT("queue created and thread requested");

// ... create a unique election name ...
jb::etcd::session election_session(
queue, etcd_channel, std::chrono::milliseconds(3000));
using namespace std::chrono_literals;
jb::etcd::session election_session(queue, etcd_channel, 3000ms);
BOOST_CHECK_NE(election_session.lease_id(), 0);

std::ostringstream os;
Expand All @@ -61,21 +60,18 @@ BOOST_AUTO_TEST_CASE(leader_election_participant_switch_leader) {
BOOST_TEST_MESSAGE("testing with election-name=" << election_name);

jb::etcd::leader_election_participant participant_a(
queue, etcd_channel, election_name, "session_a", std::chrono::seconds(3));
queue, etcd_channel, election_name, "session_a", 3000ms);
BOOST_CHECK_EQUAL(participant_a.value(), "session_a");

jb::etcd::session session_b(
queue, etcd_channel, std::chrono::milliseconds(3000));
jb::etcd::session session_b(queue, etcd_channel, 3000ms);
BOOST_CHECK_NE(session_b.lease_id(), 0);
std::promise<bool> elected_b;
jb::etcd::leader_election_participant participant_b(
queue, etcd_channel, election_name, "session_b",
[&elected_b](bool r) { elected_b.set_value(r); },
std::chrono::seconds(3));
[&elected_b](bool r) { elected_b.set_value(r); }, 3000ms);
BOOST_CHECK_EQUAL(participant_b.value(), "session_b");

jb::etcd::session session_c(
queue, etcd_channel, std::chrono::milliseconds(3000));
jb::etcd::session session_c(queue, etcd_channel, 3000ms);
BOOST_CHECK_NE(session_b.lease_id(), 0);
std::promise<bool> elected_c;
jb::etcd::leader_election_participant participant_c(
Expand All @@ -84,14 +80,12 @@ BOOST_AUTO_TEST_CASE(leader_election_participant_switch_leader) {
std::chrono::seconds(3));
BOOST_CHECK_EQUAL(participant_c.value(), "session_c");

std::this_thread::sleep_for(std::chrono::seconds(5));
std::this_thread::sleep_for(5000ms);

auto future_c = elected_c.get_future();
BOOST_CHECK_EQUAL(
std::future_status::timeout, future_c.wait_for(std::chrono::seconds(0)));
BOOST_CHECK_EQUAL(std::future_status::timeout, future_c.wait_for(0ms));
auto future_b = elected_b.get_future();
BOOST_CHECK_EQUAL(
std::future_status::timeout, future_b.wait_for(std::chrono::seconds(0)));
BOOST_CHECK_EQUAL(std::future_status::timeout, future_b.wait_for(0ms));

for (int i = 0; i != 2; ++i) {
BOOST_TEST_CHECKPOINT("iteration i=" << i);
Expand Down Expand Up @@ -121,3 +115,56 @@ BOOST_AUTO_TEST_CASE(leader_election_participant_switch_leader) {
BOOST_TEST_CHECKPOINT("cleanup");
election_session.revoke();
}

/**
* @test Verify that an election participant handles aborted elections.
*/
BOOST_AUTO_TEST_CASE(leader_election_participant_abort) {
std::string const etcd_address = "localhost:2379";
auto etcd_channel =
grpc::CreateChannel(etcd_address, grpc::InsecureChannelCredentials());
auto queue = std::make_shared<jb::etcd::active_completion_queue>();
BOOST_TEST_CHECKPOINT("queue created and thread requested");

// ... create a unique election name ...
using namespace std::chrono_literals;
jb::etcd::session election_session(queue, etcd_channel, 3000ms);
BOOST_CHECK_NE(election_session.lease_id(), 0);

std::ostringstream os;
os << "test-election/" << std::hex << election_session.lease_id();
auto election_name = os.str();
BOOST_TEST_MESSAGE("testing with election-name=" << election_name);

jb::etcd::leader_election_participant participant_a(
queue, etcd_channel, election_name, "session_a", 3000ms);
BOOST_CHECK_EQUAL(participant_a.value(), "session_a");

jb::etcd::session session_b(queue, etcd_channel, 3000ms);
BOOST_CHECK_NE(session_b.lease_id(), 0);
std::promise<bool> elected_b;
jb::etcd::leader_election_participant participant_b(
queue, etcd_channel, election_name, "session_b",
[&elected_b](bool r) { elected_b.set_value(r); },
std::chrono::seconds(3));
BOOST_CHECK_EQUAL(participant_b.value(), "session_b");

std::this_thread::sleep_for(500ms);

auto future_b = elected_b.get_future();
BOOST_CHECK_EQUAL(std::future_status::timeout, future_b.wait_for(0ms));

BOOST_TEST_CHECKPOINT("b::resign");
participant_b.resign();

// ... if future_b is not ready the next call will block, so we make
// a hard requirement ...
BOOST_REQUIRE_EQUAL(std::future_status::ready, future_b.wait_for(0ms));
BOOST_CHECK_EQUAL(future_b.get(), false);

BOOST_TEST_CHECKPOINT("a::resign");
participant_a.resign();

BOOST_TEST_CHECKPOINT("cleanup");
election_session.revoke();
}

0 comments on commit fd395c0

Please sign in to comment.