Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pre-built list of neighborhood Targets to each node #118

Merged
merged 5 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/api/hermes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ std::shared_ptr<api::Hermes> InitHermes(Config *config, bool is_daemon,

InitRpcClients(&result->rpc_);

// NOTE(chogan): Can only initialize the neighborhood Targets once the RPC
// clients have been initialized.
InitNeighborhoodTargets(&result->context_, &result->rpc_);

return result;
}

Expand Down
2 changes: 1 addition & 1 deletion src/data_placement_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc,
// TODO(chogan): @optimization We can avoid the copy here when getting local
// targets by just getting a pointer and length. I went with a vector just to
// make the interface nicer when we need neighborhood or global targets.
std::vector<TargetID> targets = GetNodeTargets(context);
std::vector<TargetID> targets = LocalGetNodeTargets(context);
std::vector<u64> node_state = GetRemainingNodeCapacities(context, targets);

switch (api_context.policy) {
Expand Down
76 changes: 76 additions & 0 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -975,4 +975,80 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
}
}

u32 GetRelativeNodeId(RpcContext *rpc, int offset) {
int result = rpc->node_id + offset;
assert(result >= 0);
assert(result <= (int)(rpc->num_nodes + 1));

if (result > (int)rpc->num_nodes) {
result = 1;
} else if (result == 0) {
result = rpc->num_nodes;
}

return (u32)result;
}

u32 GetNextNode(RpcContext *rpc) {
u32 result = GetRelativeNodeId(rpc, 1);

return result;
}

u32 GetPreviousNode(RpcContext *rpc) {
u32 result = GetRelativeNodeId(rpc, -1);

return result;
}

std::vector<TargetID> GetNodeTargets(SharedMemoryContext *context,
RpcContext *rpc, u32 target_node) {
std::vector<TargetID> result;

if (target_node == rpc->node_id) {
result = LocalGetNodeTargets(context);
} else {
result = RpcCall<std::vector<TargetID>>(rpc, target_node,
"RemoteGetNodeTargets");
}

return result;
}

