Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use neighborhood Targets if a Put fails to place to local Targets #163

Merged
merged 2 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 10 additions & 37 deletions src/api/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,49 +46,22 @@ bool Bucket::IsValid() const {

Status Bucket::Put(const std::string &name, const u8 *data, size_t size,
Context &ctx) {
Status ret;

if (IsBlobNameTooLong(name)) {
// TODO(chogan): @errorhandling
ret = BLOB_NAME_TOO_LONG;
LOG(ERROR) << ret.Msg();
return ret;
}
Status result;

if (size > 0 && nullptr == data) {
ret = INVALID_BLOB;
LOG(ERROR) << ret.Msg();
return ret;
result = INVALID_BLOB;
LOG(ERROR) << result.Msg();
}

if (IsValid()) {
std::vector<size_t> sizes(1, size);
std::vector<PlacementSchema> schemas;
HERMES_BEGIN_TIMED_BLOCK("CalculatePlacement");
ret = CalculatePlacement(&hermes_->context_, &hermes_->rpc_, sizes, schemas,
ctx);
HERMES_END_TIMED_BLOCK();

if (ret.Succeeded()) {
std::vector<std::string> names(1, name);
std::vector<std::vector<u8>> blobs(1);
blobs[0].resize(size);
// TODO(chogan): Create a PreallocatedMemory allocator for std::vector so
// that a single-blob-Put doesn't perform a copy
memcpy(blobs[0].data(), data, size);
ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries);
} else {
// TODO(chogan): @errorhandling No space left or contraints unsatisfiable.
LOG(ERROR) << ret.Msg();
return ret;
}
} else {
ret = INVALID_BUCKET;
LOG(ERROR) << ret.Msg();
return ret;
if (result.Succeeded()) {
std::vector<std::string> names{name};
// TODO(chogan): Create a PreallocatedMemory allocator for std::vector so
// that a single-blob-Put doesn't perform a copy
std::vector<Blob> blobs{Blob{data, data + size}};
result = Put(names, blobs, ctx);
}

return ret;
return result;
}

size_t Bucket::GetBlobSize(Arena *arena, const std::string &name,
Expand Down
3 changes: 0 additions & 3 deletions src/api/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ Status Bucket::Put(std::vector<std::string> &names,

for (auto &name : names) {
if (IsBlobNameTooLong(name)) {
// TODO(chogan): @errorhandling
ret = BLOB_NAME_TOO_LONG;
LOG(ERROR) << ret.Msg();
return ret;
Expand Down Expand Up @@ -221,12 +220,10 @@ Status Bucket::Put(std::vector<std::string> &names,
if (ret.Succeeded()) {
ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries);
} else {
// TODO(chogan): @errorhandling No space left or contraints unsatisfiable.
LOG(ERROR) << ret.Msg();
return ret;
}
} else {
// TODO(chogan): @errorhandling
ret = INVALID_BUCKET;
LOG(ERROR) << ret.Msg();
return ret;
Expand Down
84 changes: 56 additions & 28 deletions src/data_placement_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,45 +361,73 @@ PlacementSchema AggregateBlobSchema(PlacementSchema &schema) {
return result;
}

enum Topology {
Topology_Local,
Topology_Neighborhood,
Topology_Global,

Topology_Count
};

Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc,
std::vector<size_t> &blob_sizes,
std::vector<PlacementSchema> &output,
const api::Context &api_context) {
(void)rpc;
std::vector<PlacementSchema> output_tmp;
Status result;

// TODO(chogan): For now we just look at the node level targets as the default
// path. Eventually we will need the ability to escalate to neighborhoods, and
// the entire cluster.

// TODO(chogan): @optimization We can avoid the copy here when getting local
// targets by just getting a pointer and length. I went with a vector just to
// make the interface nicer when we need neighborhood or global targets.
std::vector<TargetID> targets = LocalGetNodeTargets(context);
std::vector<u64> node_state = GetRemainingNodeCapacities(context, targets);

switch (api_context.policy) {
// TODO(KIMMY): check device capacity against blob size
case api::PlacementPolicy::kRandom: {
std::multimap<u64, TargetID> ordered_cap;
for (size_t i = 0; i < node_state.size(); ++i) {
ordered_cap.insert(std::pair<u64, TargetID>(node_state[i], targets[i]));
// NOTE(chogan): Start with local targets and gradually expand the target list
// until the placement succeeds
for (int i = 0; i < Topology_Count; ++i) {
std::vector<TargetID> targets;

switch (i) {
case Topology_Local: {
// TODO(chogan): @optimization We can avoid the copy here when getting
// local targets by just getting a pointer and length.
targets = LocalGetNodeTargets(context);
break;
}
case Topology_Neighborhood: {
targets = GetNeighborhoodTargets(context, rpc);
break;
}
case Topology_Global: {
// TODO(chogan): GetGlobalTargets(context, rpc);
break;
}

result = RandomPlacement(blob_sizes, ordered_cap, output_tmp);
break;
}
case api::PlacementPolicy::kRoundRobin: {
result = RoundRobinPlacement(blob_sizes, node_state,
output_tmp, targets);
break;

std::vector<u64> node_state =
GetRemainingTargetCapacities(context, rpc, targets);

switch (api_context.policy) {
// TODO(KIMMY): check device capacity against blob size
case api::PlacementPolicy::kRandom: {
std::multimap<u64, TargetID> ordered_cap;
for (size_t i = 0; i < node_state.size(); ++i) {
ordered_cap.insert(std::pair<u64, TargetID>(node_state[i],
targets[i]));
}

result = RandomPlacement(blob_sizes, ordered_cap, output_tmp);
break;
}
case api::PlacementPolicy::kRoundRobin: {
result = RoundRobinPlacement(blob_sizes, node_state,
output_tmp, targets);
break;
}
case api::PlacementPolicy::kMinimizeIoTime: {
std::vector<f32> bandwidths = GetBandwidths(context);

result = MinimizeIoTimePlacement(blob_sizes, node_state, bandwidths,
targets, output_tmp);
break;
}
}
case api::PlacementPolicy::kMinimizeIoTime: {
std::vector<f32> bandwidths = GetBandwidths(context);

result = MinimizeIoTimePlacement(blob_sizes, node_state, bandwidths,
targets, output_tmp);
if (!result.Failed()) {
break;
}
}
Expand Down
44 changes: 29 additions & 15 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -807,27 +807,13 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
}
}

u64 LocalGetRemainingCapacity(SharedMemoryContext *context, TargetID id) {
u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id) {
Target *target = GetTargetFromId(context, id);
u64 result = target->remaining_space.load();

return result;
}

u64 GetRemainingCapacity(SharedMemoryContext *context, RpcContext *rpc,
TargetID id) {
u32 target_node = id.bits.node_id;

u64 result = 0;
if (target_node == rpc->node_id) {
result = LocalGetRemainingCapacity(context, id);
} else {
result = RpcCall<u64>(rpc, target_node, "RemoteGetRemainingCapacity", id);
}

return result;
}

SystemViewState *GetLocalSystemViewState(MetadataManager *mdm) {
SystemViewState *result =
(SystemViewState *)((u8 *)mdm + mdm->system_view_state_offset);
Expand Down Expand Up @@ -1185,4 +1171,32 @@ std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,

return result;
}

u64 GetRemainingTargetCapacity(SharedMemoryContext *context, RpcContext *rpc,
TargetID target_id) {
u64 result = 0;
u32 target_node = target_id.bits.node_id;

if (target_node == rpc->node_id) {
result = LocalGetRemainingTargetCapacity(context, target_id);
} else {
result = RpcCall<u64>(rpc, target_node, "RemoteGetRemainingTargetCapacity",
target_id);
}

return result;
}

std::vector<u64>
GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc,
const std::vector<TargetID> &targets) {
std::vector<u64> result(targets.size());

for (size_t i = 0; i < targets.size(); ++i) {
result[i] = GetRemainingTargetCapacity(context, rpc, targets[i]);
}

return result;
}

} // namespace hermes
11 changes: 11 additions & 0 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ bool IsVBucketNameTooLong(const std::string &name);
TargetID FindTargetIdFromDeviceId(const std::vector<TargetID> &targets,
DeviceID device_id);

/**
*
*/
std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,
RpcContext *rpc);
/**
*
*/
std::vector<u64>
GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc,
const std::vector<TargetID> &targets);
/**
*
*/
Expand Down
5 changes: 1 addition & 4 deletions src/metadata_management_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ u64 LocalGet(MetadataManager *mdm, const char *key, MapType map_type);
void LocalPut(MetadataManager *mdm, const char *key, u64 val, MapType map_type);
void LocalDelete(MetadataManager *mdm, const char *key, MapType map_type);

u64 LocalGetRemainingCapacity(SharedMemoryContext *context, TargetID id);
u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id);
void LocalUpdateGlobalSystemViewState(SharedMemoryContext *context,
std::vector<i64> adjustments);
SystemViewState *GetLocalSystemViewState(SharedMemoryContext *context);
Expand All @@ -90,9 +90,6 @@ void StartGlobalSystemViewStateUpdateThread(SharedMemoryContext *context,
void InitMetadataStorage(SharedMemoryContext *context, MetadataManager *mdm,
Arena *arena, Config *config);

std::vector<u64>
GetRemainingNodeCapacities(SharedMemoryContext *context,
const std::vector<TargetID> &targets);
std::string GetSwapFilename(MetadataManager *mdm, u32 node_id);
std::vector<BlobID> LocalGetBlobIds(SharedMemoryContext *context,
BucketID bucket_id);
Expand Down
12 changes: 0 additions & 12 deletions src/metadata_storage_stb_ds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,18 +583,6 @@ size_t GetStoredMapSize(MetadataManager *mdm, MapType map_type) {
return result;
}

std::vector<u64>
GetRemainingNodeCapacities(SharedMemoryContext *context,
const std::vector<TargetID> &targets) {
std::vector<u64> result(targets.size());

for (size_t i = 0; i < targets.size(); ++i) {
result[i] = LocalGetRemainingCapacity(context, targets[i]);
}

return result;
}

u32 HashStringForStorage(MetadataManager *mdm, RpcContext *rpc,
const char *str) {
int result =
Expand Down
5 changes: 3 additions & 2 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
};

auto rpc_get_remaining_capacity = [context](const request &req, TargetID id) {
u64 result = LocalGetRemainingCapacity(context, id);
u64 result = LocalGetRemainingTargetCapacity(context, id);

req.respond(result);
};
Expand Down Expand Up @@ -371,7 +371,8 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
rpc_server->define("RemoteDecrementRefcount", rpc_decrement_refcount_bucket);
rpc_server->define("RemoteDecrementRefcountVBucket",
rpc_decrement_refcount_vbucket);
rpc_server->define("RemoteGetRemainingCapacity", rpc_get_remaining_capacity);
rpc_server->define("RemoteGetRemainingTargetCapacity",
rpc_get_remaining_capacity);
rpc_server->define("RemoteUpdateGlobalSystemViewState",
rpc_update_global_system_view_state);
rpc_server->define("RemoteGetGlobalDeviceCapacities",
Expand Down