From 3f762a9acba32cd9b957c866690619a30e0d2aba Mon Sep 17 00:00:00 2001 From: Chris Hogan Date: Wed, 31 Mar 2021 16:11:11 -0500 Subject: [PATCH 1/5] Add default_placement_policy config option --- src/api/hermes.h | 6 ------ src/config_parser.cc | 18 ++++++++++++++++++ src/hermes_types.h | 8 ++++++++ src/utils.cc | 2 ++ test/config_parser_test.cc | 3 +++ test/data/hermes.conf | 3 +++ 6 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/api/hermes.h b/src/api/hermes.h index b04f195bc..6946d2054 100644 --- a/src/api/hermes.h +++ b/src/api/hermes.h @@ -83,12 +83,6 @@ class VBucket; class Bucket; -enum class PlacementPolicy { - kRandom, - kRoundRobin, - kMinimizeIoTime, -}; - struct Context { static int default_buffer_organizer_retries; diff --git a/src/config_parser.cc b/src/config_parser.cc index 9a61e541f..eb046b314 100644 --- a/src/config_parser.cc +++ b/src/config_parser.cc @@ -80,6 +80,7 @@ enum ConfigVariable { ConfigVariable_BufferOrganizerPort, ConfigVariable_RpcHostNumberRange, ConfigVariable_RpcNumThreads, + ConfigVariable_PlacementPolicy, ConfigVariable_Count }; @@ -115,6 +116,7 @@ static const char *kConfigVariableStrings[ConfigVariable_Count] = { "buffer_organizer_port", "rpc_host_number_range", "rpc_num_threads", + "placement_policy", }; struct Token { @@ -817,6 +819,22 @@ void ParseTokens(TokenList *tokens, Config *config) { config->rpc_num_threads = ParseInt(&tok); break; } + case ConfigVariable_PlacementPolicy: { + std::string policy = ParseString(&tok); + + if (policy == "MinimizeIoTime") { + config->default_placement_policy = + api::PlacementPolicy::kMinimizeIoTime; + } else if (policy == "Random") { + config->default_placement_policy = api::PlacementPolicy::kRandom; + } else if (policy == "RoundRobin") { + config->default_placement_policy = api::PlacementPolicy::kRoundRobin; + } else { + LOG(FATAL) << "Unknown default_placement_policy: '" << policy << "'" + << std::endl; + } + break; + } default: { HERMES_INVALID_CODE_PATH; break; diff --git a/src/hermes_types.h b/src/hermes_types.h index 1faf5088b..f3763c5d9 100644 --- a/src/hermes_types.h +++ b/src/hermes_types.h @@ -43,6 +43,13 @@ typedef u16 DeviceID; namespace api { typedef std::vector Blob; + +enum class PlacementPolicy { + kRandom, + kRoundRobin, + kMinimizeIoTime, +}; + } // namespace api // TODO(chogan): These constants impose limits on the number of slabs, devices, @@ -160,6 +167,7 @@ struct Config { int buffer_organizer_port; int rpc_host_number_range[2]; int rpc_num_threads; + api::PlacementPolicy default_placement_policy; /** A base name for the BufferPool shared memory segement. Hermes appends the * value of the USER environment variable to this string. diff --git a/src/utils.cc b/src/utils.cc index b0038d473..a441c77e6 100644 --- a/src/utils.cc +++ b/src/utils.cc @@ -110,6 +110,8 @@ void InitDefaultConfig(Config *config) { config->buffer_pool_shmem_name[i] = buffer_pool_shmem_name[i]; } config->buffer_pool_shmem_name[shmem_name_size] = '\0'; + + config->default_placement_policy = api::PlacementPolicy::kMinimizeIoTime; } namespace testing { diff --git a/test/config_parser_test.cc b/test/config_parser_test.cc index 53ceff9e2..4237a973e 100644 --- a/test/config_parser_test.cc +++ b/test/config_parser_test.cc @@ -93,5 +93,8 @@ int main(int argc, char **argv) { Assert(strncmp(config.buffer_pool_shmem_name, expected_shm_name, sizeof(expected_shm_name)) == 0); + Assert(config.default_placement_policy == + hermes::api::PlacementPolicy::kMinimizeIoTime); + return 0; } diff --git a/test/data/hermes.conf b/test/data/hermes.conf index b3bb65e9e..4ae0194ed 100644 --- a/test/data/hermes.conf +++ b/test/data/hermes.conf @@ -98,3 +98,6 @@ rpc_num_threads = 1; # The shared memory prefix for the hermes shared memory segment. A user name # will be automatically appended. buffer_pool_shmem_name = "/hermes_buffer_pool_"; + +# Choose Random, RoundRobin, or MinimizeIotime +default_placement_policy = "MinimizeIoTime"; \ No newline at end of file From 09345b98bb8e17787d9850cee75933d8001b8ac8 Mon Sep 17 00:00:00 2001 From: Chris Hogan Date: Wed, 31 Mar 2021 16:27:24 -0500 Subject: [PATCH 2/5] Pass default policy to Bucket --- src/api/bucket.cc | 3 +-- src/api/bucket.h | 6 ++++++ src/api/hermes.cc | 1 + src/api/hermes.h | 3 ++- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/api/bucket.cc b/src/api/bucket.cc index 903a879a4..fd243dc82 100644 --- a/src/api/bucket.cc +++ b/src/api/bucket.cc @@ -25,8 +25,7 @@ namespace api { Bucket::Bucket(const std::string &initial_name, const std::shared_ptr &h, Context ctx) - : name_(initial_name), hermes_(h) { - (void)ctx; + : name_(initial_name), hermes_(h), placement_policy(ctx.policy) { if (IsBucketNameTooLong(name_)) { id_.as_int = 0; diff --git a/src/api/bucket.h b/src/api/bucket.h index f0358e680..c8674b3ef 100644 --- a/src/api/bucket.h +++ b/src/api/bucket.h @@ -38,6 +38,12 @@ class Bucket { /** internal Hermes object owned by Bucket */ std::shared_ptr hermes_; + /** + * The data PlacementPolicy this Bucket will use. Defaults to the value set in + * the default_placement_policy parameter of the configuration file. + */ + PlacementPolicy placement_policy; + // TODO(chogan): Think about the Big Three Bucket() : name_(""), id_{0, 0}, hermes_(nullptr) { LOG(INFO) << "Create NULL Bucket " << std::endl; diff --git a/src/api/hermes.cc b/src/api/hermes.cc index f7b771cad..a8de2bedf 100644 --- a/src/api/hermes.cc +++ b/src/api/hermes.cc @@ -297,6 +297,7 @@ std::shared_ptr InitHermes(Config *config, bool is_daemon, api::Context::default_buffer_organizer_retries = config->num_buffer_organizer_retries; + api::Context::default_placement_policy = config->default_placement_policy; InitRpcClients(&result->rpc_); diff --git a/src/api/hermes.h b/src/api/hermes.h index 6946d2054..b483990ef 100644 --- a/src/api/hermes.h +++ b/src/api/hermes.h @@ -85,11 +85,12 @@ class Bucket; struct Context { static int default_buffer_organizer_retries; + static PlacementPolicy default_placement_policy; PlacementPolicy policy; int buffer_organizer_retries; - Context() : policy(PlacementPolicy::kRoundRobin), + Context() : policy(default_placement_policy), buffer_organizer_retries(default_buffer_organizer_retries) {} }; From c6efce5374233476f122dbf1fb7c9d57c7eb01de Mon Sep 17 00:00:00 2001 From: Chris Hogan Date: Thu, 1 Apr 2021 16:24:02 -0500 Subject: [PATCH 3/5] WIP: Trouble serializing a Context --- src/api/bucket.cc | 8 +++++- src/api/bucket.h | 55 ++++++++++++++++++++++++++++++---------- src/api/hermes.cc | 1 + src/api/hermes.h | 17 ------------- src/buffer_organizer.cc | 6 ++--- src/buffer_pool.cc | 10 +++----- src/buffer_pool.h | 6 +++-- src/config_parser.cc | 2 +- src/hermes_types.h | 11 ++++++++ src/rpc_thallium.cc | 14 +++++----- src/rpc_thallium.h | 6 +++++ test/buffer_pool_test.cc | 14 +++++----- test/data/hermes.conf | 4 +-- 13 files changed, 96 insertions(+), 58 deletions(-) diff --git a/src/api/bucket.cc b/src/api/bucket.cc index fd243dc82..e386183f2 100644 --- a/src/api/bucket.cc +++ b/src/api/bucket.cc @@ -25,7 +25,7 @@ namespace api { Bucket::Bucket(const std::string &initial_name, const std::shared_ptr &h, Context ctx) - : name_(initial_name), hermes_(h), placement_policy(ctx.policy) { + : name_(initial_name), hermes_(h), ctx_(ctx) { if (IsBucketNameTooLong(name_)) { id_.as_int = 0; @@ -63,6 +63,12 @@ Status Bucket::Put(const std::string &name, const u8 *data, size_t size, return result; } +Status Bucket::Put(const std::string &name, const u8 *data, size_t size) { + Status result = Put(name, data, size, ctx_); + + return result; +} + size_t Bucket::GetBlobSize(Arena *arena, const std::string &name, Context &ctx) { (void)ctx; diff --git a/src/api/bucket.h b/src/api/bucket.h index c8674b3ef..3136c252e 100644 --- a/src/api/bucket.h +++ b/src/api/bucket.h @@ -37,12 +37,8 @@ class Bucket { public: /** internal Hermes object owned by Bucket */ std::shared_ptr hermes_; - - /** - * The data PlacementPolicy this Bucket will use. Defaults to the value set in - * the default_placement_policy parameter of the configuration file. - */ - PlacementPolicy placement_policy; + /** This Bucket's Context. */ + Context ctx_; // TODO(chogan): Think about the Big Three Bucket() : name_(""), id_{0, 0}, hermes_(nullptr) { @@ -78,6 +74,12 @@ class Bucket { template Status Put(const std::string &name, const std::vector &data, Context &ctx); + /** + * + */ + template + Status Put(const std::string &name, const std::vector &data); + /** * \brief Puts a blob to a bucket * @@ -100,10 +102,7 @@ class Bucket { /** * */ - template - Status PlaceBlobs(std::vector &schemas, - const std::vector> &blobs, - const std::vector &names, int retries); + Status Put(const std::string &name, const u8 *data, size_t size); /** * @@ -112,6 +111,21 @@ class Bucket { Status Put(std::vector &names, std::vector> &blobs, Context &ctx); + /** + * + */ + template + Status Put(std::vector &names, + std::vector> &blobs); + + /** + * + */ + template + Status PlaceBlobs(std::vector &schemas, + const std::vector> &blobs, + const std::vector &names, Context &ctx); + /** Get the size in bytes of the Blob referred to by `name` */ size_t GetBlobSize(Arena *arena, const std::string &name, Context &ctx); @@ -172,10 +186,17 @@ Status Bucket::Put(const std::string &name, const std::vector &data, return result; } +template +Status Bucket::Put(const std::string &name, const std::vector &data) { + Status result = Put(name, data, ctx_); + + return result; +} + template Status Bucket::PlaceBlobs(std::vector &schemas, const std::vector> &blobs, - const std::vector &names, int retries) { + const std::vector &names, Context &ctx) { Status result; for (size_t i = 0; i < schemas.size(); ++i) { @@ -186,12 +207,20 @@ Status Bucket::PlaceBlobs(std::vector &schemas, LOG(INFO) << "Attaching blob '" << names[i] << "' to Bucket '" << name_ << "'" << std::endl; result = PlaceBlob(&hermes_->context_, &hermes_->rpc_, schema, blob, - names[i], id_, retries); + names[i], id_, ctx); } return result; } +template +Status Bucket::Put(std::vector &names, + std::vector> &blobs) { + Status result = Put(names, blobs, ctx_); + + return result; +} + template Status Bucket::Put(std::vector &names, std::vector> &blobs, Context &ctx) { @@ -224,7 +253,7 @@ Status Bucket::Put(std::vector &names, HERMES_END_TIMED_BLOCK(); if (ret.Succeeded()) { - ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries); + ret = PlaceBlobs(schemas, blobs, names, ctx); } else { LOG(ERROR) << ret.Msg(); return ret; diff --git a/src/api/hermes.cc b/src/api/hermes.cc index a8de2bedf..828766b39 100644 --- a/src/api/hermes.cc +++ b/src/api/hermes.cc @@ -28,6 +28,7 @@ namespace hermes { namespace api { int Context::default_buffer_organizer_retries; +PlacementPolicy Context::default_placement_policy; Status RenameBucket(const std::string &old_name, const std::string &new_name, diff --git a/src/api/hermes.h b/src/api/hermes.h index b483990ef..e25be48f5 100644 --- a/src/api/hermes.h +++ b/src/api/hermes.h @@ -83,23 +83,6 @@ class VBucket; class Bucket; -struct Context { - static int default_buffer_organizer_retries; - static PlacementPolicy default_placement_policy; - - PlacementPolicy policy; - int buffer_organizer_retries; - - Context() : policy(default_placement_policy), - buffer_organizer_retries(default_buffer_organizer_retries) {} -}; - -struct TraitTag{}; -typedef ID THnd; - -struct BucketTag{}; -typedef ID BHnd; - /** rename a bucket referred to by name only */ Status RenameBucket(const std::string &old_name, const std::string &new_name, diff --git a/src/buffer_organizer.cc b/src/buffer_organizer.cc index 933d94eb1..4d09c1ce7 100644 --- a/src/buffer_organizer.cc +++ b/src/buffer_organizer.cc @@ -16,11 +16,11 @@ namespace hermes { int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc, - SwapBlob swap_blob, const std::string &name) { + SwapBlob swap_blob, const std::string &name, + api::Context &ctx) { int result = 0; std::vector schemas; std::vector sizes(1, swap_blob.size); - api::Context ctx; Status ret = CalculatePlacement(context, rpc, sizes, schemas, ctx); if (ret.Succeeded()) { @@ -35,7 +35,7 @@ int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc, } ret = PlaceBlob(context, rpc, schemas[0], blob, name.c_str(), - swap_blob.bucket_id, 0, true); + swap_blob.bucket_id, ctx, true); } else { // TODO(chogan): @errorhandling result = 1; diff --git a/src/buffer_pool.cc b/src/buffer_pool.cc index 099f3cb55..52ce933bd 100644 --- a/src/buffer_pool.cc +++ b/src/buffer_pool.cc @@ -1645,9 +1645,9 @@ size_t ReadFromSwap(SharedMemoryContext *context, Blob blob, } api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc, - PlacementSchema &schema, Blob blob, const std::string &name, - BucketID bucket_id, int retries, - bool called_from_buffer_organizer) { + PlacementSchema &schema, Blob blob, + const std::string &name, BucketID bucket_id, + api::Context &ctx, bool called_from_buffer_organizer) { api::Status result; if (ContainsBlob(context, rpc, bucket_id, name) @@ -1671,8 +1671,6 @@ api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc, AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids); } else { if (called_from_buffer_organizer) { - // TODO(chogan): @errorhandling The BufferOrganizer failed to place a blob - // from swap space into the hierarchy. result = PLACE_SWAP_BLOB_TO_BUF_FAILED; LOG(ERROR) << result.Msg(); } else { @@ -1680,7 +1678,7 @@ api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc, blob.size); result = BLOB_IN_SWAP_PLACE; LOG(WARNING) << result.Msg(); - TriggerBufferOrganizer(rpc, kPlaceInHierarchy, name, swap_blob, retries); + TriggerBufferOrganizer(rpc, kPlaceInHierarchy, name, swap_blob, ctx); } } diff --git a/src/buffer_pool.h b/src/buffer_pool.h index 2c4efb25c..0852d678c 100644 --- a/src/buffer_pool.h +++ b/src/buffer_pool.h @@ -485,10 +485,12 @@ std::vector GetBandwidths(SharedMemoryContext *context); u32 GetBufferSize(SharedMemoryContext *context, RpcContext *rpc, BufferID id); bool BufferIsByteAddressable(SharedMemoryContext *context, BufferID id); int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc, - SwapBlob swap_blob, const std::string &blob_name); + SwapBlob swap_blob, const std::string &blob_name, + api::Context ctx); api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc, PlacementSchema &schema, Blob blob, - const std::string &name, BucketID bucket_id, int retries, + const std::string &name, BucketID bucket_id, + api::Context ctx, bool called_from_buffer_organizer = false); api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc, Arena *arena, BucketID bucket_id, diff --git a/src/config_parser.cc b/src/config_parser.cc index eb046b314..e79fec1e6 100644 --- a/src/config_parser.cc +++ b/src/config_parser.cc @@ -116,7 +116,7 @@ static const char *kConfigVariableStrings[ConfigVariable_Count] = { "buffer_organizer_port", "rpc_host_number_range", "rpc_num_threads", - "placement_policy", + "default_placement_policy", }; struct Token { diff --git a/src/hermes_types.h b/src/hermes_types.h index f3763c5d9..c4d363bad 100644 --- a/src/hermes_types.h +++ b/src/hermes_types.h @@ -50,6 +50,17 @@ enum class PlacementPolicy { kMinimizeIoTime, }; +struct Context { + static int default_buffer_organizer_retries; + static PlacementPolicy default_placement_policy; + + PlacementPolicy policy; + int buffer_organizer_retries; + + Context() : policy(default_placement_policy), + buffer_organizer_retries(default_buffer_organizer_retries) {} +}; + } // namespace api // TODO(chogan): These constants impose limits on the number of slabs, devices, diff --git a/src/rpc_thallium.cc b/src/rpc_thallium.cc index 872a83ac7..0d67e79a8 100644 --- a/src/rpc_thallium.cc +++ b/src/rpc_thallium.cc @@ -404,13 +404,13 @@ void StartBufferOrganizer(SharedMemoryContext *context, RpcContext *rpc, auto rpc_place_in_hierarchy = [context, rpc](const tl::request &req, SwapBlob swap_blob, const std::string name, - int retries) { + api::Context &ctx) { (void)req; - for (int i = 0; i < retries; ++i) { + for (int i = 0; i < ctx.buffer_organizer_retries; ++i) { LOG(INFO) << "Buffer Organizer placing blob '" << name - << "' in hierarchy. Attempt " << i + 1 << " of " << retries - << std::endl; - int result = PlaceInHierarchy(context, rpc, swap_blob, name); + << "' in hierarchy. Attempt " << i + 1 << " of " + << ctx.buffer_organizer_retries << std::endl; + int result = PlaceInHierarchy(context, rpc, swap_blob, name, ctx); if (result == 0) { break; } else { @@ -450,7 +450,7 @@ void StartBufferOrganizer(SharedMemoryContext *context, RpcContext *rpc, void TriggerBufferOrganizer(RpcContext *rpc, const char *func_name, const std::string &blob_name, SwapBlob swap_blob, - int retries) { + api::Context &ctx) { std::string server_name = GetServerName(rpc, rpc->node_id, true); std::string protocol = GetProtocol(rpc); tl::engine engine(protocol, THALLIUM_CLIENT_MODE, true); @@ -458,7 +458,7 @@ void TriggerBufferOrganizer(RpcContext *rpc, const char *func_name, tl::endpoint server = engine.lookup(server_name); remote_proc.disable_response(); // TODO(chogan): Templatize? - remote_proc.on(server)(swap_blob, blob_name, retries); + remote_proc.on(server)(swap_blob, blob_name, ctx); } void StartGlobalSystemViewStateUpdateThread(SharedMemoryContext *context, diff --git a/src/rpc_thallium.h b/src/rpc_thallium.h index 35bb30453..624c16884 100644 --- a/src/rpc_thallium.h +++ b/src/rpc_thallium.h @@ -117,6 +117,12 @@ void serialize(A &ar, SwapBlob &swap_blob) { ar & swap_blob.bucket_id; } +template +void serialize(A &ar, api::Context &ctx) { + ar & ctx.buffer_organizer_retries; + ar & (int)ctx.policy; +} + #ifndef THALLIUM_USE_CEREAL /** * Lets Thallium know how to serialize a MapType. diff --git a/test/buffer_pool_test.cc b/test/buffer_pool_test.cc index b0bfcd172..d3678d360 100644 --- a/test/buffer_pool_test.cc +++ b/test/buffer_pool_test.cc @@ -318,12 +318,14 @@ int main(int argc, char **argv) { return 1; } - TestGetBuffers(); - TestGetBandwidths(); - TestBlobOverwrite(); - TestSwap(); - TestBufferOrganizer(); - TestBufferingFileCorrectness(); +#define TEST(name) LOG(INFO) << "Running " << #name << std::endl; name(); + + // TEST(TestGetBuffers); + // TEST(TestGetBandwidths); + // TEST(TestBlobOverwrite); + TEST(TestSwap); + // TEST(TestBufferOrganizer); + // TEST(TestBufferingFileCorrectness); MPI_Finalize(); diff --git a/test/data/hermes.conf b/test/data/hermes.conf index 4ae0194ed..717c87ee7 100644 --- a/test/data/hermes.conf +++ b/test/data/hermes.conf @@ -99,5 +99,5 @@ rpc_num_threads = 1; # will be automatically appended. buffer_pool_shmem_name = "/hermes_buffer_pool_"; -# Choose Random, RoundRobin, or MinimizeIotime -default_placement_policy = "MinimizeIoTime"; \ No newline at end of file +# Choose Random, RoundRobin, or MinimizeIoTime +default_placement_policy = "MinimizeIoTime"; From 69fdd7d100d714c8ad11c3b883feb1d1ee0c7b75 Mon Sep 17 00:00:00 2001 From: Chris Hogan Date: Fri, 2 Apr 2021 11:23:13 -0500 Subject: [PATCH 4/5] Fix serialization of Context. Don't run DPE on 0 targets --- src/api/bucket.cc | 1 - src/buffer_pool.h | 4 ++-- src/data_placement_engine.cc | 4 ++++ src/hermes_status.h | 28 ++++++++++++++-------------- src/rpc_thallium.cc | 2 +- src/rpc_thallium.h | 25 ++++++++++++++++++------- test/buffer_pool_test.cc | 24 +++++++++++------------- 7 files changed, 50 insertions(+), 38 deletions(-) diff --git a/src/api/bucket.cc b/src/api/bucket.cc index e386183f2..1c83693c6 100644 --- a/src/api/bucket.cc +++ b/src/api/bucket.cc @@ -26,7 +26,6 @@ namespace api { Bucket::Bucket(const std::string &initial_name, const std::shared_ptr &h, Context ctx) : name_(initial_name), hermes_(h), ctx_(ctx) { - if (IsBucketNameTooLong(name_)) { id_.as_int = 0; throw std::length_error("Bucket name is too long: " + diff --git a/src/buffer_pool.h b/src/buffer_pool.h index 0852d678c..4a863ce79 100644 --- a/src/buffer_pool.h +++ b/src/buffer_pool.h @@ -486,11 +486,11 @@ u32 GetBufferSize(SharedMemoryContext *context, RpcContext *rpc, BufferID id); bool BufferIsByteAddressable(SharedMemoryContext *context, BufferID id); int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc, SwapBlob swap_blob, const std::string &blob_name, - api::Context ctx); + api::Context &ctx); api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc, PlacementSchema &schema, Blob blob, const std::string &name, BucketID bucket_id, - api::Context ctx, + api::Context &ctx, bool called_from_buffer_organizer = false); api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc, Arena *arena, BucketID bucket_id, diff --git a/src/data_placement_engine.cc b/src/data_placement_engine.cc index 32c7305c2..961eada99 100644 --- a/src/data_placement_engine.cc +++ b/src/data_placement_engine.cc @@ -398,6 +398,10 @@ Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc, } } + if (targets.size() == 0) { + continue; + } + std::vector node_state = GetRemainingTargetCapacities(context, rpc, targets); diff --git a/src/hermes_status.h b/src/hermes_status.h index a380955ab..2a00c90c1 100644 --- a/src/hermes_status.h +++ b/src/hermes_status.h @@ -5,29 +5,29 @@ namespace hermes { #define RETURN_CODES(X) \ - X(2, HERMES_OK_MAX, R"(Maximum supported HERMES success with \ - caveat.)") \ + X(2, HERMES_OK_MAX, "Maximum supported HERMES success with " \ + "caveat.") \ X(1, BLOB_IN_SWAP_PLACE, "Blob is placed into swap place.") \ X(0, HERMES_SUCCESS, "No error!") \ X(-1, INVALID_BUCKET, "Bucket ID is invalid.") \ X(-2, BUCKET_NAME_TOO_LONG, "Bucket name exceeds max length (256).") \ X(-3, VBUCKET_NAME_TOO_LONG, "VBucket name exceeds max length (256).") \ X(-4, BLOB_NAME_TOO_LONG, "Blob name exceeds max length (64).") \ - X(-5, INVALID_BLOB, R"(Blob data is invalid. Please check \ - address and size.)") \ + X(-5, INVALID_BLOB, "Blob data is invalid. Please check " \ + "address and size.") \ X(-6, BLOB_NOT_IN_BUCKET, "Blob is not in this bucket.") \ X(-7, BLOB_NOT_LINKED_TO_VBUCKET, "Blob is not linked to this vbucket.") \ X(-8, TRAIT_NOT_VALID, "Selected trait is not valid.") \ X(-9, TRAIT_EXISTS_ALREADY, "Selected trait already exists.") \ X(-10, OFFSET_MAP_EMPTY, "Offset_map is empty.") \ X(-11, BLOB_NOT_LINKED_IN_MAP, "Map doesnt have the blob linked.") \ - X(-12, BUCKET_IN_USE, R"(Bucket cannot be destroyed because its \ - reference count is greater than 1.)") \ - X(-13, DPE_RANDOM_FOUND_NO_TGT, R"(DPE random found no target with enough \ - space for blob.)") \ + X(-12, BUCKET_IN_USE, "Bucket cannot be destroyed because its "\ + "reference count is greater than 1.") \ + X(-13, DPE_RANDOM_FOUND_NO_TGT, "DPE random found no target with enough " \ + "space for blob.") \ X(-14, DPE_GET_INVALID_TGT, "DPE got an invalid target ID.") \ - X(-15, DPE_ORTOOLS_NO_SOLUTION, R"(DPE or-tools does not find a solution \ - with provided constraints.)") \ + X(-15, DPE_ORTOOLS_NO_SOLUTION, "DPE or-tools does not find a solution" \ + "with provided constraints.") \ X(-16, DPE_PLACEMENTSCHEMA_EMPTY, "DPE PlacementSchema is empty.") \ X(-17, READ_BLOB_FAILED, "Read blob from its id failed.") \ X(-18, STDIO_OFFSET_ERROR, "Offset invalid or fseek() failed.") \ @@ -36,10 +36,10 @@ namespace hermes { X(-21, STDIO_FCLOSE_FAILED, "Func fclose failed. System err msg: ") \ X(-22, FCLOSE_FAILED, "Func fclose failed. System err msg: ") \ X(-23, INVALID_FILE, "File is not valid.") \ - X(-24, PLACE_SWAP_BLOB_TO_BUF_FAILED, R"(Place blob from swap space into \ - buffering system failed.)") \ - X(-25, DPE_RR_FIND_TGT_FAILED, R"(DPE round-robin failed to find \ - proper target for blob.)") \ + X(-24, PLACE_SWAP_BLOB_TO_BUF_FAILED, "Place blob from swap space into " \ + "buffering system failed.") \ + X(-25, DPE_RR_FIND_TGT_FAILED, "DPE round-robin failed to find " \ + "proper target for blob.") \ X(-26, HERMES_ERROR_MAX, "Maximum supported HERMES errors.") \ #define RETURN_ENUM(ID, NAME, TEXT) NAME = ID, diff --git a/src/rpc_thallium.cc b/src/rpc_thallium.cc index 0d67e79a8..7aeea2626 100644 --- a/src/rpc_thallium.cc +++ b/src/rpc_thallium.cc @@ -404,7 +404,7 @@ void StartBufferOrganizer(SharedMemoryContext *context, RpcContext *rpc, auto rpc_place_in_hierarchy = [context, rpc](const tl::request &req, SwapBlob swap_blob, const std::string name, - api::Context &ctx) { + api::Context ctx) { (void)req; for (int i = 0; i < ctx.buffer_organizer_retries; ++i) { LOG(INFO) << "Buffer Organizer placing blob '" << name diff --git a/src/rpc_thallium.h b/src/rpc_thallium.h index 624c16884..c5cbd0a08 100644 --- a/src/rpc_thallium.h +++ b/src/rpc_thallium.h @@ -117,12 +117,6 @@ void serialize(A &ar, SwapBlob &swap_blob) { ar & swap_blob.bucket_id; } -template -void serialize(A &ar, api::Context &ctx) { - ar & ctx.buffer_organizer_retries; - ar & (int)ctx.policy; -} - #ifndef THALLIUM_USE_CEREAL /** * Lets Thallium know how to serialize a MapType. @@ -152,7 +146,24 @@ void load(A &ar, MapType &map_type) { ar.read(&val, 1); map_type = (MapType)val; } -#endif +#endif // #ifndef THALLIUM_USE_CEREAL + +namespace api { +template +void save(A &ar, api::Context &ctx) { + ar.write(&ctx.buffer_organizer_retries, 1); + int val = (int)ctx.policy; + ar.write(&val, 1); +} + +template +void load(A &ar, api::Context &ctx) { + int val = 0; + ar.read(&ctx.buffer_organizer_retries, 1); + ar.read(&val, 1); + ctx.policy = (PlacementPolicy)val; +} +} // namespace api std::string GetRpcAddress(Config *config, const std::string &host_number, int port); diff --git a/test/buffer_pool_test.cc b/test/buffer_pool_test.cc index d3678d360..293d54b03 100644 --- a/test/buffer_pool_test.cc +++ b/test/buffer_pool_test.cc @@ -114,7 +114,8 @@ static void TestGetBandwidths() { } static hapi::Status ForceBlobToSwap(hapi::Hermes *hermes, hermes::u64 id, - hapi::Blob &blob, const char *blob_name) { + hapi::Blob &blob, const char *blob_name, + hapi::Context ctx) { using namespace hermes; // NOLINT(*) PlacementSchema schema; schema.push_back({blob.size(), testing::DefaultRamTargetId()}); @@ -123,9 +124,8 @@ static hapi::Status ForceBlobToSwap(hapi::Hermes *hermes, hermes::u64 id, internal_blob.size = blob.size(); hermes::BucketID bucket_id = {}; bucket_id.as_int = id; - int retries = 3; hapi::Status result = PlaceBlob(&hermes->context_, &hermes->rpc_, schema, - internal_blob, blob_name, bucket_id, retries); + internal_blob, blob_name, bucket_id, ctx); return result; } @@ -191,7 +191,7 @@ static void TestSwap() { hapi::Blob data(data_size, 'x'); std::string blob_name("swap_blob"); hapi::Status status = ForceBlobToSwap(hermes.get(), bucket.GetId(), data, - blob_name.c_str()); + blob_name.c_str(), ctx); Assert(status == hermes::BLOB_IN_SWAP_PLACE); // NOTE(chogan): The Blob is in the swap space, but the API behaves as normal. Assert(bucket.ContainsBlob(blob_name)); @@ -228,7 +228,7 @@ static void TestBufferOrganizer() { hapi::Blob data2(KILOBYTES(4), 'y'); std::string blob2_name("bo_blob2"); status = ForceBlobToSwap(hermes.get(), bucket.GetId(), data2, - blob2_name.c_str()); + blob2_name.c_str(), ctx); Assert(status == hermes::BLOB_IN_SWAP_PLACE); Assert(bucket.BlobIsInSwap(blob2_name)); @@ -318,14 +318,12 @@ int main(int argc, char **argv) { return 1; } -#define TEST(name) LOG(INFO) << "Running " << #name << std::endl; name(); - - // TEST(TestGetBuffers); - // TEST(TestGetBandwidths); - // TEST(TestBlobOverwrite); - TEST(TestSwap); - // TEST(TestBufferOrganizer); - // TEST(TestBufferingFileCorrectness); + TestGetBuffers; + TestGetBandwidths; + TestBlobOverwrite; + TestSwap; + TestBufferOrganizer; + TestBufferingFileCorrectness; MPI_Finalize(); From e8b4c0d0f361a46b71920f8b2747b2b6827ed3fd Mon Sep 17 00:00:00 2001 From: Chris Hogan Date: Fri, 2 Apr 2021 11:26:33 -0500 Subject: [PATCH 5/5] Actually call test functions --- test/buffer_pool_test.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/buffer_pool_test.cc b/test/buffer_pool_test.cc index 293d54b03..42fa09321 100644 --- a/test/buffer_pool_test.cc +++ b/test/buffer_pool_test.cc @@ -318,12 +318,12 @@ int main(int argc, char **argv) { return 1; } - TestGetBuffers; - TestGetBandwidths; - TestBlobOverwrite; - TestSwap; - TestBufferOrganizer; - TestBufferingFileCorrectness; + TestGetBuffers(); + TestGetBandwidths(); + TestBlobOverwrite(); + TestSwap(); + TestBufferOrganizer(); + TestBufferingFileCorrectness(); MPI_Finalize();