diff --git a/src/metadata_management.cc b/src/metadata_management.cc index 3dd428f7f..6d3a1ce85 100644 --- a/src/metadata_management.cc +++ b/src/metadata_management.cc @@ -24,8 +24,6 @@ namespace hermes { -const u32 kGlobalMutexNode = 1; - bool IsNameTooLong(const std::string &name, size_t max) { bool result = false; if (name.size() + 1 >= max) { @@ -277,7 +275,7 @@ BucketID GetOrCreateBucketId(SharedMemoryContext *context, RpcContext *rpc, const std::string &name) { MetadataManager *mdm = GetMetadataManagerFromContext(context); - BeginGlobalTicketMutex(rpc); + BeginGlobalTicketMutex(context, rpc); BeginTicketMutex(&mdm->bucket_mutex); BucketID result = GetBucketIdByName(context, rpc, name.c_str()); @@ -289,7 +287,7 @@ BucketID GetOrCreateBucketId(SharedMemoryContext *context, RpcContext *rpc, result = GetNextFreeBucketId(context, rpc, name); } EndTicketMutex(&mdm->bucket_mutex); - EndGlobalTicketMutex(rpc); + EndGlobalTicketMutex(context, rpc); return result; } @@ -1055,13 +1053,33 @@ std::vector GetNeighborhoodTargets(SharedMemoryContext *context, return result; } -void BeginGlobalTicketMutex(RpcContext *rpc) { - [[maybe_unused]] - bool result = RpcCall(rpc, kGlobalMutexNode, "BeginGlobalTicketMutex"); +void LocalBeginGlobalTicketMutex(MetadataManager *mdm) { + BeginTicketMutex(&mdm->global_mutex); +} + +void LocalEndGlobalTicketMutex(MetadataManager *mdm) { + EndTicketMutex(&mdm->global_mutex); } -void EndGlobalTicketMutex(RpcContext *rpc) { - [[maybe_unused]] - bool result = RpcCall(rpc, kGlobalMutexNode, "EndGlobalTicketMutex"); +void BeginGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc) { + if (rpc->node_id == kGlobalMutexNodeId) { + MetadataManager *mdm = GetMetadataManagerFromContext(context); + LocalBeginGlobalTicketMutex(mdm); + } else { + [[maybe_unused]] + bool result = RpcCall(rpc, kGlobalMutexNodeId, + "RemoteBeginGlobalTicketMutex"); + } +} + +void EndGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc) { + if (rpc->node_id == kGlobalMutexNodeId) { + MetadataManager *mdm = GetMetadataManagerFromContext(context); + LocalEndGlobalTicketMutex(mdm); + } else { + [[maybe_unused]] + bool result = RpcCall(rpc, kGlobalMutexNodeId, + "RemoteEndGlobalTicketMutex"); + } } } // namespace hermes diff --git a/src/metadata_management.h b/src/metadata_management.h index 7c2d3d2fa..ca1304990 100644 --- a/src/metadata_management.h +++ b/src/metadata_management.h @@ -23,6 +23,8 @@ namespace hermes { +static const u32 kGlobalMutexNodeId = 1; + struct RpcContext; enum MapType { @@ -103,6 +105,15 @@ struct MetadataManager { // TODO(chogan): @optimization Should the TicketMutexes here be reader/writer // locks? + // TODO(chogan): @optimization Hopefully this is used rarely. If it becomes + // something that's commonly used, we need to come up with something smarter. + /** Mutex shared by all nodes for operations that require synchronization. + * + * Should only be accessed via BeginGlobalTicketMutex() and + * EndGlobalTicketMutex(). + */ + TicketMutex global_mutex; + /** Lock for accessing `BucketInfo` structures located at * `bucket_info_offset` */ TicketMutex bucket_mutex; @@ -299,12 +310,22 @@ bool IsNullBlobId(BlobID id); /** * */ -void BeginGlobalTicketMutex(RpcContext *rpc); +void BeginGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc); + +/** + * + */ +void EndGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc); + +/** + * + */ +void LocalBeginGlobalTicketMutex(MetadataManager *mdm); /** * */ -void EndGlobalTicketMutex(RpcContext *rpc); +void LocalEndGlobalTicketMutex(MetadataManager *mdm); } // namespace hermes diff --git a/src/rpc_thallium.cc b/src/rpc_thallium.cc index 2030d650c..f1419148f 100644 --- a/src/rpc_thallium.cc +++ b/src/rpc_thallium.cc @@ -347,22 +347,26 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, state->engine->finalize(); }; - // TEMP(chogan): Only one node needs this + // TODO(chogan): Only one node needs this. Separate RPC server? auto rpc_begin_global_ticket_mutex = [context, rpc](const tl::request &req) { - ThalliumState *state = GetThalliumState(rpc); - BeginTicketMutex(&state->global_mutex); + DLOG_ASSERT(rpc->node_id == kGlobalMutexNodeId); + MetadataManager *mdm = GetMetadataManagerFromContext(context); + LocalBeginGlobalTicketMutex(mdm); req.respond(true); }; auto rpc_end_global_ticket_mutex = [context, rpc](const tl::request &req) { - ThalliumState *state = GetThalliumState(rpc); - EndTicketMutex(&state->global_mutex); + DLOG_ASSERT(rpc->node_id == kGlobalMutexNodeId); + MetadataManager *mdm = GetMetadataManagerFromContext(context); + LocalEndGlobalTicketMutex(mdm); req.respond(true); }; - rpc_server->define("BeginGlobalTicketMutex", rpc_begin_global_ticket_mutex); - rpc_server->define("EndGlobalTicketMutex", rpc_end_global_ticket_mutex); + rpc_server->define("RemoteBeginGlobalTicketMutex", + rpc_begin_global_ticket_mutex); + rpc_server->define("RemoteEndGlobalTicketMutex", + rpc_end_global_ticket_mutex); // // TODO(chogan): Currently these three are only used for testing.