Skip to content

Commit

Permalink
Fixes a possible bug about watcher's id (#232)
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
sighingnow committed Jul 1, 2023
1 parent 32fae70 commit fe9f17e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/v3/Action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ void etcdv3::Action::waitForResponse()
break;
}
case CompletionQueue::NextStatus::GOT_EVENT: {
if (!ok) {
status = grpc::Status(grpc::StatusCode::ABORTED, "Failed to execute the action: not ok or invalid tag");
}
break;
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/v3/AsyncGRPC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(
{
isCancelled.store(false);
stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)etcdv3::WATCH_CREATE);
this->watch_id = std::chrono::steady_clock::now().time_since_epoch().count();

WatchRequest watch_req;
WatchCreateRequest watch_create_req;
Expand All @@ -1199,6 +1200,7 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(

watch_create_req.set_prev_kv(true);
watch_create_req.set_start_revision(parameters.revision);
watch_create_req.set_watch_id(this->watch_id);

watch_req.mutable_create_request()->CopyFrom(watch_create_req);

Expand Down Expand Up @@ -1254,6 +1256,9 @@ void etcdv3::AsyncWatchAction::waitForResponse()
continue;
}

// record the watcher id
this->watch_id = reply.watch_id();

// we stop watch under two conditions:
//
// 1. watch for a future revision, return immediately with empty events set
Expand Down Expand Up @@ -1339,6 +1344,9 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
continue;
}

// record the watcher id
this->watch_id = reply.watch_id();

// for the callback case, we don't invoke callback immediately if watching
// for a future revision, we wait until there are some effective events.
if(reply.events_size())
Expand Down
14 changes: 12 additions & 2 deletions tst/WatcherTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ static int watcher_called = 0;
void printResponse(etcd::Response const & resp)
{
if (resp.error_code()) {
std::cout << resp.error_code() << ": " << resp.error_message() << std::endl;
std::cout << "Watcher "<< resp.watch_id()
<< " fails with " << resp.error_code() << ": " << resp.error_message() << std::endl;
}
else {
std::cout << resp.action() << " " << resp.value().as_string() << std::endl;
std::cout << "Watcher " << resp.watch_id()
<< " responses with " << resp.action() << " " << resp.value().as_string() << std::endl;
std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl;

std::cout << "Events size: " << resp.events().size() << std::endl;
Expand Down Expand Up @@ -191,6 +193,14 @@ TEST_CASE("watch changes on the same key (#212)")
}
}

TEST_CASE("create two watcher")
{
etcd::Watcher w1(etcd_url, "/test", printResponse, true);
etcd::Watcher w2(etcd_url, "/test", printResponse, true);

std::this_thread::sleep_for(std::chrono::seconds(5));
}

// TEST_CASE("request cancellation")
// {
// etcd::Client etcd(etcd_url);
Expand Down

0 comments on commit fe9f17e

Please sign in to comment.