Skip to content

Commit

Permalink
Expand data placement targets
Browse files Browse the repository at this point in the history
If data placement fails with local targets, we then try again with a
neighborhood target list.
  • Loading branch information
ChristopherHogan committed Mar 18, 2021
1 parent 01b523e commit bfef763
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 61 deletions.
84 changes: 56 additions & 28 deletions src/data_placement_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,45 +361,73 @@ PlacementSchema AggregateBlobSchema(PlacementSchema &schema) {
return result;
}

enum Topology {
Topology_Local,
Topology_Neighborhood,
Topology_Global,

Topology_Count
};

Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc,
std::vector<size_t> &blob_sizes,
std::vector<PlacementSchema> &output,
const api::Context &api_context) {
(void)rpc;
std::vector<PlacementSchema> output_tmp;
Status result;

// TODO(chogan): For now we just look at the node level targets as the default
// path. Eventually we will need the ability to escalate to neighborhoods, and
// the entire cluster.

// TODO(chogan): @optimization We can avoid the copy here when getting local
// targets by just getting a pointer and length. I went with a vector just to
// make the interface nicer when we need neighborhood or global targets.
std::vector<TargetID> targets = LocalGetNodeTargets(context);
std::vector<u64> node_state = GetRemainingNodeCapacities(context, targets);

switch (api_context.policy) {
// TODO(KIMMY): check device capacity against blob size
case api::PlacementPolicy::kRandom: {
std::multimap<u64, TargetID> ordered_cap;
for (size_t i = 0; i < node_state.size(); ++i) {
ordered_cap.insert(std::pair<u64, TargetID>(node_state[i], targets[i]));
// NOTE(chogan): Start with local targets and gradually expand the target list
// until the placement succeeds
for (int i = 0; i < Topology_Count; ++i) {
std::vector<TargetID> targets;

switch (i) {
case Topology_Local: {
// TODO(chogan): @optimization We can avoid the copy here when getting
// local targets by just getting a pointer and length.
targets = LocalGetNodeTargets(context);
break;
}
case Topology_Neighborhood: {
targets = GetNeighborhoodTargets(context, rpc);
break;
}
case Topology_Global: {
// TODO(chogan): GetGlobalTargets(context, rpc);
break;
}

result = RandomPlacement(blob_sizes, ordered_cap, output_tmp);
break;
}
case api::PlacementPolicy::kRoundRobin: {
result = RoundRobinPlacement(blob_sizes, node_state,
output_tmp, targets);
break;

std::vector<u64> node_state =
GetRemainingTargetCapacities(context, rpc, targets);

switch (api_context.policy) {
// TODO(KIMMY): check device capacity against blob size
case api::PlacementPolicy::kRandom: {
std::multimap<u64, TargetID> ordered_cap;
for (size_t i = 0; i < node_state.size(); ++i) {
ordered_cap.insert(std::pair<u64, TargetID>(node_state[i],
targets[i]));
}

result = RandomPlacement(blob_sizes, ordered_cap, output_tmp);
break;
}
case api::PlacementPolicy::kRoundRobin: {
result = RoundRobinPlacement(blob_sizes, node_state,
output_tmp, targets);
break;
}
case api::PlacementPolicy::kMinimizeIoTime: {
std::vector<f32> bandwidths = GetBandwidths(context);

result = MinimizeIoTimePlacement(blob_sizes, node_state, bandwidths,
targets, output_tmp);
break;
}
}
case api::PlacementPolicy::kMinimizeIoTime: {
std::vector<f32> bandwidths = GetBandwidths(context);

result = MinimizeIoTimePlacement(blob_sizes, node_state, bandwidths,
targets, output_tmp);
if (!result.Failed()) {
break;
}
}
Expand Down
44 changes: 29 additions & 15 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -807,27 +807,13 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
}
}

u64 LocalGetRemainingCapacity(SharedMemoryContext *context, TargetID id) {
u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id) {
Target *target = GetTargetFromId(context, id);
u64 result = target->remaining_space.load();

return result;
}

u64 GetRemainingCapacity(SharedMemoryContext *context, RpcContext *rpc,
TargetID id) {
u32 target_node = id.bits.node_id;

u64 result = 0;
if (target_node == rpc->node_id) {
result = LocalGetRemainingCapacity(context, id);
} else {
result = RpcCall<u64>(rpc, target_node, "RemoteGetRemainingCapacity", id);
}

return result;
}

SystemViewState *GetLocalSystemViewState(MetadataManager *mdm) {
SystemViewState *result =
(SystemViewState *)((u8 *)mdm + mdm->system_view_state_offset);
Expand Down Expand Up @@ -1185,4 +1171,32 @@ std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,

return result;
}

u64 GetRemainingTargetCapacity(SharedMemoryContext *context, RpcContext *rpc,
TargetID target_id) {
u64 result = 0;
u32 target_node = target_id.bits.node_id;

if (target_node == rpc->node_id) {
result = LocalGetRemainingTargetCapacity(context, target_id);
} else {
result = RpcCall<u64>(rpc, target_node, "RemoteGetRemainingTargetCapacity",
target_id);
}

return result;
}

std::vector<u64>
GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc,
const std::vector<TargetID> &targets) {
std::vector<u64> result(targets.size());

for (size_t i = 0; i < targets.size(); ++i) {
result[i] = GetRemainingTargetCapacity(context, rpc, targets[i]);
}

return result;
}

} // namespace hermes
11 changes: 11 additions & 0 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ bool IsVBucketNameTooLong(const std::string &name);
TargetID FindTargetIdFromDeviceId(const std::vector<TargetID> &targets,
DeviceID device_id);

/**
*
*/
std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,
RpcContext *rpc);
/**
*
*/
std::vector<u64>
GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc,
const std::vector<TargetID> &targets);
/**
*
*/
Expand Down
5 changes: 1 addition & 4 deletions src/metadata_management_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ u64 LocalGet(MetadataManager *mdm, const char *key, MapType map_type);
void LocalPut(MetadataManager *mdm, const char *key, u64 val, MapType map_type);
void LocalDelete(MetadataManager *mdm, const char *key, MapType map_type);