std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,
RpcContext *rpc) {
// TODO(chogan): Inform the concept of "neighborhood" with a network topology
// NOTE(chogan): For now, each node has 2 neighbors, NodeID-1 and NodeID+1
// (wrapping around for nodes 1 and N).
std::vector<TargetID> result;

switch (rpc->num_nodes) {
case 1: {
// No neighbors
break;
}
case 2: {
// One neighbor
u32 next_node = GetNextNode(rpc);
result = GetNodeTargets(context, rpc, next_node);
break;
}
default: {
// Two neighbors
u32 next_node = GetNextNode(rpc);
std::vector<TargetID> next_targets = GetNodeTargets(context, rpc,
next_node);
u32 prev_node = GetPreviousNode(rpc);
std::vector<TargetID> prev_targets = GetNodeTargets(context, rpc,
prev_node);

result.reserve(next_targets.size() + prev_targets.size());
result.insert(result.end(), next_targets.begin(), next_targets.end());
result.insert(result.end(), prev_targets.begin(), prev_targets.end());
}
}

return result;
}

} // namespace hermes
6 changes: 6 additions & 0 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ struct MetadataManager {
size_t map_seed;

IdList node_targets;
IdList neighborhood_targets;

u32 system_view_state_update_interval_ms;
u32 global_system_view_state_node_id;
Expand All @@ -139,6 +140,11 @@ struct RpcContext;
void InitMetadataManager(MetadataManager *mdm, Arena *arena, Config *config,
int node_id);

/**
*
*/
void InitNeighborhoodTargets(SharedMemoryContext *context, RpcContext *rpc);

/**
*
*/
Expand Down
3 changes: 3 additions & 0 deletions src/metadata_management_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ GetRemainingNodeCapacities(SharedMemoryContext *context,
std::string GetSwapFilename(MetadataManager *mdm, u32 node_id);
std::vector<BlobID> LocalGetBlobIds(SharedMemoryContext *context,
BucketID bucket_id);
std::vector<TargetID> LocalGetNodeTargets(SharedMemoryContext *context);
u32 GetNextNode(RpcContext *rpc);
u32 GetPreviousNode(RpcContext *rpc);

} // namespace hermes
#endif // HERMES_METADATA_MANAGEMENT_INTERNAL_H_
7 changes: 6 additions & 1 deletion src/metadata_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ size_t GetStoredMapSize(MetadataManager *mdm, MapType map_type);
/**
*
*/
std::vector<TargetID> GetNodeTargets(SharedMemoryContext *context);
std::vector<TargetID> LocalGetNodeTargets(SharedMemoryContext *context);

/**
*
*/
std::vector<TargetID> LocalGetNeighborhoodTargets(SharedMemoryContext *context);

/**
*
Expand Down
40 changes: 35 additions & 5 deletions src/metadata_storage_stb_ds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,12 @@ bool LocalDestroyBucket(SharedMemoryContext *context, RpcContext *rpc,
return destroyed;
}

std::vector<TargetID> GetNodeTargets(SharedMemoryContext *context) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
u32 length = mdm->node_targets.length;
std::vector<TargetID> LocalGetTargets(MetadataManager *mdm,
IdList target_list) {
u32 length = target_list.length;
std::vector<TargetID> result(length);

u64 *target_ids = GetIdsPtr(mdm, mdm->node_targets);
u64 *target_ids = GetIdsPtr(mdm, target_list);
for (u32 i = 0; i < length; ++i) {
result[i].as_int = target_ids[i];
}
Expand All @@ -507,6 +507,22 @@ std::vector<TargetID> GetNodeTargets(SharedMemoryContext *context) {
return result;
}

std::vector<TargetID> LocalGetNodeTargets(SharedMemoryContext *context) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
std::vector<TargetID> result = LocalGetTargets(mdm, mdm->node_targets);

return result;
}

std::vector<TargetID>
LocalGetNeighborhoodTargets(SharedMemoryContext *context) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
std::vector<TargetID> result = LocalGetTargets(mdm,
mdm->neighborhood_targets);

return result;
}

void PutToStorage(MetadataManager *mdm, const char *key, u64 val,
MapType map_type) {
Heap *heap = GetMapHeap(mdm);
Expand Down Expand Up @@ -614,6 +630,21 @@ void InitSwapSpaceFilename(MetadataManager *mdm, Arena *arena, Config *config) {
GetOffsetFromMdm(mdm, swap_file_suffix_memory);
}

void InitNeighborhoodTargets(SharedMemoryContext *context, RpcContext *rpc) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
std::vector<TargetID> neighborhood_targets =
GetNeighborhoodTargets(context, rpc);
size_t num_targets = neighborhood_targets.size();
IdList targets = AllocateIdList(mdm, num_targets);
TargetID *ids = (TargetID *)GetIdsPtr(mdm, targets);
for (size_t i = 0; i < num_targets; ++i) {
ids[i] = neighborhood_targets[i];
}
ReleaseIdsPtr(mdm);

mdm->neighborhood_targets = targets;
}

void InitMetadataStorage(SharedMemoryContext *context, MetadataManager *mdm,
Arena *arena, Config *config) {
InitSwapSpaceFilename(mdm, arena, config);
Expand All @@ -639,7 +670,6 @@ void InitMetadataStorage(SharedMemoryContext *context, MetadataManager *mdm,
ReleaseIdsPtr(mdm);
mdm->node_targets = node_targets;


// ID Maps

i64 total_map_capacity = GetHeapFreeList(map_heap)->size / 3;
Expand Down
8 changes: 8 additions & 0 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
req.respond(result);
};

function<void(const request&)> rpc_get_node_targets =
[context](const request &req) {
std::vector<TargetID> result = LocalGetNodeTargets(context);

req.respond(result);
};

function<void(const request&)> rpc_finalize =
[rpc](const request &req) {
(void)req;
Expand Down Expand Up @@ -381,6 +388,7 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
rpc_server->define("RemoteGetGlobalDeviceCapacities",
rpc_get_global_device_capacities);
rpc_server->define("RemoteGetBlobIds", rpc_get_blob_ids);
rpc_server->define("RemoteGetNodeTargets", rpc_get_node_targets);
rpc_server->define("RemoteFinalize", rpc_finalize).disable_response();
}

Expand Down
14 changes: 14 additions & 0 deletions test/mdm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,19 @@ static void TestMaxNameLength(HermesPtr hermes) {
bucket.Destroy(ctx);
}

void TestGetRelativeNodeId() {
RpcContext rpc = {};
rpc.num_nodes = 10;
rpc.node_id = 1;

Assert(GetNextNode(&rpc) == 2);
Assert(GetPreviousNode(&rpc) == 10);

rpc.node_id = 10;
Assert(GetNextNode(&rpc) == 1);
Assert(GetPreviousNode(&rpc) == 9);
}

int main(int argc, char **argv) {
int mpi_threads_provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &mpi_threads_provided);
Expand All @@ -208,6 +221,7 @@ int main(int argc, char **argv) {
TestRenameBucket(hermes);
TestBucketRefCounting(hermes);
TestMaxNameLength(hermes);
TestGetRelativeNodeId();

hermes->Finalize(true);

Expand Down