Skip to content

Commit

Permalink
Fixes the implementation of Observe() (#231)
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
sighingnow committed Jul 1, 2023
1 parent 09f665f commit 32fae70
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 13 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ pplx::task<Response> proclaim(std::string const &name, int64_t lease_id,

pplx::task<Response> leader(std::string const &name);

std::unique_ptr<Observer> observe(std::string const &name);
std::unique_ptr<SyncClient::Observer> observe(std::string const &name);

pplx::task<Response> resign(std::string const &name, int64_t lease_id,
std::string const &key, int64_t revision);
Expand All @@ -891,7 +891,7 @@ The `Observer` returned by `observe()` can be use to monitor the changes of elec
The observer stream will be canceled when been destructed.
```c++
std::unique_ptr<etcd::Observer> observer = etcd.observe("test");
std::unique_ptr<etcd::SyncClient::Observer> observer = etcd.observe("test");
// wait one change event, blocked execution
etcd::Response resp = observer->WaitOnce();
Expand All @@ -906,7 +906,7 @@ The observer stream will be canceled when been destructed.
observer.reset(nullptr);
```

for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp).
for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp) and [tst/ElectionTest.cpp](./tst/ElectionTest.cpp).

### TODO

Expand Down
1 change: 1 addition & 0 deletions etcd/v3/action_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace etcdv3
extern char const * WATCH_FINISH;

extern char const * ELECTION_OBSERVE_CREATE;
extern char const * ELECTION_OBSERVE_FINISH;

extern const int ERROR_GRPC_OK;
extern const int ERROR_GRPC_CANCELLED;
Expand Down
2 changes: 1 addition & 1 deletion src/v3/Action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void etcdv3::ActionParameters::dump(std::ostream &os) const {
os << " grpc_timeout: " << grpc_timeout.count() << "(ms)" << std::endl;
}

void etcdv3::Action::waitForResponse()
void etcdv3::Action::waitForResponse()
{
void* got_tag;
bool ok = false;
Expand Down
27 changes: 18 additions & 9 deletions src/v3/AsyncGRPC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -817,10 +817,7 @@ void etcdv3::AsyncObserveAction::waitForResponse()
response_reader->Read(&reply, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)this) {
auto response = ParseResponse();
if (response.get_error_code() == 0) {
// issue the next read
response_reader->Read(&reply, (void *)this);
} else {
if (response.get_error_code() != 0) {
this->CancelObserve();
}
} else {
Expand All @@ -835,13 +832,25 @@ void etcdv3::AsyncObserveAction::CancelObserve()
if (!isCancelled.exchange(true)) {
void* got_tag;
bool ok = false;
response_reader->Finish(&status, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) {
// ok
} else {
std::cerr << "Failed to finish a election observing connection" << std::endl;

response_reader->Finish(&status, (void *)ELECTION_OBSERVE_FINISH);

// FIXME: not sure why the `Next()` after `Finish()` blocks forever.
// Using the `AsyncNext()` without a timeout to ensure the cancel is done.
switch (cq_.AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::microseconds(1))) {
case CompletionQueue::NextStatus::TIMEOUT:
case CompletionQueue::NextStatus::SHUTDOWN:
// ignore
break;
case CompletionQueue::NextStatus::GOT_EVENT:
if (!ok || got_tag != (void *)ELECTION_OBSERVE_FINISH) {
std::cerr << "Failed to finish a election observing connection" << std::endl;
}
}

// cancel on-the-fly calls
context.TryCancel();

cq_.Shutdown();
}
}
Expand Down
1 change: 1 addition & 0 deletions src/v3/action_constants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ char const * etcdv3::WATCH_WRITES_DONE = "watch writes done";
char const * etcdv3::WATCH_FINISH = "watch finish";

char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create";
char const * etcdv3::ELECTION_OBSERVE_FINISH = "observe finish";

const int etcdv3::ERROR_GRPC_OK = 0;
const int etcdv3::ERROR_GRPC_CANCELLED = 1;
Expand Down
54 changes: 54 additions & 0 deletions tst/ElectionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,60 @@ TEST_CASE("campaign and resign")
REQUIRE(0 == resp5.error_code());
}

TEST_CASE("campaign and observe")
{
etcd::Client etcd(etcd_url);

auto keepalive = etcd.leasekeepalive(60).get();
auto lease_id = keepalive->Lease();

auto observer_thread = std::thread([&etcd]() {
std::unique_ptr<etcd::SyncClient::Observer> observer = etcd.observe("test");
// wait many change events, blocked execution
for (size_t i = 0; i < 10; ++i) {
etcd::Response resp = observer->WaitOnce();
std::cout << "observe " << resp.value().key() << " as the leader: " << resp.value().as_string() << std::endl;
}
std::cout << "finish the observe" << std::endl;
// cancel the observers
observer.reset(nullptr);
});

std::this_thread::sleep_for(std::chrono::seconds(1));

for (int i = 0; i < 5; ++i) {
// campaign
auto resp1 = etcd.campaign("test", lease_id, "xxxx").get();
REQUIRE(0 == resp1.error_code());
std::cout << "key " << resp1.value().key() << " becomes the leader"
<< std::endl;

// proclaim
auto resp3 = etcd.proclaim("test", lease_id, resp1.value().key(),
resp1.value().created_index(),
"tttt - " + std::to_string(i))
.get();
REQUIRE(0 == resp3.error_code());

// leader
{
auto resp4 = etcd.leader("test").get();
REQUIRE(0 == resp4.error_code());
REQUIRE(resp1.value().key() == resp4.value().key());
REQUIRE("tttt - " + std::to_string(i) == resp4.value().as_string());
}

// resign
auto resp5 = etcd.resign("test", lease_id, resp1.value().key(),
resp1.value().created_index())
.get();
REQUIRE(0 == resp5.error_code());
std::this_thread::sleep_for(std::chrono::seconds(1));
}

observer_thread.join();
}

TEST_CASE("cleanup")
{
etcd::Client etcd(etcd_url);
Expand Down

0 comments on commit 32fae70

Please sign in to comment.