Skip to content

Commit

Permalink
Implement global mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Mar 2, 2021
1 parent 429a5d8 commit 4bcb171
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 19 deletions.
38 changes: 28 additions & 10 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

namespace hermes {

const u32 kGlobalMutexNode = 1;

bool IsNameTooLong(const std::string &name, size_t max) {
bool result = false;
if (name.size() + 1 >= max) {
Expand Down Expand Up @@ -277,7 +275,7 @@ BucketID GetOrCreateBucketId(SharedMemoryContext *context, RpcContext *rpc,
const std::string &name) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);

BeginGlobalTicketMutex(rpc);
BeginGlobalTicketMutex(context, rpc);
BeginTicketMutex(&mdm->bucket_mutex);
BucketID result = GetBucketIdByName(context, rpc, name.c_str());

Expand All @@ -289,7 +287,7 @@ BucketID GetOrCreateBucketId(SharedMemoryContext *context, RpcContext *rpc,
result = GetNextFreeBucketId(context, rpc, name);
}
EndTicketMutex(&mdm->bucket_mutex);
EndGlobalTicketMutex(rpc);
EndGlobalTicketMutex(context, rpc);

return result;
}
Expand Down Expand Up @@ -1055,13 +1053,33 @@ std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,
return result;
}

void BeginGlobalTicketMutex(RpcContext *rpc) {
[[maybe_unused]]
bool result = RpcCall<bool>(rpc, kGlobalMutexNode, "BeginGlobalTicketMutex");
void LocalBeginGlobalTicketMutex(MetadataManager *mdm) {
BeginTicketMutex(&mdm->global_mutex);
}

void LocalEndGlobalTicketMutex(MetadataManager *mdm) {
EndTicketMutex(&mdm->global_mutex);
}

void EndGlobalTicketMutex(RpcContext *rpc) {
[[maybe_unused]]
bool result = RpcCall<bool>(rpc, kGlobalMutexNode, "EndGlobalTicketMutex");
void BeginGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc) {
if (rpc->node_id == kGlobalMutexNodeId) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
LocalBeginGlobalTicketMutex(mdm);
} else {
[[maybe_unused]]
bool result = RpcCall<bool>(rpc, kGlobalMutexNodeId,
"RemoteBeginGlobalTicketMutex");
}
}

void EndGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc) {
if (rpc->node_id == kGlobalMutexNodeId) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
LocalEndGlobalTicketMutex(mdm);
} else {
[[maybe_unused]]
bool result = RpcCall<bool>(rpc, kGlobalMutexNodeId,
"RemoteEndGlobalTicketMutex");
}
}
} // namespace hermes
25 changes: 23 additions & 2 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

namespace hermes {

static const u32 kGlobalMutexNodeId = 1;

struct RpcContext;

enum MapType {
Expand Down Expand Up @@ -103,6 +105,15 @@ struct MetadataManager {
// TODO(chogan): @optimization Should the TicketMutexes here be reader/writer
// locks?

// TODO(chogan): @optimization Hopefully this is used rarely. If it becomes
// something that's commonly used, we need to come up with something smarter.
/** Mutex shared by all nodes for operations that require synchronization.
*
* Should only be accessed via BeginGlobalTicketMutex() and
* EndGlobalTicketMutex().
*/
TicketMutex global_mutex;

/** Lock for accessing `BucketInfo` structures located at
* `bucket_info_offset` */
TicketMutex bucket_mutex;
Expand Down Expand Up @@ -299,12 +310,22 @@ bool IsNullBlobId(BlobID id);
/**
*
*/
void BeginGlobalTicketMutex(RpcContext *rpc);
void BeginGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc);

/**
*
*/
void EndGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc);

/**
*
*/
void LocalBeginGlobalTicketMutex(MetadataManager *mdm);

/**
*
*/
void EndGlobalTicketMutex(RpcContext *rpc);
void LocalEndGlobalTicketMutex(MetadataManager *mdm);

} // namespace hermes

Expand Down
18 changes: 11 additions & 7 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,22 +347,26 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
state->engine->finalize();
};

// TEMP(chogan): Only one node needs this
// TODO(chogan): Only one node needs this. Separate RPC server?
auto rpc_begin_global_ticket_mutex = [context, rpc](const tl::request &req) {
ThalliumState *state = GetThalliumState(rpc);
BeginTicketMutex(&state->global_mutex);
DLOG_ASSERT(rpc->node_id == kGlobalMutexNodeId);
MetadataManager *mdm = GetMetadataManagerFromContext(context);
LocalBeginGlobalTicketMutex(mdm);

req.respond(true);
};

auto rpc_end_global_ticket_mutex = [context, rpc](const tl::request &req) {
ThalliumState *state = GetThalliumState(rpc);
EndTicketMutex(&state->global_mutex);
DLOG_ASSERT(rpc->node_id == kGlobalMutexNodeId);
MetadataManager *mdm = GetMetadataManagerFromContext(context);
LocalEndGlobalTicketMutex(mdm);

req.respond(true);
};
rpc_server->define("BeginGlobalTicketMutex", rpc_begin_global_ticket_mutex);
rpc_server->define("EndGlobalTicketMutex", rpc_end_global_ticket_mutex);
rpc_server->define("RemoteBeginGlobalTicketMutex",
rpc_begin_global_ticket_mutex);
rpc_server->define("RemoteEndGlobalTicketMutex",
rpc_end_global_ticket_mutex);
//

// TODO(chogan): Currently these three are only used for testing.
Expand Down

0 comments on commit 4bcb171

Please sign in to comment.