u64 LocalGetRemainingCapacity(SharedMemoryContext *context, TargetID id);
u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id);
void LocalUpdateGlobalSystemViewState(SharedMemoryContext *context,
std::vector<i64> adjustments);
SystemViewState *GetLocalSystemViewState(SharedMemoryContext *context);
Expand All @@ -90,9 +90,6 @@ void StartGlobalSystemViewStateUpdateThread(SharedMemoryContext *context,
void InitMetadataStorage(SharedMemoryContext *context, MetadataManager *mdm,
Arena *arena, Config *config);

std::vector<u64>
GetRemainingNodeCapacities(SharedMemoryContext *context,
const std::vector<TargetID> &targets);
std::string GetSwapFilename(MetadataManager *mdm, u32 node_id);
std::vector<BlobID> LocalGetBlobIds(SharedMemoryContext *context,
BucketID bucket_id);
Expand Down
12 changes: 0 additions & 12 deletions src/metadata_storage_stb_ds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,18 +583,6 @@ size_t GetStoredMapSize(MetadataManager *mdm, MapType map_type) {
return result;
}

std::vector<u64>
GetRemainingNodeCapacities(SharedMemoryContext *context,
const std::vector<TargetID> &targets) {
std::vector<u64> result(targets.size());

for (size_t i = 0; i < targets.size(); ++i) {
result[i] = LocalGetRemainingCapacity(context, targets[i]);
}

return result;
}

u32 HashStringForStorage(MetadataManager *mdm, RpcContext *rpc,
const char *str) {
int result =
Expand Down
5 changes: 3 additions & 2 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
};

auto rpc_get_remaining_capacity = [context](const request &req, TargetID id) {
u64 result = LocalGetRemainingCapacity(context, id);
u64 result = LocalGetRemainingTargetCapacity(context, id);

req.respond(result);
};
Expand Down Expand Up @@ -371,7 +371,8 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
rpc_server->define("RemoteDecrementRefcount", rpc_decrement_refcount_bucket);
rpc_server->define("RemoteDecrementRefcountVBucket",
rpc_decrement_refcount_vbucket);
rpc_server->define("RemoteGetRemainingCapacity", rpc_get_remaining_capacity);
rpc_server->define("RemoteGetRemainingTargetCapacity",
rpc_get_remaining_capacity);
rpc_server->define("RemoteUpdateGlobalSystemViewState",
rpc_update_global_system_view_state);
rpc_server->define("RemoteGetGlobalDeviceCapacities",
Expand Down

0 comments on commit bfef763

Please sign in to comment.