Skip to content

Commit

Permalink
Fixes the watcher cannot be cancelled issue with etcd 3.x (#238)
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
sighingnow committed Jul 15, 2023
1 parent 153546f commit 068f37b
Showing 1 changed file with 83 additions and 25 deletions.
108 changes: 83 additions & 25 deletions src/v3/AsyncGRPC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1091,17 +1091,8 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
isCancelled.store(false);
stream = parameters.watch_stub->AsyncWatch(&context, &cq_,
(void*) etcdv3::WATCH_CREATE);
// The unique watcher id causes the watcher cannot be cancelled as expected
// on Ubuntu 20.04.
//
// See CI failures:
// https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/actions/runs/5561397273/jobs/10159051536
//
// Added in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/232
// Removed in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/236
//
// this->watch_id =
// std::chrono::high_resolution_clock::now().time_since_epoch().count();
this->watch_id =
std::chrono::high_resolution_clock::now().time_since_epoch().count();
// #ifndef NDEBUG
// std::clog << "etcd-cpp-apiv3: watch_id: " << this->watch_id << std::endl;
// #endif
Expand Down Expand Up @@ -1135,11 +1126,52 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
}
}

/**
* Notes: `Cancel` and `waitForResponse` of watchers.
*
* We meet failures about failed to cancel the watcher on Ubuntu 20.04
* due to unable to receive the "etcdv3::WATCH_FINISH" tag from the gRPC
* completion queue.
*
* See CI:
* https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/actions/runs/5561458372/jobs/10159155857
*
* To address the problem, we use the `AsyncNext()` to wait for the
* the last token from the completion queue (wait for 1 second, once
* we called the method `stream->Finish()`).
*
* Remark: the issue might be caused by lower version etcd.
*/

void etcdv3::AsyncWatchAction::waitForResponse() {
void* got_tag;
bool ok = false;
bool the_final_round = false;

while (cq_.Next(&got_tag, &ok)) {
while (true) {
if (!the_final_round) {
if (!cq_.Next(&got_tag, &ok)) {
break;
}
} else {
auto deadline =
std::chrono::system_clock::now() + std::chrono::seconds(1);
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
case CompletionQueue::NextStatus::TIMEOUT:
case CompletionQueue::NextStatus::SHUTDOWN: {
std::cerr << "[warn] watcher does't exit normally" << std::endl;
// pretend to be received a "WATCH_FINISH" tag: shutdown
context.TryCancel();
cq_.Shutdown();
ok = false; // jump out
break;
}
case CompletionQueue::NextStatus::GOT_EVENT: {
// normal execution flow
break;
}
}
}
if (ok == false) {
break;
}
Expand All @@ -1148,6 +1180,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
continue;
}
if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) {
the_final_round = true;
stream->Finish(&status, (void*) etcdv3::WATCH_FINISH);
continue;
}
Expand All @@ -1168,7 +1201,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
// we stop watch under two conditions:
//
// 1. watch for a future revision, return immediately with empty events
// set
// set
// 2. receive any effective events.
if ((reply.created() &&
reply.header().revision() < parameters.revision) ||
Expand Down Expand Up @@ -1200,23 +1233,36 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
}
}

void etcdv3::AsyncWatchAction::CancelWatch() {
if (!isCancelled.exchange(true)) {
WatchRequest cancel_req;
cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id);
stream->Write(cancel_req, (void*) etcdv3::WATCH_WRITE_CANCEL);
isCancelled.store(true);
}
}

bool etcdv3::AsyncWatchAction::Cancelled() const { return isCancelled.load(); }

void etcdv3::AsyncWatchAction::waitForResponse(
std::function<void(etcd::Response)> callback) {
void* got_tag;
bool ok = false;
bool the_final_round = false;

while (cq_.Next(&got_tag, &ok)) {
while (true) {
if (!the_final_round) {
if (!cq_.Next(&got_tag, &ok)) {
break;
}
} else {
auto deadline =
std::chrono::system_clock::now() + std::chrono::seconds(1);
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
case CompletionQueue::NextStatus::TIMEOUT:
case CompletionQueue::NextStatus::SHUTDOWN: {
std::cerr << "[warn] watcher does't exit normally" << std::endl;
// pretend to be received a "WATCH_FINISH" tag: shutdown
context.TryCancel();
cq_.Shutdown();
ok = false; // jump out
break;
}
case CompletionQueue::NextStatus::GOT_EVENT: {
// normal execution flow
break;
}
}
}
if (ok == false) {
break;
}
Expand All @@ -1225,6 +1271,7 @@ void etcdv3::AsyncWatchAction::waitForResponse(
continue;
}
if (got_tag == (void*) etcdv3::WATCH_WRITES_DONE) {
the_final_round = true;
stream->Finish(&status, (void*) etcdv3::WATCH_FINISH);
continue;
}
Expand Down Expand Up @@ -1267,6 +1314,17 @@ void etcdv3::AsyncWatchAction::waitForResponse(
}
}

void etcdv3::AsyncWatchAction::CancelWatch() {
if (!isCancelled.exchange(true)) {
WatchRequest cancel_req;
cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id);
stream->Write(cancel_req, (void*) etcdv3::WATCH_WRITE_CANCEL);
isCancelled.store(true);
}
}

bool etcdv3::AsyncWatchAction::Cancelled() const { return isCancelled.load(); }

etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() {
AsyncWatchResponse watch_resp;
watch_resp.set_action(etcdv3::WATCH_ACTION);
Expand Down

0 comments on commit 068f37b

Please sign in to comment.