Skip to content

Commit

Permalink
Use global mutex for GetOrCreateBucketId
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Feb 26, 2021
1 parent 2b7f7d7 commit 429a5d8
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 7 deletions.
4 changes: 0 additions & 4 deletions src/api/hermes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ void Hermes::AppBarrier() {

bool Hermes::BucketContainsBlob(const std::string &bucket_name,
const std::string &blob_name) {
MetadataManager *mdm = GetMetadataManagerFromContext(&context_);
BeginTicketMutex(&mdm->bucket_mutex);
BucketID bucket_id = GetBucketIdByName(&context_, &rpc_, bucket_name.c_str());
EndTicketMutex(&mdm->bucket_mutex);

bool result = hermes::ContainsBlob(&context_, &rpc_, bucket_id, blob_name);

return result;
Expand Down
15 changes: 13 additions & 2 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

namespace hermes {

const u32 kGlobalMutexNode = 1;

bool IsNameTooLong(const std::string &name, size_t max) {
bool result = false;
if (name.size() + 1 >= max) {
Expand Down Expand Up @@ -275,6 +277,7 @@ BucketID GetOrCreateBucketId(SharedMemoryContext *context, RpcContext *rpc,
const std::string &name) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);

BeginGlobalTicketMutex(rpc);
BeginTicketMutex(&mdm->bucket_mutex);
BucketID result = GetBucketIdByName(context, rpc, name.c_str());

Expand All @@ -286,6 +289,7 @@ BucketID GetOrCreateBucketId(SharedMemoryContext *context, RpcContext *rpc,
result = GetNextFreeBucketId(context, rpc, name);
}
EndTicketMutex(&mdm->bucket_mutex);
EndGlobalTicketMutex(rpc);

return result;
}
Expand Down Expand Up @@ -614,10 +618,8 @@ void LocalRenameBucket(SharedMemoryContext *context, RpcContext *rpc,
BucketID id, const std::string &old_name,
const std::string &new_name) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
BeginTicketMutex(&mdm->bucket_mutex);
DeleteId(mdm, rpc, old_name, kMapType_Bucket);
PutBucketId(mdm, rpc, new_name, id);
EndTicketMutex(&mdm->bucket_mutex);
}

void RenameBucket(SharedMemoryContext *context, RpcContext *rpc, BucketID id,
Expand Down Expand Up @@ -1053,4 +1055,13 @@ std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,
return result;
}

void BeginGlobalTicketMutex(RpcContext *rpc) {
[[maybe_unused]]
bool result = RpcCall<bool>(rpc, kGlobalMutexNode, "BeginGlobalTicketMutex");
}

void EndGlobalTicketMutex(RpcContext *rpc) {
[[maybe_unused]]
bool result = RpcCall<bool>(rpc, kGlobalMutexNode, "EndGlobalTicketMutex");
}
} // namespace hermes
10 changes: 10 additions & 0 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
*/
bool IsNullBlobId(BlobID id);

/**
*
*/
void BeginGlobalTicketMutex(RpcContext *rpc);

/**
*
*/
void EndGlobalTicketMutex(RpcContext *rpc);

} // namespace hermes

#endif // HERMES_METADATA_MANAGEMENT_H_
18 changes: 18 additions & 0 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,24 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
state->engine->finalize();
};

// TEMP(chogan): Only one node needs this
auto rpc_begin_global_ticket_mutex = [context, rpc](const tl::request &req) {
ThalliumState *state = GetThalliumState(rpc);
BeginTicketMutex(&state->global_mutex);

req.respond(true);
};

auto rpc_end_global_ticket_mutex = [context, rpc](const tl::request &req) {
ThalliumState *state = GetThalliumState(rpc);
EndTicketMutex(&state->global_mutex);

req.respond(true);
};
rpc_server->define("BeginGlobalTicketMutex", rpc_begin_global_ticket_mutex);
rpc_server->define("EndGlobalTicketMutex", rpc_end_global_ticket_mutex);
//

// TODO(chogan): Currently these three are only used for testing.
rpc_server->define("GetBuffers", rpc_get_buffers);
rpc_server->define("SplitBuffers", rpc_split_buffers).disable_response();
Expand Down
1 change: 1 addition & 0 deletions src/rpc_thallium.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct ThalliumState {
tl::engine *engine;
tl::engine *bo_engine;
ABT_xstream execution_stream;
TicketMutex global_mutex;
};

struct ClientThalliumState {
Expand Down
1 change: 0 additions & 1 deletion test/end_to_end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ int main(int argc, char **argv) {

// Each rank puts and gets its portion of a blob to a shared bucket
hapi::Bucket shared_bucket(std::string("test_bucket"), hermes, ctx);

TestPutGetBucket(shared_bucket, app_rank, app_size);

if (app_rank != 0) {
Expand Down

0 comments on commit 429a5d8

Please sign in to comment.