From 153546f9654e1f08f177c11f31ac24ed94000443 Mon Sep 17 00:00:00 2001 From: Tao He Date: Sat, 15 Jul 2023 17:07:34 +0800 Subject: [PATCH] Refactor the implementation of etcd transactions. (#236) Fixes #234. Signed-off-by: Tao He --- README.md | 60 ++++- etcd-cpp-api-config.in.cmake | 5 +- etcd/Client.hpp | 74 ++---- etcd/SyncClient.hpp | 92 +++----- etcd/v3/Action.hpp | 23 ++ etcd/v3/AsyncGRPC.hpp | 4 +- etcd/v3/Transaction.hpp | 208 ++++++++++++++--- src/Client.cpp | 153 ++---------- src/Response.cpp | 1 + src/SyncClient.cpp | 138 ++--------- src/v3/AsyncGRPC.cpp | 258 +++++++++++---------- src/v3/Transaction.cpp | 436 +++++++++++++++++++++++------------ src/v3/action_constants.cpp | 2 +- tst/EtcdSyncTest.cpp | 20 +- tst/EtcdTest.cpp | 19 +- tst/TransactionTest.cpp | 54 ++++- 16 files changed, 866 insertions(+), 681 deletions(-) diff --git a/README.md b/README.md index 89a7327..debc603 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,36 @@ dependencies have been successfully installed: cmake .. make -j$(nproc) && make install +## Using this package in your CMake project + +To use this package in your CMake project, you can either + +- install, then find the library using `find_package()`: + + ```cmake + find_package(etcd-cpp-apiv3 REQUIRED) + target_link_libraries(your_target PRIVATE etcd-cpp-api) + ``` + +- or, add this repository as a subdirectory in your project, and link the library directly: + + ```cmake + add_subdirectory(thirdparty/etcd-cpp-apiv3) + target_link_libraries(your_target PRIVATE etcd-cpp-api) + ``` + +- or, use [FetchContent](https://cmake.org/cmake/help/latest/module/FetchContent.html): + + ```cmake + include(FetchContent) + FetchContent_Declare( + etcd-cpp-apiv3 + https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git + ) + FetchContent_MakeAvailable(etcd-cpp-apiv3) + target_link_libraries(your_target PRIVATE etcd-cpp-api) + ``` + ## Compatible etcd version The _etcd-cpp-apiv3_ should work well with etcd > 3.0. Feel free to issue an issue to us on @@ -495,6 +525,7 @@ some specific conditions. Values can be deleted with the `rm` method passing the key to be deleted as a parameter. The key should point to an existing value. There are conditional variations for deletion too. +* `rm(std::string const& key)` unconditionally deletes the given key * `rm_if(key, value, old_value)` deletes an already existing value but only if the previous value equals with old_value. If the values does not match returns with "Compare failed" error (code `ERROR_COMPARE_FAILED`) @@ -852,11 +883,32 @@ Transactions in etcd supports set a set of comparison targets to specify the con etcdv3::Transaction txn; // setup the conditions - txn.reset_key("/test/x1"); - txn.init_compare("1", etcdv3::CompareResult::EQUAL, etcdv3::CompareTarget::VALUE); + txn.add_compare_value("/test/x1", "1"); + txn.add_compare_value("/test/x2", "2"); + + // or, compare the last modified revision + txn.add_compare_mod("/test/x3", 0); // not exists + txn.add_compare_mod("/test/x4", etcdv3::CompareResult::GREATER, 1234); // the modified revision is greater than 1234 +``` + +High-level APIs (e.g., `compare_and_create`, `compare_and_swap`) are also provided, e.g., +`fetch-and-add` operation can be implemented as - txn.reset_key("/test/x2"); - txn.init_compare("2", etcdv3::CompareResult::EQUAL, etcdv3::CompareTarget::VALUE); +```cpp + auto fetch_and_add = [](etcd::Client& client, + std::string const& key) -> void { + auto value = stoi(client.get(key).get().value().as_string()); + while (true) { + auto txn = etcdv3::Transaction(); + txn.setup_compare_and_swap(key, std::to_string(value), + std::to_string(value + 1)); + etcd::Response resp = client.txn(txn).get(); + if (resp.is_ok()) { + break; + } + value = stoi(resp.value().as_string()); + } + }; ``` See full example of the usages of transaction APIs, please refer to [./tst/TransactionTest.cpp](./tst/TransactionTest.cpp), diff --git a/etcd-cpp-api-config.in.cmake b/etcd-cpp-api-config.in.cmake index befba7a..c1a9047 100644 --- a/etcd-cpp-api-config.in.cmake +++ b/etcd-cpp-api-config.in.cmake @@ -32,5 +32,8 @@ set(ETCD_CPP_INCLUDE_DIRS "${ETCD_CPP_INCLUDE_DIR}") include(FindPackageMessage) find_package_message(etcd "Found etcd: ${CMAKE_CURRENT_LIST_FILE} (found version \"@etcd-cpp-api_VERSION@\")" - "etcd-cpp-apiv3 version: @etcd-cpp-api_VERSION@\netcd-cpp-apiv3 libraries: ${ETCD_CPP_LIBRARIES}, \netcd-cpp-apiv3 core libraries: ${ETCD_CPP_CORE_LIBRARIES}\ninclude directories: ${ETCD_CPP_INCLUDE_DIRS}" + "etcd-cpp-apiv3 version: @etcd-cpp-api_VERSION@\n" + "etcd-cpp-apiv3 libraries: ${ETCD_CPP_LIBRARIES}\n" + "etcd-cpp-apiv3 core libraries: ${ETCD_CPP_CORE_LIBRARIES}\n" + "include directories: ${ETCD_CPP_INCLUDE_DIRS}" ) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 6fc0da9..4f8a566 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -271,15 +271,6 @@ class Client { */ pplx::task get(std::string const& key, int64_t revision); - /** - * Sets the value of a key. The key will be modified if already exists or - * created if it does not exists. - * @param key is the key to be created or modified - * @param value is the new value to be set - */ - pplx::task set(std::string const& key, std::string const& value, - int ttl = 0); - /** * Sets the value of a key. The key will be modified if already exists or * created if it does not exists. @@ -288,15 +279,7 @@ class Client { * @param leaseId is the lease attached to the key */ pplx::task set(std::string const& key, std::string const& value, - int64_t leaseId); - - /** - * Creates a new key and sets it's value. Fails if the key already exists. - * @param key is the key to be created - * @param value is the value to be set - */ - pplx::task add(std::string const& key, std::string const& value, - int ttl = 0); + const int64_t leaseId = 0); /** * Creates a new key and sets it's value. Fails if the key already exists. @@ -305,7 +288,7 @@ class Client { * @param leaseId is the lease attached to the key */ pplx::task add(std::string const& key, std::string const& value, - int64_t leaseId); + const int64_t leaseId = 0); /** * Put a new key-value pair. @@ -315,12 +298,13 @@ class Client { pplx::task put(std::string const& key, std::string const& value); /** - * Modifies an existing key. Fails if the key does not exists. - * @param key is the key to be modified - * @param value is the new value to be set + * Put a new key-value pair. + * @param key is the key to be put + * @param value is the value to be put + * @param leaseId is the lease id to be associated with the key */ - pplx::task modify(std::string const& key, std::string const& value, - int ttl = 0); + pplx::task put(std::string const& key, std::string const& value, + const int64_t leaseId); /** * Modifies an existing key. Fails if the key does not exists. @@ -329,18 +313,7 @@ class Client { * @param leaseId is the lease attached to the key */ pplx::task modify(std::string const& key, std::string const& value, - int64_t leaseId); - - /** - * Modifies an existing key only if it has a specific value. Fails if the key - * does not exists or the original value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_value is the value to be replaced - */ - pplx::task modify_if(std::string const& key, - std::string const& value, - std::string const& old_value, int ttl = 0); + const int64_t leaseId = 0); /** * Modifies an existing key only if it has a specific value. Fails if the key @@ -352,19 +325,8 @@ class Client { */ pplx::task modify_if(std::string const& key, std::string const& value, - std::string const& old_value, int64_t leaseId); - - /** - * Modifies an existing key only if it has a specific modification index - * value. Fails if the key does not exists or the modification index of the - * previous value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_index is the expected index of the original value - */ - pplx::task modify_if(std::string const& key, - std::string const& value, int64_t old_index, - int ttl = 0); + std::string const& old_value, + const int64_t leaseId = 0); /** * Modifies an existing key only if it has a specific modification index @@ -377,11 +339,13 @@ class Client { */ pplx::task modify_if(std::string const& key, std::string const& value, int64_t old_index, - int64_t leaseId); + const int64_t leaseId = 0); /** * Removes a single key. The key has to point to a plain, non directory entry. * @param key is the key to be deleted + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the key does not exist. */ pplx::task rm(std::string const& key); @@ -389,6 +353,8 @@ class Client { * Removes a single key but only if it has a specific value. Fails if the key * does not exists or the its value differs from the expected one. * @param key is the key to be deleted + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the key does not exist. */ pplx::task rm_if(std::string const& key, std::string const& old_value); @@ -399,6 +365,8 @@ class Client { * from the expected one. * @param key is the key to be deleted * @param old_index is the expected index of the existing value + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the key does not exist. */ pplx::task rm_if(std::string const& key, int64_t old_index); @@ -408,6 +376,8 @@ class Client { * @param key is the directory to be created to be listed * @param recursive if true then delete a whole subtree, otherwise deletes * only an empty directory. + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the no key been deleted. */ pplx::task rmdir(std::string const& key, bool recursive = false); @@ -419,6 +389,8 @@ class Client { * * @param key is the directory to be created to be listed * @param range_end is the end of key range to be removed. + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the no key been deleted. */ pplx::task rmdir(std::string const& key, const char* range_end); @@ -427,6 +399,8 @@ class Client { * * @param key is the directory to be created to be listed * @param range_end is the end of key range to be removed. + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the no key been deleted. */ pplx::task rmdir(std::string const& key, std::string const& range_end); diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index 6c96b62..6859096 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -327,14 +327,6 @@ class SyncClient { */ Response get(std::string const& key, int64_t revision); - /** - * Sets the value of a key. The key will be modified if already exists or - * created if it does not exists. - * @param key is the key to be created or modified - * @param value is the new value to be set - */ - Response set(std::string const& key, std::string const& value, int ttl = 0); - /** * Sets the value of a key. The key will be modified if already exists or * created if it does not exists. @@ -343,14 +335,7 @@ class SyncClient { * @param leaseId is the lease attached to the key */ Response set(std::string const& key, std::string const& value, - int64_t leaseId); - - /** - * Creates a new key and sets it's value. Fails if the key already exists. - * @param key is the key to be created - * @param value is the value to be set - */ - Response add(std::string const& key, std::string const& value, int ttl = 0); + const int64_t leaseId = 0); /** * Creates a new key and sets it's value. Fails if the key already exists. @@ -359,7 +344,7 @@ class SyncClient { * @param leaseId is the lease attached to the key */ Response add(std::string const& key, std::string const& value, - int64_t leaseId); + const int64_t leaseId = 0); /** * Put a new key-value pair. @@ -369,12 +354,13 @@ class SyncClient { Response put(std::string const& key, std::string const& value); /** - * Modifies an existing key. Fails if the key does not exists. - * @param key is the key to be modified - * @param value is the new value to be set + * Put a new key-value pair. + * @param key is the key to be put + * @param value is the value to be put + * @param leaseId is the lease id to be associated with the key */ - Response modify(std::string const& key, std::string const& value, - int ttl = 0); + Response put(std::string const& key, std::string const& value, + const int64_t leaseId); /** * Modifies an existing key. Fails if the key does not exists. @@ -383,17 +369,7 @@ class SyncClient { * @param leaseId is the lease attached to the key */ Response modify(std::string const& key, std::string const& value, - int64_t leaseId); - - /** - * Modifies an existing key only if it has a specific value. Fails if the key - * does not exists or the original value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_value is the value to be replaced - */ - Response modify_if(std::string const& key, std::string const& value, - std::string const& old_value, int ttl = 0); + const int64_t leaseId = 0); /** * Modifies an existing key only if it has a specific value. Fails if the key @@ -404,18 +380,7 @@ class SyncClient { * @param leaseId is the lease attached to the key */ Response modify_if(std::string const& key, std::string const& value, - std::string const& old_value, int64_t leaseId); - - /** - * Modifies an existing key only if it has a specific modification index - * value. Fails if the key does not exists or the modification index of the - * previous value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_index is the expected index of the original value - */ - Response modify_if(std::string const& key, std::string const& value, - int64_t old_index, int ttl = 0); + std::string const& old_value, const int64_t leaseId = 0); /** * Modifies an existing key only if it has a specific modification index @@ -427,11 +392,13 @@ class SyncClient { * @param leaseId is the lease attached to the key */ Response modify_if(std::string const& key, std::string const& value, - int64_t old_index, int64_t leaseId); + int64_t old_index, const int64_t leaseId = 0); /** * Removes a single key. The key has to point to a plain, non directory entry. * @param key is the key to be deleted + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the key does not exist. */ Response rm(std::string const& key); @@ -439,6 +406,8 @@ class SyncClient { * Removes a single key but only if it has a specific value. Fails if the key * does not exists or the its value differs from the expected one. * @param key is the key to be deleted + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the key does not exist. */ Response rm_if(std::string const& key, std::string const& old_value); @@ -448,6 +417,8 @@ class SyncClient { * from the expected one. * @param key is the key to be deleted * @param old_index is the expected index of the existing value + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the key does not exist. */ Response rm_if(std::string const& key, int64_t old_index); @@ -457,6 +428,8 @@ class SyncClient { * @param key is the directory to be created to be listed * @param recursive if true then delete a whole subtree, otherwise deletes * only an empty directory. + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the no key been deleted. */ Response rmdir(std::string const& key, bool recursive = false); @@ -468,6 +441,8 @@ class SyncClient { * * @param key is the directory to be created to be listed * @param range_end is the end of key range to be removed. + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the no key been deleted. */ Response rmdir(std::string const& key, const char* range_end); @@ -476,6 +451,8 @@ class SyncClient { * * @param key is the directory to be created to be listed * @param range_end is the end of key range to be removed. + * + * @return Returns etcdv3::ERROR_KEY_NOT_FOUND if the no key been deleted. */ Response rmdir(std::string const& key, std::string const& range_end); @@ -819,22 +796,21 @@ class SyncClient { private: // TODO: use std::unique_ptr<> std::shared_ptr head_internal(); - std::shared_ptr get_internal(std::string const& key, - int64_t revision = 0); - std::shared_ptr set_internal(std::string const& key, - std::string const& value, - int64_t leaseId); - std::shared_ptr add_internal(std::string const& key, - std::string const& value, - int64_t leaseId); + std::shared_ptr get_internal( + std::string const& key, const int64_t revision = 0); + std::shared_ptr add_internal( + std::string const& key, std::string const& value, + const int64_t leaseId = 0); std::shared_ptr put_internal( - std::string const& key, std::string const& value); + std::string const& key, std::string const& value, + const int64_t leaseId = 0); std::shared_ptr modify_internal( - std::string const& key, std::string const& value, int64_t leaseId); + std::string const& key, std::string const& value, + const int64_t leaseId = 0); std::shared_ptr modify_if_internal( std::string const& key, std::string const& value, int64_t old_index, - std::string const& old_value, int64_t leaseId, - etcdv3::AtomicityType const& atomicity_type); + std::string const& old_value, etcdv3::AtomicityType const& atomicity_type, + const int64_t leaseId = 0); std::shared_ptr rm_internal( std::string const& key); std::shared_ptr rm_if_internal( diff --git a/etcd/v3/Action.hpp b/etcd/v3/Action.hpp index 1920758..3f99a26 100644 --- a/etcd/v3/Action.hpp +++ b/etcd/v3/Action.hpp @@ -9,6 +9,8 @@ #include "proto/v3election.grpc.pb.h" #include "proto/v3lock.grpc.pb.h" +#include "etcd/v3/action_constants.hpp" + using grpc::ClientContext; using grpc::CompletionQueue; using grpc::Status; @@ -81,6 +83,27 @@ class Action { namespace detail { std::string string_plus_one(std::string const& value); std::string resolve_etcd_endpoints(std::string const& default_endpoints); + +template +void make_request_with_ranges(Req& req, std::string const& key, + std::string const& range_end, + bool const recursive) { + if (!recursive) { + req.set_key(key); + } else { + if (key.empty()) { + req.set_key(etcdv3::NUL); + req.set_range_end(etcdv3::NUL); + } else { + req.set_key(key); + req.set_range_end(detail::string_plus_one(key)); + } + } + if (!range_end.empty()) { + req.set_range_end(range_end); + } +} + } // namespace detail } // namespace etcdv3 #endif diff --git a/etcd/v3/AsyncGRPC.hpp b/etcd/v3/AsyncGRPC.hpp index 901f7a2..c6a2d7c 100644 --- a/etcd/v3/AsyncGRPC.hpp +++ b/etcd/v3/AsyncGRPC.hpp @@ -76,8 +76,7 @@ class AsyncCampaignResponse : public etcdv3::V3Response { class AsyncDeleteResponse : public etcdv3::V3Response { public: AsyncDeleteResponse(){}; - void ParseResponse(std::string const& key, bool prefix, - DeleteRangeResponse& resp); + void ParseResponse(DeleteRangeResponse& resp); }; class AsyncHeadResponse : public etcdv3::V3Response { @@ -162,7 +161,6 @@ class AsyncTxnResponse : public etcdv3::V3Response { public: AsyncTxnResponse(){}; void ParseResponse(TxnResponse& resp); - void ParseResponse(std::string const& key, bool prefix, TxnResponse& resp); }; class AsyncUnlockResponse : public etcdv3::V3Response { diff --git a/etcd/v3/Transaction.hpp b/etcd/v3/Transaction.hpp index 5ad6c29..bd90dae 100644 --- a/etcd/v3/Transaction.hpp +++ b/etcd/v3/Transaction.hpp @@ -28,38 +28,194 @@ enum class CompareTarget { class Transaction { public: Transaction(); - Transaction(std::string const&); - virtual ~Transaction(); - - // Set a new key for different comparisons and /put/get/delete requests. - void reset_key(std::string const& newkey); - - void init_compare(CompareResult, CompareTarget); - void init_compare(std::string const& old_value, CompareResult, CompareTarget); - void init_compare(int64_t old_value, CompareResult, CompareTarget); - - void setup_basic_failure_operation(std::string const& key); - void setup_set_failure_operation(std::string const& key, - std::string const& value, int64_t leaseid); - void setup_basic_create_sequence(std::string const& key, - std::string const& value, int64_t leaseid); - void setup_compare_and_swap_sequence(std::string const& valueToSwap, - int64_t leaseid); - void setup_delete_sequence(std::string const& key, - std::string const& range_end, bool recursive); - void setup_delete_failure_operation(std::string const& key, - std::string const& range_end, - bool recursive); - void setup_compare_and_delete_operation(std::string const& key); + ~Transaction(); + + union Value { + int64_t version; + int64_t create_revision; + int64_t mod_revision; + std::string value; + int64_t lease; + }; + + void add_compare(std::string const& key, CompareTarget const& target, + CompareResult const& result, Value const& target_value, + std::string const& range_end = ""); + + void add_compare_version(std::string const& key, int64_t const& version, + std::string const& range_end = ""); + void add_compare_version(std::string const& key, CompareResult const& result, + int64_t const& version, + std::string const& range_end = ""); + void add_compare_create(std::string const& key, + int64_t const& create_revision, + std::string const& range_end = ""); + void add_compare_create(std::string const& key, CompareResult const& result, + int64_t const& create_revision, + std::string const& range_end = ""); + void add_compare_mod(std::string const& key, int64_t const& mod_revision, + std::string const& range_end = ""); + void add_compare_mod(std::string const& key, CompareResult const& result, + int64_t const& mod_revision, + std::string const& range_end = ""); + void add_compare_value(std::string const& key, std::string const& value, + std::string const& range_end = ""); + void add_compare_value(std::string const& key, CompareResult const& result, + std::string const& value, + std::string const& range_end = ""); + void add_compare_lease(std::string const& key, int64_t const& lease, + std::string const& range_end = ""); + void add_compare_lease(std::string const& key, CompareResult const& result, + int64_t const& lease, + std::string const& range_end = ""); + + void add_success_range(std::string const& key, + std::string const& range_end = "", + bool const recursive = false, const int64_t limit = 0); + void add_success_put(std::string const& key, std::string const& value, + int64_t const leaseid = 0, const bool prev_kv = false); + void add_success_delete(std::string const& key, + std::string const& range_end = "", + bool const recursive = false, + const bool prev_kv = false); + void add_success_txn(const std::shared_ptr txn); + + void add_failure_range(std::string const& key, + std::string const& range_end = "", + bool const recursive = false, const int64_t limit = 0); + void add_failure_put(std::string const& key, std::string const& value, + int64_t const leaseid = 0, const bool prev_kv = false); + void add_failure_delete(std::string const& key, + std::string const& range_end = "", + bool const recursive = false, + const bool prev_kv = false); + void add_failure_txn(const std::shared_ptr txn); + + /** + * @brief Compare, and create if succeed. If failed, the response will + * contains the previous value in "values()" field. + */ + void setup_compare_and_create(std::string const& key, + std::string const& prev_value, + std::string const& create_key, + std::string const& value, + int64_t const leaseid = 0); + + /** + * @brief Compare, or create if failed. If failed, the response will contains + * the previous value in "values()" field. + */ + void setup_compare_or_create(std::string const& key, + std::string const& prev_value, + std::string const& create_key, + std::string const& value, + int64_t const leaseid = 0); + + /** + * @brief Compare, and swap if succeed. If failed, the response will contains + * the previous value in "values()" field. + */ + void setup_compare_and_swap(std::string const& key, + std::string const& prev_value, + std::string const& value, + int64_t const leaseid = 0); + + /** + * @brief Compare, and swap if failed. If failed, the response will contains + * the previous value in "values()" field. + */ + void setup_compare_or_swap(std::string const& key, + std::string const& prev_value, + std::string const& value, + int64_t const leaseid = 0); + + /** + * @brief Compare, and delete if succeed. If failed, the response will + * contains the previous value in "values()" field. + */ + void setup_compare_and_delete(std::string const& key, + std::string const& prev_value, + std::string const& delete_key, + std::string const& range_end = "", + const bool recursive = false); + + /** + * @brief Compare, or delete if failed. If failed, the response will contains + * the previous value in "values()" field. + */ + void setup_compare_or_delete(std::string const& key, + std::string const& prev_value, + std::string const& delete_key, + std::string const& range_end = "", + const bool recursive = false); + + /** + * @brief Compare, and create if succeed. If failed, the response will + * contains the previous value in "values()" field. + */ + void setup_compare_and_create(std::string const& key, + const int64_t prev_revision, + std::string const& create_key, + std::string const& value, + int64_t const leaseid = 0); + + /** + * @brief Compare, or create if failed. If failed, the response will contains + * the previous value in "values()" field. + */ + void setup_compare_or_create(std::string const& key, + const int64_t prev_revision, + std::string const& create_key, + std::string const& value, + int64_t const leaseid = 0); + + /** + * @brief Compare, and swap if succeed. If failed, the response will contains + * the previous value in "values()" field. + */ + void setup_compare_and_swap(std::string const& key, + const int64_t prev_revision, + std::string const& value, + int64_t const leaseid = 0); + + /** + * @brief Compare, and swap if failed. If failed, the response will contains + * the previous value in "values()" field. + */ + void setup_compare_or_swap(std::string const& key, + const int64_t prev_revision, + std::string const& value, + int64_t const leaseid = 0); + + /** + * @brief Compare, and delete if succeed. If failed, the response will + * contains the previous value in "values()" field. + */ + void setup_compare_and_delete(std::string const& key, + const int64_t prev_revision, + std::string const& delete_key, + std::string const& range_end = "", + const bool recursive = false); + + /** + * @brief Compare, or delete if failed. If failed, the response will contains + * the previous value in "values()" field. + */ + void setup_compare_or_delete(std::string const& key, + const int64_t prev_revision, + std::string const& delete_key, + std::string const& range_end = "", + const bool recursive = false); + + // Keep for backwards compatibility. // update without `get` and no `prev_kv` returned void setup_put(std::string const& key, std::string const& value); void setup_delete(std::string const& key); + void setup_delete(std::string const& key, std::string const& range_end, + const bool recursive = false); std::shared_ptr txn_request; - - private: - std::string key; }; } // namespace etcdv3 diff --git a/src/Client.cpp b/src/Client.cpp index aac023a..0bab966 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -231,61 +231,15 @@ pplx::task etcd::Client::get(std::string const& key, pplx::task etcd::Client::set(std::string const& key, std::string const& value, - int ttl) { - if (ttl > 0) { - return this->leasegrant(ttl).then( - [this, key, value](pplx::task const& task) { - auto resp = task.get(); - if (resp.error_code() == 0) { - return etcd::detail::asyncify( - static_cast>( - Response::create), - this->client->set_internal(key, value, resp.value().lease())); - } else { - return pplx::task_from_result(resp); - } - }); - } else { - return etcd::detail::asyncify( - static_cast>(Response::create), - this->client->set_internal(key, value, 0)); - } -} - -pplx::task etcd::Client::set(std::string const& key, - std::string const& value, - int64_t leaseid) { + const int64_t leaseid) { return etcd::detail::asyncify( - static_cast>(Response::create), - this->client->set_internal(key, value, leaseid)); -} - -pplx::task etcd::Client::add(std::string const& key, - std::string const& value, - int ttl) { - if (ttl > 0) { - return this->leasegrant(ttl).then( - [this, key, value](pplx::task const& task) { - auto resp = task.get(); - if (resp.error_code() == 0) { - return etcd::detail::asyncify( - static_cast>( - Response::create), - this->client->add_internal(key, value, resp.value().lease())); - } else { - return pplx::task_from_result(resp); - } - }); - } else { - return etcd::detail::asyncify( - static_cast>(Response::create), - this->client->add_internal(key, value, 0)); - } + static_cast>(Response::create), + this->client->put_internal(key, value, leaseid)); } pplx::task etcd::Client::add(std::string const& key, std::string const& value, - int64_t leaseid) { + const int64_t leaseid) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->add_internal(key, value, leaseid)); @@ -298,33 +252,17 @@ pplx::task etcd::Client::put(std::string const& key, this->client->put_internal(key, value)); } -pplx::task etcd::Client::modify(std::string const& key, - std::string const& value, - int ttl) { - if (ttl > 0) { - return this->leasegrant(ttl).then( - [this, key, value](pplx::task const& task) { - auto resp = task.get(); - if (resp.error_code() == 0) { - return etcd::detail::asyncify( - static_cast>( - Response::create), - this->client->modify_internal(key, value, - resp.value().lease())); - } else { - return pplx::task_from_result(resp); - } - }); - } else { - return etcd::detail::asyncify( - static_cast>(Response::create), - this->client->modify_internal(key, value, 0)); - } +pplx::task etcd::Client::put(std::string const& key, + std::string const& value, + const int64_t leaseId) { + return etcd::detail::asyncify( + static_cast>(Response::create), + this->client->put_internal(key, value, leaseId)); } pplx::task etcd::Client::modify(std::string const& key, std::string const& value, - int64_t leaseid) { + const int64_t leaseid) { return etcd::detail::asyncify( static_cast>(Response::create), this->client->modify_internal(key, value, leaseid)); @@ -333,78 +271,25 @@ pplx::task etcd::Client::modify(std::string const& key, pplx::task etcd::Client::modify_if(std::string const& key, std::string const& value, std::string const& old_value, - int ttl) { - if (ttl > 0) { - return this->leasegrant(ttl).then( - [this, key, value, old_value](pplx::task const& task) { - auto resp = task.get(); - if (resp.error_code() == 0) { - return etcd::detail::asyncify( - static_cast>( - Response::create), - this->client->modify_if_internal( - key, value, 0, old_value, resp.value().lease(), - etcdv3::AtomicityType::PREV_VALUE)); - } else { - return pplx::task_from_result(resp); - } - }); - } else { - return etcd::detail::asyncify( - static_cast>( - Response::create), - this->client->modify_if_internal(key, value, 0, old_value, 0, - etcdv3::AtomicityType::PREV_VALUE)); - } -} - -pplx::task etcd::Client::modify_if(std::string const& key, - std::string const& value, - std::string const& old_value, - int64_t leaseid) { + const int64_t leaseid) { return etcd::detail::asyncify( static_cast>( Response::create), - this->client->modify_if_internal(key, value, 0, old_value, leaseid, - etcdv3::AtomicityType::PREV_VALUE)); -} - -pplx::task etcd::Client::modify_if(std::string const& key, - std::string const& value, - int64_t old_index, int ttl) { - if (ttl > 0) { - return this->leasegrant(ttl).then( - [this, key, value, old_index](pplx::task const& task) { - auto resp = task.get(); - if (resp.error_code() == 0) { - return etcd::detail::asyncify( - static_cast>( - Response::create), - this->client->modify_if_internal( - key, value, old_index, "", resp.value().lease(), - etcdv3::AtomicityType::PREV_INDEX)); - } else { - return pplx::task_from_result(resp); - } - }); - } else { - return etcd::detail::asyncify( - static_cast>( - Response::create), - this->client->modify_if_internal(key, value, old_index, "", 0, - etcdv3::AtomicityType::PREV_INDEX)); - } + this->client->modify_if_internal(key, value, 0, old_value, + etcdv3::AtomicityType::PREV_VALUE, + leaseid)); } pplx::task etcd::Client::modify_if(std::string const& key, std::string const& value, int64_t old_index, - int64_t leaseid) { + const int64_t leaseid) { return etcd::detail::asyncify( static_cast>( Response::create), - this->client->modify_if_internal(key, value, old_index, "", leaseid, - etcdv3::AtomicityType::PREV_INDEX)); + this->client->modify_if_internal(key, value, old_index, "", + etcdv3::AtomicityType::PREV_INDEX, + leaseid)); } pplx::task etcd::Client::rm(std::string const& key) { diff --git a/src/Response.cpp b/src/Response.cpp index 26da605..6a6a3d6 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -39,6 +39,7 @@ etcd::Response::Response(const etcdv3::V3Response& reply, _values.push_back(Value(val[index])); _keys.push_back(val[index].kvs.key()); } + _value = Value(reply.get_values()[0]); } else { _value = Value(reply.get_value()); } diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index 96dfbd3..d5a4179 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -474,18 +474,6 @@ etcd::SyncClient::~SyncClient() { channel.reset(); } -/** - * Note [lease with TTL and issue the actual request] - * - * We sometime use the request like `set(key, value, TTL)`, we explain the TTL - * as the time between the user call the `set()` method between the request is - * actually executed in etcd server. Thus, we issue a lease request with that - * TTL value immediately, and pass it to the `set_internal()` method, the later - * may be issues asynchronously. - * - * Thus the TTL could keep the expected semantic even in the async runtime. - */ - etcd::Response etcd::SyncClient::head() { return Response::create(this->head_internal()); } @@ -502,7 +490,8 @@ etcd::Response etcd::SyncClient::get(std::string const& key) { return Response::create(this->get_internal(key)); } -etcd::Response etcd::SyncClient::get(std::string const& key, int64_t revision) { +etcd::Response etcd::SyncClient::get(std::string const& key, + const int64_t revision) { return Response::create(this->get_internal(key, revision)); } @@ -518,62 +507,20 @@ std::shared_ptr etcd::SyncClient::get_internal( return std::make_shared(std::move(params)); } -etcd::Response etcd::SyncClient::set(std::string const& key, - std::string const& value, int ttl) { - // See Note [lease with TTL and issue the actual request] - int64_t leaseId = 0; - if (ttl > 0) { - auto res = this->leasegrant(ttl); - if (!res.is_ok()) { - return etcd::Response(res.error_code(), res.error_message()); - } else { - leaseId = res.value().lease(); - } - } - return Response::create(this->set_internal(key, value, leaseId)); -} - etcd::Response etcd::SyncClient::set(std::string const& key, std::string const& value, - int64_t leaseid) { - return Response::create(this->set_internal(key, value, leaseid)); -} - -std::shared_ptr etcd::SyncClient::set_internal( - std::string const& key, std::string const& value, int64_t leaseid) { - etcdv3::ActionParameters params; - params.key.assign(key); - params.value.assign(value); - params.lease_id = leaseid; - params.auth_token.assign(this->token_authenticator->renew_if_expired()); - params.grpc_timeout = this->grpc_timeout; - params.kv_stub = stubs->kvServiceStub.get(); - return std::make_shared(std::move(params), false); -} - -etcd::Response etcd::SyncClient::add(std::string const& key, - std::string const& value, int ttl) { - // See Note [lease with TTL and issue the actual request] - int64_t leaseId = 0; - if (ttl > 0) { - auto res = this->leasegrant(ttl); - if (!res.is_ok()) { - return etcd::Response(res.error_code(), res.error_message()); - } else { - leaseId = res.value().lease(); - } - } - return Response::create(this->add_internal(key, value, leaseId)); + const int64_t leaseid) { + return Response::create(this->put_internal(key, value, leaseid)); } etcd::Response etcd::SyncClient::add(std::string const& key, std::string const& value, - int64_t leaseid) { + const int64_t leaseid) { return Response::create(this->add_internal(key, value, leaseid)); } std::shared_ptr etcd::SyncClient::add_internal( - std::string const& key, std::string const& value, int64_t leaseid) { + std::string const& key, std::string const& value, const int64_t leaseid) { etcdv3::ActionParameters params; params.key.assign(key); params.value.assign(value); @@ -589,40 +536,32 @@ etcd::Response etcd::SyncClient::put(std::string const& key, return Response::create(this->put_internal(key, value)); } +etcd::Response etcd::SyncClient::put(std::string const& key, + std::string const& value, + const int64_t leaseId) { + return Response::create(this->put_internal(key, value, leaseId)); +} + std::shared_ptr etcd::SyncClient::put_internal( - std::string const& key, std::string const& value) { + std::string const& key, std::string const& value, const int64_t leaseId) { etcdv3::ActionParameters params; params.key.assign(key); params.value.assign(value); + params.lease_id = leaseId; params.auth_token.assign(this->token_authenticator->renew_if_expired()); params.grpc_timeout = this->grpc_timeout; params.kv_stub = stubs->kvServiceStub.get(); return std::make_shared(std::move(params)); } -etcd::Response etcd::SyncClient::modify(std::string const& key, - std::string const& value, int ttl) { - // See Note [lease with TTL and issue the actual request] - int64_t leaseId = 0; - if (ttl > 0) { - auto res = leasegrant(ttl); - if (!res.is_ok()) { - return etcd::Response(res.error_code(), res.error_message()); - } else { - leaseId = res.value().lease(); - } - } - return Response::create(this->modify_internal(key, value, leaseId)); -} - etcd::Response etcd::SyncClient::modify(std::string const& key, std::string const& value, - int64_t leaseid) { + const int64_t leaseid) { return Response::create(this->modify_internal(key, value, leaseid)); } std::shared_ptr etcd::SyncClient::modify_internal( - std::string const& key, std::string const& value, int64_t leaseid) { + std::string const& key, std::string const& value, const int64_t leaseid) { etcdv3::ActionParameters params; params.key.assign(key); params.value.assign(value); @@ -636,58 +575,23 @@ std::shared_ptr etcd::SyncClient::modify_internal( etcd::Response etcd::SyncClient::modify_if(std::string const& key, std::string const& value, std::string const& old_value, - int ttl) { - // See Note [lease with TTL and issue the actual request] - int64_t leaseId = 0; - if (ttl > 0) { - auto res = leasegrant(ttl); - if (!res.is_ok()) { - return etcd::Response(res.error_code(), res.error_message()); - } else { - leaseId = res.value().lease(); - } - } - return Response::create(this->modify_if_internal( - key, value, 0, old_value, leaseId, etcdv3::AtomicityType::PREV_VALUE)); -} - -etcd::Response etcd::SyncClient::modify_if(std::string const& key, - std::string const& value, - std::string const& old_value, - int64_t leaseid) { - return Response::create(this->modify_if_internal( - key, value, 0, old_value, leaseid, etcdv3::AtomicityType::PREV_VALUE)); -} - -etcd::Response etcd::SyncClient::modify_if(std::string const& key, - std::string const& value, - int64_t old_index, int ttl) { - // See Note [lease with TTL and issue the actual request] - int64_t leaseId = 0; - if (ttl > 0) { - auto res = leasegrant(ttl); - if (!res.is_ok()) { - return etcd::Response(res.error_code(), res.error_message()); - } else { - leaseId = res.value().lease(); - } - } + const int64_t leaseid) { return Response::create(this->modify_if_internal( - key, value, old_index, "", leaseId, etcdv3::AtomicityType::PREV_INDEX)); + key, value, 0, old_value, etcdv3::AtomicityType::PREV_VALUE, leaseid)); } etcd::Response etcd::SyncClient::modify_if(std::string const& key, std::string const& value, int64_t old_index, int64_t leaseid) { return Response::create(this->modify_if_internal( - key, value, old_index, "", leaseid, etcdv3::AtomicityType::PREV_INDEX)); + key, value, old_index, "", etcdv3::AtomicityType::PREV_INDEX, leaseid)); } std::shared_ptr etcd::SyncClient::modify_if_internal( std::string const& key, std::string const& value, int64_t old_index, - std::string const& old_value, int64_t leaseId, - etcdv3::AtomicityType const& atomicity_type) { + std::string const& old_value, etcdv3::AtomicityType const& atomicity_type, + const int64_t leaseId) { etcdv3::ActionParameters params; params.key.assign(key); params.value.assign(value); diff --git a/src/v3/AsyncGRPC.cpp b/src/v3/AsyncGRPC.cpp index 3d07ae5..8cb8c7f 100644 --- a/src/v3/AsyncGRPC.cpp +++ b/src/v3/AsyncGRPC.cpp @@ -43,9 +43,7 @@ void etcdv3::AsyncCampaignResponse::ParseResponse(CampaignResponse& reply) { value.kvs.set_lease(leader.lease()); } -void etcdv3::AsyncDeleteResponse::ParseResponse(std::string const& key, - bool prefix, - DeleteRangeResponse& resp) { +void etcdv3::AsyncDeleteResponse::ParseResponse(DeleteRangeResponse& resp) { index = resp.header().revision(); if (resp.prev_kvs_size() == 0) { @@ -57,13 +55,15 @@ void etcdv3::AsyncDeleteResponse::ParseResponse(std::string const& key, etcdv3::KeyValue kv; kv.kvs.CopyFrom(resp.prev_kvs(cnt)); values.push_back(kv); + prev_values.push_back(kv); } - if (!prefix) { - prev_value = values[0]; + // flatten values/prev_values 0 to value/prev_value + if (!values.empty()) { value = values[0]; - value.kvs.clear_value(); - values.clear(); + } + if (!prev_values.empty()) { + prev_value = prev_values[0]; } } } @@ -137,6 +137,7 @@ void etcdv3::AsyncPutResponse::ParseResponse(PutResponse& resp) { // get all previous values etcdv3::KeyValue kv; kv.kvs.CopyFrom(resp.prev_kv()); + prev_values.push_back(kv); prev_value = kv; } @@ -167,39 +168,91 @@ void etcdv3::AsyncResignResponse::ParseResponse(ResignResponse& reply) { void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) { index = reply.header().revision(); -} - -void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, - bool prefix, TxnResponse& reply) { - index = reply.header().revision(); for (int index = 0; index < reply.responses_size(); index++) { auto resp = reply.responses(index); if (ResponseOp::ResponseCase::kResponseRange == resp.response_case()) { AsyncRangeResponse response; - response.ParseResponse(*(resp.mutable_response_range()), prefix); - - error_code = response.get_error_code(); - error_message = response.get_error_message(); + response.ParseResponse(*(resp.mutable_response_range()), true); - values = response.get_values(); - value = response.get_value(); + if (error_code == 0) { + error_code = response.get_error_code(); + } + if (!response.get_error_message().empty()) { + if (!error_message.empty()) { + error_message += "\n"; + } + error_message += response.get_error_message(); + } + for (auto const& value : response.get_values()) { + values.emplace_back(value); + } + for (auto const& prev_value : response.get_prev_values()) { + prev_values.emplace_back(prev_value); + } } else if (ResponseOp::ResponseCase::kResponsePut == resp.response_case()) { - auto put_resp = resp.response_put(); - if (put_resp.has_prev_kv()) { - prev_value.kvs.CopyFrom(put_resp.prev_kv()); + AsyncPutResponse response; + response.ParseResponse(*(resp.mutable_response_put())); + if (error_code == 0) { + error_code = response.get_error_code(); + } + if (!response.get_error_message().empty()) { + if (!error_message.empty()) { + error_message += "\n"; + } + error_message += response.get_error_message(); + } + for (auto const& value : response.get_values()) { + values.emplace_back(value); + } + for (auto const& prev_value : response.get_prev_values()) { + prev_values.emplace_back(prev_value); } } else if (ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) { AsyncDeleteResponse response; - response.ParseResponse(key, prefix, - *(resp.mutable_response_delete_range())); + response.ParseResponse(*(resp.mutable_response_delete_range())); - prev_value.kvs.CopyFrom(response.get_prev_value().kvs); + if (error_code == 0) { + error_code = response.get_error_code(); + } + if (!response.get_error_message().empty()) { + if (!error_message.empty()) { + error_message += "\n"; + } + error_message += response.get_error_message(); + } + for (auto const& value : response.get_values()) { + values.emplace_back(value); + } + for (auto const& prev_value : response.get_prev_values()) { + prev_values.emplace_back(prev_value); + } + } else if (ResponseOp::ResponseCase::kResponseTxn == resp.response_case()) { + AsyncTxnResponse response; + response.ParseResponse(*(resp.mutable_response_txn())); - values = response.get_values(); - value = response.get_value(); + if (error_code == 0) { + error_code = response.get_error_code(); + } + if (!response.get_error_message().empty()) { + if (!error_message.empty()) { + error_message += "\n"; + } + error_message += response.get_error_message(); + } + + // skip + std::cerr << "Not implemented error: unable to parse nested transaction " + "response" + << std::endl; } } + if (!values.empty()) { + value = values[0]; + } + if (!prev_values.empty()) { + prev_value = prev_values[0]; + } } void etcdv3::AsyncUnlockResponse::ParseResponse(UnlockResponse& resp) { @@ -270,20 +323,17 @@ etcdv3::AsyncCampaignResponse etcdv3::AsyncCampaignAction::ParseResponse() { etcdv3::AsyncCompareAndDeleteAction::AsyncCompareAndDeleteAction( etcdv3::ActionParameters&& params, etcdv3::AtomicityType type) : etcdv3::Action(std::move(params)) { - etcdv3::Transaction transaction(parameters.key); + etcdv3::Transaction txn; if (type == etcdv3::AtomicityType::PREV_VALUE) { - transaction.init_compare(parameters.old_value, CompareResult::EQUAL, - CompareTarget::VALUE); + txn.setup_compare_and_delete(parameters.key, parameters.old_value, + parameters.key); } else if (type == etcdv3::AtomicityType::PREV_INDEX) { - transaction.init_compare(parameters.old_revision, CompareResult::EQUAL, - CompareTarget::MOD); + txn.setup_compare_and_delete(parameters.key, parameters.old_revision, + parameters.key); } - transaction.setup_compare_and_delete_operation(parameters.key); - transaction.setup_basic_failure_operation(parameters.key); - response_reader = - parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); + parameters.kv_stub->AsyncTxn(&context, *txn.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*) this); } @@ -295,35 +345,32 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndDeleteAction::ParseResponse() { txn_resp.set_error_code(status.error_code()); txn_resp.set_error_message(status.error_message()); } else { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + txn_resp.ParseResponse(reply); if (!reply.succeeded()) { txn_resp.set_error_code(ERROR_COMPARE_FAILED); txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); } } - return txn_resp; } etcdv3::AsyncCompareAndSwapAction::AsyncCompareAndSwapAction( etcdv3::ActionParameters&& params, etcdv3::AtomicityType type) : etcdv3::Action(std::move(params)) { - etcdv3::Transaction transaction(parameters.key); + etcdv3::Transaction txn; if (type == etcdv3::AtomicityType::PREV_VALUE) { - transaction.init_compare(parameters.old_value, CompareResult::EQUAL, - CompareTarget::VALUE); + txn.setup_compare_and_swap(parameters.key, parameters.old_value, + parameters.value, parameters.lease_id); } else if (type == etcdv3::AtomicityType::PREV_INDEX) { - transaction.init_compare(parameters.old_revision, CompareResult::EQUAL, - CompareTarget::MOD); + txn.setup_compare_and_swap(parameters.key, parameters.old_revision, + parameters.value, parameters.lease_id); } - - transaction.setup_basic_failure_operation(parameters.key); - transaction.setup_compare_and_swap_sequence(parameters.value, - parameters.lease_id); + // backwards compatibility + txn.add_success_range(parameters.key); response_reader = - parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); + parameters.kv_stub->AsyncTxn(&context, *txn.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*) this); } @@ -335,7 +382,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse() { txn_resp.set_error_code(status.error_code()); txn_resp.set_error_message(status.error_message()); } else { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + txn_resp.ParseResponse(reply); // if there is an error code returned by parseResponse, we must // not overwrite it. @@ -344,30 +391,15 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncCompareAndSwapAction::ParseResponse() { txn_resp.set_error_message("etcd-cpp-apiv3: compare failed"); } } - return txn_resp; } etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters&& params) : etcdv3::Action(std::move(params)) { DeleteRangeRequest del_request; - if (!parameters.withPrefix) { - del_request.set_key(parameters.key); - } else { - if (parameters.key.empty()) { - // see: WithFromKey in etcdv3/client - del_request.set_key(etcdv3::NUL); - del_request.set_range_end(etcdv3::NUL); - } else { - del_request.set_key(parameters.key); - del_request.set_range_end(detail::string_plus_one(parameters.key)); - } - } - if (!parameters.range_end.empty()) { - del_request.set_range_end(parameters.range_end); - } - - del_request.set_prev_kv(true); + detail::make_request_with_ranges(del_request, parameters.key, + parameters.range_end, parameters.withPrefix); + del_request.set_prev_kv(true /* fetch prev values */); response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_); @@ -382,11 +414,8 @@ etcdv3::AsyncDeleteResponse etcdv3::AsyncDeleteAction::ParseResponse() { del_resp.set_error_code(status.error_code()); del_resp.set_error_message(status.error_message()); } else { - del_resp.ParseResponse( - parameters.key, parameters.withPrefix || !parameters.range_end.empty(), - reply); + del_resp.ParseResponse(reply); } - return del_resp; } @@ -874,21 +903,8 @@ etcdv3::AsyncPutResponse etcdv3::AsyncPutAction::ParseResponse() { etcdv3::AsyncRangeAction::AsyncRangeAction(etcdv3::ActionParameters&& params) : etcdv3::Action(std::move(params)) { RangeRequest get_request; - if (!parameters.withPrefix) { - get_request.set_key(parameters.key); - } else { - if (parameters.key.empty()) { - // see: WithFromKey in etcdv3/client - get_request.set_key(etcdv3::NUL); - get_request.set_range_end(etcdv3::NUL); - } else { - get_request.set_key(parameters.key); - get_request.set_range_end(detail::string_plus_one(parameters.key)); - } - } - if (!parameters.range_end.empty()) { - get_request.set_range_end(parameters.range_end); - } + detail::make_request_with_ranges(get_request, parameters.key, + parameters.range_end, parameters.withPrefix); if (parameters.revision > 0) { get_request.set_revision(parameters.revision); } @@ -951,21 +967,19 @@ etcdv3::AsyncResignResponse etcdv3::AsyncResignAction::ParseResponse() { etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters&& params, bool create) : etcdv3::Action(std::move(params)) { - etcdv3::Transaction transaction(parameters.key); + etcdv3::Transaction txn; isCreate = create; - transaction.init_compare(CompareResult::EQUAL, CompareTarget::VERSION); - - transaction.setup_basic_create_sequence(parameters.key, parameters.value, - parameters.lease_id); - - if (isCreate) { - transaction.setup_basic_failure_operation(parameters.key); + txn.add_compare_mod(parameters.key, 0 /* not exists */); + txn.add_success_put(parameters.key, parameters.value, parameters.lease_id); + // backwards compatibility + txn.add_success_range(parameters.key); + if (create) { + txn.add_failure_put(parameters.key, parameters.value, parameters.lease_id); } else { - transaction.setup_set_failure_operation(parameters.key, parameters.value, - parameters.lease_id); + txn.add_failure_range(parameters.key); } response_reader = - parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); + parameters.kv_stub->AsyncTxn(&context, *txn.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*) this); } @@ -977,7 +991,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncSetAction::ParseResponse() { txn_resp.set_error_code(status.error_code()); txn_resp.set_error_message(status.error_message()); } else { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + txn_resp.ParseResponse(reply); if (!reply.succeeded() && isCreate) { txn_resp.set_error_code(etcdv3::ERROR_KEY_ALREADY_EXISTS); @@ -1003,7 +1017,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncTxnAction::ParseResponse() { txn_resp.set_error_code(status.error_code()); txn_resp.set_error_message(status.error_message()); } else { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + txn_resp.ParseResponse(reply); // if there is an error code returned by parseResponse, we must // not overwrite it. @@ -1042,14 +1056,15 @@ etcdv3::AsyncUnlockResponse etcdv3::AsyncUnlockAction::ParseResponse() { etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters&& params) : etcdv3::Action(std::move(params)) { - etcdv3::Transaction transaction(parameters.key); - transaction.init_compare(CompareResult::GREATER, CompareTarget::VERSION); - - transaction.setup_compare_and_swap_sequence(parameters.value, - parameters.lease_id); - + etcdv3::Transaction txn; + txn.add_compare_version(parameters.key, CompareResult::GREATER, 0); // exists + txn.add_success_put(parameters.key, parameters.value, parameters.lease_id, + true /* for backwards compatibility */); + // backwards compatibility + txn.add_success_range(parameters.key); + txn.add_failure_range(parameters.key); response_reader = - parameters.kv_stub->AsyncTxn(&context, *transaction.txn_request, &cq_); + parameters.kv_stub->AsyncTxn(&context, *txn.txn_request, &cq_); response_reader->Finish(&reply, &status, (void*) this); } @@ -1061,7 +1076,7 @@ etcdv3::AsyncTxnResponse etcdv3::AsyncUpdateAction::ParseResponse() { txn_resp.set_error_message(status.error_message()); } else { if (reply.succeeded()) { - txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + txn_resp.ParseResponse(reply); txn_resp.set_action(etcdv3::UPDATE_ACTION); } else { txn_resp.set_error_code(etcdv3::ERROR_KEY_NOT_FOUND); @@ -1076,30 +1091,25 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params) isCancelled.store(false); stream = parameters.watch_stub->AsyncWatch(&context, &cq_, (void*) etcdv3::WATCH_CREATE); - this->watch_id = - std::chrono::high_resolution_clock::now().time_since_epoch().count(); + // The unique watcher id causes the watcher cannot be cancelled as expected + // on Ubuntu 20.04. + // + // See CI failures: + // https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/actions/runs/5561397273/jobs/10159051536 + // + // Added in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/232 + // Removed in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/pull/236 + // + // this->watch_id = + // std::chrono::high_resolution_clock::now().time_since_epoch().count(); // #ifndef NDEBUG // std::clog << "etcd-cpp-apiv3: watch_id: " << this->watch_id << std::endl; // #endif WatchRequest watch_req; WatchCreateRequest watch_create_req; - - if (!parameters.withPrefix) { - watch_create_req.set_key(parameters.key); - } else { - if (parameters.key.empty()) { - watch_create_req.set_key(etcdv3::NUL); - watch_create_req.set_range_end(etcdv3::NUL); - } else { - watch_create_req.set_key(parameters.key); - watch_create_req.set_range_end(detail::string_plus_one(parameters.key)); - } - } - if (!parameters.range_end.empty()) { - watch_create_req.set_range_end(parameters.range_end); - } - + detail::make_request_with_ranges(watch_create_req, parameters.key, + parameters.range_end, parameters.withPrefix); watch_create_req.set_prev_kv(true); watch_create_req.set_start_revision(parameters.revision); watch_create_req.set_watch_id(this->watch_id); diff --git a/src/v3/Transaction.cpp b/src/v3/Transaction.cpp index 98f86f9..a7c0c37 100644 --- a/src/v3/Transaction.cpp +++ b/src/v3/Transaction.cpp @@ -2,11 +2,7 @@ #include "proto/rpc.grpc.pb.h" -using etcdserverpb::Compare; -using etcdserverpb::DeleteRangeRequest; -using etcdserverpb::PutRequest; -using etcdserverpb::RangeRequest; -using etcdserverpb::RequestOp; +#include "etcd/v3/Action.hpp" namespace etcdv3 { @@ -28,185 +24,337 @@ etcdv3::Transaction::Transaction() { txn_request.reset(new etcdserverpb::TxnRequest{}); } -etcdv3::Transaction::Transaction(const std::string& key) : key(key) { - txn_request.reset(new etcdserverpb::TxnRequest{}); +etcdv3::Transaction::~Transaction() {} + +void etcdv3::Transaction::add_compare(std::string const& key, + CompareTarget const& target, + CompareResult const& result, + Value const& target_value, + std::string const& range_end) { + switch (target) { + case CompareTarget::VERSION: + add_compare_version(key, result, target_value.version, range_end); + break; + case CompareTarget::CREATE: + add_compare_create(key, result, target_value.create_revision, range_end); + break; + case CompareTarget::MOD: + add_compare_mod(key, result, target_value.mod_revision, range_end); + break; + case CompareTarget::VALUE: + add_compare_value(key, result, target_value.value, range_end); + break; + case CompareTarget::LEASE: + add_compare_lease(key, result, target_value.lease, range_end); + break; + default: + // ignore invalid compare target + break; + } } -void etcdv3::Transaction::reset_key(std::string const& newkey) { key = newkey; } +void etcdv3::Transaction::add_compare_version(std::string const& key, + int64_t const& version, + std::string const& range_end) { + this->add_compare_version(key, CompareResult::EQUAL, version, range_end); +} -void etcdv3::Transaction::init_compare(CompareResult result, - CompareTarget target) { - Compare* compare = txn_request->add_compare(); +void etcdv3::Transaction::add_compare_version(std::string const& key, + CompareResult const& result, + int64_t const& version, + std::string const& range_end) { + auto compare = txn_request->add_compare(); compare->set_result(detail::to_compare_result(result)); - compare->set_target(detail::to_compare_target(target)); + compare->set_target(detail::to_compare_target(CompareTarget::VERSION)); compare->set_key(key); + compare->set_version(version); + compare->set_range_end(range_end); +} - compare->set_version(0); +void etcdv3::Transaction::add_compare_create(std::string const& key, + int64_t const& create_revision, + std::string const& range_end) { + this->add_compare_create(key, CompareResult::EQUAL, create_revision, + range_end); } -void etcdv3::Transaction::init_compare(std::string const& old_value, - CompareResult result, - CompareTarget target) { - Compare* compare = txn_request->add_compare(); +void etcdv3::Transaction::add_compare_create(std::string const& key, + CompareResult const& result, + int64_t const& create_revision, + std::string const& range_end) { + auto compare = txn_request->add_compare(); compare->set_result(detail::to_compare_result(result)); - compare->set_target(detail::to_compare_target(target)); + compare->set_target(detail::to_compare_target(CompareTarget::CREATE)); compare->set_key(key); + compare->set_create_revision(create_revision); + compare->set_range_end(range_end); +} - compare->set_value(old_value); +void etcdv3::Transaction::add_compare_mod(std::string const& key, + int64_t const& mod_revision, + std::string const& range_end) { + this->add_compare_mod(key, CompareResult::EQUAL, mod_revision, range_end); } -void etcdv3::Transaction::init_compare(int64_t old_index, CompareResult result, - CompareTarget target) { - Compare* compare = txn_request->add_compare(); +void etcdv3::Transaction::add_compare_mod(std::string const& key, + CompareResult const& result, + int64_t const& mod_revision, + std::string const& range_end) { + auto compare = txn_request->add_compare(); compare->set_result(detail::to_compare_result(result)); - compare->set_target(detail::to_compare_target(target)); + compare->set_target(detail::to_compare_target(CompareTarget::MOD)); compare->set_key(key); + compare->set_mod_revision(mod_revision); + compare->set_range_end(range_end); +} - compare->set_mod_revision(old_index); +void etcdv3::Transaction::add_compare_value(std::string const& key, + std::string const& value, + std::string const& range_end) { + this->add_compare_value(key, CompareResult::EQUAL, value, range_end); } -/** - * get key on failure - */ -void etcdv3::Transaction::setup_basic_failure_operation( - std::string const& key) { - std::unique_ptr get_request(new RangeRequest()); - get_request->set_key(key); - RequestOp* req_failure = txn_request->add_failure(); - req_failure->set_allocated_request_range(get_request.release()); +void etcdv3::Transaction::add_compare_value(std::string const& key, + CompareResult const& result, + std::string const& value, + std::string const& range_end) { + auto compare = txn_request->add_compare(); + compare->set_result(detail::to_compare_result(result)); + compare->set_target(detail::to_compare_target(CompareTarget::VALUE)); + compare->set_key(key); + compare->set_value(value); + compare->set_range_end(range_end); } -/** - * get key on failure, get key before put, modify and then get updated key - */ -void etcdv3::Transaction::setup_set_failure_operation(std::string const& key, - std::string const& value, - int64_t leaseid) { - std::unique_ptr put_request(new PutRequest()); - put_request->set_key(key); - put_request->set_value(value); - put_request->set_prev_kv(true); - put_request->set_lease(leaseid); - RequestOp* req_failure = txn_request->add_failure(); - req_failure->set_allocated_request_put(put_request.release()); - - std::unique_ptr get_request(new RangeRequest()); - get_request->set_key(key); - req_failure = txn_request->add_failure(); - req_failure->set_allocated_request_range(get_request.release()); -} - -/** - * add key and then get new value of key - */ -void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, - std::string const& value, - int64_t leaseid) { - std::unique_ptr put_request(new PutRequest()); +void etcdv3::Transaction::add_compare_lease(std::string const& key, + int64_t const& lease, + std::string const& range_end) { + this->add_compare_lease(key, CompareResult::EQUAL, lease, range_end); +} + +void etcdv3::Transaction::add_compare_lease(std::string const& key, + CompareResult const& result, + int64_t const& lease, + std::string const& range_end) { + auto compare = txn_request->add_compare(); + compare->set_result(detail::to_compare_result(result)); + compare->set_target(detail::to_compare_target(CompareTarget::LEASE)); + compare->set_key(key); + compare->set_lease(lease); + compare->set_range_end(range_end); +} + +void etcdv3::Transaction::add_success_range(std::string const& key, + std::string const& range_end, + bool const recursive, + const int64_t limit) { + auto succ = txn_request->add_success(); + auto get_request = succ->mutable_request_range(); + etcdv3::detail::make_request_with_ranges(*get_request, key, range_end, + recursive); + get_request->set_limit(limit); +} + +void etcdv3::Transaction::add_success_put(std::string const& key, + std::string const& value, + int64_t const leaseid, + const bool prev_kv) { + auto succ = txn_request->add_success(); + auto put_request = succ->mutable_request_put(); put_request->set_key(key); put_request->set_value(value); - put_request->set_prev_kv(true); + put_request->set_prev_kv(prev_kv); put_request->set_lease(leaseid); - RequestOp* req_success = txn_request->add_success(); - req_success->set_allocated_request_put(put_request.release()); +} - std::unique_ptr get_request(new RangeRequest()); - get_request->set_key(key); - req_success = txn_request->add_success(); - req_success->set_allocated_request_range(get_request.release()); +void etcdv3::Transaction::add_success_delete(std::string const& key, + std::string const& range_end, + bool const recursive, + const bool prev_kv) { + auto succ = txn_request->add_success(); + auto del_request = succ->mutable_request_delete_range(); + etcdv3::detail::make_request_with_ranges(*del_request, key, range_end, + recursive); + del_request->set_prev_kv(prev_kv); } -/** - * get key value then modify and get new value - */ -void etcdv3::Transaction::setup_compare_and_swap_sequence( - std::string const& value, int64_t leaseid) { - std::unique_ptr put_request(new PutRequest()); +void etcdv3::Transaction::add_success_txn( + const std::shared_ptr txn) { + auto succ = txn_request->add_success(); + auto txn_request = succ->mutable_request_txn(); + txn_request->CopyFrom(*txn->txn_request); +} + +void etcdv3::Transaction::add_failure_range(std::string const& key, + std::string const& range_end, + bool const recursive, + const int64_t limit) { + auto fail = txn_request->add_failure(); + auto get_request = fail->mutable_request_range(); + etcdv3::detail::make_request_with_ranges(*get_request, key, range_end, + recursive); + get_request->set_limit(limit); +} + +void etcdv3::Transaction::add_failure_put(std::string const& key, + std::string const& value, + int64_t const leaseid, + const bool prev_kv) { + auto fail = txn_request->add_failure(); + auto put_request = fail->mutable_request_put(); put_request->set_key(key); put_request->set_value(value); - put_request->set_prev_kv(true); + put_request->set_prev_kv(prev_kv); put_request->set_lease(leaseid); - RequestOp* req_success = txn_request->add_success(); - req_success->set_allocated_request_put(put_request.release()); - - std::unique_ptr get_request(new RangeRequest()); - get_request->set_key(key); - req_success = txn_request->add_success(); - req_success->set_allocated_request_range(get_request.release()); -} - -/** - * get key, delete - */ -void etcdv3::Transaction::setup_delete_sequence(std::string const& key, - std::string const& range_end, - bool recursive) { - std::unique_ptr del_request(new DeleteRangeRequest()); - del_request->set_key(key); - del_request->set_prev_kv(true); - if (recursive) { - del_request->set_range_end(range_end); - } +} - RequestOp* req_success = txn_request->add_success(); - req_success->set_allocated_request_delete_range(del_request.release()); -} - -/** - * get key, delete - */ -void etcdv3::Transaction::setup_delete_failure_operation( - std::string const& key, std::string const& range_end, bool recursive) { - std::unique_ptr get_request(new RangeRequest()); - std::unique_ptr del_request(new DeleteRangeRequest()); - get_request.reset(new RangeRequest()); - get_request->set_key(key); - if (recursive) { - get_request->set_range_end(range_end); - get_request->set_sort_target( - RangeRequest::SortTarget::RangeRequest_SortTarget_KEY); - get_request->set_sort_order( - RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); - } - RequestOp* req_failure = txn_request->add_failure(); - req_failure->set_allocated_request_range(get_request.release()); +void etcdv3::Transaction::add_failure_delete(std::string const& key, + std::string const& range_end, + bool const recursive, + const bool prev_kv) { + auto fail = txn_request->add_failure(); + auto del_request = fail->mutable_request_delete_range(); + etcdv3::detail::make_request_with_ranges(*del_request, key, range_end, + recursive); + del_request->set_prev_kv(prev_kv); +} - del_request.reset(new DeleteRangeRequest()); - del_request->set_key(key); - if (recursive) { - del_request->set_range_end(range_end); - } +void etcdv3::Transaction::add_failure_txn( + const std::shared_ptr txn) { + auto fail = txn_request->add_failure(); + auto txn_request = fail->mutable_request_txn(); + txn_request->CopyFrom(*txn->txn_request); +} + +void etcdv3::Transaction::setup_compare_and_create( + std::string const& key, std::string const& prev_value, + std::string const& create_key, std::string const& value, + int64_t const leaseid) { + this->add_compare_value(key, CompareResult::EQUAL, prev_value); + this->add_success_put(create_key, value, leaseid); + this->add_failure_range(key); +} + +void etcdv3::Transaction::setup_compare_or_create(std::string const& key, + std::string const& prev_value, + std::string const& create_key, + std::string const& value, + int64_t const leaseid) { + this->add_compare_value(key, CompareResult::NOT_EQUAL, prev_value); + this->add_success_put(create_key, value, leaseid); + this->add_failure_range(key); +} + +void etcdv3::Transaction::setup_compare_and_swap(std::string const& key, + std::string const& prev_value, + std::string const& value, + int64_t const leaseid) { + this->add_compare_value(key, CompareResult::EQUAL, prev_value); + this->add_success_put(key, value, leaseid); + this->add_failure_range(key); +} + +void etcdv3::Transaction::setup_compare_or_swap(std::string const& key, + std::string const& prev_value, + std::string const& value, + int64_t const leaseid) { + this->add_compare_value(key, CompareResult::NOT_EQUAL, prev_value); + this->add_success_put(key, value, leaseid); + this->add_failure_range(key); +} + +void etcdv3::Transaction::setup_compare_and_delete( + std::string const& key, std::string const& prev_value, + std::string const& delete_key, std::string const& range_end, + const bool recursive) { + this->add_compare_value(key, CompareResult::EQUAL, prev_value); + this->add_success_delete(delete_key, range_end, recursive, + true /* for backwards compatibility */); + this->add_failure_range(key); +} + +void etcdv3::Transaction::setup_compare_or_delete(std::string const& key, + std::string const& prev_value, + std::string const& delete_key, + std::string const& range_end, + const bool recursive) { + this->add_compare_value(key, CompareResult::NOT_EQUAL, prev_value); + this->add_success_delete(delete_key, range_end, recursive, + true /* for backwards compatibility */); + this->add_failure_range(key); +} + +void etcdv3::Transaction::setup_compare_and_create( + std::string const& key, const int64_t prev_revision, + std::string const& create_key, std::string const& value, + int64_t const leaseid) { + this->add_compare_mod(key, CompareResult::EQUAL, prev_revision); + this->add_success_put(create_key, value, leaseid); + this->add_failure_range(key); +} + +void etcdv3::Transaction::setup_compare_or_create(std::string const& key, + const int64_t prev_revision, + std::string const& create_key, + std::string const& value, + int64_t const leaseid) { + this->add_compare_mod(key, CompareResult::NOT_EQUAL, prev_revision); + this->add_success_put(create_key, value, leaseid); + this->add_failure_range(key); +} + +void etcdv3::Transaction::setup_compare_and_swap(std::string const& key, + const int64_t prev_revision, + std::string const& value, + int64_t const leaseid) { + this->add_compare_mod(key, CompareResult::EQUAL, prev_revision); + this->add_success_put(key, value, leaseid); + this->add_failure_range(key); +} - req_failure = txn_request->add_failure(); - req_failure->set_allocated_request_delete_range(del_request.release()); +void etcdv3::Transaction::setup_compare_or_swap(std::string const& key, + const int64_t prev_revision, + std::string const& value, + int64_t const leaseid) { + this->add_compare_mod(key, CompareResult::NOT_EQUAL, prev_revision); + this->add_success_put(key, value, leaseid); + this->add_failure_range(key); } -void etcdv3::Transaction::setup_compare_and_delete_operation( - std::string const& key) { - std::unique_ptr del_request(new DeleteRangeRequest()); - del_request->set_key(key); - del_request->set_prev_kv(true); - RequestOp* req_success = txn_request->add_success(); - req_success->set_allocated_request_delete_range(del_request.release()); +void etcdv3::Transaction::setup_compare_and_delete( + std::string const& key, const int64_t prev_revision, + std::string const& delete_key, std::string const& range_end, + const bool recursive) { + this->add_compare_mod(key, CompareResult::EQUAL, prev_revision); + this->add_success_delete(delete_key, range_end, recursive, + true /* for backwards compatibility */); + this->add_failure_range(key); +} + +void etcdv3::Transaction::setup_compare_or_delete(std::string const& key, + const int64_t prev_revision, + std::string const& delete_key, + std::string const& range_end, + const bool recursive) { + this->add_compare_mod(key, CompareResult::NOT_EQUAL, prev_revision); + this->add_success_delete(delete_key, range_end, recursive, + true /* for backwards compatibility */); + this->add_failure_range(key); } void etcdv3::Transaction::setup_put(std::string const& key, std::string const& value) { - std::unique_ptr put_request(new PutRequest()); - put_request->set_key(key); - put_request->set_value(value); - put_request->set_prev_kv(false); - RequestOp* req_success = txn_request->add_success(); - req_success->set_allocated_request_put(put_request.release()); + this->add_success_put(key, value); } void etcdv3::Transaction::setup_delete(std::string const& key) { - std::unique_ptr del_request(new DeleteRangeRequest()); - del_request->set_key(key); - del_request->set_prev_kv(false); - - RequestOp* req_success = txn_request->add_success(); - req_success->set_allocated_request_delete_range(del_request.release()); + this->add_success_delete(key, "", false, + true /* for backwards compatibility */); } -etcdv3::Transaction::~Transaction() {} +void etcdv3::Transaction::setup_delete(std::string const& key, + std::string const& range_end, + const bool recursive) { + this->add_success_delete(key, range_end, recursive, + true /* for backwards compatibility */); +} diff --git a/src/v3/action_constants.cpp b/src/v3/action_constants.cpp index dd82f50..8bac56c 100644 --- a/src/v3/action_constants.cpp +++ b/src/v3/action_constants.cpp @@ -5,7 +5,7 @@ char const* etcdv3::COMPARESWAP_ACTION = "compareAndSwap"; char const* etcdv3::UPDATE_ACTION = "update"; char const* etcdv3::SET_ACTION = "set"; char const* etcdv3::GET_ACTION = "get"; -char const* etcdv3::PUT_ACTION = "put"; +char const* etcdv3::PUT_ACTION = "set"; // alias char const* etcdv3::DELETE_ACTION = "delete"; char const* etcdv3::COMPAREDELETE_ACTION = "compareAndDelete"; char const* etcdv3::LOCK_ACTION = "lock"; diff --git a/tst/EtcdSyncTest.cpp b/tst/EtcdSyncTest.cpp index 1dee207..e588c28 100644 --- a/tst/EtcdSyncTest.cpp +++ b/tst/EtcdSyncTest.cpp @@ -12,6 +12,9 @@ TEST_CASE("sync operations") { etcd::SyncClient etcd(etcd_url); etcd.rmdir("/test", true); + etcd::Response res; + int64_t index; + // add CHECK(0 == etcd.add("/test/key1", "42").error_code()); CHECK(etcd::ERROR_KEY_ALREADY_EXISTS == @@ -22,7 +25,9 @@ TEST_CASE("sync operations") { CHECK(0 == etcd.modify("/test/key1", "43").error_code()); CHECK(etcd::ERROR_KEY_NOT_FOUND == etcd.modify("/test/key2", "43").error_code()); // Key not found - CHECK("43" == etcd.modify("/test/key1", "42").prev_value().as_string()); + res = etcd.modify("/test/key1", "42"); + CHECK(0 == res.error_code()); + CHECK("43" == res.prev_value().as_string()); // set CHECK(0 == etcd.set("/test/key1", "43").error_code()); // overwrite @@ -60,7 +65,7 @@ TEST_CASE("sync operations") { // compare and swap etcd.set("/test/key1", "42"); - int64_t index = etcd.modify_if("/test/key1", "43", "42").index(); + index = etcd.modify_if("/test/key1", "43", "42").index(); CHECK(etcd::ERROR_COMPARE_FAILED == etcd.modify_if("/test/key1", "44", "42").error_code()); REQUIRE(etcd.modify_if("/test/key1", "44", index).is_ok()); @@ -71,16 +76,20 @@ TEST_CASE("sync operations") { etcd.set("/test/key1", "42"); CHECK(etcd::ERROR_COMPARE_FAILED == etcd.rm_if("/test/key1", "43").error_code()); - CHECK(0 == etcd.rm_if("/test/key1", "42").error_code()); + res = etcd.rm_if("/test/key1", "42"); + CHECK( + (0 == res.error_code() || etcd::ERROR_KEY_NOT_FOUND == res.error_code())); // atomic compare-and-delete based on prevIndex index = etcd.set("/test/key1", "42").index(); CHECK(etcd::ERROR_COMPARE_FAILED == etcd.rm_if("/test/key1", index - 1).error_code()); - CHECK(0 == etcd.rm_if("/test/key1", index).error_code()); + res = etcd.rm_if("/test/key1", index); + CHECK( + (0 == res.error_code() || etcd::ERROR_KEY_NOT_FOUND == res.error_code())); // leasegrant - etcd::Response res = etcd.leasegrant(60); + res = etcd.leasegrant(60); REQUIRE(res.is_ok()); CHECK(60 == res.value().ttl()); CHECK(0 < res.value().lease()); @@ -96,6 +105,7 @@ TEST_CASE("sync operations") { res = etcd.set("/test/key1", "43", leaseid); REQUIRE(0 == res.error_code()); CHECK("set" == res.action()); + res = etcd.get("/test/key1"); CHECK(leaseid == res.value().lease()); // modify with lease diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index 7739b76..74d6a5a 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -91,12 +91,13 @@ TEST_CASE("set a key") { CHECK(0 == etcd.set("/test", "42").get().error_code()); // Not a file // set with ttl - resp = etcd.set("/test/key1", "50", 10).get(); + resp = etcd.set("/test/key1", "50").get(); REQUIRE(0 == resp.error_code()); // overwrite CHECK("set" == resp.action()); CHECK("43" == resp.prev_value().as_string()); + resp = etcd.get("/test/key1").get(); CHECK("50" == resp.value().as_string()); - CHECK(0 < resp.value().lease()); + CHECK(0 == resp.value().lease()); } TEST_CASE("atomic compare-and-swap") { @@ -115,11 +116,11 @@ TEST_CASE("atomic compare-and-swap") { CHECK(etcd::ERROR_COMPARE_FAILED == res.error_code()); CHECK("etcd-cpp-apiv3: compare failed" == res.error_message()); - // modify fails the second time + // modify fails on non-existing keys res = etcd.modify_if("/test/key222", "44", "42").get(); CHECK(!res.is_ok()); - CHECK(etcd::ERROR_KEY_NOT_FOUND == res.error_code()); - CHECK("etcd-cpp-apiv3: key not found" == res.error_message()); + CHECK(etcd::ERROR_COMPARE_FAILED == res.error_code()); + CHECK("etcd-cpp-apiv3: compare failed" == res.error_message()); } TEST_CASE("delete a value") { @@ -144,10 +145,10 @@ TEST_CASE("delete a value") { CHECK(modify_index == resp.prev_value().modified_index()); CHECK(version == resp.prev_value().version()); CHECK("delete" == resp.action()); - CHECK(modify_index == resp.value().modified_index()); CHECK(create_index == resp.value().created_index()); + CHECK(modify_index == resp.value().modified_index()); CHECK(version == resp.value().version()); - CHECK("" == resp.value().as_string()); + CHECK("43" == resp.value().as_string()); CHECK("/test/key1" == resp.value().key()); } @@ -558,6 +559,8 @@ TEST_CASE("lease grant") { res = etcd.set("/test/key1", "43", leaseid).get(); REQUIRE(0 == res.error_code()); // overwrite CHECK("set" == res.action()); + res = etcd.get("/test/key1").get(); + REQUIRE(0 == res.error_code()); // overwrite CHECK(leaseid == res.value().lease()); // change with lease id @@ -566,6 +569,8 @@ TEST_CASE("lease grant") { res = etcd.set("/test/key1", "43", leaseid).get(); REQUIRE(0 == res.error_code()); // overwrite CHECK("set" == res.action()); + res = etcd.get("/test/key1").get(); + REQUIRE(0 == res.error_code()); // overwrite CHECK(leaseid == res.value().lease()); // failure to attach lease id diff --git a/tst/TransactionTest.cpp b/tst/TransactionTest.cpp index 942a699..ee46a5c 100644 --- a/tst/TransactionTest.cpp +++ b/tst/TransactionTest.cpp @@ -50,13 +50,8 @@ TEST_CASE("add a new key") { etcdv3::Transaction txn; // setup the conditions - txn.reset_key("/test/x1"); - txn.init_compare("1", etcdv3::CompareResult::EQUAL, - etcdv3::CompareTarget::VALUE); - - txn.reset_key("/test/x2"); - txn.init_compare("2", etcdv3::CompareResult::EQUAL, - etcdv3::CompareTarget::VALUE); + txn.add_compare_value("/test/x1", "1"); + txn.add_compare_value("/test/x2", "2"); txn.setup_put("/test/x1", "111"); txn.setup_delete("/test/x2"); @@ -84,6 +79,51 @@ TEST_CASE("add a new key") { } } +TEST_CASE("fetch & add") { + etcd::Client etcd(etcd_url); + etcd.rmdir("/test", true).wait(); + + etcd.set("/test/key", "0").wait(); + + auto fetch_and_add = [](etcd::Client& client, + std::string const& key) -> void { + auto value = stoi(client.get(key).get().value().as_string()); + while (true) { + auto txn = etcdv3::Transaction(); + txn.setup_compare_and_swap(key, std::to_string(value), + std::to_string(value + 1)); + etcd::Response resp = client.txn(txn).get(); + if (resp.is_ok()) { + break; + } + value = stoi(resp.value().as_string()); + } + }; + + // run 1000 times + const size_t rounds = 100; + std::atomic_size_t counter(0); + + std::vector threads; + for (size_t i = 0; i < 10; ++i) { + threads.emplace_back([&]() { + while (counter.fetch_add(1) < rounds) { + fetch_and_add(etcd, "/test/key"); + } + }); + } + for (auto& thr : threads) { + thr.join(); + } + + // check the value + { + etcd::Response resp = etcd.get("/test/key").get(); + REQUIRE(0 == resp.error_code()); + CHECK(resp.value().as_string() == std::to_string(rounds)); + } +} + TEST_CASE("cleanup") { etcd::Client etcd(etcd_url); REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());