From 01b523e4bf80ee336ca2420bd0494be0a2f5c3da Mon Sep 17 00:00:00 2001 From: Chris Hogan Date: Tue, 16 Mar 2021 14:38:38 -0500 Subject: [PATCH 1/2] Remove duplication from Put --- src/api/bucket.cc | 47 ++++++++++------------------------------------- src/api/bucket.h | 3 --- 2 files changed, 10 insertions(+), 40 deletions(-) diff --git a/src/api/bucket.cc b/src/api/bucket.cc index ee6f88847..903a879a4 100644 --- a/src/api/bucket.cc +++ b/src/api/bucket.cc @@ -46,49 +46,22 @@ bool Bucket::IsValid() const { Status Bucket::Put(const std::string &name, const u8 *data, size_t size, Context &ctx) { - Status ret; - - if (IsBlobNameTooLong(name)) { - // TODO(chogan): @errorhandling - ret = BLOB_NAME_TOO_LONG; - LOG(ERROR) << ret.Msg(); - return ret; - } + Status result; if (size > 0 && nullptr == data) { - ret = INVALID_BLOB; - LOG(ERROR) << ret.Msg(); - return ret; + result = INVALID_BLOB; + LOG(ERROR) << result.Msg(); } - if (IsValid()) { - std::vector sizes(1, size); - std::vector schemas; - HERMES_BEGIN_TIMED_BLOCK("CalculatePlacement"); - ret = CalculatePlacement(&hermes_->context_, &hermes_->rpc_, sizes, schemas, - ctx); - HERMES_END_TIMED_BLOCK(); - - if (ret.Succeeded()) { - std::vector names(1, name); - std::vector> blobs(1); - blobs[0].resize(size); - // TODO(chogan): Create a PreallocatedMemory allocator for std::vector so - // that a single-blob-Put doesn't perform a copy - memcpy(blobs[0].data(), data, size); - ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries); - } else { - // TODO(chogan): @errorhandling No space left or contraints unsatisfiable. - LOG(ERROR) << ret.Msg(); - return ret; - } - } else { - ret = INVALID_BUCKET; - LOG(ERROR) << ret.Msg(); - return ret; + if (result.Succeeded()) { + std::vector names{name}; + // TODO(chogan): Create a PreallocatedMemory allocator for std::vector so + // that a single-blob-Put doesn't perform a copy + std::vector blobs{Blob{data, data + size}}; + result = Put(names, blobs, ctx); } - return ret; + return result; } size_t Bucket::GetBlobSize(Arena *arena, const std::string &name, diff --git a/src/api/bucket.h b/src/api/bucket.h index e64aa8d2a..f0358e680 100644 --- a/src/api/bucket.h +++ b/src/api/bucket.h @@ -193,7 +193,6 @@ Status Bucket::Put(std::vector &names, for (auto &name : names) { if (IsBlobNameTooLong(name)) { - // TODO(chogan): @errorhandling ret = BLOB_NAME_TOO_LONG; LOG(ERROR) << ret.Msg(); return ret; @@ -221,12 +220,10 @@ Status Bucket::Put(std::vector &names, if (ret.Succeeded()) { ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries); } else { - // TODO(chogan): @errorhandling No space left or contraints unsatisfiable. LOG(ERROR) << ret.Msg(); return ret; } } else { - // TODO(chogan): @errorhandling ret = INVALID_BUCKET; LOG(ERROR) << ret.Msg(); return ret; From bfef763cfd8e3d6f1cfb56c63ddc6ff34d56143d Mon Sep 17 00:00:00 2001 From: Chris Hogan Date: Wed, 17 Mar 2021 14:27:11 -0500 Subject: [PATCH 2/2] Expand data placement targets If data placement fails with local targets, we then try again with a neighborhood target list. --- src/data_placement_engine.cc | 84 ++++++++++++++++++++---------- src/metadata_management.cc | 44 ++++++++++------ src/metadata_management.h | 11 ++++ src/metadata_management_internal.h | 5 +- src/metadata_storage_stb_ds.cc | 12 ----- src/rpc_thallium.cc | 5 +- 6 files changed, 100 insertions(+), 61 deletions(-) diff --git a/src/data_placement_engine.cc b/src/data_placement_engine.cc index dbc948f25..32c7305c2 100644 --- a/src/data_placement_engine.cc +++ b/src/data_placement_engine.cc @@ -361,45 +361,73 @@ PlacementSchema AggregateBlobSchema(PlacementSchema &schema) { return result; } +enum Topology { + Topology_Local, + Topology_Neighborhood, + Topology_Global, + + Topology_Count +}; + Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc, std::vector &blob_sizes, std::vector &output, const api::Context &api_context) { - (void)rpc; std::vector output_tmp; Status result; - // TODO(chogan): For now we just look at the node level targets as the default - // path. Eventually we will need the ability to escalate to neighborhoods, and - // the entire cluster. - - // TODO(chogan): @optimization We can avoid the copy here when getting local - // targets by just getting a pointer and length. I went with a vector just to - // make the interface nicer when we need neighborhood or global targets. - std::vector targets = LocalGetNodeTargets(context); - std::vector node_state = GetRemainingNodeCapacities(context, targets); - - switch (api_context.policy) { - // TODO(KIMMY): check device capacity against blob size - case api::PlacementPolicy::kRandom: { - std::multimap ordered_cap; - for (size_t i = 0; i < node_state.size(); ++i) { - ordered_cap.insert(std::pair(node_state[i], targets[i])); + // NOTE(chogan): Start with local targets and gradually expand the target list + // until the placement succeeds + for (int i = 0; i < Topology_Count; ++i) { + std::vector targets; + + switch (i) { + case Topology_Local: { + // TODO(chogan): @optimization We can avoid the copy here when getting + // local targets by just getting a pointer and length. + targets = LocalGetNodeTargets(context); + break; + } + case Topology_Neighborhood: { + targets = GetNeighborhoodTargets(context, rpc); + break; + } + case Topology_Global: { + // TODO(chogan): GetGlobalTargets(context, rpc); + break; } - - result = RandomPlacement(blob_sizes, ordered_cap, output_tmp); - break; } - case api::PlacementPolicy::kRoundRobin: { - result = RoundRobinPlacement(blob_sizes, node_state, - output_tmp, targets); - break; + + std::vector node_state = + GetRemainingTargetCapacities(context, rpc, targets); + + switch (api_context.policy) { + // TODO(KIMMY): check device capacity against blob size + case api::PlacementPolicy::kRandom: { + std::multimap ordered_cap; + for (size_t i = 0; i < node_state.size(); ++i) { + ordered_cap.insert(std::pair(node_state[i], + targets[i])); + } + + result = RandomPlacement(blob_sizes, ordered_cap, output_tmp); + break; + } + case api::PlacementPolicy::kRoundRobin: { + result = RoundRobinPlacement(blob_sizes, node_state, + output_tmp, targets); + break; + } + case api::PlacementPolicy::kMinimizeIoTime: { + std::vector bandwidths = GetBandwidths(context); + + result = MinimizeIoTimePlacement(blob_sizes, node_state, bandwidths, + targets, output_tmp); + break; + } } - case api::PlacementPolicy::kMinimizeIoTime: { - std::vector bandwidths = GetBandwidths(context); - result = MinimizeIoTimePlacement(blob_sizes, node_state, bandwidths, - targets, output_tmp); + if (!result.Failed()) { break; } } diff --git a/src/metadata_management.cc b/src/metadata_management.cc index 922ae9935..e95cdb0ec 100644 --- a/src/metadata_management.cc +++ b/src/metadata_management.cc @@ -807,27 +807,13 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc, } } -u64 LocalGetRemainingCapacity(SharedMemoryContext *context, TargetID id) { +u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id) { Target *target = GetTargetFromId(context, id); u64 result = target->remaining_space.load(); return result; } -u64 GetRemainingCapacity(SharedMemoryContext *context, RpcContext *rpc, - TargetID id) { - u32 target_node = id.bits.node_id; - - u64 result = 0; - if (target_node == rpc->node_id) { - result = LocalGetRemainingCapacity(context, id); - } else { - result = RpcCall(rpc, target_node, "RemoteGetRemainingCapacity", id); - } - - return result; -} - SystemViewState *GetLocalSystemViewState(MetadataManager *mdm) { SystemViewState *result = (SystemViewState *)((u8 *)mdm + mdm->system_view_state_offset); @@ -1185,4 +1171,32 @@ std::vector GetNeighborhoodTargets(SharedMemoryContext *context, return result; } + +u64 GetRemainingTargetCapacity(SharedMemoryContext *context, RpcContext *rpc, + TargetID target_id) { + u64 result = 0; + u32 target_node = target_id.bits.node_id; + + if (target_node == rpc->node_id) { + result = LocalGetRemainingTargetCapacity(context, target_id); + } else { + result = RpcCall(rpc, target_node, "RemoteGetRemainingTargetCapacity", + target_id); + } + + return result; +} + +std::vector +GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc, + const std::vector &targets) { + std::vector result(targets.size()); + + for (size_t i = 0; i < targets.size(); ++i) { + result[i] = GetRemainingTargetCapacity(context, rpc, targets[i]); + } + + return result; +} + } // namespace hermes diff --git a/src/metadata_management.h b/src/metadata_management.h index 9f8a502d7..ad9c31ed3 100644 --- a/src/metadata_management.h +++ b/src/metadata_management.h @@ -267,6 +267,17 @@ bool IsVBucketNameTooLong(const std::string &name); TargetID FindTargetIdFromDeviceId(const std::vector &targets, DeviceID device_id); +/** + * + */ +std::vector GetNeighborhoodTargets(SharedMemoryContext *context, + RpcContext *rpc); +/** + * + */ +std::vector +GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc, + const std::vector &targets); /** * */ diff --git a/src/metadata_management_internal.h b/src/metadata_management_internal.h index 113fd7635..b4f1b0f33 100644 --- a/src/metadata_management_internal.h +++ b/src/metadata_management_internal.h @@ -72,7 +72,7 @@ u64 LocalGet(MetadataManager *mdm, const char *key, MapType map_type); void LocalPut(MetadataManager *mdm, const char *key, u64 val, MapType map_type); void LocalDelete(MetadataManager *mdm, const char *key, MapType map_type); -u64 LocalGetRemainingCapacity(SharedMemoryContext *context, TargetID id); +u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id); void LocalUpdateGlobalSystemViewState(SharedMemoryContext *context, std::vector adjustments); SystemViewState *GetLocalSystemViewState(SharedMemoryContext *context); @@ -90,9 +90,6 @@ void StartGlobalSystemViewStateUpdateThread(SharedMemoryContext *context, void InitMetadataStorage(SharedMemoryContext *context, MetadataManager *mdm, Arena *arena, Config *config); -std::vector -GetRemainingNodeCapacities(SharedMemoryContext *context, - const std::vector &targets); std::string GetSwapFilename(MetadataManager *mdm, u32 node_id); std::vector LocalGetBlobIds(SharedMemoryContext *context, BucketID bucket_id); diff --git a/src/metadata_storage_stb_ds.cc b/src/metadata_storage_stb_ds.cc index 08d291b62..674ddb528 100644 --- a/src/metadata_storage_stb_ds.cc +++ b/src/metadata_storage_stb_ds.cc @@ -583,18 +583,6 @@ size_t GetStoredMapSize(MetadataManager *mdm, MapType map_type) { return result; } -std::vector -GetRemainingNodeCapacities(SharedMemoryContext *context, - const std::vector &targets) { - std::vector result(targets.size()); - - for (size_t i = 0; i < targets.size(); ++i) { - result[i] = LocalGetRemainingCapacity(context, targets[i]); - } - - return result; -} - u32 HashStringForStorage(MetadataManager *mdm, RpcContext *rpc, const char *str) { int result = diff --git a/src/rpc_thallium.cc b/src/rpc_thallium.cc index 001615a53..872a83ac7 100644 --- a/src/rpc_thallium.cc +++ b/src/rpc_thallium.cc @@ -293,7 +293,7 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, }; auto rpc_get_remaining_capacity = [context](const request &req, TargetID id) { - u64 result = LocalGetRemainingCapacity(context, id); + u64 result = LocalGetRemainingTargetCapacity(context, id); req.respond(result); }; @@ -371,7 +371,8 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, rpc_server->define("RemoteDecrementRefcount", rpc_decrement_refcount_bucket); rpc_server->define("RemoteDecrementRefcountVBucket", rpc_decrement_refcount_vbucket); - rpc_server->define("RemoteGetRemainingCapacity", rpc_get_remaining_capacity); + rpc_server->define("RemoteGetRemainingTargetCapacity", + rpc_get_remaining_capacity); rpc_server->define("RemoteUpdateGlobalSystemViewState", rpc_update_global_system_view_state); rpc_server->define("RemoteGetGlobalDeviceCapacities",